# 利用py注解实现创建sql

# 概述

我的场景——基于pyspark，其已经提供了强大的数据处理能力，我们当然可以选择全部用pyspark来实现，但往往编写spark sql已经足够，而且可读性比纯代码要好，因此，实际中，注意实现方式的选择。

通常的ETL功能java编写的居多，但针对我的场景。
- 首先，不需要这么强大的ETL功能，pyspark已经足够强大；
- 其次，也不想用java，毕竟scala是个不错的选择。不想增加自己负担。

因此，本文旨在用py通过注解实现一个etl功能，定制sql语句。

# 工程结构

文件 | 说明
---|---
const.py | 常量，必要的符号和sql关键字
etl.py | etl，基本注解功能
etl.ipnb | 测试注解

本文只提供了最基本的定制sql的注解，但是实际中，你可以通过这些基本注解，再定义自己的注解，从而实现更丰富的功能。

# SQL注解

注解| 说明
---|---
select | select {field} from {table} {filter}
subquery | select field from (select * from table) alias
Aggregate | {sub-query} {group}
AggregateSel | select {field} from {table} {filter} {group}
Join | select {field} from ({before}) {before_alias} \[right&#124;left&#124;inner\] join ({after}) {after_alias} on({on})
selectMap | select field_map from (select field from table where f) alias
Map | select mapper from (sub-query) alias
InsertOverWrite | INSERT OVERWRITE TABLE {table} {partition} {select}

In [9]:
from etl import *

# 测试select和subquery

In [18]:
def test_Select_and_SubQuery():
    """

    :return:
    """

    @SubQuery(field=['id', 'name'], alias='t1')
    @Select(field=['*'], f=["1=1", 'and', 'id >100'])
    def table():
        return 'table'

    print('---------------------')
    print('| Select and SubQuery')
    print('---------------------')
    s = table()
    print('s:   {}'.format(s))

In [19]:
test_Select_and_SubQuery()

---------------------
| Select and SubQuery
---------------------
s:   select id,name from (select * from table WHERE 1=1 and id >100) t1


# 测试join

In [20]:
def test_Join():
    """

    :return:
    """

    @Select(field=['*'], f=["1=1"])
    def table1():
        return 'table1'

    @Select(field=['*'], f=["1=1"])
    def table2():
        return 'table2'

    print('---------------------')
    print('| Join')
    print('---------------------')
    s1 = table1()
    s2 = table2()
    print('s1:  {}'.format(s1))
    print('s2:  {}'.format(s2))

    @Join(t="right", before=s1, after=s2, field=['*'], on=['t1.id=t2.id'])
    def join(before_field=[], after_field=[]):
        return {
            "before_field": before_field,
            "after_field": after_field
        }

    s3 = join()
    print('s3:  {}'.format(s3))

In [21]:
test_Join()

---------------------
| Join
---------------------
s1:  select * from table1 WHERE 1=1
s2:  select * from table2 WHERE 1=1
s3:  
    select * from (select * from table1 WHERE 1=1) t1
    right join (select * from table2 WHERE 1=1) t2 on(t1.id=t2.id)
    


# 测试selectmap和map

In [22]:
def test_SelectMap_and_Map():
    """

    :return:
    """
    print('---------------------')
    print('| SelectMap and Map')
    print('---------------------')

    @SelectMap(
        field=['identity', 'start_date', 'len', "regexp_replace(regexp_replace(y, '\\\\]', ''), '\\\\[', '') as y"],
        field_map=['identity', 'start_date', 'len', 'split(y) as y'], alias="t1",
        f=["type='XTL'", "and", "key='sales'"])
    def select_map(table):
        return {"table": table}

    s1 = select_map(table="mytable")
    print('s1:  {}'.format(s1))

    @Map(mapper=['identity', 'start_date', 'len', "concat(',', d1, d2)"], alias="t2")
    @Map(mapper=['identity', 'start_date', 'len', 'd1', 'd2'], alias="t2")
    def mapper():
        sql = select_map("mytable")
        return sql

    s2 = mapper()
    print('s2:  {}'.format(s2))

In [23]:
test_SelectMap_and_Map()

---------------------
| SelectMap and Map
---------------------
s1:  select identity,start_date,len,split(y) as y from (select identity,start_date,len,regexp_replace(regexp_replace(y, '\\]', ''), '\\[', '') as y from mytable WHERE type='XTL' and key='sales') t1
s2:  select identity,start_date,len,concat(',', d1, d2) from (select identity,start_date,len,d1,d2 from (select identity,start_date,len,split(y) as y from (select identity,start_date,len,regexp_replace(regexp_replace(y, '\\]', ''), '\\[', '') as y from mytable WHERE type='XTL' and key='sales') t1) t2) t2
