In [2]:
import pandas as pd
import pathlib
import re

In [769]:
def to_snake(name):
    name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
    return re.sub('([a-z0-9])([A-Z])', r'\1_\2', name).lower()

In [770]:
def snake_to_camel(word):
    import re
    return ''.join(x.capitalize() or '_' for x in word.split('_'))

In [4]:
# Cria os diretorios para onde vao os arquivos gerados
dirs = ['json', 'ddl', 'dags', 'datalake', 'sql', 'variables']

pathlib.Path('./enviar').mkdir(parents=True, exist_ok=True)
for directory in dirs:
    pathlib.Path(f"./enviar/{directory}").mkdir(parents=True, exist_ok=True)

In [None]:
############################################
##  BEGIN DEFINICOES MANUAIS OBRIGATORIAS ##
############################################

In [771]:
# Nome da conexao que vai para dentro da DAG
conn_id = "connection_name_id"


In [772]:
# Nome do arquivo que possui os metadados da tabela.
# IMPORTANTE: todos os nomes do cabecalho precisam estar em maiusculas.
# Por exemplo: TABLE_NAME, DATA_TYPE etc
# Toda vez que aparecer "metadado" escrito nos comentarios, se refere a esse arquivo e
# seu conteudo.
meta = pd.read_csv('metadados-example.csv')


In [773]:
# Como o nome do schema esta no arquivo de metadados e ele nao possui a
# convencao interna do time, se faz necessario criar uma estrutura de conversao
# como abaixo.
schemas_mapping = {
    'example1': 'dump__company_co_ods__example1',
    'example2': 'dump__company_co_ods__example2',
    'correios': 'dump__company_co_ods__correios'
}


In [None]:
##########################################
##  END DEFINICOES MANUAIS OBRIGATORIAS ##
##########################################

In [775]:
# Isso aqui serve para o script adicionar ou nao aquela chave:
# format: date_time
# para os tipos de dados de data (que sao considerados como string dentro do arquivo).
# Basicamente o script verifica se o data_type do metadado esta na lista abaixo,
# se estiver, ele adicionar as linhas correpondentes para o "format": "date-time".
tipos_data = ['datetime', 'datetime2', 'time', 'timestamp', 'date']

In [776]:
# Estrutura de conversao de tipos entre os metadados e o que se espera estar inserido
# no json schema.
data_type_conversion_source_json = {
    'int': 'integer',
    'datetime': 'string',
    'datetime2': 'string',
    'varchar': 'string',
    'bit': 'boolean',
    'char': 'string',
    'smallint': 'integer',
    'nvarchar': 'string',
    'bigint': 'number',
    'decimal': 'number',
    'float': 'number',
    'uniqueidentifier': 'string',
    'time': 'string',
    'date': 'string',
    'real': 'number',
    'varbinary': "string",
    'text': 'string',
    'numeric': 'number',
}

In [777]:
# Estrutura de conversao entre os tipos de dados dos metadados e o arquivo DDL para 
# o bigquery.
data_type_conversion_source_ddl = {
    'int': 'INTEGER',
    'datetime': 'TIMESTAMP',
    'datetime2': 'TIMESTAMP',    
    'varchar': 'STRING',
    'bit': 'BOOLEAN',
    'char': 'STRING',
    'smallint': 'INTEGER',
    'nvarchar': 'STRING',
    'bigint': 'INTEGER',
    'decimal': 'BIGNUMERIC',
    'float': 'BIGNUMERIC',
    'uniqueidentifier': 'STRING',
    'time': 'TIMESTAMP',
    'date': 'DATE',
    'real': 'BIGNUMERIC',
    'varbinary': "STRING",
    'text': 'STRING',
    'numeric': 'BIGNUMERIC',
}

In [778]:
# Abaixo eh o que gera a estrutura a partir dos metadados contendo todos os elementos
# que vao ser utilizados para gerar os arquivos.
raw_schema = {}

for index, row in meta.iterrows():

    if not row['TABLE_SCHEMA'] in raw_schema:
        raw_schema[row['TABLE_SCHEMA']] = {}

    column_snake = to_snake(row['COLUMN_NAME'])

    if not row['TABLE_NAME'] in raw_schema[row['TABLE_SCHEMA']]:
        raw_schema[row['TABLE_SCHEMA']][row['TABLE_NAME']] = []

    raw_schema[row['TABLE_SCHEMA']][row['TABLE_NAME']].append({'column_snake': column_snake, 'column': row['COLUMN_NAME'], 'data_type': row['DATA_TYPE'], 'table': row['TABLE_NAME'], 'schema': row['TABLE_SCHEMA'], 'catalog': row['TABLE_CATALOG']})
    tables.append(row['TABLE_NAME'])


In [782]:
# Gera efetivamente os arquivos
# O Mako eh uma ferramenta de templates para Python.
# Optei por ele e nao o Jinja porque o Jinja supoe um contexto de
# aplicativo (coisas do Flask etc) e eu queria algo mais simplificado para usar.
from mako.template import Template
columns = []

