# BIF - Basic Import Framework
  Framework de ingestão básica de dados no padrão Nestlé.
      Serão criadas para o dado ingestionado as camadas :
    - RAW -----> camada bronze
    - STAGE ---> camada silver
    - CURATED -> camada gold

## Import de bibliotecas

In [0]:
V_STEP = '#I# IMPORTACAO DE LIBS.'
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, regexp_replace, udf, concat_ws
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from datetime import datetime
import chardet

In [0]:
%python
V_STEP = '#I# RECEPCAO DE PARAMETROS.'
# Ultima camada para processar
# camadas válidas: bronze, silver, gold
try:
  par_processar_ate = dbutils.widgets.get("par_processar_ate")
  print('# Processar até a camada => ' + str(par_processar_ate))
except:
  print('#E# Parametro par_processar_ate nao informado.')
  print('#E# PARAMETRO OBRIGATORIO.')
  par_processar_ate = None
  #dbutils.notebook.exit(1)

# recebe parametro - nome do CSV a ingestionar
try:
  par_nome_csv = dbutils.widgets.get("par_nome_csv")
  print('# Arquivo CSV para processar => ' + par_nome_csv)
except:
  print('#E# Parametro par_nome_csv não informado.')
  par_nome_csv = None

# recebe parametro - nome do XLSX a ingestionar
try:
  par_nome_xlsx = dbutils.widgets.get("par_nome_xlsx")
  print('# Arquivo XLSX para processar => ' + par_nome_xlsx)
except:
  print('#E# Parametro par_nome_xlsx não informado.')
  par_nome_xlsx = None

# recebe parametro - caminho do arquivo
try:
  par_path_arquivo = dbutils.widgets.get("par_path_arquivo")
  print('# Path do arquivo => ' + par_path_arquivo)
except:
  print('#E# Parametro par_path_arquivo não informado.')
  par_path_arquivo = None

# recebe parametro - nome da tabela a ingestionar
try:
  par_nome_tabela = dbutils.widgets.get("par_nome_tabela")
  print('# Tabela para processar => ' + par_nome_tabela)
except:
  print('#E# Parametro par_nome_tabela não informado.')
  par_nome_tabela = None

# recebe parametro - nome do schema que sera utilizado
try:
  v_schema = dbutils.widgets.get("par_schema_destino")
  print('# Schema destino => ' + str(par_schema_destino))
except:
  print('#E# Schema destino não informado.')
  print('#E# PARAMETRO OBRIGATORIO.')
  v_schema = None
  #dbutils.notebook.exit(1)

In [0]:
#--- TESTE - PODE SER EXCLUIDO
par_nome_csv = 'produtos2'
par_path_arquivo = 'x'

In [0]:
%python
V_STEP = '#I# IDENTIFICAO DE ORIGEM DO DADO.'
fl_escape = 'N'

if par_nome_csv is None:
  v_csv = 0
else:
  v_csv = 1
  v_origem = 'CSV'

if par_nome_xlsx is None:
  v_xls = 0
else:
  v_xls = 1
  v_origem = 'XLSX'

if par_nome_tabela is None:
  v_tab = 0
else:
  v_tab = 1
  v_origem = 'TABELA'
  
v_total_fontes = v_csv + v_xls + v_tab

if v_total_fontes == 0:
  print('#E# OBRIGATÓRIO INFORMAR UMA ORIGEM DE DADOS (ARQUIVO OU TABELA).')
  fl_escape = 'Y'
elif v_total_fontes > 1:
  print('#E# SOMENTE UMA ORIGEM PERMITIDA. AJUSTES OS PARAMETROS E REEXECUTE O PROCESSO.')
  fl_escape = 'Y'
else:
  v_msg = '# ORIGEM DOS DADOS IDENTIFICADA: {0}'.format(v_origem)
  if v_origem == 'CSV' or v_origem == 'XLSX':
    if par_path_arquivo is None:
      print('#E# OBRIGATORIO INFORMAR PATH PARA INGESTAO DE ARQUIVOS.')
      fl_escape = 'Y'
    else:
      pass
  print(v_msg)
  
