In [0]:
display(dbutils.fs.ls("dbfs:/FileStore/tables/ProjetoG"))

path,name,size,modificationTime
dbfs:/FileStore/tables/ProjetoG/TB_CEP_BR_2018.csv,TB_CEP_BR_2018.csv,51395868,1709119089000
dbfs:/FileStore/tables/ProjetoG/compras-1.csv,compras-1.csv,9309,1709119852000
dbfs:/FileStore/tables/ProjetoG/compras-2.csv,compras-2.csv,9309,1709119980000
dbfs:/FileStore/tables/ProjetoG/compras.csv,compras.csv,9309,1709119067000
dbfs:/FileStore/tables/ProjetoG/compras_rejeitadas/,compras_rejeitadas/,0,0
dbfs:/FileStore/tables/ProjetoG/compras_rejeitadas.csv/,compras_rejeitadas.csv/,0,0
dbfs:/FileStore/tables/ProjetoG/compras_tratadas/,compras_tratadas/,0,0
dbfs:/FileStore/tables/ProjetoG/compras_tratadas.parquet/,compras_tratadas.parquet/,0,0
dbfs:/FileStore/tables/ProjetoG/condicao_pagamento.csv,condicao_pagamento.csv,192,1709119067000
dbfs:/FileStore/tables/ProjetoG/enderecos_fornecedores.parquet/,enderecos_fornecedores.parquet/,0,0


In [0]:
#%% Importing the libraries
import logging

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

#%% Initialize the SparkSession
SPARK = SparkSession.builder \
    .appName("Load Tipo Endereco Table") \
    .getOrCreate()

logging.basicConfig()
LOGGER = logging.getLogger("pyspark")
LOGGER.setLevel(logging.INFO)

#%% Load the function to compute the stage data for the table 'tipo_pagamento'
def compute_tipo_endereco(spark: SparkSession):
    TIPO_ENDERECO_FILEPATH = f'/FileStore/tables/ProjetoG/tipo_endereco.csv'

    df = spark.read.csv(TIPO_ENDERECO_FILEPATH, header=True, sep=',')
    
    # Data cleaning
    df = df.withColumn('nome_tipo_endereco', F.upper(F.col('nome_tipo_endereco')))
    df = df.withColumn('nome_tipo_endereco', F.trim(F.col('nome_tipo_endereco')))
    
    df = df.withColumn('sigla_endereco', F.upper(F.col('sigla_endereco')))
    df = df.withColumn('sigla_endereco', F.trim(F.col('sigla_endereco')))
    
    
    # Column renaming
    df = df.withColumnRenamed('id_tipo_endereco', 'ID_TIPO_ENDERECO')
    df = df.withColumnRenamed('nome_tipo_endereco', 'DESCRICAO')
    df = df.withColumnRenamed('sigla_endereco', 'SIGLA')
    
    return df

df_tp_endereco = compute_tipo_endereco(SPARK)
df_tp_endereco.show()

+----------------+--------------------+-----------+
|ID_TIPO_ENDERECO|           DESCRICAO|      SIGLA|
+----------------+--------------------+-----------+
|               1|            COBRANCA|        COB|
|               2|             ENTREGA|        ENT|
|               3|         FATURAMENTO|        FAT|
|               4|    COBRANCA/ENTREGA|    COB/ENT|
|               5|COBRANCA/FATURAMENTO|    COB/FAT|
|               6| ENTREGA/FATURAMENTO|    ENT/FAT|
|               7|COBRANCA/ENTREGA/...|COB/ENT/FAT|
|               8|          FORNECEDOR|       FORN|
+----------------+--------------------+-----------+



In [0]:
#%% Importing the libraries
import logging

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

#%% Initialize the SparkSession
SPARK = SparkSession.builder \
    .appName("Load CONDICAO_PAGAMENTO Table") \
    .getOrCreate()

logging.basicConfig()
LOGGER = logging.getLogger("pyspark")
LOGGER.setLevel(logging.INFO)

