In [15]:
# LEITURA DOS ARQUIVOS PARQUET NA PASTA NOVOS

# Lê o arquivo Parquet contendo os dados da aba "Receita" e cria um DataFrame Spark
df_receita = spark.read.parquet("Files/Abas/Receita.parquet")

# CRIAR UMA VISÃO TEMPORÁRIA PARA USAR SQL

# Cria ou substitui uma *temporary view* chamada "df_receita"
# Isso permite executar consultas SQL diretamente sobre esse DataFrame
df_receita.createOrReplaceTempView("df_receita")

# A consulta SQL abaixo transforma os dados da receita
df_receita_alucar = spark.sql("""
    SELECT
       Nome_Alucar as NOME,                             -- Renomeia a coluna "Nome_Alucar" para "NOME"
       DATE_FORMAT(Data, 'dd/MM/yyyy') AS DATA,         -- Formata a data para o formato brasileiro (ex: 08/07/2025)
       MONTH(Data) AS MES,                              -- Extrai o número do mês da coluna "Data"
       year(Data) AS ANO,                               -- Extrai o ano da coluna "Data"
       CAST(Valor_Receita AS FLOAT) AS VALOR_RECEITA    -- Converte a receita para tipo FLOAT
    FROM
        df_receita                                      -- Fonte: visão temporária criada acima
    WHERE MONTH(Data) IS NOT NULL                       -- Filtra apenas linhas com valor de mês válido
    ORDER BY MONTH(Data), Data  ASC                     -- Ordena por mês e depois por data
""")


# Salva o resultado da consulta como uma tabela permanente chamada "receita_alucar"
# O modo "overwrite" substitui a tabela se ela já existir
df_receita_alucar.write.mode("overwrite").saveAsTable("receita_alucar")


StatementMeta(, 971db938-9b8c-4ea3-ac3c-3850dbbc882a, 17, Finished, Available, Finished)

In [16]:
# Importações necessárias do PySpark
from pyspark.sql.functions import col, lag, trim, monotonically_increasing_id, when, lit, collect_list, udf
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType, StringType

# Lê o arquivo Parquet da aba "Receita"
df_receita = spark.read.parquet("Files/Abas/Receita.parquet")

# Cria uma visão temporária para usar SQL
df_receita.createOrReplaceTempView("df_receita")

# Consulta os dados relevantes para o ConsigCar (nome do mês, ano e valor)
df_ConsigCar = spark.sql("""
    SELECT Faturamento_ConsigCar, Ano,Valor
    FROM df_receita
""")

# Cria uma coluna com ID artificial (para manter a ordem original, já que Parquet não garante isso)
df_raw_com_id = df_ConsigCar.withColumn("row_id", monotonically_increasing_id())

# Define uma janela de ordenação baseada no ID artificial
window_spec = Window.orderBy("row_id")

# Usa a função `lag` para pegar o valor da linha anterior (supostamente o nome do mês da linha anterior à "Pagseguro")
df_with_mes = df_raw_com_id.withColumn("MES", lag("Faturamento_ConsigCar").over(window_spec))

# Converte os nomes dos meses em números
df_with_mes = df_with_mes.withColumn(
    "MES_NUMERO",
    when(col("MES") == "JANEIRO", 1)
    .when(col("MES") == "FEVEREIRO", 2)
    .when(col("MES") == "MARÇO", 3)
    .when(col("MES") == "ABRIL", 4)
    .when(col("MES") == "MAIO", 5)
    .when(col("MES") == "JUNHO", 6)
    .when(col("MES") == "JULHO", 7)
    .when(col("MES") == "AGOSTO", 8)
    .when(col("MES") == "SETEMBRO", 9)
    .when(col("MES") == "OUTUBRO", 10)
    .when(col("MES") == "NOVEMBRO", 11)
    .when(col("MES") == "DEZEMBRO", 12)
    .otherwise(None)
)

# Filtra apenas as linhas que correspondem ao item "Pagseguro" (os valores monetários) e transforma o ano em inteiro para facilitar operações posteriores
df_pagseguro = df_with_mes.filter(trim(col("Faturamento_ConsigCar")) == "Pagseguro") \
    .withColumn("Ano_int", col("Ano").cast("int"))

# Coleta os meses que já têm ano 2025 para evitar repetição incorreta de ano
meses_2025 = (
    df_pagseguro.filter(col("Ano") == 2025)
    .select("MES_NUMERO")
    .distinct()
    .rdd.flatMap(lambda x: x)
    .collect()
)

# Compartilha a lista de meses de 2025 com os nós do cluster (broadcast variable)
meses_2025_broadcast = spark.sparkContext.broadcast(set(meses_2025))

# Define uma função UDF para corrigir o ano com base no mês:
#    - Se o ano já estiver preenchido, mantém
#    - Se o mês já apareceu em 2025, assume que pertence a 2026
#    - Caso contrário, assume que é de 2025
def corrigir_ano(mes, ano):
    if ano is not None:
        return int(ano)
    if mes in meses_2025_broadcast.value:
        return 2026
    return 2025


# Registra a função UDF para uso no DataFrame
corrigir_ano_udf = udf(corrigir_ano, IntegerType())

# Aplica a UDF no DataFrame para gerar a nova coluna "Ano_corrigido"
df_final = df_pagseguro.withColumn("Ano_corrigido", corrigir_ano_udf(col("MES_NUMERO"), col("Ano_int")))