if fl_escape == 'Y':
  #dbutils.notebook.exit(1)
  print('entrou')

In [0]:
%python
V_STEP = '#I# DEFINICAO DE VARIAVEIS DO PROCESSO.'
# variáveis base
v_schema = 'sales'
v_raiz_lake = '/mnt/amsbradls2a4a/'

# schemas
v_schema_bronze = v_schema + '_bronze'
print('# v_schema_bronze => ' + v_schema_bronze)
v_schema_silver = v_schema + '_silver'
print('# v_schema_silver => ' + v_schema_silver)
v_schema_gold   = v_schema + '_gold'
print('# v_schema_gold   => ' + v_schema_gold)
print('#')

# table
v_table = 'tb_delago_teste_produto'

# locations
v_loc_bronze = v_raiz_lake + ''
v_loc_silver = v_raiz_lake + ''
v_loc_gold   = v_raiz_lake + ''

# Definição de funções

In [0]:
%run /sv_utilities/transform_data/commons

In [0]:
%python
V_STEP = '#I# DEFINICAO DE FUNCOES DO PROCESSO.'

def verif_campo_integer(nome_campo, tabela):
  try:
    rc_code = 0
    tot_null=spark.sql('''
    select sum(if(CAST({nome_campo} AS BIGINT) is null,1,0)) as total_nulos 
    from {tabela}
    '''.format(nome_campo = nome_campo, tabela = tabela)).collect()[0][0]
    #print(tot_null)
    if tot_null >= 1:
      msg = '{nome_campo} inválido encontrado.'.format(nome_campo = nome_campo)
      rc_code = 'NOK'
    else:
      msg = 'Coluna {nome_campo} sem erros.'.format(nome_campo = nome_campo)
      rc_code = 'OK'
  except:
    msg = 'erro inesperado'
    rc_code = 9
  finally:
    #print('# ' + msg)
    return rc_code, msg
  
def verif_campo_double(nome_campo, tabela):
  try:
    rc_code = 0
    tot_null=spark.sql('''
    select sum(if(CAST({nome_campo} AS DOUBLE) is null,1,0)) as total_nulos 
    from {tabela}
    '''.format(nome_campo = nome_campo, tabela = tabela)).collect()[0][0]
    #print(tot_null)
    if tot_null >= 1:
      msg = '{nome_campo} inválido encontrado.'.format(nome_campo = nome_campo)
      rc_code = 'NOK'
    else:
      msg = 'Coluna {nome_campo} sem erros.'.format(nome_campo = nome_campo)
      rc_code = 'OK'
  except:
    msg = 'erro inesperado'
    rc_code = 9
  finally:
    #print('# ' + msg)
    return rc_code, msg
  
def pega_virgula(nome_campo, tabela):
  """Busca se existe o caracter virgula na coluna"""
  try:
    rc_code = spark.sql('''
    SELECT IF(COUNT({nome_campo}) > 0,1,0) as fl_virgula FROM {tabela}
    WHERE {nome_campo} LIKE "%,%"
    '''.format(nome_campo = nome_campo, tabela = tabela)).collect()[0][0]
    #print(rc_code)
  except:
    rc_code = 'ERRO'
    msg_code = '# Ocorreu um erro inesperado.'
  finally:
    if rc_code == 0:
      msg_code = '# Consulta Efetuada com Sucesso. Nao ha virgulas.'
      rc_code = 'NOK'
    elif rc_code == 1:
      msg_code = '# Consulta Efetuada com Sucesso. Virgulas encontradas.'
      rc_code = 'OK'
    
    return rc_code, msg_code

def pega_pto(nome_campo, tabela):
  """Busca se existe o caracter ponto na coluna"""
  try:
    rc_code = spark.sql('''
    SELECT IF(COUNT({nome_campo}) > 0,1,0) as fl_pto FROM {tabela}
    WHERE {nome_campo} LIKE "%.%"
    '''.format(nome_campo = nome_campo, tabela = tabela)).collect()[0][0]
  except:
    rc_code = 'ERRO'
    msg_code = '# Ocorreu um erro inesperado.'
  finally:
    if rc_code == 0:
      msg_code = '# Consulta Efetuada com Sucesso. Nao ha pontos.'
      rc_code = 'NOK'
    elif rc_code == 1:
      msg_code = '# Consulta Efetuada com Sucesso. Pontos encontrados.'
      rc_code = 'OK'
    
    return rc_code, msg_code
  