#%% Load compute functions 
def compute_condicao_pagamento(spark: SparkSession):
    COND_PAG_FILEPATH = f'/FileStore/tables/ProjetoG/condicao_pagamento.csv'

    df = spark.read.csv(COND_PAG_FILEPATH, header=True, sep=',')
    df = df.select('ID_CONDICAO', 'DESCRICAO', 'QTD_PARCELAS')
    df = df.withColumn('DESCRICAO', F.upper(F.trim(F.col('DESCRICAO'))))
    
    df = df.withColumn('QTD_PARCELAS', F.col('QTD_PARCELAS').cast(T.IntegerType()))
    
    return df

df_condpag = compute_condicao_pagamento(SPARK)
df_condpag.show()

+-----------+--------------------+------------+
|ID_CONDICAO|           DESCRICAO|QTD_PARCELAS|
+-----------+--------------------+------------+
|          1|             A VISTA|           1|
|          2|             30 DIAS|           1|
|          3|          30/60 DIAS|           2|
|          4|       30/60/90 DIAS|           3|
|          5|     ENTRADA/30 DIAS|           2|
|          6|  ENTRADA/30/60 DIAS|           3|
|          7|ENTRADA/30/60/90 ...|           4|
+-----------+--------------------+------------+



In [0]:
#%% Importing the libraries
import logging

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

#%% Initialize the SparkSession
SPARK = SparkSession.builder \
    .appName("Load CEP Table") \
    .getOrCreate()

logging.basicConfig()
LOGGER = logging.getLogger("pyspark")
LOGGER.setLevel(logging.INFO)

#%% Load the function to compute the stage data for the file 'cep'
def compute_cep_table(spark: SparkSession):
    CEP_FILEPATH = f'/FileStore/tables/ProjetoG/TB_CEP_BR_2018.csv'
    SCHEMA = T.StructType([
        T.StructField('CEP', T.IntegerType(), True),
        T.StructField('UF', T.StringType(), True),
        T.StructField('CIDADE', T.StringType(), True),
        T.StructField('BAIRRO', T.StringType(), True),
        T.StructField('LOGRADOURO', T.StringType(), True),
    ])

    df = spark.read.csv(CEP_FILEPATH, header=False, sep=';', schema=SCHEMA)
    
    df = df.withColumn('UF', F.trim(F.upper(F.col('UF'))))
    df = df.withColumn('CIDADE', F.trim(F.upper(F.col('CIDADE'))))
    df = df.withColumn('BAIRRO', F.trim(F.upper(F.col('BAIRRO'))))
    df = df.withColumn('LOGRADOURO', F.trim(F.upper(F.col('LOGRADOURO'))))
    
    return df

df_cep = compute_cep_table(SPARK)
df_cep.show(10)

+--------+---+--------------+------------+--------------------+
|     CEP| UF|        CIDADE|      BAIRRO|          LOGRADOURO|
+--------+---+--------------+------------+--------------------+
|23085680| RJ|RIO DE JANEIRO|CAMPO GRANDE| RUA CHARLES DICKENS|
|23085690| RJ|RIO DE JANEIRO|CAMPO GRANDE|       PRAÇA CENTRAL|
|23085700| RJ|RIO DE JANEIRO|CAMPO GRANDE|RUA BOM JESUS DA ...|
|23085710| RJ|RIO DE JANEIRO|CAMPO GRANDE|   RUA PRIMEIRA CRUZ|
|23085720| RJ|RIO DE JANEIRO|CAMPO GRANDE|         RUA DONEGAL|
|23085730| RJ|RIO DE JANEIRO|CAMPO GRANDE|    RUA POUSO ALEGRE|
|23085740| RJ|RIO DE JANEIRO|CAMPO GRANDE|           RUA EMAUS|
|23085750| RJ|RIO DE JANEIRO|CAMPO GRANDE|RUA OSVALDO SENTO SÉ|
|23085760| RJ|RIO DE JANEIRO|CAMPO GRANDE|RUA CORONEL JOSÉ ...|
|23085770| RJ|RIO DE JANEIRO|CAMPO GRANDE|RUA JERÔNIMO BARB...|
+--------+---+--------------+------------+--------------------+
only showing top 10 rows



In [0]:
#%% Importing the libraries
import logging

from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F
import pyspark.sql.types as T
import re

#%% Initialize the SparkSession
SPARK = SparkSession.builder \
    .appName("Load Staging Data") \
    .getOrCreate()

