<center><H2> (DataMart) </H2></center>
---

In [0]:
import os
import sys
import time
import json
import random
import shutil
import logging
import hashlib
import requests
from delta.tables import *
from datetime import datetime
from datetime import timedelta
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import udf
from pyspark.sql import functions as F
from pyspark.sql.dataframe import DataFrame

PATH_MNT = "/mnt/datalake/bkt-cleansing-dev/brazil/"
PATH_DATAMART = "/mnt/datalake/bkt-datamart-dev/arrears/datamart/"

In [0]:
@udf("string")
def get_hash_key(values: str):
    '''
        retorna a hashkey da linha (identificador único)
    '''
    hash_key = hashlib.md5(values.encode('utf-8')).hexdigest()
    return hash_key 

In [0]:
def load_data_saldo_parcela_rdl() -> DataFrame:
    '''
        carrega os dados vinculados ao último e penúltimo arquivos processados no saldo parcela. 
    '''
    BASE_SALDO_PARCELA_RDL = "/dbfs/" + PATH_MNT + "/saldo_parcela_rdl/"
    PATH_SALDO_PARCELA_RDL = [root.replace("/dbfs/", "") + "/" for root, subdirs, files in os.walk(BASE_SALDO_PARCELA_RDL) if len(files) > 0][-2:]
    
    dataset = spark\
        .read\
        .format("orc")\
        .load(PATH_SALDO_PARCELA_RDL)\
        .withColumn("_", F.lit(1))\
        .withColumn(
            "DATE_OFFSET", 
            F.dense_rank()\
            .over(
                Window()\
                  .partitionBy("_")\
                  .orderBy(F.col("DataExportacao").cast(DateType()).desc())
            )
        )\
        .withColumnRenamed("COD_DESEMBOLSO", "COD_CONTRATO")\
        .where(F.col("COD_TIPO_PARCELA").isin(['CP', 'CPCM', 'CJR', 'CJRCM', 'CMR', 'CRFPR', 'CRFJR', 'CRFJC', 'CRFMR']))\
        .drop("_")\
        .distinct()
    
    return dataset

In [0]:
def load_data_operacao_rdl() -> DataFrame:
    '''
        carrega os dados vinculados ao último e penúltimo arquivos processados no saldo parcela. 
    '''
    BASE_OPERACAO_RDL = "/dbfs/" + PATH_MNT + "funcao/operacao_rdl/"
    PATH_OPERACAO_RDL = [root.replace("/dbfs/", "") + "/" for root, subdirs, files in os.walk(BASE_OPERACAO_RDL) if len(files) > 0][-2:]
    
    dataset = spark\
        .read\
        .format("orc")\
        .load(PATH_OPERACAO_RDL)\
        .withColumn("_", F.lit(1))\
        .withColumn(
            "DATE_OFFSET", 
            F.dense_rank()\
            .over(
                Window()\
                  .partitionBy("_")\
                  .orderBy(F.col("DataExportacao").cast(DateType()).desc())
            )
        )\
        .withColumnRenamed("COD_DESEMBOLSO", "COD_CONTRATO")\
        .select("DATE_OFFSET", "DTA_REFERENCIA", "COD_CONTRATO", "COD_CLIENTE", "COD_COUNTRY_BOOK")\
        .distinct()
    
    return dataset

In [0]:
def load_data_elcm() -> DataFrame:
    '''
        carrega os dados vinculados ao último e penúltimo arquivos processados no elcm 
    '''
    BASE_ELCM = "/dbfs/" + PATH_MNT + "XML/position/"
    PATH_ELCM = [root.replace("/dbfs/", "") + "/" for root, subdirs, files in os.walk(BASE_ELCM) if len(files) > 0][-2:]
    
    dataset = spark\
      .read\
      .format("orc")\
      .load(PATH_ELCM).withColumn("_", F.lit(1))\
          .withColumn(
              "DATE_OFFSET", 
              F.dense_rank()\
              .over(
                  Window()\
                    .partitionBy("_")\
                    .orderBy(F.to_date("DataExportacao").desc())
              )
      )\
      .withColumnRenamed("trade_ref_id", "TRADE_REF_ID")\
      .withColumnRenamed("dta_referencia", "DTA_REFERENCIA_FIX")\
      .withColumnRenamed("facility_id", "FACILITY_ID")\
      .withColumnRenamed("facility_global_facility_id", "GLOBAL_FACILITY_ID")\
      .filter(F.col("DATE_OFFSET").isin([1, 2]))
    
    return dataset