def pega_numerico(nome_campo,tabela):
  """Pega se campo eh somente numeros"""
  try:
    v_total = spark.sql('''
    SELECT sum(case when cast(replace(replace({nome_campo},",",""),".","") as int) is null then
       1 else 0 end) as fl_int
    FROM {tabela}
    '''.format(nome_campo = nome_campo, tabela = tabela)).collect()[0][0]
    
    if v_total > 0:
      rc_code = 1
    else:
      rc_code = 0
  except:
    rc_code = 'ERRO'
  finally:
    if rc_code == 0:
      msg_code = '# Consulta Efetuado com Sucesso. Campo possui soh numeros.'
      rc_code = 'OK'
    elif rc_code == 1:
      msg_code = '# Consulta Efetuada com Sucesso. Campo possui caracteres texto ou especiais.'
      rc_code = 'NOK'
    else:
      msg_code = '# Erro inesperado.'
      
    return rc_code, msg_code

def apply_date_pattern(input_date_to_convert):
  '''
  Aplica formato padrao para campo de data.
  Entrada:
   input_date - Formato String - data a ser formatada.
   Formatos válidos aceitos:
    YYYY-MM-DD
    DD-MM-YYYY
    YYYY.MM.DD
    DD.MM.YYYY
    YYYY/MM/DD
    DD/MM/YYYY
  Saída
   saida_dt - Formato String - Data Formatada em formato (yyyy-MM-dd)

  Library necessária:
  from pyspark.sql.types import StringType
  from pyspark.sql.functions import udf
  '''
  data_entrada = input_date_to_convert
  saida_dt = None
  retorno_dt = None
  try:
    data_com_traco = data_entrada.replace('/','-').replace('.','-')
    if len(data_com_traco) != 10:
      raise Exception('# Consulta Efetuado com Sucesso. Tamanho da data inválida! Tamanho esperado deve ser 10.')

    if data_com_traco.find('-') == 4:
      ano = data_com_traco[:4]
      mes = data_com_traco[5:7]
      dia = data_com_traco[8:]
    else:
      dia = data_com_traco[:2]
      mes = data_com_traco[3:5]
      ano = data_com_traco[6:]
  
    saida = ano + '-' + mes + '-' + dia
    saida_dt = datetime.strptime(saida,'%Y-%m-%d')
  except ValueError:
    print('# Consulta Efetuado com Sucesso. Data informada invalida. Verifique o dia ou mes se esta valido em um calendario.')
  if saida_dt is None:
    retorno_dt = 'NULL'
  else:
    retorno_dt = str(saida_dt)[:11]
  return retorno_dt
spark.udf.register('apply_date_pattern',apply_date_pattern)  