logging.basicConfig()
LOGGER = logging.getLogger("pyspark")
LOGGER.setLevel(logging.INFO)

def cnpj_valido(cnpj):
    cnpj = re.sub(r'[^0-9]', '', cnpj)
    if len(cnpj) !=14:
        return False
    
    total = 0 
    resto = 0 
    digito_verificador_1 = 0
    digito_verificador_2 = 0
    multiplicadores1 = [5, 4, 3, 2, 9, 8, 7, 6, 5, 4, 3, 2]
    multiplicadores2 = [6, 5, 4, 3, 2, 9, 8, 7, 6, 5, 4, 3, 2]

    for i in range(0,12,1):
        total += int(cnpj[i]) * int(multiplicadores1[i])
    resto = total % 11
    
    if resto < 2:
        digito_verificador_1 = 0
    else:
        digito_verificador_1 = 11 - resto

    total = 0
    resto = 0

    for i in range(0,13,1):
        total += int(cnpj[i]) * int(multiplicadores2[i])

    resto = total % 11
    if resto < 2:
        digito_verificador_2 = 0 
    else:
        digito_verificador_2 = 11 - resto

    return cnpj[-2:] == str(digito_verificador_1) + str(digito_verificador_2) 
    cnpj_valido = udf(cnpj_valido, T.BooleanType())

def compute_compras_stg(spark: SparkSession, df_ceps: DataFrame):
    """Compute the stage data for the table 'compras' and return a DataFrame with the stage data and a rejected DataFrame with the rejected data and the inconsistency type."""
    COMPRAS_FILEPATH = f'/FileStore/tables/ProjetoG/compras.csv'
    
    def subtract_by_index(df: DataFrame, df_to_subtract: DataFrame) -> DataFrame:
        """Subtract the rows of a DataFrame by the index column."""
        return df.join(df_to_subtract, df['index'] == df_to_subtract['index'], 'left_anti').select(df.columns)
    
    DF = spark.read.csv(COMPRAS_FILEPATH, header=True)
    df = DF
        
    # Trim all columns 
    for column in df.columns:
        df = df.withColumn(column, F.trim(F.col(column)))
        
    # Add a temp index column
    df = df.withColumn('index', F.monotonically_increasing_id())
    
    # NOME_FORNECEDOR column treatment
    df = df.withColumn('NOME_FORNECEDOR', F.upper(F.col('NOME_FORNECEDOR')))
    df = df.withColumn('NOME_FORNECEDOR', F.trim(F.col('NOME_FORNECEDOR')))
    
    # CONDICAO_PAGAMENTO column treatment
    df = df.withColumn('CONDICAO_PAGAMENTO', F.upper(F.col('CONDICAO_PAGAMENTO')))
    df = df.withColumn('CONDICAO_PAGAMENTO', F.trim(F.col('CONDICAO_PAGAMENTO')))
    
    df = df.withColumn('CONDICAO_PAGAMENTO',
        F.when(
                    F.col('CONDICAO_PAGAMENTO') == 'ENTRADA/30/60/90 DIAS', '30/60/90 DIAS')
            .when(  F.col('CONDICAO_PAGAMENTO') == 'A VISTA',               'A VISTA')
            .when(  F.col('CONDICAO_PAGAMENTO') == '30 DIAS',               '30 DIAS')
            .when(  F.col('CONDICAO_PAGAMENTO') == '30/60 DIAS',            '30/60 DIAS')
            .when(  F.col('CONDICAO_PAGAMENTO') == 'ENTRADA/30 DIAS',       'ENTRADA/30 DIAS')
            .when(  F.col('CONDICAO_PAGAMENTO') == 'ENTRADA/30/60 DIAS',    'ENTRADA/30/60 DIAS')
        .otherwise( F.col('CONDICAO_PAGAMENTO') + ' (invalid)')
    )
    
    # COMPLEMENTO column treatment
    df = df.na.fill('N/A', subset=['COMPLEMENTO'])

    # Compute a DataFrame with the nulls 
    df_not_null = df.dropna(how='any', subset=[column for column in df.columns if column != 'COMPLEMENTO'])
    
    df_nulls = (
        subtract_by_index(df, df_not_null)
        .withColumn('inconsistency', F.lit('null column(s)'))
    )
    
    # Compute a DataFrame with the invalid payment condition
    df_invalid_payment_condition = (
        df
        .filter(F.col('CONDICAO_PAGAMENTO').contains('(invalid)'))
        .withColumn('inconsistency', F.lit('invalid payment condition'))
    )
    
    # Compute a DataFrame with the invalid CEP
    df_ceps = df_ceps.select('CEP')
    df_ceps = df_ceps.withColumnRenamed('CEP', 'CEP_VALIDO')
    
    df_invalid_ceps = (
        # Lookup the valid CEPs and filter the valid ones
        df 
        .join(df_ceps, 
              df['CEP'].cast('int') == df_ceps['CEP_VALIDO'].cast('int'), 
              'left'
        )
        .filter(F.col('CEP_VALIDO').isNull())
        .select(df.columns)
        .withColumn('inconsistency', F.lit('invalid CEP'))
    )
    
    # Compute a DataFrame with the invalid CNPJ
    @F.udf(T.BooleanType())
    def udf_is_cnpj_valid(cnpj: str) -> bool:
        if cnpj is None:
            return False
        return cnpj_valido(cnpj)
    spark.udf.register("udf_is_cnpj_valid", udf_is_cnpj_valid)
    
    df = df.cache()
    df_invalids_cnpj = df.filter(~udf_is_cnpj_valid(F.col('CNPJ_FORNECEDOR')))
    df_invalids_cnpj = df_invalids_cnpj.withColumn('inconsistency', F.lit('invalid CNPJ'))
    
    # Remove the duplicates AND create a Dataframe with duplicated rows
    df_duplicated = (
        df
        .groupBy(DF.columns)
        .count()
        .where(F.col('count') > 1)
        .drop('count')
        .withColumn('inconsistency', F.lit('duplicated'))
    )
    
    # Remove all rejeted data by the index column
    df = (
        df
        .transform(lambda df: subtract_by_index(df, df_nulls))
        .transform(lambda df: subtract_by_index(df, df_invalid_payment_condition))
        .transform(lambda df: subtract_by_index(df, df_invalids_cnpj))
        .transform(lambda df: subtract_by_index(df, df_invalid_ceps))
        .drop('index').dropDuplicates()
    )
    
    df_rejected = (
        df_duplicated.drop('index')
        .union(df_nulls.drop('index'))
        .union(df_invalid_payment_condition.drop('index'))
        .union(df_invalids_cnpj.drop('index'))
        .union(df_invalid_ceps.drop('index'))
        .dropDuplicates()
        .groupBy(df.columns)
        .agg(F.concat_ws(", ", F.collect_list(F.col('inconsistency'))).alias('inconsistencies'))
    )

    return df, df_rejected
    