In [0]:
def load_data_ods() -> DataFrame:
    '''
        carrega os dados vinculados ao último e penúltimo arquivos processados no elcm 
    '''
    BASE_ELCM = "/dbfs/" + PATH_MNT + "ods/vw_alocacao_credito/"
    PATH_ELCM = [root.replace("/dbfs/", "") + "/" for root, subdirs, files in os.walk(BASE_ELCM) if len(files) > 0][-2:]
    
    dataset = spark\
        .read\
        .format("orc")\
        .load(PATH_ELCM)\
        .withColumn("_", F.lit(1))\
        .withColumn(
            "DATE_OFFSET", 
            F.dense_rank()\
            .over(
                Window()\
                  .partitionBy("_")\
                  .orderBy(F.to_date("DataExportacao").desc())
            )
        )\
        .withColumnRenamed("COD_CORRESP_SIST_ORIGEM", "COD_CONTRATO")\
        .withColumn("TRADE_REF_ID", 
            F.concat(F.col("DTA_ANO_OPERACAO").cast(IntegerType()), 
                     F.lit("."), 
                     F.col("SEQ_OPERACAO").cast(IntegerType())
            )
         )\
        .filter(F.col("COD_SISTEMA_LEGADO") == "FUNCAO")\
        .select("TRADE_REF_ID", "COD_CONTRATO", "DATE_OFFSET")
    
    return dataset

In [0]:
def table_exists(table: str) -> bool:
    '''
        checa se a tabela do (delta lake) existe
    '''
    ret = True
    try:
      dir_ = dbutils.fs.ls(table)
    except:
       ret = False
    return ret

In [0]:
def create_enviroment():
    '''
        cria a tabela destino, se não existir
    '''
    fields = [
        StructField("arrearsEventCode", StringType(), True),
        StructField("arrearsEventDateTime", TimestampType(), True),
        StructField("effectiveEventDate", TimestampType(), True),
        StructField("arrears", StringType(), True),
        StructField("tradeId", StringType(), True),
        StructField("tradeSourceSystemCode", StringType(), True),
        StructField("facilityId", StringType(), True),
        StructField("facilitySourceSystemCode", StringType(), True),
        StructField("originalDueDate",TimestampType(), True),
        StructField("arrearsCurrency", StringType(), True),
        StructField("arrearsAmount", DecimalType(32,18), True),
        StructField("arrearsLevelOfApplication", StringType(), True),
        StructField("levelOfApplicationId", StringType(), True),
        StructField("levelOfApplicationCode", StringType(), True),
        StructField("levelOfApplicationSourceSystemCode", StringType(), True),
        StructField("businessUnit", StringType(), True),
        StructField("code", StringType(), True),
        StructField("subCode", StringType(), True),
        StructField("arrearsEventSourceSystemCode", StringType(), True),
        StructField("arrearsEventUserId", StringType(), True),
        StructField("cod_backoffice_origem", StringType(), True),
        StructField("dsc_backoffice_origem", StringType(), True),
        StructField("suspensionReasonCode", StringType(), True),
        StructField("data_criacao_datamart", TimestampType(), True),
        StructField("hash_key", StringType(), True)        
    ]

    schema = StructType(fields)
    dataset = sqlContext.createDataFrame(spark.sparkContext.emptyRDD(), schema)
       
    if not table_exists(PATH_TBL_DATAMART): 
      dataset.write.format("delta").mode("overwrite").save(PATH_TBL_DATAMART)