def verifica_formato_data(v_nome_coluna,v_nome_tabela):
  '''
  Valida formato de data de coluna
  Entrada:
   v_nome_tabela - Formato String - nome da tabela ou TempView.
   v_nome_coluna - Formato String - nome da coluna de data a validar. 
  Saída:
   rc_code - Formato Integer - código de retorno
   msg_code - Formato String - Mensagem de retorno.
  Library necessária:
   from pyspark.sql.functions import concat_ws
  '''
  try:
    v_tam_min = spark.sql('select min(length({0})) as tam_min from {1} '.format(v_nome_coluna,v_nome_tabela)).collect()[0][0]
    if v_tam_min != 10:
      rc_code = 'NOK'
      msg_code = ' Tamanho minimo do campo data invalido. Tamanho padrão deve ser 10 bytes.'
    else:
      v_total_strg = spark.sql('''
      SELECT count(*) as TOTAL FROM {1}
      WHERE cast(replace(replace(replace({0},".",""),"/",""),"-","") as integer) IS NULL
      '''.format(v_nome_coluna, v_nome_tabela)).collect()[0][0]
      if v_total_strg > 0:
        rc_code = 'NOK'
        msg_code = '#E# Carater invalido encontrado na data. Verifique a coluna {0}.'.format(v_nome_coluna)
      else:
        rc_code = 'OK'
        msg_code = "# Verificação efetuada. Coluna passivel de formatação."  
        v_total_nulos = spark.sql('''SELECT count(*) as tot_regs_null FROM {1}
                                     WHERE to_date(apply_date_pattern({0}),"yyyy-MM-dd") IS NULL
                                  '''.format(v_nome_coluna,v_nome_tabela)).collect()[0][0]
        if str(v_total_nulos) != '0':
          rc_code = 'NOK'
          msg_code = '#E# Encontrado dia ou mês fora do range permitido. Ranges permitidos p/ dias 01 à 31 e p/ meses 01 à 12.'
  except:
    rc_code = 9
    msg_code = '# Ocorreu uma exception. Erro não mapeado. Necessária análise do erro.'
  finally:
    return rc_code,msg_code
  
# PROXIMA IMPLEMENTACAO

## identifica o formato do arquivo csv
def ler_csv(v_path, v_nome_arquivo):
  '''
  Identifica o formato do arquivo CSV que está sendo lido.
  Entrada:
    v_path - string - caminho no datalake onde se encontra o arquivo.
    v_nome_arquivo - string - nome do arquivo ingestionado.
  Saída:
    df_csv - dataframe spark - conteúdo do arquivo em formato de dataframe.
  '''
  try:
    rc_code = 0
    if v_path[0:5] == 'dbfs:':
      v_path = v_path.replace('dbfs:','/dbfs')
    
    if v_path[-1] == '/':
      v_path_csv = v_path + v_nome_arquivo
    else:
      v_path_csv = v_path + '/' + v_nome_arquivo
    
    print('# Leitura do CSV => ' + v_path_csv)
    
    with open(r"{0}".format(v_path_csv),"rb") as f:
      result = chardet.detect(f.read())
      df_csv=spark.read.csv(r"{0}".format(v_path_csv[5:])
                                  ,encoding=result['encoding']
                                  ,sep=';'
                                  ,header=True)
      df_csv.count
  except:
    print('# Erro na leitura do CSV.')
    rc_code = 9
  finally:
    print(rc_code)
    return rc_code,df_csv

def ler_excel(v_path, v_nome_arquivo):
  '''
  Le arquivo CSV.
  Leitura somente da primeira aba (sheet1)
  '''
  try:
    rc_code = 0
    if v_path[0:5] == 'dbfs:':
      v_path = v_path.replace('dbfs:','/dbfs')
    
    if v_path[-1] == '/':
      v_path_xls = v_path + v_nome_arquivo
    else:
      v_path_xls = v_path + '/' + v_nome_arquivo
    
    print('# Leitura do XLS => ' + v_path_xls)
    df_xls = spark.read.format("com.crealytics.spark.excel") \
                       .option("useHeader", "true") \
                       .option("treatEmptyValuesAsNulls", "true") \
                       .option("inferSchema", "true") \
                       .option("addColorColumns", "False") \
                       .option("maxRowsInMey", 2000) \
                       .option("sheetName", "Import") \
                       .load(v_path_xls)
  except:
    rc_code = 9
    print('#E# Erro na leitura do Excel.')
  finally:
    return rc_code, df_xls

# Leitura da origem

In [0]:
# TESTE - PODE SER DELETADO
#
#df_original = spark.read.csv('dbfs:/mnt/amsbradls2a4a/inbound/external-files/sellout/arq-auxiliares/produtos.txt',sep=';',header=True)
#df_original = spark.read.csv('dbfs:/mnt/amsbradls2a4a/inbound/external-files/sellout/arq-auxiliares/produtos2.txt',sep=';',header=True)
par_path_arquivo = 'dbfs:/mnt/amsbradls2a4a/inbound/external-files/sellout/arq-auxiliares/'
par_nome_csv = 'produtos2.txt'

