**Desafio 1.1**

Com base nos dados disponibilizados no arquivo mencionado acima, traga como resultado o **valor de venda mensal agrupado por mês e por tipo de serviço** e **valor de venda acumulado por serviço até o mês atual**. 

Salve um arquivo csv de nome **‘desafio1.1.csv’** com o resultado final.



In [1]:
# ===== Imports principais =====
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import regexp_replace, format_number, col


Explicação dos imports

from pyspark.sql import SparkSession, DataFrame

SparkSession: Ponto de entrada principal para usar o PySpark. Usado para criar ou recuperar uma sessão Spark com SparkSession.builder.getOrCreate().

DataFrame: Usado para declarar o tipo de retorno de funções que retornam DataFrames (ex: -> DataFrame).

from pyspark.sql import functions as F

Importa todas as funções do módulo functions com o alias F, permitindo acessar funções como F.col(), F.sum(), F.date_format(), F.round() etc.

Essa abordagem ajuda a deixar o código mais conciso e organizado.

from pyspark.sql.window import Window

Usado para criar especificações de janela para funções do tipo over(), que permitem realizar agregações acumuladas ou por grupos com ordenação (como soma acumulada por mês).

Exemplo de uso: Window.partitionBy("tipo_servico").orderBy("mes").

from pyspark.sql.functions import regexp_replace, format_number, col

regexp_replace: Permite substituir partes de strings usando expressões regulares. Usado para formatar números no padrão brasileiro.

format_number: Formata números com separadores de milhar e casas decimais (padrão americano).

col: Refere-se a colunas de um DataFrame. Necessário para expressões como col("valor_total").

In [2]:
def read_csv(file_path: str) -> DataFrame:
    """
    Função responsável por ler arquivo csv
    """
    df = (spark.read
      .option("header", True)
      .option("inferSchema", True)
      .csv(file_path))
    return df

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df  = read_csv("/home/jovyan/work/dados/tab_venda_servicos.csv")

df.show(5, truncate=False)


+----------+--------+------------+-------+-----------+
|data_venda|cliente |filial_venda|servico|valor_total|
+----------+--------+------------+-------+-----------+
|2022-10-22|6309189 |256         |5      |232.27     |
|2023-01-30|90585447|256         |5      |425.13     |
|2022-12-09|10733220|256         |5      |122.67     |
|2023-05-26|6338901 |256         |5      |166.68     |
|2023-06-30|6278693 |256         |5      |509.95     |
+----------+--------+------------+-------+-----------+
only showing top 5 rows



In [4]:
#verificar se o spark catalogou certo mos tipos de colunas 
df.printSchema()


root
 |-- data_venda: date (nullable = true)
 |-- cliente: integer (nullable = true)
 |-- filial_venda: integer (nullable = true)
 |-- servico: integer (nullable = true)
 |-- valor_total: double (nullable = true)



In [5]:
def classificar_servico(df: DataFrame) -> DataFrame:
    """
    Adiciona a coluna 'tipo_servico' ao DataFrame conforme as regras:
    - servico ∈ (5,6,7,8)  → 'garantia estendida'
    - servico ∈ (19,20)    → 'seguro roubo e furto'
    - outros valores       → 'outros'
    """
    df = df.withColumn(
        "tipo_servico",
        F.when(F.col("servico").isin(5,6,7,8), "garantia estendida")
        .when(F.col("servico").isin(19,20), "seguro roubo e furto")
        .otherwise("outros")
    )
    return df


In [6]:
df_completo = classificar_servico(df)
df_completo.select( "tipo_servico").distinct().show()

+--------------------+
|        tipo_servico|
+--------------------+
|  garantia estendida|
|seguro roubo e furto|
+--------------------+



In [7]:
df_completo.show()