In [0]:
def load_auxiliary_data() -> DataFrame:
  '''
    carrega as informações adicionais contidas em outras tabelas
  '''
  
  dataset_elcm = load_data_elcm().cache()
  dataset_ods = load_data_ods().cache() 
  operacao_rdl = load_data_operacao_rdl().cache()
  
  ds_join = operacao_rdl.alias("a")\
    .join(F.broadcast(dataset_ods).alias("b"), "COD_CONTRATO", "left_outer")\
    .join(F.broadcast(dataset_elcm).alias("c"), "TRADE_REF_ID", "left_outer")\
    .select(
        "DTA_REFERENCIA", 
        "COD_CONTRATO", 
        "TRADE_REF_ID", 
        "COD_CLIENTE", 
        "COD_COUNTRY_BOOK", 
        "base_date",
        "limit_id", 
        "FACILITY_ID",
        "facility_local_facility_id", 
        "GLOBAL_FACILITY_ID", 
        "DTA_REFERENCIA_FIX",
        "a.DATE_OFFSET"
    )\
    .cache()
  
  return ds_join

In [0]:
def process_workflow() -> DataFrame:
    '''
        processa os dados
    '''   
    # carrega o dataset principal
    saldo_parcela_rdl = load_data_saldo_parcela_rdl()
    
    # carrega o dataset auxiliar
    dataset_auxiliary = load_auxiliary_data()
    dataset_auxiliary_cur = dataset_auxiliary.filter(F.col("DATE_OFFSET") == F.lit(1))
    dataset_auxiliary_prv = dataset_auxiliary.filter(F.col("DATE_OFFSET") == F.lit(2))  
    
    # dados última data
    dataset_cur = saldo_parcela_rdl.alias("a")\
      .join(dataset_auxiliary_cur.alias("b"), ["COD_CONTRATO", "DATE_OFFSET"], "left_outer")\
      .where(F.col("a.DATE_OFFSET") == F.lit(1))\
      .groupby(
          "a.COD_CONTRATO",
          "a.SGL_MOEDA",
          "b.COD_COUNTRY_BOOK",
          "a.COD_BACKOFFICE_ORIGEM",
          "a.DSC_BACKOFFICE_ORIGEM",
          "a.DTA_VENCTO_PARCELA",
          "b.LIMIT_ID",
          "b.FACILITY_ID",
          "b.COD_CLIENTE",
          "b.TRADE_REF_ID",
          "b.GLOBAL_FACILITY_ID"
      )\
      .agg(
          F.max("a.DTA_REFERENCIA").alias("DTA_REFERENCIA"),
          F.sum("a.VLR_PARCELA").cast(DecimalType(32,18)).alias("VLR_PARCELA"),
          F.sum("a.VLR_SALDO_PARCELA").cast(DecimalType(32,18)).alias("VLR_SALDO_PARCELA")
      )

    # dados penúltima data
    dataset_prv = saldo_parcela_rdl.alias("a")\
      .join(dataset_auxiliary_prv.alias("b"), ["COD_CONTRATO", "DATE_OFFSET"], "left_outer")\
      .where(F.col("a.DATE_OFFSET") == F.lit(2))\
      .groupby(
          "a.COD_CONTRATO",
          "a.SGL_MOEDA",
          "b.COD_COUNTRY_BOOK",
          "a.COD_BACKOFFICE_ORIGEM",
          "a.DSC_BACKOFFICE_ORIGEM",
          "a.DTA_VENCTO_PARCELA",
          "b.LIMIT_ID",
          "b.FACILITY_ID",
          "b.COD_CLIENTE",
          "b.TRADE_REF_ID",
          "b.GLOBAL_FACILITY_ID"
      )\
      .agg(
          F.max("a.DTA_REFERENCIA").alias("DTA_REFERENCIA_PREV"),
          F.sum("a.VLR_PARCELA").cast(DecimalType(32,18)).alias("VLR_PARCELA_PREV"),
          F.sum("a.VLR_SALDO_PARCELA").cast(DecimalType(32,18)).alias("VLR_SALDO_PARCELA_PREV")
      )\
      .withColumnRenamed("COD_BACKOFFICE_ORIGEM", "COD_BACKOFFICE_ORIGEM_PREV")\
      .withColumnRenamed("DSC_BACKOFFICE_ORIGEM", "DSC_BACKOFFICE_ORIGEM_PREV")\
      .withColumnRenamed("SGL_MOEDA", "SGL_MOEDA_PREV")\
      .withColumnRenamed("COD_COUNTRY_BOOK", "COD_COUNTRY_BOOK_PREV")\
      .withColumnRenamed("DTA_VENCTO_PARCELA", "DTA_VENCTO_PARCELA_PREV")\
      .withColumnRenamed("LIMIT_ID", "LIMIT_ID_PREV")\
      .withColumnRenamed("COD_CLIENTE", "COD_CLIENTE_PREV")\
      .withColumnRenamed("TRADE_REF_ID", "TRADE_REF_ID_PREV")\
      .withColumnRenamed("FACILITY_ID", "FACILITY_ID_PREV")\
      .withColumnRenamed("GLOBAL_FACILITY_ID", "GLOBAL_FACILITY_ID_PREV")\

    # dataset final
    dataset_all = dataset_cur.alias("a").join(dataset_prv.alias("b"), "COD_CONTRATO", "left_outer")
    
    if dataset_prv.count() > 0:
      
      # converte as regras do workflow em flags
      dataset_rules = dataset_all\
          .withColumn("RULE_000_FACILITY_HAS_CHANGED", 
              F.when(
                  F.col("FACILITY_ID_PREV") != F.col("a.FACILITY_ID"), F.lit("YES")
              ).otherwise(F.lit("NO"))
          )\
          .withColumn("RULE_001_CURRENT_BALANCE_GT_ZERO", 
              F.when(
                  F.col("VLR_SALDO_PARCELA") > 0, F.lit("YES")
              ).otherwise(F.lit("NO"))
          )\
          .withColumn("RULE_002_CURRENT_BALANCE_EQUALS_ZERO", 
              F.when(
                  F.col("VLR_SALDO_PARCELA") == 0, F.lit("YES")
              ).otherwise(F.lit("NO"))
          )\
          .withColumn("RULE_003_CURRENT_BALANCE_LT_PREVIOUS_BALANCE", 
              F.when(
                  F.col("VLR_SALDO_PARCELA") >= F.col("VLR_SALDO_PARCELA_PREV"), F.lit("YES")
              ).otherwise(F.lit("NO"))
          )
    else:
        dataset_rules = dataset_all\
          .withColumn("RULE_000_FACILITY_HAS_CHANGED", F.lit("NO"))\
          .withColumn("RULE_001_CURRENT_BALANCE_GT_ZERO", F.lit("YES"))\
          .withColumn("RULE_002_CURRENT_BALANCE_EQUALS_ZERO", F.lit("NO"))\
          .withColumn("RULE_003_CURRENT_BALANCE_LT_PREVIOUS_BALANCE", F.lit("NO"))
    
    return dataset_rules