In [0]:
%python
V_STEP = '#I# LEITURA DA ORIGEM DOS DADOS.'

if v_origem == 'CSV':
  rc_code, df_original = ler_csv(par_path_arquivo,par_nome_csv)
elif v_origem == 'XLS':
  rc_code, df_original = ler_excel(par_path_arquivo,par_nome_csv)
  pass
else:
  print('# Efetuando leitura da tabela.')
  df_original = spark.sql('''SELECT * FROM {0}'''.format(par_nome_tabela))

In [0]:
df_original.show()

# Tratamento de Camadas

## Cria camada bronze (RAW)

In [0]:
%python
V_STEP = '#I# TRATAMENTO CAMADA BRONZE - IDENTIFICACAO E TRATAMENTO DE NOMES DAS COLUNAS ORIGEM.'
# Copia dataframe para montar campos tratados
df_bronze = df_original

for campo in df_original.columns:
  novo_campo = retira_acento(campo.upper().replace(' ','_'))
  df_bronze = df_bronze.withColumnRenamed(campo, novo_campo)
  
df_bronze.createOrReplaceTempView('tpv_df_bronze')

In [0]:
%python
V_STEP = '#I# TRATAMENTO CAMADA BRONZE - CRIACAO TABELA NA CAMADA BRONZE.'

#
# TESTE -  ESSA LINHA PODE SER EXCLUIDA - INICIO
v_schema_bronze = 'default'
# TESTE -  ESSA LINHA PODE SER EXCLUIDA - FIM

v_mnt_tb = '/mnt/amsbradls2a4a/inbound/external-files/sellout/teste_delago'

# Exclui a tabela caso exista
lst_tb = spark.sql('''show tables in {0} like "{1}" '''.format(v_schema_bronze,v_table)).count()
print(lst_tb)
if lst_tb != 0:
  print('# Ja existe tabela com esse nome. Recriando a tabela.')
  print('#')
  v_mnt_tb = spark.sql('''desc detail {v_schema_table}.{v_table}'''.format(v_schema_table = v_schema_bronze
                                                                          ,v_table = v_table)).collect()[0][4]
  v_mnt_tb = v_mnt_tb.replace('dbfs:','')
  spark.sql('''DROP TABLE if exists {v_schema_table}.{v_table}'''.format(v_schema_table = v_schema_bronze
                                                                        ,v_table = v_table))
  print(v_mnt_tb)
  dbutils.fs.rm('{0}'.format(v_mnt_tb),True)

query_bronze=('''
CREATE TABLE if not exists {v_schema_table}.{v_table}
USING delta
LOCATION "{v_location}"
AS
SELECT *
FROM tpv_df_bronze
'''.format(v_schema_table = v_schema_bronze
          ,v_location = v_mnt_tb
          ,v_table = v_table))
print(query_bronze)
spark.sql(query_bronze)
print('# Tabela criada.')

In [0]:
%python
spark.sql('desc formatted {0}.{1}'.format(v_schema_bronze,v_table)).show(100,truncate=False)

# Cria Camada Silver

In [0]:
%sql
--# TESTE - PODE SER EXCLUIDO
--# Mais casos de testes
insert into default.tb_delago_teste_produto
values('ARROZ CAMIL PCT. 5 KG.','2021-02-27','5,10','COMIDA');
insert into default.tb_delago_teste_produto
values('APPLE IMAC 15 POL.','2020-02-27','15600','INFO');

In [0]:
%python
V_STEP = '#I# TRATAMENTO CAMADA SILVER - IDENTIFICA NOMES DOS CAMPOS ORIGINAIS.'
df_original.columns

lst_campos_originais= []

for campo in df_original.columns:
  lst_campos_originais.append(campo)

lst_campos_originais

In [0]:
%python
V_STEP = '#I# TRATAMENTO CAMADA SILVER - IDENTIFICA NOMES DA CAMADA BRONZE.'
lst_campos_tratados = []

for campo in df_bronze.columns:
  lst_campos_tratados.append(campo)
  
lst_campos_tratados