+----------+--------+------------+-------+-----------+------------------+
|data_venda| cliente|filial_venda|servico|valor_total|      tipo_servico|
+----------+--------+------------+-------+-----------+------------------+
|2022-10-22| 6309189|         256|      5|     232.27|garantia estendida|
|2023-01-30|90585447|         256|      5|     425.13|garantia estendida|
|2022-12-09|10733220|         256|      5|     122.67|garantia estendida|
|2023-05-26| 6338901|         256|      5|     166.68|garantia estendida|
|2023-06-30| 6278693|         256|      5|     509.95|garantia estendida|
|2023-04-10|89179573|         256|      5|     231.33|garantia estendida|
|2023-06-29| 6310720|         256|      5|     142.36|garantia estendida|
|2023-06-27|34984970|         256|      5|     396.53|garantia estendida|
|2022-12-21|95746466|         256|      5|      233.2|garantia estendida|
|2023-04-05|10041227|         256|      5|     854.69|garantia estendida|
|2023-04-22| 5810997|         256|    

### **Agregando os dados mensalmente por tipo de serviço**

- **Calculando a soma das vendas no mês**  
- **Calculando o acumulado até o mês atual**



In [8]:
# 1) cria coluna "mes" a partir de data_venda (formato yyyy-MM)
df_mes = df_completo.withColumn("mes", F.date_format(F.date_trunc("month", F.col("data_venda")), "yyyy-MM"))

# 2) agrega por mes + tipo_servico (valor mensal de vendas)
df_agg = (df_mes
    .groupBy("mes", "tipo_servico")
    .agg(F.sum("valor_total").alias("valor_venda_mensal"))
)

# 3) calcula valor acumulado por tipo_servico ao longo do tempo
w = Window.partitionBy("tipo_servico").orderBy("mes").rowsBetween(Window.unboundedPreceding, Window.currentRow)

df_final = (df_agg
    .withColumn("valor_venda_acumulado", F.sum("valor_venda_mensal").over(w))
    .select(
        "mes",
        "tipo_servico",
        F.round(F.col("valor_venda_mensal"), 2).alias("valor_venda_mensal"),
        F.round(F.col("valor_venda_acumulado"), 2).alias("valor_venda_acumulado")
    )
    .orderBy("mes", "tipo_servico")
)

# 4) visualizar resultado
df_final.show(20, truncate=False)

+-------+--------------------+------------------+---------------------+
|mes    |tipo_servico        |valor_venda_mensal|valor_venda_acumulado|
+-------+--------------------+------------------+---------------------+
|2022-07|garantia estendida  |5.14809071E7      |5.14809071E7         |
|2022-07|seguro roubo e furto|9310526.16        |9310526.16           |
|2022-08|garantia estendida  |4.271105788E7     |9.419196498E7        |
|2022-08|seguro roubo e furto|8363101.16        |1.767362732E7        |
|2022-09|garantia estendida  |4.080065183E7     |1.3499261681E8       |
|2022-09|seguro roubo e furto|7494609.59        |2.516823691E7        |
|2022-10|garantia estendida  |4.575607256E7     |1.8074868937E8       |
|2022-10|seguro roubo e furto|9533270.17        |3.470150708E7        |
|2022-11|garantia estendida  |6.735803518E7     |2.4810672455E8       |
|2022-11|seguro roubo e furto|1.140347351E7     |4.610498059E7        |
|2022-12|garantia estendida  |4.991802019E7     |2.9802474474E8 

### **Convertendo os valores de vendas para formato contábil**

- **Transformando números em notação científica para formato numérico legível (ex: `5.14809071E7` → `51.480.907,10`)**
- **Aplicando separador de milhar e arredondamento para duas casas decimais**
- **Formatando os valores no padrão brasileiro com a função `format_number()`**


In [10]:
df_formatado = df_final.select(
    "mes",
    "tipo_servico",
    F.format_number("valor_venda_mensal", 2).alias("valor_venda_mensal"),
    F.format_number("valor_venda_acumulado", 2).alias("valor_venda_acumulado")
)

df_formatado.show(20, truncate=False)


