In [0]:

# passo 1: PROCESSAMENTO DO ARQUIVO DE VENDAS

#através do .text é possível acessar os dados, visto que com o .csv foi impossível realizar a leitura


from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split
from pyspark.sql.functions import col, to_date, trim, when, regexp_replace, monotonically_increasing_id, split, size, lower, translate, exists, transform, count, sum, split

file_path_vendas = "/Volumes/workspace/default/ray/vendas.csv"
file_path_cotacoes = "/Volumes/workspace/default/ray/cotacoes.csv"


df_vendas_raw = spark.read.text(file_path_vendas)
display(df_vendas_raw)


value
"""id_venda,data_venda,local_venda,tipo_maquina,valor_total,revendedor_id"""
"""id\_venda,data\_venda,local\_venda,tipo\_maquina,valor\_total,revendedor\_id"""
"""1,2025-06-01,Sao Paulo,Trator,150000.00,101"""
"""2,2025-06-02, Rio de Janeiro,Colheitadeira,300000.00,102"""
"""3,2025-06-03,S. Paulo,Pulverizador,80000.00,101"""
"""4,2025-006-04,Belo Horizote,Trator,160000.00,103"""
"""5,2025-06-05,Florianopolis,,200000.00,104"""
"""6,2025-06-06,Brasilia,Colheitadeira,280000.00,105"""
"""7,2025-06-07,RIO DE JANEIRO,Pulverizador,75000.00,102"""
"""8,2025-06-08,Curitiba,Trator,145000.00,106"""


In [0]:
# Passo 2: removendo os dois cabeçalhos inúteis 

df_vendas_indexed = df_vendas_raw.withColumn("index", monotonically_increasing_id())

df_vendas_datalines = df_vendas_indexed.filter(col("index") > 1).drop("index")

In [0]:
# Passo 3: Limpeza das Strings, retirando acentos

df_vendas_cleaned_str = df_vendas_datalines.withColumn("value", regexp_replace(col("value"), '^"|"$', ''))

caracteres_acentuados = 'áàâãéèêíìîóòôõúùûçÁÀÂÃÉÈÊÍÌÎÓÒÔÕÚÙÛÇ'
caracteres_nao_acentuados = 'aaaaeeeiiioooouuucAAAAEEEIIIOOOUUUC'

df_vendas_cleaned_str = df_vendas_cleaned_str.withColumn("value", translate(col("value"), caracteres_acentuados, caracteres_nao_acentuados))


In [0]:
# Passo 4: Divisão da String em um Array
# Dividindo as strings de cada linha em arrays de string
df_vendas_split = df_vendas_cleaned_str.withColumn("data_array", split(col("value"), ","))

display(df_vendas_split)


value,data_array
"1,2025-06-01,Sao Paulo,Trator,150000.00,101","List(1, 2025-06-01, Sao Paulo, Trator, 150000.00, 101)"
"2,2025-06-02, Rio de Janeiro,Colheitadeira,300000.00,102","List(2, 2025-06-02, Rio de Janeiro, Colheitadeira, 300000.00, 102)"
"3,2025-06-03,S. Paulo,Pulverizador,80000.00,101","List(3, 2025-06-03, S. Paulo, Pulverizador, 80000.00, 101)"
"4,2025-006-04,Belo Horizote,Trator,160000.00,103","List(4, 2025-006-04, Belo Horizote, Trator, 160000.00, 103)"
"5,2025-06-05,Florianopolis,,200000.00,104","List(5, 2025-06-05, Florianopolis, , 200000.00, 104)"
"6,2025-06-06,Brasilia,Colheitadeira,280000.00,105","List(6, 2025-06-06, Brasilia, Colheitadeira, 280000.00, 105)"
"7,2025-06-07,RIO DE JANEIRO,Pulverizador,75000.00,102","List(7, 2025-06-07, RIO DE JANEIRO, Pulverizador, 75000.00, 102)"
"8,2025-06-08,Curitiba,Trator,145000.00,106","List(8, 2025-06-08, Curitiba, Trator, 145000.00, 106)"
"9,2025-06-09,Sao Paulo,Colheitadeira,310000.00,101","List(9, 2025-06-09, Sao Paulo, Colheitadeira, 310000.00, 101)"
"10,2025-06-10,Recife,Trator,130000.00,107","List(10, 2025-06-10, Recife, Trator, 130000.00, 107)"