In [0]:
# TESTE - PODE SER REMOVIDO
# PODE SER REMOVIDO - INCLUINDO SOMENTE PARA PEGAR NOVOS CASOS DE TESTE
df_bronze = spark.sql('''
select * from {v_schema_table}.{v_table}
'''.format(v_schema_table = v_schema_bronze
          ,v_table = v_table))
df_bronze.createOrReplaceTempView('tpv_df_bronze')
df_bronze.show()

In [0]:
# TESTE - PODE SER REMOVIDO
spark.sql('desc tpv_df_bronze').show()

In [0]:
%python
V_STEP = '#I# TRATAMENTO CAMADA SILVER - PADRONIZA DADOS (UPPER) E RETIRA ACENTOS.'
# padroniza dados nas colunas (retirada de acentos e aplicação de uppercase)
df_silver = df_bronze.selectExpr(['retira_acento(UPPER(' + campo + ')) AS ' + campo for campo in df_bronze.columns])
df_silver.createOrReplaceTempView('tpv_silver')

In [0]:
# TESTE - PODE SER EXCLUIDO
df_silver.show()

In [0]:
%python
V_STEP = '#I# TRATAMENTO CAMADA SILVER - CRIA LISTA DE REGRAS PARA IDENTIFICAO DO TIPO DE DADO.'
#    0             1          2          3           4
#[[nome_campo,fl_virgula,fl_ponto,fl_only_number,fl_data]]
lst_campo_tipo = [[],[],[],[],[]]
lst_campo_tipo[0].append('NOME_CAMPO')
lst_campo_tipo[1].append('PEGA_VIRGULA')
lst_campo_tipo[2].append('PEGA_PTO')
lst_campo_tipo[3].append('SOH_NUMEROS')
lst_campo_tipo[4].append('DATA_VALIDA')
for idx,nome_campo in enumerate(lst_campos_tratados):
  print(idx ,nome_campo)
  
  lst_campo_tipo[0].append(nome_campo)
  #rc_code = verif_campo_double(nome_campo,'tpv_df2')
  rc_code,msg_code = pega_virgula(nome_campo,'tpv_silver')
  print(msg_code)
  lst_campo_tipo[1].append(rc_code)
  
  rc_code,msg_code = pega_pto(nome_campo,'tpv_silver')
  print(msg_code)
  lst_campo_tipo[2].append(rc_code)
  
  rc_code,msg_code = pega_numerico(nome_campo,'tpv_silver')
  print(msg_code)
  lst_campo_tipo[3].append(rc_code)
  #print(rc_code)
  
  rc_code,msg_code = verifica_formato_data(nome_campo,'tpv_silver')
  print(msg_code)
  lst_campo_tipo[4].append(rc_code)

In [0]:
V_STEP = '#I# TRATAMENTO CAMADA SILVER - LISTA DE IDENTIFICAO DO TIPO DE CAMPO.'
lst_campo_tipo

In [0]:
%python
V_STEP = '#I# TRATAMENTO CAMADA SILVER - FORMATA TIPO DE CAMPO PARA QUERY DE CREATE DA CAMADA SILVER.'
v_campos_query = ''