df_compras, df_rejected = compute_compras_stg(SPARK, df_cep)
df_compras.show()
df_rejected.show()
df_compras.write.parquet('/FileStore/tables/ProjetoG/compras_tratadas.parquet', mode = 'overwrite')
df_rejected.write.csv('/FileStore/tables/ProjetoG/compras_rejeitadas.csv', header = 'True', mode = 'overwrite')

+--------------------+---------------+--------------------+-------------------+---------+------------+---------+-------------+-----------+------------+--------+------------------+--------+------------+-----------+-------------+------------------+
|     NOME_FORNECEDOR|CNPJ_FORNECEDOR|    EMAIL_FORNECEDOR|TELEFONE_FORNECEDOR|NUMERO_NF|DATA_EMISSAO|VALOR_NET|VALOR_TRIBUTO|VALOR_TOTAL|   NOME_ITEM|QTD_ITEM|CONDICAO_PAGAMENTO|     CEP|NUM_ENDERECO|COMPLEMENTO|TIPO_ENDERECO|DATA_PROCESSAMENTO|
+--------------------+---------------+--------------------+-------------------+---------+------------+---------+-------------+-----------+------------+--------+------------------+--------+------------+-----------+-------------+------------------+
|        3R PETROLEUM| 12091809000155|dbreakwell2d@unes...|        286-44-7158|  9569521|  2023-07-01|   180000|         9000|     189000|Escalade ESV|       1|   ENTRADA/30 DIAS|12605180|         170|    LOTE 74|   fornecedor|        2023-11-13|
|         EN