In [0]:
# Passo 5: Filtro para retirar linhas com dados incompletos 
#retirando do data frame linhas com tamanho diferente de 6 
df_vendas_filtered = df_vendas_split.filter(size(col("data_array")) == 6)

In [0]:
# passo 6 criando um novo dataframe com as colunas organizadas e entituladas

df_vendas_text = df_vendas_filtered.select(
    col("data_array").getItem(0).alias("id_venda"),
    col("data_array").getItem(1).alias("data_venda"),
    col("data_array").getItem(2).alias("local_venda"),
    col("data_array").getItem(3).alias("tipo_maquina"),
    col("data_array").getItem(4).alias("valor_total"),
    col("data_array").getItem(5).alias("revendedor_id")
)

display(df_vendas_text)



id_venda,data_venda,local_venda,tipo_maquina,valor_total,revendedor_id
1,2025-06-01,Sao Paulo,Trator,150000.0,101
2,2025-06-02,Rio de Janeiro,Colheitadeira,300000.0,102
3,2025-06-03,S. Paulo,Pulverizador,80000.0,101
4,2025-006-04,Belo Horizote,Trator,160000.0,103
5,2025-06-05,Florianopolis,,200000.0,104
6,2025-06-06,Brasilia,Colheitadeira,280000.0,105
7,2025-06-07,RIO DE JANEIRO,Pulverizador,75000.0,102
8,2025-06-08,Curitiba,Trator,145000.0,106
9,2025-06-09,Sao Paulo,Colheitadeira,310000.0,101
10,2025-06-10,Recife,Trator,130000.0,107


In [0]:
# passo 7 padronizando o nome das cidades
df_vendas_text = df_vendas_text.withColumn("local_venda",
    when(trim(col("local_venda")) == "S. Paulo", "Sao Paulo")
    .when(trim(col("local_venda")) == "RIO DE JANEIRO", "Rio de Janeiro")
    .when(trim(col("local_venda")) == "Rio de janeiro", "Rio de Janeiro")
    .when(trim(col("local_venda")) == "Belo Horizote", "Belo Horizonte")
    .otherwise(trim(col("local_venda")))
)
#função trim retira espaços do começo e do fim do objeto

In [0]:
# Passo 8: Limpeza e Padronização das Datas (como Texto)
# Aqui, preparo a coluna de data para a conversão final
# 1. Substitui todas as barras ("/") por hífens ("-").
df_vendas_text = df_vendas_text.withColumn("data_venda_str", regexp_replace(col("data_venda"), "/", "-"))
# 2. Corrigi erros de digitação comuns no mês, como "-006-", para "-06-".
df_vendas_text = df_vendas_text.withColumn("data_venda_str", regexp_replace(col("data_venda_str"), "-00", "-0"))


In [0]:
# Passo 9: Conversão Final dos Tipos de Dados
# fazendo o cast das colunas para o tipo de dado apropriado
df_vendas = df_vendas_text.select(
    col("id_venda").cast("int"),
    to_date(col("data_venda_str"), "yyyy-MM-dd").alias("data_venda"),
    col("local_venda"), col("tipo_maquina"),
    col("valor_total").cast("double"), col("revendedor_id").cast("int")
)
display(df_vendas)

id_venda,data_venda,local_venda,tipo_maquina,valor_total,revendedor_id
1,2025-06-01,Sao Paulo,Trator,150000.0,101
2,2025-06-02,Rio de Janeiro,Colheitadeira,300000.0,102
3,2025-06-03,Sao Paulo,Pulverizador,80000.0,101
4,2025-06-04,Belo Horizonte,Trator,160000.0,103
5,2025-06-05,Florianopolis,,200000.0,104
6,2025-06-06,Brasilia,Colheitadeira,280000.0,105
7,2025-06-07,Rio de Janeiro,Pulverizador,75000.0,102
8,2025-06-08,Curitiba,Trator,145000.0,106
9,2025-06-09,Sao Paulo,Colheitadeira,310000.0,101
10,2025-06-10,Recife,Trator,130000.0,107


In [0]:

# 3. PROCESSAMENTO DO ARQUIVO DE COTAÇÕES