for idx_lst_1,valor_lst_1 in enumerate (lst_campo_tipo):
  # verifica se eh numerico
  #print(idx_lst_1)
  print(valor_lst_1)
  if idx_lst_1 != 0:
    break
  for idx_lst_2,valor_lst_2 in enumerate (valor_lst_1):
    
    if idx_lst_2 == 0:
      print('# Sub-lista de ' + str(valor_lst_1[0])) 
    else:
      v_nome_campo = lst_campo_tipo[0][idx_lst_2]
      print('# Campo => ' + v_nome_campo)

      # Trata campo numerico
      #-- É UM CAMPO NUMERICO? OK = SIM / NOK = NAO
      if lst_campo_tipo[3][idx_lst_2] == 'OK': 
        
        #-- POSSUI VIRGULA NO NUMERO? OK = SIM / NOK = NAO
        if lst_campo_tipo[1][idx_lst_2] == 'OK': 
          print('# Campo numerico com virgulas encontrado.')
          print('#--------------------------------------------------#')
          print('# ATENÇÃO! SEPARADOR DE DECIMAIS DEVE SER O PONTO. #')
          print('# SERAO REMOVIDAS TODAS AS VIRGULAS DO CAMPO.      #')
          print('#--------------------------------------------------#')
          print('# CASO SEPARADOR DE DECIMAIS SEJA VIRGULA, DEVE-SE #')
          print('#  SUBSTITUIR POR PONTO E REINICIAR O PROCESSO.    #')
          print('#--------------------------------------------------#')
        
        #-- POSSUI PONTO NO NUMERO? OK = SIM / NOK = NAO
        if lst_campo_tipo[2][idx_lst_2] == 'OK':  
          # PODE SER FLOAT OU DOUBLE OU DECIMAL
          v_linha = ' CAST(REPLACE({0},",","") AS DECIMAL(15,3)) AS {0}, \n'.format(v_nome_campo)
        else:
          # EH INTEGER
          v_linha = ' CAST({0} AS INTEGER) AS {0}, \n'.format(v_nome_campo)
      else:
        # CAMPO PODE SER FORMATADO COMO DATA
        if lst_campo_tipo[4][idx_lst_2] == 'OK':
          v_linha = ' to_date(apply_date_pattern({0}),"yyyy-MM-dd") AS {0}, \n'.format(v_nome_campo)
          pass
        else:
          v_linha = ' CAST({0} AS STRING) AS {0}, \n'.format(v_nome_campo)
        
      v_campos_query = v_campos_query + v_linha

v_campos_query_final = v_campos_query[:-3]

In [0]:
%python
V_STEP = '#I# TRATAMENTO CAMADA SILVER - FORMATA PRIMEIRA PARTE DA QUERY DA CAMADA SILVER.'

v_linha_create_silver = '''
CREATE TABLE if not exists {0}.{1}
USING delta
LOCATION "{2}" 
AS
SELECT 
'''

In [0]:
v_campos_query_final

v_linha_create_gold = '''
SELECT 
'''

In [0]:
%python
V_STEP = '#I# TRATAMENTO CAMADA SILVER - CRIA TABELA DA CAMADA SILVER.'

v_schema_silver = 'default'
v_table_final = v_table + '_silver'
v_mnt_silver = '/mnt/amsbradls2a4a/inbound/external-files/sellout/teste_delago_silver'

spark.sql('set spark.sql.legacy.timeParserPolicy = LEGACY')

#var_query_silver = v_linha_create_gold + v_campos_query_final + '\n FROM tpv_silver'
var_query_silver = v_linha_create_silver.format(v_schema_silver,v_table_final,v_mnt_silver) + v_campos_query_final + '\n FROM tpv_silver'
print(var_query_silver)
print('#')

lst_tb = spark.sql('''show tables in {0} like "{1}" '''.format(v_schema_silver,v_table_final)).count()
print('# Tabelas encontradas => ' + str(lst_tb))
print('#')

if str(lst_tb) != '0':
  print('# Ja existe tabela com esse nome. Recriando a tabela.')
  print('#')
  v_mnt_tb_silver = spark.sql('''desc detail {v_schema_table}.{v_table}'''.format(v_schema_table = v_schema_silver
                                                                                ,v_table = v_table_final)).collect()[0][4]
  v_mnt_silver = v_mnt_tb_silver.replace('dbfs:','')
  spark.sql('''DROP TABLE if exists {v_schema_table}.{v_table}'''.format(v_schema_table = v_schema_silver
                                                                        ,v_table = v_table_final))
  print(v_mnt_silver)
  dbutils.fs.rm('{0}'.format(v_mnt_silver),True)
  print('# Tabela excluída!')

spark.sql(var_query_silver)
print('# Tabela criada!')

In [0]:
%sql
-- TESTE - SOMENTE PARA VALIDACAO DA TABELA - PODE SER EXCLUIDO
select * from default.tb_delago_teste_produto_silver

