# Levantamiento Tablas originales de las Vistas

> **PROYECTO : LEVANTAMIENTO TABLAS ORIGINALES QUE ALIMENTAN A LAS VISTAS GESTOR DE CAMPAÑAS TERADATA** <br> 
**Extracción de tablas originales utilizadas en las Vistas** <br>
Los archivos a procesar están en la carpeta ./ArchivosProcesar  <br>
Versión:  1.0  <br>
Fecha: 01-05-2020  <br>
Descripción: Versión Inicial  <br>
Desarrollador: Axity | Adriana Jiménez 

## Librerías

In [24]:
import re
import sys
import os, glob
import pandas as pd 
import numpy as np
import datetime

## Libreria que se integra con Teradata
import giraffez

## Funciones: Insertar en BD Teradata

In [25]:
## Configuracion de conexion
td_config = {
    "username": "exajibl",
    "password": "acjb0610",
    "host": "dataware.bci.cl"}

In [26]:
def create_and_drop_tables_teradata():
    '''
    Función que crea las tablas en Teradata
    '''
    
    print("ELIMINANDO Y CREANDO TABLAS..\n")
    
    drop_views_views   = "DROP TABLE EDW_TEMPUSU.LAC_LISTA_VISTAS_DE_VISTAS"
    drop_views_tables  = "DROP TABLE EDW_TEMPUSU.LAC_VISTAS_Y_TABLAS_ORIGEN"

    create_views_views = """CREATE MULTISET TABLE EDW_TEMPUSU.LAC_LISTA_VISTAS_DE_VISTAS
        (
          TABLA_OUTPUT_COMPUESTA VARCHAR(200) ,
          ESQUEMA_INPUT VARCHAR(50) ,
          TABLA_INPUT_COMPUESTA VARCHAR(200)      
          ) ;"""

    create_views_tables = """CREATE MULTISET TABLE EDW_TEMPUSU.LAC_VISTAS_Y_TABLAS_ORIGEN
        (
          VISTA_OUTPUT VARCHAR(200) ,
          TABLA_INPUT VARCHAR(300)      
          ) ;"""
    
    with giraffez.Cmd(**td_config) as cmd:
        if cmd.exists("EDW_TEMPUSU.LAC_LISTA_VISTAS_DE_VISTAS"):
            cmd.execute(drop_views_views)
        if cmd.exists("EDW_TEMPUSU.LAC_VISTAS_Y_TABLAS_ORIGEN"):
            cmd.execute(drop_views_tables)

        cmd.execute(create_views_views)
        cmd.execute(create_views_tables)

In [27]:
def insert_files_csv_in_teradata(df):
    '''
    Función que inserta los dataframes a Teradata
    '''
    
    print("INSERTANDO ARCHIVOS A LAS TABLAS..\n")

    if (df.empty == False): 
        
        with giraffez.BulkLoad("EDW_TEMPUSU.LAC_LISTA_VISTAS_DE_VISTAS", **td_config) as load:
            load.cleanup()
            load.columns = df.columns.tolist()
            for row in df.values.tolist(): 
                load.put([row[0], row[1], row[2] ])    

In [28]:
def insert_result_in_teradata(df):
    '''
    Función que inserta los dataframes a Teradata
    '''
    
    print("INSERTANDO INFO. DE TABLAS ORIGENES DE LAS VISTAS EN TERADATA..\n")
    
    if (df.empty == False): 
        
        df['VISTA_OUTPUT']  = df['VISTA_OUTPUT'].astype('str')
        df['TABLA_INPUT']   = df['TABLA_INPUT'].astype('str')
        
        with giraffez.BulkLoad("EDW_TEMPUSU.LAC_VISTAS_Y_TABLAS_ORIGEN", **td_config) as load:
            load.cleanup()
            load.columns = df.columns.tolist()
            for row in df.values.tolist():
                load.put([ row[0], row[1] ])    

In [29]:
def get_real_existing_views(views):
    '''Buscar en Teradata las vistas extraídas de los archivos, 
       para ver si existen realmente en base de datos
    '''

    query_get_real_views = """SELECT TRIM(databasename)||'.'||TRIM(tablename) AS NOMBRE_VISTAS
                                 FROM dbc.tables 
                                WHERE tablekind='V' 
                                  and databasename='ARMVIEWS'
                                  and tablename in {} """.format(views)
    
    
    with giraffez.BulkExport(query_get_real_views, **td_config) as export:
        dataset = export.to_list()
    
    headers = ['NOMBRE_VISTAS']
    df = pd.DataFrame(dataset, columns = headers)
    
    return df

## Inicio

### Funciones: Insertar en Teradata las Vistas creadas por Vistas