+-------+--------------------+------------------+---------------------+
|mes    |tipo_servico        |valor_venda_mensal|valor_venda_acumulado|
+-------+--------------------+------------------+---------------------+
|2022-07|garantia estendida  |51,480,907.10     |51,480,907.10        |
|2022-07|seguro roubo e furto|9,310,526.16      |9,310,526.16         |
|2022-08|garantia estendida  |42,711,057.88     |94,191,964.98        |
|2022-08|seguro roubo e furto|8,363,101.16      |17,673,627.32        |
|2022-09|garantia estendida  |40,800,651.83     |134,992,616.81       |
|2022-09|seguro roubo e furto|7,494,609.59      |25,168,236.91        |
|2022-10|garantia estendida  |45,756,072.56     |180,748,689.37       |
|2022-10|seguro roubo e furto|9,533,270.17      |34,701,507.08        |
|2022-11|garantia estendida  |67,358,035.18     |248,106,724.55       |
|2022-11|seguro roubo e furto|11,403,473.51     |46,104,980.59        |
|2022-12|garantia estendida  |49,918,020.19     |298,024,744.74 

### **Aplicando formatação brasileira com função personalizada**

- Criada uma função `formatar_brasileiro()` para transformar valores numéricos no formato contábil brasileiro (com vírgula decimal e ponto como separador de milhar).
- A função utiliza `format_number()` para arredondar, seguido de `regexp_replace()` para ajustar a pontuação.
- Foram adicionadas duas versões de cada valor: uma com formatação padrão e outra com formatação brasileira (`*_br`).


In [12]:


# Função para formatar em padrão brasileiro
def formatar_brasileiro(coluna):
    return regexp_replace(
        regexp_replace(
            regexp_replace(format_number(col(coluna), 2), ",", "X"),
            "\\.", ","
        ),
        "X", "."
    ).alias(coluna + "_br")

# Adiciona ambas as versões
df_formatado = df_final.select(
    "mes",
    "tipo_servico",
    F.format_number("valor_venda_mensal", 2).alias("valor_venda_mensal"),
    formatar_brasileiro("valor_venda_mensal"),
    F.format_number("valor_venda_acumulado", 2).alias("valor_venda_acumulado"),
    formatar_brasileiro("valor_venda_acumulado")
)

df_formatado.show(20, truncate=False)


+-------+--------------------+------------------+---------------------+---------------------+------------------------+
|mes    |tipo_servico        |valor_venda_mensal|valor_venda_mensal_br|valor_venda_acumulado|valor_venda_acumulado_br|
+-------+--------------------+------------------+---------------------+---------------------+------------------------+
|2022-07|garantia estendida  |51,480,907.10     |51.480.907,10        |51,480,907.10        |51.480.907,10           |
|2022-07|seguro roubo e furto|9,310,526.16      |9.310.526,16         |9,310,526.16         |9.310.526,16            |
|2022-08|garantia estendida  |42,711,057.88     |42.711.057,88        |94,191,964.98        |94.191.964,98           |
|2022-08|seguro roubo e furto|8,363,101.16      |8.363.101,16         |17,673,627.32        |17.673.627,32           |
|2022-09|garantia estendida  |40,800,651.83     |40.800.651,83        |134,992,616.81       |134.992.616,81          |
|2022-09|seguro roubo e furto|7,494,609.59      

### **Desafio 1.2**

**João** é um colaborador do **Magalu** que analisa a venda de serviços. Sempre que um mês é finalizado, João calcula o **percentual de crescimento do mês que acabou de finalizar em relação ao mês anterior a este.**

O líder do João enviou uma solicitação pra ele:

> **"João, preciso enviar para cada filial os 5 meses que tiveram o maior % de crescimento em relação ao mês anterior, você pode gerar uma base que tenha essa informação?"**

Salve um arquivo CSV de nome **‘desafio1.2.csv’** com o resultado final.  
**Utilize PySpark na resolução.**
olução.


In [13]:
df.show()

+----------+--------+------------+-------+-----------+
|data_venda| cliente|filial_venda|servico|valor_total|
+----------+--------+------------+-------+-----------+
|2022-10-22| 6309189|         256|      5|     232.27|
|2023-01-30|90585447|         256|      5|     425.13|
|2022-12-09|10733220|         256|      5|     122.67|
|2023-05-26| 6338901|         256|      5|     166.68|
|2023-06-30| 6278693|         256|      5|     509.95|
|2023-04-10|89179573|         256|      5|     231.33|
|2023-06-29| 6310720|         256|      5|     142.36|
|2023-06-27|34984970|         256|      5|     396.53|
|2022-12-21|95746466|         256|      5|      233.2|
|2023-04-05|10041227|         256|      5|     854.69|
|2023-04-22| 5810997|         256|      5|     600.61|
|2023-04-27| 6298569|         256|      5|     140.43|
|2022-09-06| 6252250|         256|      5|     182.42|
|2022-11-12|40697402|         256|      5|      399.9|
|2023-05-13|13256729|         256|      5|      289.0|
|2022-08-1