In [0]:
#%% Importing the libraries
import logging

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window as W

#%% Initialize the SparkSession
SPARK = SparkSession.builder \
    .appName("Load Fornecedores Table") \
    .getOrCreate()

logging.basicConfig()
LOGGER = logging.getLogger("pyspark")
LOGGER.setLevel(logging.INFO)

#%% Load the function to compute fornecedores dataframe
def compute_fornecedores(spark: SparkSession):
    COMPRAS_TRAT_FILEPATH = f'/FileStore/tables/ProjetoG/compras_tratadas.parquet'
    
    df = spark.read.parquet(COMPRAS_TRAT_FILEPATH)
    
    # Select the columns, drop duplicates and rename the columns
    df = df.select('NOME_FORNECEDOR', 'CNPJ_FORNECEDOR', 'EMAIL_FORNECEDOR', 'TELEFONE_FORNECEDOR')
    df = df.dropDuplicates()
    
    df = (
        df
        .withColumn('ID_FORNECEDOR', F.row_number().over(W.orderBy(F.monotonically_increasing_id())))
        .withColumnRenamed('NOME_FORNECEDOR', 'NOME')
        .withColumnRenamed('CNPJ_FORNECEDOR', 'CNPJ')
        .withColumnRenamed('EMAIL_FORNECEDOR', 'EMAIL')
        .withColumnRenamed('TELEFONE_FORNECEDOR', 'TELEFONE')
        .select('ID_FORNECEDOR', 'NOME', 'CNPJ', 'EMAIL', 'TELEFONE')
    )
       
    return df

df_fornecedores = compute_fornecedores(SPARK)
df_fornecedores.show()
df_fornecedores.write.parquet('/FileStore/tables/ProjetoG/fornecedores.parquet', mode = 'overwrite')

+-------------+--------------------+--------------+--------------------+-----------+
|ID_FORNECEDOR|                NOME|          CNPJ|               EMAIL|   TELEFONE|
+-------------+--------------------+--------------+--------------------+-----------+
|            1|           EXCELSIOR|95426862000197|oschroeder2k@ocn....|133-95-3943|
|            2|BOSCO PARKER AND ...|12528708000107|wglendinningk@eur...|835-83-3066|
|            3|                EVEN|43470988000165|  ahancock25@wsj.com|214-28-9058|
|            4|        EUROFARMA SA|61190096000192|hhaddington17@cbc.ca|201-38-1891|
|            5|              FLEURY|60840055000131|dbreakwell2d@unes...|286-44-7158|
|            6|             ETERNIT|61092037000181|myelden1u@usatoda...|763-57-5758|
|            7|BOGISICH O'KEEFE ...|82643537000134|  bogissfh.ss@ss.com|742-63-3038|
|            8|              ENJOEI|16922038000151|myelden1u@usatoda...|763-57-5758|
|            9|             FERBASA|15141799000103|wglendinningk@

In [0]:
#%% Importing the libraries
import logging

from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F

#%% Initialize the SparkSession
SPARK = SparkSession.builder \
    .appName("Load Endereco Fornecedores Table") \
    .getOrCreate()

logging.basicConfig()
LOGGER = logging.getLogger("pyspark")
LOGGER.setLevel(logging.INFO)

#%% Load the function to compute dataframes
def compute_compras_stg(spark: SparkSession):
    """Compute the dataframe for the compras_stg table."""
    COMPRAS_TRAT_FILEPATH = f'/FileStore/tables/ProjetoG/compras_tratadas.parquet'
    return spark.read.parquet(COMPRAS_TRAT_FILEPATH)

def compute_fornecedores_parquet(spark: SparkSession):
    """Compute the dataframe for the fornecedores table."""
    FORN_FILEPATH = f'/FileStore/tables/ProjetoG/fornecedores.parquet'
    return spark.read.parquet(FORN_FILEPATH)