In [30]:
def return_groupby_views(df_vistas):
    
    cols_output = ['ESQUEMA_OUTPUT', 'TABLA_OUTPUT'] 

    cols_input  = ['ESQUEMA_INPUT', 'TABLA_INPUT']

    columns = ['TABLA_OUTPUT_COMPUESTA','ESQUEMA_INPUT','TABLA_INPUT_COMPUESTA']
    
    # Genero un campo compuesto con el formato esquema.tabla 
    df_vistas['TABLA_OUTPUT_COMPUESTA'] = df_vistas[cols_output].\
                                apply(lambda row: '.'.join(row.values.astype(str)), axis=1)

    df_vistas['TABLA_INPUT_COMPUESTA'] = df_vistas[cols_input].\
                                apply(lambda row: '.'.join(row.values.astype(str)), axis=1)

    df_vistas_groupby_tables = df_vistas[columns].groupby(columns).last().reset_index() 
    
    return df_vistas_groupby_tables

In [31]:
'''
Función que por cada vista, retorna las tablas internas que la creó
Ej: Ocurre para los casos que una vista fue creada por otra vista
'''
def loop_return_tables_in_views(list_tables_views):

    list_views = []
    list_querys_show_views = []
    
    list_only_views         = []  # lista que tenga VISTA creada por VISTAS.
    list_final_tables_views = []  # Lista que guardará todo el levantamiento de todas las tablas.
    
    
    ## Lista que retorna TABLA_OUTPUT_COMPUESTA|ESQUEMA_INPUT|TABLA_INPUT_COMPUESTA
    for list_view in list_tables_views:
        
        ouput_view        = list_view[0]
        schema_input_view = list_view[1]
        input_view        = list_view[2]
        
        ## Aqui filtro todo lo que sea INPUT de Vistas
        if (schema_input_view == 'ARMVIEWS'):       
            list_only_views.append(list_view)
        else:
            ## Almaceno las vistas que tienen INPUT de Tablas
            list_final_tables_views.append(list_view)
            
    return list_only_views  

### Creación de queries para envío masivo a Airflow

In [32]:
def unique(_list):
    """
    Makes the list have unique items only and maintains the order
    list(set()) won't provide that
    :type _list list
    :rtype: list
    """
    ret = []

    for item in _list:
        if item not in ret:
            ret.append(item)

    return ret

In [33]:
def create_file_for_airflow(list_only_views):
    '''Crea un archivo que genera una lista de vistas existentes'''
    
    airflow_file =open('Resultados/querys_show_qualified_para_airflow.txt','w')

    # Hacerle split a las vistas output
    list_ = []
    for views in list_only_views:
        view = views[0].split('.')[1]
        list_.append(view)

    # Hacer una lista unica VISTA
    list_unique_views = unique(list_)

    # Convertir la lista en tupla
    def convert_to_tuple(list): 
        return (*list,)

    tuple_views = convert_to_tuple(list_unique_views)

    # Buscar en Teradata si las vistas existen realmente en la BD
    df_unique_views = get_real_existing_views(tuple_views) 
    unique_views    = df_unique_views.values.tolist()

    for view in unique_views:
        print(view)
        #static_select    = "SELECT '"+ view + "';"
        static_show_view = "SHOW QUALIFIED SELECT * FROM "+ view[0]+ ";"

        #airflow_file.write("%s" % static_select +'\n')
        airflow_file.write("%s" % static_show_view +'\n')

    airflow_file.close() 

In [34]:
def convert_list_into_dataframe(total_results_list):

    headers   = ['VISTA_OUTPUT', 'TABLA_INPUT']
    
    df_final  = pd.DataFrame(columns = headers)
    
    for list_views_tables in total_results_list:
        view = list_views_tables[0] 
        lenght_list = len(list_views_tables)
    
        for i in range(1, lenght_list):
            
            table = list_views_tables[i]          
        
            new_df_tables = pd.DataFrame(columns = headers)
            
            new_df_tables = new_df_tables.append(
                [ {'VISTA_OUTPUT' : view, 
                   'TABLA_INPUT'  : table
                  }]) 
            
            df_final = df_final.append(new_df_tables,ignore_index=True,sort=False)
    
    return df_final

In [35]:
def processing_files(log_airflow):
    
    ## Lista de sintaxis SQL que debería tomar cada linea del archivo,
    ## Obtener solo las SHOW QUALIFIED, CREATE TABLES Y ERRORES.
    
    show_qualified_str    = 'SHOW QUALIFIED SELECT'
    create_ddl_str        = 'CREATE SET TABLE'
    failure_message_str   = '*** FAILURE'
    
    view_and_tables_list  = []
    total_results_list    = []
    
    dml_dll_syntax_query = [ show_qualified_str, create_ddl_str, failure_message_str ]   
        
    clear_log  = re.sub('[[\]{}].*[[\]{}]', ' ', log_airflow)   ## elimina las llaves y corchetes con su contenido
    clear_log = clear_log.replace(" INFO - ",'')                ## elimina los INFO mensajes
    clear_log = clear_log.replace("+",'')                       ## reemplaza otros caracteres
    clear_log = re.sub('\-\-.*?\n|\/\*.*?\*\/', ' ', clear_log) ## elimina los comentarios  
    clear_log = clear_log.strip()
        
    lines = clear_log.splitlines();
            
    for line in lines: 
        # elimina espacios a comienzo de la linea y lo edita a mayuscula
        line = line.strip().upper()                
        
        # comienza agregando las querys que interesan
        if line.startswith(tuple(dml_dll_syntax_query)): 
            
            if (line.startswith(show_qualified_str)):  ## comienza con SHOW QUALIFIED
                
                if (len(view_and_tables_list) > 0):
                    total_results_list.append(view_and_tables_list)
                    view_and_tables_list = []
                
                ## Obtiene ESQUEMA.VISTA directamente de la query
                ## Ej: SHOW QUALIFIED SELECT * FROM ARMVIEWS.VISTA_A;
                view = re.findall(r'[\w\.-]+\.[\w\.-]+', line)[0] 
                view_and_tables_list.append(view)
                
            elif(line.startswith(create_ddl_str)):    ## comienza con CREATE TABLE                
                
                ## Obtiene ESQUEMA.TABLA directamente de la query
                ## Ej: CREATE SET TABLE ARMINP.TABLA_1 ,FALLBACK ,
                table = re.findall(r'[\w\.-]+\.[\w\.-]+', line)[0]
                view_and_tables_list.append(table)              
                  
            elif(line.startswith(failure_message_str)):  ## comienza con Failure
                view_and_tables_list.append(line)
    
    
    if (len(view_and_tables_list) > 0):
        total_results_list.append(view_and_tables_list)
                
    return total_results_list