# Seleciona e renomeia as colunas finais formatadas
df_consigcar = df_final.filter(trim(col("Faturamento_ConsigCar")) == "Pagseguro") \
    .selectExpr(
        "'Faturamento_ConsigCar' as NOME",   # Nome fixo
        "Ano_corrigido as Ano",              # Ano corrigido
        "MES_NUMERO as MES",                 # Número do mês
        "CAST(Valor AS FLOAT) AS Valor"      # Valor convertido para float
    )

# Salva o resultado final como uma tabela chamada "receita_consigcar"
df_consigcar.write.mode("overwrite").saveAsTable("receita_consigcar")

StatementMeta(, 971db938-9b8c-4ea3-ac3c-3850dbbc882a, 18, Finished, Available, Finished)

In [17]:
# Importações necessárias do panda
import pandas as pd

# LEITURA DO ARQUIVO PARQUET DA ABA "Despesas"
df_despesas = spark.read.parquet("Files/Abas/Despesas.parquet")

# CONVERSÃO DO DATAFRAME SPARK PARA PANDAS
df = df_despesas.toPandas()

StatementMeta(, 971db938-9b8c-4ea3-ac3c-3850dbbc882a, 19, Finished, Available, Finished)

In [18]:
# Filtra as linhas da coluna '1' que contenham as palavras 'ALUCAR' ou 'CONSIGCAR' (case-insensitive)
filtro = df['1'].str.contains('ALUCAR|CONSIGCAR', case=False, na=False)

# Aplica o filtro no DataFrame para obter somente as linhas que satisfazem o critério
resultado = df[filtro]

# Seleciona apenas as colunas '1' e 'linha_ordem' para análise
df_filtrado = resultado[['1', 'linha_ordem']]

# Filtra ainda mais para obter só as linhas que contém 'CONSIGCAR' na coluna '1'
df_filtrado_consigcar = df_filtrado['1'].str.contains('CONSIGCAR', case=False, na=False)
reslultato_consigcar = df_filtrado[df_filtrado_consigcar]

# Pega somente a coluna 'linha_ordem' das linhas filtradas com 'CONSIGCAR'
x = reslultato_consigcar[['linha_ordem']]

# Extrai o valor da linha_ordem da primeira ocorrência para usar como ponto de corte
valor_linha = int(x['linha_ordem'].values[0])

# Divide o DataFrame original em duas partes:
# Da linha 0 até valor_linha - 1 (inclusive)
df_1 = df.iloc[0:valor_linha]  # ou .iloc[0:25] se quiser da primeira até a linha 24

# Da linha valor_linha até o fim
df_2 = df.iloc[valor_linha:]

StatementMeta(, 971db938-9b8c-4ea3-ac3c-3850dbbc882a, 20, Finished, Available, Finished)

In [19]:
# Converte o DataFrame Pandas df_1 para um DataFrame Spark
df_1_spark  = spark.createDataFrame(df_1)

# Cria ou substitui uma visão temporária chamada "df_despesas_alucar"
df_1_spark.createOrReplaceTempView("df_despesas_alucar")

# Executa uma consulta SQL para selecionar e renomear as colunas da view criada
df_despesas_alucar = spark.sql("""
    SELECT 
            `0` AS NOME,
            `1` AS JANEIRO,
            `2` AS FEVEIRO,
            `3` AS `MARÇO`,
            `4` AS ABRIL,
            `5` AS MAIO,
            `6` AS JUNHO,
            `7` AS JULHO,            
            `8` AS AGOSTO,
            `9` AS SETEMBRO,
            `10` AS OUTUBRO,
            `11` AS NOVEMBRO,
            `12` AS DEZEMBRO
    FROM df_despesas_alucar
    WHERE  `0` NOT IN ('nan', 'DESPESAS', 'TOTAL')
""")

# Salva o DataFrame Spark como tabela permanente chamada "despesas_alucar"
df_despesas_alucar.write.mode("overwrite").saveAsTable("despesas_alucar")

# Repete o processo para o segundo DataFrame df_2 (que contém dados referentes a ConsigCar)
df_2 = spark.createDataFrame(df_2)

# Cria visão temporária para df_2
df_2.createOrReplaceTempView("df_despesas_consigcar")

# Consulta SQL para selecionar e renomear colunas de df_2
df_despesas_consigcar = spark.sql("""
    SELECT 
            `0` AS NOME,
            `1` AS JANEIRO,
            `2` AS FEVEIRO,
            `3` AS `MARÇO`,
            `4` AS ABRIL,
            `5` AS MAIO,
            `6` AS JUNHO,
            `7` AS JULHO,            
            `8` AS AGOSTO,
            `9` AS SETEMBRO,
            `10` AS OUTUBRO,
            `11` AS NOVEMBRO,
            `12` AS DEZEMBRO
    FROM df_despesas_consigcar
    WHERE  `0` NOT IN ('nan', 'DESPESAS', 'TOTAL')
""")

# Salva o DataFrame Spark como tabela permanente chamada "despesas_consigcar"
df_despesas_consigcar.write.mode("overwrite").saveAsTable("despesas_consigcar")

StatementMeta(, 971db938-9b8c-4ea3-ac3c-3850dbbc882a, 21, Finished, Available, Finished)