In [14]:
# quantidade de filiais 
df.select("filial_venda").distinct().count()


1045

1. **Preparar os dados**

**Extrair o ano e mês da data_venda**

**Agregar valor_total por filial_venda e ano_mes**


In [15]:
# 1. Criar coluna de ano e mês
df_preparado = df.withColumn("ano_mes", F.date_format("data_venda", "yyyy-MM"))

# 2. Agrupar por filial e ano_mes, somar e arredondar o valor
df_mensal = df_preparado.groupBy("filial_venda", "ano_mes").agg(
    F.round(F.sum("valor_total"), 2).alias("venda_mensal")
)

# 3. Aplicar format_number para formatar com 2 casas decimais e separador brasileiro
df_mensal_formatado = df_mensal.withColumn(
    "venda_mensal_br", formatar_brasileiro("venda_mensal")
)


In [16]:
df_mensal_formatado.show()

+------------+-------+------------+---------------+
|filial_venda|ano_mes|venda_mensal|venda_mensal_br|
+------------+-------+------------+---------------+
|        1028|2023-05|    30512.35|      30.512,35|
|         261|2023-05|    49091.46|      49.091,46|
|         264|2022-11|    79988.05|      79.988,05|
|        1038|2023-02|     6588.17|       6.588,17|
|         272|2022-07|   120649.92|     120.649,92|
|         792|2023-03|    33067.14|      33.067,14|
|         256|2023-02|    19602.41|      19.602,41|
|        1029|2023-02|    22581.16|      22.581,16|
|         521|2022-08|    47870.47|      47.870,47|
|          16|2022-12|    41877.23|      41.877,23|
|          17|2023-04|   113128.44|     113.128,44|
|          19|2022-11|   154857.19|     154.857,19|
|         531|2023-01|    30627.91|      30.627,91|
|          20|2022-08|    59789.57|      59.789,57|
|          24|2022-08|   159856.66|     159.856,66|
|         513|2022-09|    23988.48|      23.988,48|
|          1

**Calcular crescimento percentual por filial**

**Objetivo:**

**Usar** `lag()` **para obter a** `venda_mensal` **do mês anterior (por filial)**

**Calcular o** `crescimento_percentual`

**Já arredondar com** `round()` **e formatar com** `formatar_brasileiro()` **também**


In [19]:

# 1. Criar janela para calcular mês anterior por filial
janela = Window.partitionBy("filial_venda").orderBy("ano_mes")

# 2. Calcular a venda do mês anterior
df_crescimento = df_mensal.withColumn(
    "venda_anterior", F.lag("venda_mensal").over(janela)
)

# 3. Calcular o crescimento percentual
df_crescimento = df_crescimento.withColumn(
    "crescimento_percentual",
    F.when(
        F.col("venda_anterior").isNotNull(),
        ((F.col("venda_mensal") - F.col("venda_anterior")) / F.col("venda_anterior")) * 100
    )
)

# 4. Arredondar o crescimento percentual para 2 casas
df_crescimento = df_crescimento.withColumn(
    "crescimento_percentual", F.round("crescimento_percentual", 2)
)

# 5. Adicionar coluna formatada no padrão brasileiro
df_crescimento = df_crescimento.withColumn(
    "crescimento_percentual_br", formatar_brasileiro("crescimento_percentual")
)


In [20]:
df_crescimento.select(
    "filial_venda", "ano_mes", "venda_mensal", "venda_anterior", 
    "crescimento_percentual", "crescimento_percentual_br"
).orderBy("filial_venda", "ano_mes").show(10, truncate=False)