In [0]:
def process_result(dataset_src: DataFrame) -> DataFrame:
    '''
        gera o datamart
    '''   
    
    columns = [
     'arrearsEventCode',
     'arrearsEventDateTime',
     'effectiveEventDate',
     'arrears',
     'tradeId',
     'tradeSourceSystemCode',
     'facilityId',
     'facilitySourceSystemCode',
     'originalDueDate',
     'arrearsCurrency',
     'arrearsAmount',
     'arrearsLevelOfApplication',
     'levelOfApplicationId',
     'levelOfApplicationCode',
     'levelOfApplicationSourceSystemCode',
     'businessUnit',
     'code',
     'subCode',
     'arrearsEventSourceSystemCode',
     'arrearsEventUserId',
     'cod_backoffice_origem',
     'dsc_backoffice_origem',
     'suspensionReasonCode',
     'data_criacao_datamart'
    ]
    
    dataset_step_1 = dataset_src\
        .withColumn(
            "arrearsEventCode", 
            F.when(
               F.col("RULE_001_CURRENT_BALANCE_GT_ZERO") == "YES", 
                   F.when(
                       F.col("RULE_002_CURRENT_BALANCE_EQUALS_ZERO") == "NO",
                       F.lit("E001")
                   ).otherwise( 
                       F.when(
                           F.col("RULE_003_CURRENT_BALANCE_LT_PREVIOUS_BALANCE") == "NO", 
                           F.lit("E001")           
                      )
                 )          
            )
        )
    
    dataset_step_2 = dataset_step_1\
        .withColumn(
            "arrearsEventCode", 
            F.when(
               F.col("RULE_002_CURRENT_BALANCE_EQUALS_ZERO") == "NO", 
                   F.when(
                       F.col("RULE_003_CURRENT_BALANCE_LT_PREVIOUS_BALANCE") == "YES",
                       F.lit("E002")
                   )          
              )
          )
    
    dataset_step_3 = dataset_step_2\
        .withColumn(
            "arrearsEventCode", 
            F.when(
               F.col("RULE_002_CURRENT_BALANCE_EQUALS_ZERO") == "YES", 
                   F.lit("E003")         
              )
          )
    
    dataset_step_4 = dataset_step_3\
        .withColumn(
            "effectiveEventDate", 
            F.when(
                F.col("arrearsEventCode") == "E001", F.col("DTA_VENCTO_PARCELA").cast(TimestampType())
            ).otherwise(F.col("DTA_REFERENCIA").cast(TimestampType()))
        )\
        .withColumn(
            "arrearsAmount", 
             F.when(
                 F.col("arrearsEventCode") == "E002", 
                 F.col("VLR_SALDO_PARCELA_PREV").cast(DecimalType(32,18))           
            ).otherwise(
                  F.when(
                     F.col("arrearsEventCode") == "E001", 
                     F.col("VLR_SALDO_PARCELA").cast(DecimalType(32,18))    
                  ).otherwise(
                      F.lit(0)
                )
            )
        )\
        .filter(F.col("arrearsEventCode").isNotNull())
    
    dataset_step_5 = dataset_step_4\
      .withColumn(
          "facility_changed", 
          F.when(
             F.col("arrearsEventCode").isin(["E001", "E002"]), 
                 F.when(
                     F.col("RULE_000_FACILITY_HAS_CHANGED") == "YES",
                     F.lit("YES")
                 ).otherwise("FALSE")
          ).otherwise("FALSE")
      )\
      .withColumn("arrearsEventDateTime", F.lit(F.current_date()).cast(TimestampType()))\
      .withColumn("effectiveEventDate", F.lit(F.current_date()).cast(TimestampType()))\
      .withColumn("arrears", F.lit("").cast(StringType()))\
      .withColumn("tradeId", F.col("COD_CONTRATO").cast(StringType()))\
      .withColumn("tradeSourceSystemCode", F.lit("FUNCAO"))\
      .withColumn("facilityId", F.col("FACILITY_ID").cast(StringType()))\
      .withColumn("facilitySourceSystemCode", F.lit("ELCM").cast(StringType()))\
      .withColumn("originalDueDate", F.col("DTA_VENCTO_PARCELA").cast(TimestampType()))\
      .withColumn("arrearsCurrency", F.col("SGL_MOEDA").cast(StringType()))\
      .withColumn("arrearsAmount", F.col("VLR_SALDO_PARCELA").cast(DecimalType(32,18)))\
      .withColumn("arrearsLevelOfApplication", F.lit(None).cast(StringType()))\
      .withColumn("levelOfApplicationId", F.lit(None).cast(StringType()))\
      .withColumn("levelOfApplicationCode", F.lit(None).cast(StringType()))\
      .withColumn("levelOfApplicationSourceSystemCode", F.lit(None).cast(StringType()))\
      .withColumn("businessUnit", F.lit("").cast(StringType()))\
      .withColumn("code", F.lit("WR").cast(StringType()))\
      .withColumn("subCode", F.col("COD_COUNTRY_BOOK").cast(StringType()))\
      .withColumn("arrearsEventSourceSystemCode", F.lit("RDLLA").cast(StringType()))\
      .withColumn("arrearsEventUserId", F.lit(None).cast(StringType()))\
      .withColumn("suspensionReasonCode", F.lit(None).cast(StringType()))\
      .withColumn("data_criacao_datamart", F.lit(datetime.fromtimestamp(time.time())))\
      .withColumn("cod_backoffice_origem", F.col("a.COD_BACKOFFICE_ORIGEM").cast(StringType()))\
      .withColumn("dsc_backoffice_origem", F.col("a.DSC_BACKOFFICE_ORIGEM").cast(StringType()))\
      .withColumn("hash_key", F.lit(None).cast(StringType()))
    
    
    #================================================
    #  TROCA DE FACILITY
    #================================================
    dataset_closed = dataset_step_5\
      .withColumn("arrearsEventCode", F.lit("E003"))\
      .withColumn("facilityId", 
          F.when(
              F.col("arrearsEventCode") == "E002", F.coalesce(F.col("FACILITY_ID_PREV"), F.col("GLOBAL_FACILITY_ID_PREV"))
            ).otherwise(F.coalesce(F.col("FACILITY_ID"), F.col("GLOBAL_FACILITY_ID"))
          )
       )\
      .withColumn("originalDueDate", 
          F.when(
              F.col("arrearsEventCode") == "E002", F.col("DTA_VENCTO_PARCELA_PREV")
            ).otherwise(F.col("DTA_VENCTO_PARCELA")
          )
       )\
      .withColumn("arrearsCurrency", 
          F.when(
              F.col("arrearsEventCode") == "E002", F.col("SGL_MOEDA_PREV")
            ).otherwise(F.col("SGL_MOEDA")
          )
       )\
      .withColumn("subCode", 
          F.when(
              F.col("arrearsEventCode") == "E002", F.col("COD_COUNTRY_BOOK_PREV")
            ).otherwise(F.col("COD_COUNTRY_BOOK")
          )
       )\
      .withColumn("arrearsAmount", F.lit(0))\
      .select(columns)\
      .filter(F.col("facility_changed") == "YES")
    
    dataset_all = dataset_step_5.select(columns).union(dataset_closed)
    
    dataset_result = dataset_all\
        .withColumn("hash_key", get_hash_key(F.concat_ws("||", *dataset_all.columns)))\
        .distinct()
    
    return dataset_result
    

In [0]:
def save_results(dataset: DataFrame, path: str):
    '''
      salva os dados do datamart
    '''
    delta_table = DeltaTable.forPath(spark, path)
    delta_table\
      .alias("current")\
      .merge(
          dataset.alias("new"),
          "current.hash_key = new.hash_key"
      )\
      .whenMatchedUpdateAll()\
      .whenNotMatchedInsertAll()\
      .execute()

In [0]:
def erase_enviroment():
    dbutils.fs.rm(PATH_TBL_DATAMART, True)

In [0]:
def start_pipeline():
    '''
        Início do pipeline
    '''
    try:
        start = time.time()
        
        # so roda na primeira execução
        create_enviroment()
        
        # inicio do processamento
        dataset_workflow = process_workflow()
        dataset_result = process_result(dataset_workflow)
        save_results(dataset_result, PATH_TBL_DATAMART)
        end = time.time()
        print("tempo de processamento: %s" % str(end-start))
    except Exception as e:
        print("erro ao processar os dado: %s" % e)

In [0]:
erase_enviroment()
start_pipeline()