df_cotacoes_raw = spark.read.text(file_path_cotacoes) #lendo como um arquivo de texto
df_cotacoes_indexed = df_cotacoes_raw.withColumn("index", monotonically_increasing_id()) 
df_cotacoes_datalines = df_cotacoes_indexed.filter(col("index") > 0).drop("index") #removendo novamente a primeira linha que contem o nome das colunas
df_cotacoes_cleaned_str = df_cotacoes_datalines.withColumn("value", regexp_replace(col("value"), '^"|"$', '')) #retirando as aspas
df_cotacoes_cleaned_str = df_cotacoes_cleaned_str.withColumn("value", translate(col("value"), caracteres_acentuados, caracteres_nao_acentuados)) #retirando acentos

df_cotacoes_split = df_cotacoes_cleaned_str.withColumn("data_array", split(col("value"), ",")) #separando os dados de cada linha

df_cotacoes_filtered = df_cotacoes_split.filter(size(col("data_array")) == 7) #filtrando linhas que tem 7 elementos em cada array


#adicionando cabeçalho e nomeando as colunas
df_cotacoes_text = df_cotacoes_filtered.select(
    col("data_array").getItem(0).alias("id_cotacao"),
    col("data_array").getItem(1).alias("data_cotacao"),
    col("data_array").getItem(2).alias("local_cotacao"),
    col("data_array").getItem(3).alias("maquina_cotada"),
    col("data_array").getItem(4).alias("valor_estimado"),
    col("data_array").getItem(5).alias("revendedor_id"),
    col("data_array").getItem(6).alias("status_cotacao")
)
#padronizando o nome das cidades para facilitar o B.I
df_cotacoes_text = df_cotacoes_text.withColumn("local_cotacao",
    when(trim(col("local_cotacao")) == "sao paulo", "Sao Paulo")
    .when(trim(col("local_cotacao")) == "S. Paulo", "Sao Paulo")
    .when(trim(col("local_cotacao")) == "Rio de janeiro", "Rio de Janeiro")
    .when(trim(col("local_cotacao")) == "Belo Horizote", "Belo Horizonte")
    .when(trim(col("local_cotacao")) == "Floripa", "Florianopolis")
    .when(trim(col("local_cotacao")) == "S Paulo", "Sao Paulo")
    .when(trim(col("local_cotacao")) == "RIO", "Rio de Janeiro")
    .otherwise(trim(col("local_cotacao")))
)
# Limpeza de múltiplos erros na string de data ANTES da conversão
df_cotacoes_text = df_cotacoes_text.withColumn("data_cotacao_str", regexp_replace(col("data_cotacao"), "/", "-"))
df_cotacoes_text = df_cotacoes_text.withColumn("data_cotacao_str", regexp_replace(col("data_cotacao_str"), "^220", "20"))


#realizando o cast para cada tipo certo de dado
df_cotacoes = df_cotacoes_text.select(
    col("id_cotacao").cast("int"),
    to_date(col("data_cotacao_str"), "yyyy-MM-dd").alias("data_cotacao"),
    col("local_cotacao"), col("maquina_cotada"),
    col("valor_estimado").cast("double"), col("revendedor_id").cast("int"),
    col("status_cotacao")
)

display(df_cotacoes)

id_cotacao,data_cotacao,local_cotacao,maquina_cotada,valor_estimado,revendedor_id,status_cotacao
1001,2025-06-01,Sao Paulo,Trator Modelo A,155000.0,101,Em negociacao
1002,2025-06-01,Rio de Janeiro,Colheitadeira Modelo B,300000.0,102,Proposta enviada
1003,2025-06-02,Sao Paulo,Pulverizador Modelo C,82000.0,101,Em negociacao
1004,2025-06-03,Belo Horizonte,Trator Modelo D,165000.0,103,Proposta enviada
1005,2025-06-03,Florianopolis,Plantadeira Modelo E,210000.0,104,Em negociacao
1006,2025-06-04,Brasilia,Colheitadeira Modelo F,290000.0,105,Proposta enviada
1007,2025-06-04,Rio de Janeiro,Pulverizador Modelo G,78000.0,102,Em negociacao
1008,2025-06-05,Curitiba,Trator Modelo H,150000.0,106,Proposta enviada
1009,2025-06-05,Sao Paulo,Colheitadeira Modelo I,320000.0,101,Em negociacao
1010,2025-06-06,Recife,Trator Modelo J,130000.0,107,Proposta enviada