## Inicio

In [36]:
def fast_scandir_files(path):
    '''
    Función que escanea los directorios y subdirectorios,
    obteniendo como resultado una lista de archivo con,
    formato: ./Directorio/Subdirectorio/archivo
    '''

    file_list = []
    
    for root, dirs, files in os.walk(path):
        for file in files:
            if(file.endswith(".log")):
                file_list.append(os.path.join(root,file))
                
    return file_list

In [37]:
def first_step():

    df_vistas                = pd.read_csv("LAC_Levantamiento_Vistas.csv",  sep='|') 
    df_vistas_groupby_tables = return_groupby_views(df_vistas)
    list_tables_views        = df_vistas_groupby_tables.values.tolist()
    list_only_views          = loop_return_tables_in_views(list_tables_views)

    headers      = ['TABLA_OUTPUT_COMPUESTA','ESQUEMA_INPUT','TABLA_INPUT_COMPUESTA']
    df_only_view = pd.DataFrame(list_only_views, columns = headers)
    
    create_and_drop_tables_teradata()
    
    insert_files_csv_in_teradata(df_only_view)
    
    return list_only_views

In [38]:
def extract_tables_from_log_airflow():

    dir_logs = 'Logs_Airflow/'
    log_files = fast_scandir_files(dir_logs)
    log_files.sort()

    headers   = ['VISTA_OUTPUT', 'TABLA_INPUT']
    df_final_all_logs  = pd.DataFrame(columns = headers)

    for log in log_files:
        print(log)
        
        filename        = log
        archivo         = open(filename, encoding="utf8",errors='ignore')
        log_airflow     = archivo.read()

        total_results_list      = processing_files(log_airflow)
        df_final_results_by_log = convert_list_into_dataframe(total_results_list)

        df_final_all_logs = df_final_all_logs.append(df_final_results_by_log, ignore_index=True, sort=False)
        
        
    df_final_all_logs.to_csv("Resultados/resultados_logs_airflow.csv", header=True, index=None, sep='|', mode='w') 
    
    # Elimina y crea tablas en teradata
    create_and_drop_tables_teradata()
    
    # Inserta el dataframe a Teradata
    insert_result_in_teradata(df_final_all_logs)

In [40]:
if __name__ == "__main__":

    now = datetime.datetime.now()
    print (now)
    
    ###### PASO 1)
    '''
        Sección que extrae del archivo de vistas, las vistas que estan creadas por otras vistas
        y retorna una lista unica de las mismas. Además inserta la lista en teradata para su 
        posterior consulta.
        
        Condición: Debe existir el archivo LAC_Levantamiento_Vistas.csv generado en el proceso
        del notebook 03_LAC_Logicas_Negocio_Bteq_SP_Vistas.ipynb
    '''       
    list_only_views = first_step()
    
    
    
    ''' ------------------------------------------------------------------- '''
    
    ###### PASO 2)
    '''
        Sección que crea un archivo con la lista unica de vistas y una serie de queries que contiene
        multiples SHOW QUALIFIED para analizarlas masivamente desde Airflow (proceso externo a este notebook)
        
        Solo se genera ese archivo llamado: Resultados/querys_show_qualified_para_airflow.txt
    '''    
    #create_file_for_airflow(list_only_views)

    
    ''' ------------------------------------------------------------------- '''
    
    
    
    ###### PASO 3)
    '''
        Sección que lee el Log que devuelve Airflow como respuesta a las tablas originales de 
        las vistas que se les realizó SHOW QUALIFIED.
        
        Condición: Insertar en la carpeta Logs_Airflow/ el resultado de logs que arrojó Airflow
        como respuesta al paso 2)
    '''        
    #extract_tables_from_log_airflow()

2020-05-11 19:04:38.096386
ELIMINANDO Y CREANDO TABLAS..

INSERTANDO ARCHIVOS A LAS TABLAS..