+------------+-------+------------+--------------+----------------------+-------------------------+
|filial_venda|ano_mes|venda_mensal|venda_anterior|crescimento_percentual|crescimento_percentual_br|
+------------+-------+------------+--------------+----------------------+-------------------------+
|6           |2022-07|335887.17   |NULL          |NULL                  |NULL                     |
|6           |2022-08|257621.15   |335887.17     |-23.3                 |-23,30                   |
|6           |2022-09|264037.04   |257621.15     |2.49                  |2,49                     |
|6           |2022-10|318597.02   |264037.04     |20.66                 |20,66                    |
|6           |2022-11|419139.73   |318597.02     |31.56                 |31,56                    |
|6           |2022-12|434635.57   |419139.73     |3.7                   |3,70                     |
|6           |2023-01|365816.71   |434635.57     |-15.83                |-15,83                   |


**Etapa 3 – Top 5 crescimentos por filial**

**Objetivo:**

**Para cada** `filial_venda`, **pegar os 5 meses com maior crescimento percentual**

**Ignorar casos onde o crescimento é** `null` **(primeiro mês de cada filial)**


In [22]:

# Janela para rankear o crescimento percentual por filial
janela_rank = Window.partitionBy("filial_venda").orderBy(F.col("crescimento_percentual").desc())

# Adicionar coluna com o ranking
df_rankeado = df_crescimento.withColumn(
    "rank_crescimento",
    F.row_number().over(janela_rank)
)

# Filtrar os top 5 por filial
df_top5 = df_rankeado.filter(F.col("rank_crescimento") <= 5)


In [23]:
df_top5.show()

+------------+-------+------------+--------------+----------------------+-------------------------+----------------+
|filial_venda|ano_mes|venda_mensal|venda_anterior|crescimento_percentual|crescimento_percentual_br|rank_crescimento|
+------------+-------+------------+--------------+----------------------+-------------------------+----------------+
|           6|2022-11|   419139.73|     318597.02|                 31.56|                    31,56|               1|
|           6|2022-10|   318597.02|     264037.04|                 20.66|                    20,66|               2|
|           6|2023-03|   316776.62|     279941.49|                 13.16|                    13,16|               3|
|           6|2023-05|    343490.4|      305695.8|                 12.36|                    12,36|               4|
|           6|2022-12|   434635.57|     419139.73|                   3.7|                     3,70|               5|
|           7|2023-03|    30002.26|      16588.38|              

### 💾 Gerando os arquivos finais (.csv)

**Objetivo:**  
Exportar os resultados processados para arquivos `.csv`, organizando-os na estrutura de diretórios desejada para entrega.

- Os arquivos serão salvos na pasta `/home/jovyan/work/dados/Output/`.
- O nome dos arquivos reflete o desafio resolvido:
  - `solucao1.1.csv` → Resultado do desafio 1.1: vendas mensais e acumuladas por tipo de serviço.
  - `solucao1.2.csv` → Resultado do desafio 1.2: top 5 maiores crescimentos mensais percentuais por filial.

**Comando utilizado:**  
Utiliza o método `.toPandas().to_csv(path, index=False)` para salvar em formato CSV.


In [25]:
#salvando em spark desafio 1.1
(df_formatado
    .write
    .mode("overwrite")
    .option("header", True)
    .option("delimiter", ";")
    .csv("/home/jovyan/work/dados/Output/solucao1.1.csv"))

In [None]:
#desafio 1.1 salvando com pandas
# Converte para pandas e salva como CSV em um único arquivo
df_formatado.toPandas().to_csv(
    "/home/jovyan/work/dados/Output/solucao1.1.csv",
    index=False,
    sep=","
)

print("✅ Arquivo salvo com sucesso em /home/jovyan/work/dados/Output/solucao1.1.csv")



In [26]:
#salvando em spark desafio 1.2
(df_top5
    .write
    .mode("overwrite")
    .option("header", True)
    .option("delimiter", ";")
    .csv("/home/jovyan/work/dados/Output/solucao1.2.csv"))

In [None]:
#desafio 1.2
# Converte para pandas e salva como CSV em um único arquivo
df_top5.toPandas().to_csv(
    "/home/jovyan/work/dados/Output/solucao1.2.csv",
    index=False,
    sep=";"
)

print("✅ Arquivo salvo com sucesso como solucao1.2.csv")