In [0]:


#unificando as tabelas

# Passo 1: Criar a coluna padronizada 'tipo_maquina' em cotações
df_cotacoes_com_tipo = df_cotacoes.withColumn("tipo_maquina", split(col("maquina_cotada"), " ").getItem(0))

# Passo 2: Renomeando a coluna 'local_cotacao' para 'local_venda' no dataframe de cotações para que o join funcione corretamente
df_cotacoes_renomeado = df_cotacoes_com_tipo.withColumnRenamed("local_cotacao", "local_venda")

# Passo 3: Definir as chaves da união como uma lista de strings
chaves_join = ["revendedor_id", "local_venda", "tipo_maquina"]

# Passo 4: Executar o join usando a sintaxe simplificada 'on='
# Ao usar 'on=', o Spark automaticamente unifica as colunas-chave.
df_unificado_final = df_vendas.join(df_cotacoes_renomeado, on=chaves_join, how="left")

# Passo 5: Exibir o resultado final
print("✅ DataFrame Unificado com Colunas-Chave Unificadas:")
display(df_unificado_final)

✅ DataFrame Unificado com Colunas-Chave Unificadas:


revendedor_id,local_venda,tipo_maquina,id_venda,data_venda,valor_total,id_cotacao,data_cotacao,maquina_cotada,valor_estimado,status_cotacao
109,Salvador,Pulverizador,47,2025-07-17,88000.0,1020.0,2025-06-16,Pulverizador Modelo T,85000.0,Proposta enviada
109,Salvador,Pulverizador,47,2025-07-17,88000.0,1029.0,2025-06-25,Pulverizador Modelo CC,81000.0,Em negociacao
109,Salvador,Pulverizador,47,2025-07-17,88000.0,1038.0,2025-07-04,Pulverizador Modelo LL,86000.0,Proposta enviada
107,Recife,Colheitadeira,45,2025-07-15,301000.0,1018.0,2025-06-14,Colheitadeira Modelo R,295000.0,Proposta enviada
107,Recife,Colheitadeira,45,2025-07-15,301000.0,1027.0,2025-06-23,Colheitadeira Modelo AA,288000.0,Em negociacao
107,Recife,Colheitadeira,45,2025-07-15,301000.0,1036.0,2025-07-02,Colheitadeira Modelo JJ,298000.0,Proposta enviada
109,Salvador,Colheitadeira,12,2025-06-12,290000.0,1012.0,2025-06-08,Colheitadeira Modelo L,290000.0,Proposta enviada
108,Porto Alegre,Trator,46,2025-07-16,153000.0,1019.0,2025-06-15,Trator Modelo S,148000.0,Em negociacao
108,Porto Alegre,Trator,46,2025-07-16,153000.0,1028.0,2025-06-24,Trator Modelo BB,149000.0,Proposta enviada
108,Porto Alegre,Trator,46,2025-07-16,153000.0,1037.0,2025-07-03,Trator Modelo KK,151000.0,Em negociacao


In [0]:


# ANÁLISE: TOTAL DE VENDAS POR LOCAL 


total_vendas_por_local = df_vendas.groupBy("local_venda") \
    .agg(
        # Soma todos os valores da coluna 'valor_total'
        sum("valor_total").alias("valor_total_vendas"),
        
        # Simplesmente conta todas as linhas em cada grupo (local)
        count("*").alias("numero_de_vendas")
    ) \
    .orderBy(col("valor_total_vendas").desc())

# Exibe o resultado final
display(total_vendas_por_local)

local_venda,valor_total_vendas,numero_de_vendas
Sao Paulo,1945000.0,8
Florianopolis,1379000.0,5
Recife,1312000.0,5
Brasilia,1117000.0,6
Rio de Janeiro,1087000.0,7
Belo Horizonte,766000.0,6
Porto Alegre,691000.0,5
Salvador,630000.0,5
Curitiba,455000.0,5


In [0]:


# ANÁLISE: NÚMERO TOTAL DE ITENS COTADOS POR LOCAL

itens_cotados_por_local = df_cotacoes.groupBy("local_cotacao") \
    .agg(
        count("*").alias("total_de_itens_cotados")
    ) \
    .orderBy(col("total_de_itens_cotados").desc()) # Ordena pelo local com mais itens cotados