for schema in raw_schema:

    tables = []
    for table in raw_schema[schema]:
        tables.append(table)
        columns = []
        
        for d in raw_schema[schema][table]:

            catalog = d['catalog']
            column_snake = d['column_snake']
            column = d['column']
            table_snake = to_snake(table)
            catalog_snake = to_snake(catalog)
            schema_orig = schema
            schema_translated = schemas_mapping[schema]
            schema_snake = to_snake(schema)
            data_type_json = data_type_conversion_source_json[d['data_type']]
            data_type_ddl = data_type_conversion_source_ddl[d['data_type']]
            
            # Lembra daquela lista de tipos de dado datetime.
            # Entao, eh aqui que ela eh consultada.
            # Eh so para marcar se o script precisa adicionar a linha "format": "date-time"
            # no json.
            if d['data_type'] in tipos_data:
                datetime = 1
            else:
                datetime = 0

            columns.append({'table_sname': table_snake,
                            'table': table, 
                            'schema_snake': schema_snake,
                            'schema': schema,
                            'catalog_snake': catalog_snake,
                            'catalog': catalog,
                            'schema_orig': schema_orig,
                            'name_snake': column_snake, 
                            'name': column, 
                            'datetime': datetime, 
                            'data_type_ddl': data_type_ddl, 
                            'data_type_json': data_type_json})

        # Aqui comecam a ser gerados os arquivos:
        # ------------------------------------------
        # Create Dags
        
        # Essas variaveis todas vao ser utilizadas dentro do template.
        constant = f"{schemas_mapping[schema].upper()}_{to_snake(table).upper()}"
        task_id = f"{schemas_mapping[schema]}_{to_snake(table).lower()}"
        source_sql = f"{schemas_mapping[schema]}/{to_snake(table).lower()}.sql"
        conn_id = conn_id
        dag_id = f"{schemas_mapping[schema]}_{to_snake(table).lower()}_extractor"
        max_active_runs = f"{to_snake(table).lower()}"
        gcs_path = to_snake(table).lower()
        
        # O arquivo de template
        mytemplate = Template(filename='./templates/dag.tpl')
        
        # Onde o resultado da geracao deve ser escrito
        file_name = f"./enviar/dags/{schemas_mapping[schema]}-{to_snake(table).lower()}_extractor.py"
        
        # Repare que o arquivo eh escrito utilizando o render do Mako.
        with open(file_name, 'w') as f:                    
            f.write(mytemplate.render(constant=constant, task_id=task_id, source_sql = source_sql,
                                     conn_id=conn_id, dag_id=dag_id, max_active_runs=max_active_runs,
                                     schema=schemas_mapping[schema],
                                     gcs_path=gcs_path))

        # Segue o mesmo esquema acima, mas para os arquivos de schema json.
        # Create schemas json
        mytemplate = Template(filename='./templates/json.tpl')
        file_name = f"./enviar/json/{schemas_mapping[schema]}-{to_snake(table).lower()}.json"
        with open(file_name, 'w') as f:                    
            f.write(mytemplate.render(catalog=catalog, schema_orig = schema_orig, schema_snake = schema_snake, schema=schema_translated, table_snake = table_snake, table=table, columns=columns))

        # Create schemas ddl
        mytemplate = Template(filename='./templates/ddl.tpl')            
        file_name = f"./enviar/ddl/{schemas_mapping[schema]}-{to_snake(table).lower()}.json"
        with open(file_name, 'w') as f:                    
            f.write(mytemplate.render(catalog=catalog, schema_orig = schema_orig, schema_snake = schema_snake, schema=schema_translated, table_snake = table_snake, table=table, columns=columns))

        # Create sql files
        mytemplate = Template(filename='./templates/sql.tpl')            
        file_name = f"./enviar/sql/{to_snake(table).lower()}.sql"
        with open(file_name, 'w') as f:                    
            f.write(mytemplate.render(catalog=catalog, schema_orig = schema_orig, schema_snake = schema_snake, schema=schema_translated, table_snake = table_snake, table=table, columns=columns))

    # Aqui sai do loop aninhado porque eh sobre a lista de tabelas
    # e nao a lista de colunas dentro das tabelas.
    tmp_tables = []
    for t in tables:
        tmp_tables.append(to_snake(t))

    tables = tmp_tables
    del tmp_tables

    # Create datalake.tf
    mytemplate = Template(filename='./templates/datalake.tpl')
    file_name = f"./enviar/datalake/datalake.tf_{schemas_mapping[schema]}"
    with open(file_name, 'w') as f:                    
        f.write(mytemplate.render(tables=tables))

    # Create variables.json
    mytemplate = Template(filename='./templates/variables.tpl')
    file_name = f"./enviar/variables/variables.json_{schemas_mapping[schema]}"
    with open(file_name, 'w') as f:                    
        f.write(mytemplate.render(tables=tables, schema=schemas_mapping[schema]))              