def compute_enderecos_fornecedores_table(df_compras_stg: DataFrame, df_fornecedores: DataFrame, df_tipo_endereco: DataFrame):
    """Compute the dataframe for the enderecos_fornecedores table."""
    
    df = df_compras_stg.select(
            'CNPJ_FORNECEDOR',
            'TIPO_ENDERECO',
            'NUM_ENDERECO',
            'COMPLEMENTO',
            'CEP'
        )
    
    df = df.dropDuplicates()

    df = df.join(df_fornecedores, df['CNPJ_FORNECEDOR'] == df_fornecedores['CNPJ'], 'inner')
    df = df.join(df_tipo_endereco, F.upper(df['TIPO_ENDERECO']) == df_tipo_endereco['DESCRICAO'], 'inner')
    
    df = df.select(
        df_compras_stg['CEP'],
        df_fornecedores['ID_FORNECEDOR'],
        df_tipo_endereco['ID_TIPO_ENDERECO'],
        df_compras_stg['NUM_ENDERECO'],   
        df_compras_stg['COMPLEMENTO']
    )
    
    return df

df_compras_trat = compute_compras_stg(SPARK)
df_fornecedores_p = compute_fornecedores_parquet(SPARK)
df_tipo_endereco =  df_tp_endereco

df_endereco_fornecedores = compute_enderecos_fornecedores_table(df_compras_trat, df_fornecedores_p, df_tipo_endereco)
df_endereco_fornecedores.show()
df_endereco_fornecedores.write.parquet('/FileStore/tables/ProjetoG/enderecos_fornecedores.parquet', mode = 'overwrite')


+--------+-------------+----------------+------------+-----------+
|     CEP|ID_FORNECEDOR|ID_TIPO_ENDERECO|NUM_ENDERECO|COMPLEMENTO|
+--------+-------------+----------------+------------+-----------+
|41301050|           15|               8|         349|    LOTE 68|
|12605180|            5|               8|         170|    LOTE 74|
|41300780|           17|               8|         221|    LOTE 02|
|41301050|           23|               8|         349|    LOTE 68|
|12605120|            6|               8|         917|    LOTE 70|
|12605200|           20|               8|         997|    LOTE 92|
|12605130|            3|               8|          99|        N/A|
|12605140|            1|               8|         445|    LOTE 57|
|12605160|           14|               8|         693|    LOTE 80|
|12605180|           16|               8|         170|    LOTE 74|
|12605160|           11|               8|         693|    LOTE 80|
|12605070|           21|               8|        1223|        

In [0]:
#%% Importing the libraries
import logging

from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F

#%% Initialize the SparkSession
SPARK = SparkSession.builder \
    .appName("Load Notas Fiscais Table") \
    .getOrCreate()

logging.basicConfig()
LOGGER = logging.getLogger("pyspark")
LOGGER.setLevel(logging.INFO)

#%% Load the function to compute dataframes
def compute_compras_stg(spark: SparkSession):
    """Compute the dataframe for the compras_stg table."""
    COMPRAS_TRAT_FILEPATH = f'/FileStore/tables/ProjetoG/compras_tratadas.parquet'
    return spark.read.parquet(COMPRAS_TRAT_FILEPATH)

def compute_fornecedores_parquet(spark: SparkSession):
    """Compute the dataframe for the fornecedores table."""
    FORN_FILEPATH = f'/FileStore/tables/ProjetoG/fornecedores.parquet'
    return spark.read.parquet(FORN_FILEPATH)