# Exibe o resultado final
display(itens_cotados_por_local)

local_cotacao,total_de_itens_cotados
Sao Paulo,10
Rio de Janeiro,8
Belo Horizonte,7
Brasilia,6
Curitiba,6
Florianopolis,5
Recife,5
Porto Alegre,4
Salvador,4
Campinas,1


In [0]:
from pyspark.sql.functions import col, round

# MÉTRICA: TAXA DE CONVERSÃO POR LOCAL

# 1.contando o número de vendas por local
num_vendas_por_local = df_vendas.groupBy("local_venda") \
    .count() \
    .withColumnRenamed("count", "numero_de_vendas")

# 2. contando o numero de cotações por local
num_cotacoes_por_local = df_cotacoes.groupBy("local_cotacao") \
    .count() \
    .withColumnRenamed("count", "numero_de_itens_cotados")

# 3. Unir as duas tabelas de contagem
#realizando o join das duas tabelas através do local, renomeando a coluna local cotacao para local venda
conversao_por_local = num_vendas_por_local.join(
    num_cotacoes_por_local.withColumnRenamed("local_cotacao", "local_venda"),
    on="local_venda",
    how="inner"
)

# 4. Calcular a taxa de conversão e ordenar
# Criamos a nova coluna 'taxa_conversao' dividindo as vendas pelos itens cotados
# e arredondando para 2 casas decimais.
conversao_por_local = conversao_por_local.withColumn(
    "taxa_conversao",
    round((col("numero_de_vendas") / col("numero_de_itens_cotados")), 2)
).orderBy("taxa_conversao", ascending=False)

#display(num_cotacoes_por_local)

display(conversao_por_local)

local_venda,numero_de_vendas,numero_de_itens_cotados,taxa_conversao
Porto Alegre,5,4,1.25
Salvador,5,4,1.25
Recife,5,5,1.0
Brasilia,6,6,1.0
Florianopolis,5,5,1.0
Rio de Janeiro,7,8,0.88
Belo Horizonte,6,7,0.86
Curitiba,5,6,0.83
Sao Paulo,8,10,0.8


In [0]:


#ARMAZENAMENTO OTIMIZADO DE TODAS AS TABELAS

print("Iniciando o processo de armazenamento otimizado...")

# --- ARMAZENANDO A TABELA DE VENDAS LIMPA ---
try:
    print("Salvando a tabela 'df_vendas' como Delta Table...")
    df_vendas.write.format("delta").mode("overwrite").saveAsTable("vendas_limpo")
    print(" -> Tabela 'vendas_limpo' salva com sucesso!")
except Exception as e:
    print(f" -> Erro ao salvar 'vendas_limpo': {e}")


# --- ARMAZENANDO A TABELA DE COTAÇÕES LIMPA ---
try:
    print("\nSalvando a tabela 'df_cotacoes' como Delta Table...")
    df_cotacoes.write.format("delta").mode("overwrite").saveAsTable("cotacoes_limpo")
    print(" -> Tabela 'cotacoes_limpo' salva com sucesso!")
except Exception as e:
    print(f" -> Erro ao salvar 'cotacoes_limpo': {e}")


# --- ARMAZENANDO A TABELA FINAL UNIFICADA E CONSOLIDADA ---
try:
    print("\nSalvando a tabela 'df_unificado_final' como Delta Table...")
    df_unificado_final.write.format("delta").mode("overwrite").saveAsTable("vendas_e_cotacoes_unificado")
    print(" -> Tabela 'vendas_e_cotacoes_unificado' salva com sucesso!")
except Exception as e:
    print(f" -> Erro ao salvar 'vendas_e_cotacoes_unificado': {e}")

print("\nProcesso de armazenamento finalizado.")

Iniciando o processo de armazenamento otimizado...
Salvando a tabela 'df_vendas' como Delta Table...
 -> Tabela 'vendas_limpo' salva com sucesso!

Salvando a tabela 'df_cotacoes' como Delta Table...
 -> Tabela 'cotacoes_limpo' salva com sucesso!

Salvando a tabela 'df_unificado_final' como Delta Table...
 -> Tabela 'vendas_e_cotacoes_unificado' salva com sucesso!

Processo de armazenamento finalizado.