PRODUTO,DATA_DA_COMPRA,PRECO,CATEGORIA
FOX,2020-07-30,40000.0,VEICULO
REVISTA VEJA,2021-02-15,9.9,REVISTARIA
BOLA DE FUTEBOL,2019-01-30,10.0,BRINQUEDOS
BONECA,2021-03-12,12.5,BRINQUEDOS
HONDAXV,2020-11-10,14500.19,VEICULO
SENHOR DOS ANEIS,2021-06-06,39.9,REVISTARIA
FREESBE,2020-01-01,2.99,BRINQUEDOS
SHAMPOO,2021-01-18,1.99,PERFUMARIA
DOVE SABONETE,2021-01-18,0.99,PERFUMARIA
HOMME COLONIA,1999-09-09,30.99,PERFUMARIA


In [0]:
# TESTE - SOMENTE PARA VALIDACAO DA TABELA - PODE SER EXCLUIDO
spark.sql('DESC default.tb_delago_teste_produto_silver').show(truncate=False)

## Cria Camada Gold

In [0]:
%python
V_STEP = '#I# TRATAMENTO CAMADA GOLD - CRIA TABELA DA CAMADA GOLD.'

v_schema_gold = 'default'
v_table_final = v_table + '_gold'

lst_tb = spark.sql('''show tables in {0} like "{1}" '''.format(v_schema_gold,v_table_final)).count()
print('# Tabelas encontradas => ' + str(lst_tb))
print('#')

if str(lst_tb) != '0':
  print('# Ja existe tabela com esse nome. Recriando a tabela.')
  print('#')
  v_mnt_tb_gold = spark.sql('''desc detail {v_schema_table}.{v_table}'''.format(v_schema_table = v_schema_gold
                                                                                ,v_table = v_table_final)).collect()[0][4]
  v_mnt_gold = v_mnt_tb_gold.replace('dbfs:','')
  spark.sql('''DROP TABLE if exists {v_schema_table}.{v_table}'''.format(v_schema_table = v_schema_gold
                                                                        ,v_table = v_table_final))
  print(v_mnt_gold)
  dbutils.fs.rm('{0}'.format(v_mnt_gold),True)
  print('# Tabela excluída!')
else:
  v_mnt_gold = '/mnt/amsbradls2a4a/inbound/external-files/sellout/teste_delago_gold'

v_query_gold = ('''
CREATE TABLE if not exists {0}.{1}
USING delta
LOCATION "{2}"
AS
SELECT * FROM tpv_silver
'''.format(v_schema_gold,v_table_final,v_mnt_gold))
print(v_query_gold)

spark.sql(v_query_gold)

print('# Tabela criada!')

In [0]:
%sql
select * from default.tb_delago_teste_produto_gold

PRODUTO,DATA_DA_COMPRA,PRECO,CATEGORIA
FOX,2020-07-30,40000.0,VEICULO
REVISTA VEJA,2021-02-15,9.9,REVISTARIA
BOLA DE FUTEBOL,2019-01-30,10.0,BRINQUEDOS
BONECA,2021-03-12,12.5,BRINQUEDOS
HONDAXV,2020-11-10,14500.19,VEICULO
SENHOR DOS ANEIS,2021-06-06,39.9,REVISTARIA
FREESBE,2020-01-01,2.99,BRINQUEDOS
SHAMPOO,2021-01-18,1.99,PERFUMARIA
DOVE SABONETE,2021-01-18,0.99,PERFUMARIA
HOMME COLONIA,1999-09-09,30.99,PERFUMARIA


In [0]:
#-- TESTE - SOMENTE PARA VALIDACAO DA TABELA - PODE SER EXCLUIDO
df_original.show()

In [0]:
%sql
-- TESTE - SOMENTE PARA VALIDACAO DA TABELA - PODE SER EXCLUIDO
desc formatted default.tb_delago_teste_produto_gold_final

col_name,data_type,comment
PRODUTO,string,
DATA_DA_COMPRA,string,
PRECO,string,
CATEGORIA,string,
,,
# Partitioning,,
Not partitioned,,
,,
# Detailed Table Information,,
Name,default.tb_delago_teste_produto_gold_final,


In [0]:
print('#')
print('#-------------- FIM DO PROCESSAMENTO --------------#')