def compute_notas_fiscais_entrada_table(df_compras_stg: DataFrame, df_fornecedores: DataFrame, df_condicao_pagamento: DataFrame):
    """Compute the dataframe for the NOTAS_FISCAIS_ENTRADA table."""
    
    df = df_compras_stg.select(
        'NUMERO_NF',
        'DATA_EMISSAO',
        'VALOR_NET',
        'VALOR_TRIBUTO',
        'VALOR_TOTAL',
        'NOME_ITEM',
        'QTD_ITEM',
        'CONDICAO_PAGAMENTO',
        'CNPJ_FORNECEDOR'
    )
    df = df.withColumn('ID_NF_ENTRADA', F.row_number().over(W.orderBy(F.monotonically_increasing_id())))
    df = df.join(df_fornecedores, df['CNPJ_FORNECEDOR'] == df_fornecedores['CNPJ'], 'inner')
    df = df.join(df_condicao_pagamento, df['CONDICAO_PAGAMENTO'] == df_condicao_pagamento['DESCRICAO'], 'LEFT')    
    
    df = df.select(

        df['ID_NF_ENTRADA'],
        df['NUMERO_NF'],
        df_fornecedores['ID_FORNECEDOR'],
        df_condicao_pagamento['ID_CONDICAO'],
        df['DATA_EMISSAO'],
        df['VALOR_NET'],
        df['VALOR_TRIBUTO'],
        df['VALOR_TOTAL'],
        df['NOME_ITEM'],
        df['QTD_ITEM']
    )
    
    df = df.withColumn('DATA_EMISSAO', F.to_date(F.col('DATA_EMISSAO'), 'yyyy-MM-dd'))
    df = df.withColumn('VALOR_NET', F.col('VALOR_NET').cast('decimal'))
    df = df.withColumn('VALOR_TRIBUTO', F.col('VALOR_TRIBUTO').cast('decimal'))
    df = df.withColumn('VALOR_TOTAL', F.col('VALOR_TOTAL').cast('decimal'))
    df = df.withColumn('QTD_ITEM', F.col('QTD_ITEM').cast('integer'))
    
    return df

##You may execute this again.
##df_compras_trat = compute_compras_stg(SPARK)
##df_fornecedores = compute_fornecedores_table(SPARK)
##df_condicao_pagamento = df_condpag

df_nf_entrada = compute_notas_fiscais_entrada_table(df_compras_trat, df_fornecedores_p, df_condpag)
df_nf_entrada.show(truncate=False)
df_nf_entrada.write.parquet('/FileStore/tables/ProjetoG/nfs_entrada.parquet', mode = 'overwrite')

+-------------+---------+-------------+-----------+------------+---------+-------------+-----------+------------+--------+
|ID_NF_ENTRADA|NUMERO_NF|ID_FORNECEDOR|ID_CONDICAO|DATA_EMISSAO|VALOR_NET|VALOR_TRIBUTO|VALOR_TOTAL|NOME_ITEM   |QTD_ITEM|
+-------------+---------+-------------+-----------+------------+---------+-------------+-----------+------------+--------+
|1            |9569521  |13           |5          |2023-07-01  |180000   |9000         |189000     |Escalade ESV|1       |
|2            |9375275  |16           |4          |2023-07-01  |70000    |3000         |73000      |Azera       |1       |
|3            |6402186  |2            |2          |2023-07-01  |110000   |5000         |115000     |Cooper      |1       |
|4            |8934386  |7            |5          |2023-07-01  |150000   |7000         |157000     |Tacoma      |1       |
|5            |1123305  |1            |6          |2023-07-01  |70000    |3000         |73000      |Azera       |1       |
|6            |1

In [0]:
#%% Importing the libraries
import logging

from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F
import pyspark.sql.types as T

#%% Initialize the SparkSession
SPARK = SparkSession.builder \
    .appName("Load Programacao Pagamento Table") \
    .getOrCreate()

logging.basicConfig()
LOGGER = logging.getLogger("pyspark")
LOGGER.setLevel(logging.INFO)

def compute_notas_fiscais_de_entrada(spark: SparkSession):
    """Compute the dataframe using a query"""
    NFS_PATH = f'/FileStore/tables/ProjetoG/nfs_entrada.parquet'
    
    df = spark.read.parquet(NFS_PATH)
    df = df.join(df_condpag, df['ID_CONDICAO'] == df_condpag['ID_CONDICAO'], 'INNER')
    df = df.select(
        df['ID_NF_ENTRADA'],
        df['DATA_EMISSAO'],
        df['VALOR_TOTAL'],
        df_condpag['QTD_PARCELAS']
    )
    return df

def compute_programacao_pagamento_pendente(spark: SparkSession):
    """Compute the programcao_pagamento dataframe"""
    
    schema = df_notas_fiscais_entrada.schema

    df = spark.createDataFrame([], schema)
    
    df = df.withColumn('STATUS_PAGAMENTO', F.lit('PENDENTE'))  # Default status is 'PENDENTE'
    
    return df

#%% Load the function to compute the stage data for the table 'tipo_pagamento'
def compute_programacao_pagamento(df_notas_fiscais_entrada: DataFrame, df_programacao_pagamento_pendente: DataFrame):
    """Compute the Programacao Pagamento dataframe using the Notas Fiscais de Entrada dataframe"""
    
    df = (
        df_notas_fiscais_entrada
        .withColumn("PARCELAS", F.sequence(F.lit(1), F.col("QTD_PARCELAS")))
        .withColumn("NUM_PARCELA", F.explode(F.col("PARCELAS")))
        .drop("PARCELAS")
    )
    
    df = df.withColumn("VALOR_PARCELA", F.col("VALOR_TOTAL") / F.col("QTD_PARCELAS"))
    # I using the expr function because the F.add_months function not accept a column as integer parameter to increment the date 
    df = df.withColumn("DATA_VENCIMENTO", F.expr("add_months(DATA_EMISSAO, NUM_PARCELA - 1)"))
    
    df = df.select(
        'ID_NF_ENTRADA',
        'DATA_VENCIMENTO',
        'NUM_PARCELA',
        'VALOR_PARCELA'
    )
    
    # All dates equal or minor than today are considered as "PAGO", else "PENDENTE"
    df = (
        df
        .withColumn("STATUS_PAGAMENTO",   
                    F.when(F.col("DATA_VENCIMENTO") <= F.current_date(), F.lit("PAGO")) ##You can put '2023-07-01' to see how the column 'STATUS_PAGAMENTO' reacts
                    .otherwise(F.lit("PENDENTE")))
    )

    df = df.union(df_programacao_pagamento_pendente)
    df = df.withColumn('ID_PROG_PAG', F.row_number().over(W.orderBy(F.monotonically_increasing_id())))
    df = df.select(
        'ID_PROG_PAG',
        'ID_NF_ENTRADA',
        'DATA_VENCIMENTO',
        'NUM_PARCELA',
        'VALOR_PARCELA',
        'STATUS_PAGAMENTO'
    )

    return df

df_notas_fiscais_entrada = compute_notas_fiscais_de_entrada(SPARK)
df_programacao_pendente = compute_programacao_pagamento_pendente(SPARK)

df_programacao_pagamentos = compute_programacao_pagamento(df_notas_fiscais_entrada, df_programacao_pendente)
df_programacao_pagamentos.show(truncate=False)
df_programacao_pagamentos.write.parquet('/FileStore/tables/ProjetoG/prog_pagamentos.parquet', mode = 'overwrite')

df_historico_pagamentos = df_programacao_pagamentos.filter(F.col('STATUS_PAGAMENTO') == 'PAGO')
df_historico_pagamentos = df_historico_pagamentos.withColumn('ID_HIST_PAG', F.row_number().over(W.orderBy(F.monotonically_increasing_id())))
df_historico_pagamentos = df_historico_pagamentos.select(
    'ID_HIST_PAG',
    'ID_NF_ENTRADA',
    'DATA_VENCIMENTO',
    'NUM_PARCELA',
    'VALOR_PARCELA'
)
df_historico_pagamentos.show(truncate=False)
df_historico_pagamentos.write.parquet('/FileStore/tables/ProjetoG/hist_pagamentos.parquet', mode = 'overwrite')



+-----------+-------------+---------------+-----------+------------------+----------------+
|ID_PROG_PAG|ID_NF_ENTRADA|DATA_VENCIMENTO|NUM_PARCELA|VALOR_PARCELA     |STATUS_PAGAMENTO|
+-----------+-------------+---------------+-----------+------------------+----------------+
|1          |1            |2023-07-01     |1          |94500.00000000000 |PAGO            |
|2          |1            |2023-08-01     |2          |94500.00000000000 |PAGO            |
|3          |2            |2023-07-01     |1          |24333.33333333333 |PAGO            |
|4          |2            |2023-08-01     |2          |24333.33333333333 |PAGO            |
|5          |2            |2023-09-01     |3          |24333.33333333333 |PAGO            |
|6          |3            |2023-07-01     |1          |115000.00000000000|PAGO            |
|7          |4            |2023-07-01     |1          |78500.00000000000 |PAGO            |
|8          |4            |2023-08-01     |2          |78500.00000000000 |PAGO  