# Introdução ao Big Data com Apache Spark

Este tutorial tem como objetivo facilitar o entendimento do conceito de Big Data utilizando o Apache Spark.
    
## Arquitetura do Apache Spark

Antes de partir para o código, vamos ver uma visão geral da arquitetura do Apache Spark. Esta arquitetura permite que você possa processar seus códigos em várias máquinas como se fosse uma só através da arquitetura master-worker, onde existe um `driver` ou nó master no cluster, acompanhado pelos nós `worker`. O master envia o trabalho para os workers com instruções para carregar os dados da memória ou do disco.

O diagrama abaixo apresenta um exemplo de um cluster com Apache Spark, onde basicamente existe um nó Driver que comunica com os nós executors. Cada um destes nós executors tem slots que são logicamente como núcleos de processsamento.

![spark-architecture](https://miro.medium.com/v2/resize:fit:962/1*AWt1p2zUo9lLQ_X_gN0vZg.png)


## Introdução ao Big Data com Spark

Neste tutorial, vamos explorar o poder do Apache Spark através da análise de um conjunto de dados de vendas e clientes. Nossos dados serão carregados a partir de arquivos CSV, representando transações de vendas e informações de clientes.

Vamos usar esses dados para realizar tarefas comuns de análise de dados, como:

Carregamento e tratamento dos dados: Abordaremos como carregar os dados CSV no Spark e prepará-los para análise.
Transformações e agregações: Realizaremos operações de transformação e agregação para obter insights valiosos dos dados.
Exploração e visualização: Utilizaremos ferramentas de visualização para explorar os dados e apresentar as conclusões de forma clara e intuitiva.

Ao longo do tutorial, você aprenderá os conceitos básicos do Spark e como aplicar suas funcionalidades para analisar dados do mundo real.




# Download dos arquivos

Iremos baixar os arquivos de entrada:
- clients.csv
- vendas.csv

Os dados em `vendas.csv` são relativos a vendas realizadas por atacadistas e distribuidores.

## Dados de clientes

Iremos realizar o download de dados de clientes do link abaixo:

In [1]:
!wget -O clients.csv https://www.dropbox.com/scl/fi/vd5hmlr7ghj2j5rx3w681/clients.csv?rlkey=rmcalhytfjm6nfklw7hhtykid&dl=1

--2024-09-20 23:50:37--  https://www.dropbox.com/scl/fi/vd5hmlr7ghj2j5rx3w681/clients.csv?rlkey=rmcalhytfjm6nfklw7hhtykid
Resolving www.dropbox.com (www.dropbox.com)... 162.125.5.18, 2620:100:601d:18::a27d:512
Connecting to www.dropbox.com (www.dropbox.com)|162.125.5.18|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://uc9e5b4f858a2d8c96631685bfb0.dl.dropboxusercontent.com/cd/0/inline/Ca9hYI-NR8bMV8KM-LAmUhBTcMXBUcQDIl9kfZeUmRkRB1S3mUUIU4grTrFrQBpyuDNh2Fs3tHZoPJarbd58bj7Ncr05wW3ElejUsz0zO8YB_4-6dRTUDtSb7dBCl6XOrd5NQrPZ8zFMrCTUrIX2KW51/file# [following]
--2024-09-20 23:50:38--  https://uc9e5b4f858a2d8c96631685bfb0.dl.dropboxusercontent.com/cd/0/inline/Ca9hYI-NR8bMV8KM-LAmUhBTcMXBUcQDIl9kfZeUmRkRB1S3mUUIU4grTrFrQBpyuDNh2Fs3tHZoPJarbd58bj7Ncr05wW3ElejUsz0zO8YB_4-6dRTUDtSb7dBCl6XOrd5NQrPZ8zFMrCTUrIX2KW51/file
Resolving uc9e5b4f858a2d8c96631685bfb0.dl.dropboxusercontent.com (uc9e5b4f858a2d8c96631685bfb0.dl.dropboxusercontent.com)... 162.125.5.15, 2620:10

**Se não funcionar o download acima, tente o link abaixo:**

---



In [None]:
#!gdown https://drive.google.com/uc?id=1SQn8nCPhdFXFOe2wZ9wn1exTAIdgo2QU

## Dados de vendas

Iremos realizar o download dos dados de vendas presentes no arquivo `vendas.csv`:

In [2]:
!wget -O vendas.csv https://www.dropbox.com/scl/fi/y6h3do8rp9fhovunvj36c/vendas.csv?rlkey=m4yl4h8vzfyg5fq8vyb2sbd2x&st=nz4dme6m&dl=1

--2024-09-20 23:50:50--  https://www.dropbox.com/scl/fi/y6h3do8rp9fhovunvj36c/vendas.csv?rlkey=m4yl4h8vzfyg5fq8vyb2sbd2x
Resolving www.dropbox.com (www.dropbox.com)... 162.125.5.18, 2620:100:601d:18::a27d:512
Connecting to www.dropbox.com (www.dropbox.com)|162.125.5.18|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://uc94023d186ef98b13bf8eea9423.dl.dropboxusercontent.com/cd/0/inline/Ca9hIH5h5HM_w8BM_HXnnUljYYVPthOucFOxgk1Y4gWs1XVHIdECjnPkEbXeDjr9rBvDKlE4VQhDgROPudoeE2hfLNjnGcxnbparKvQra58aLZpz0jcpGcAMEB65lPdTucjx0BMaoCN1M4NMd6Vv4mMl/file# [following]
--2024-09-20 23:50:50--  https://uc94023d186ef98b13bf8eea9423.dl.dropboxusercontent.com/cd/0/inline/Ca9hIH5h5HM_w8BM_HXnnUljYYVPthOucFOxgk1Y4gWs1XVHIdECjnPkEbXeDjr9rBvDKlE4VQhDgROPudoeE2hfLNjnGcxnbparKvQra58aLZpz0jcpGcAMEB65lPdTucjx0BMaoCN1M4NMd6Vv4mMl/file
Resolving uc94023d186ef98b13bf8eea9423.dl.dropboxusercontent.com (uc94023d186ef98b13bf8eea9423.dl.dropboxusercontent.com)... 162.125.5.15, 2620:100

**Se não funcionar o download acima, tente o link abaixo:**

In [None]:
#!gdown https://drive.google.com/uc?id=1ubiLTdjEdy8C86MdkW1HRyPOFZl4irT1

# Analisando dados de vendas
Você está recebendo um conjunto de dados histórico de vendas de ERPs de várias empresas. Temos o histórico de vendas de várias empresas dentro do arquivo e, por isso, podemos ter períodos históricos diferentes de dados de vendas disponíveis.

## Arquivos
- **vendas.parquet** - contém dados históricos de vendas até junho de 2022.
- **clients.csv** - dados dos clientes que compraram o produto.

## Campos do arquivo de vendas

- *client_id*: id do cliente.
- *items_count*: número de itens vendidos
- *list_price*: preço do produto no catálogo da empresa.
- *order_date*: data da venda.
- *order_id*: id do pedido. Cada pedido pode conter vários produtos vendidos dentro dele.
- *product_id*: id do produto vendido.
- *sale_price*: preço vendido ao cliente.
- *salesman_id*: id do vendedor.
- *supplier_id*: id do fornecedor do produto. Por exemplo, a indústria fabricando do produto.
- *company_id*: id da empresa. Temos dentro da base o histórico de vendas de várias empresas para clientes finais.
- *product*: nome do produto.
- *salesman*: nome do vendedor.
- *supplier*: nome do fornecedor.
- *client*: nome do cliente.


## Campos do arquivo clients.csv
- *client_id*: id do cliente.
- *cnae_id*: CNAE do cliente que está realizando a compra.
- *cod_city*: código da cidade no IBGE em que o cliente está localizado.
- *cod_tract*: código do setor censitário no IBGE em que o cliente está localizado.
- *cod_uf*: código da UF no IBGE em que o cliente está localizado.
- *city*: cidade do cliente.
- *state*: UF do cliente.
- *client*: nome do cliente.
- *company_id*: id da empresa. Temos dentro da base o histórico de vendas de várias empresas para clientes finais.


## Iniciando o PySpark

Esta célula de código instala o Spark no ambiente de execução Colab. Aqui está uma explicação passo a passo:

1. **`!apt-get install openjdk-11-jdk-headless -qq > /dev/null`**: este comando instala o OpenJDK 11 (versão headless, sem interface gráfica), que é um requisito para o Spark. O `-qq` suprime a saída e o `> /dev/null` redireciona a saída para o nada, tornando o processo mais silencioso.

2. **`!wget -q https://dlcdn.apache.org/spark/spark-3.5.2/spark-3.5.2-bin-hadoop3.tgz`**: Este comando baixa o arquivo compactado do Spark 3.5.2 (construído para o Hadoop 3) do site oficial do Apache Spark. O `-q` suprime a saída de download.

3. **`!tar xf spark-3.5.2-bin-hadoop3.tgz`**: Este comando extrai o arquivo compactado baixado do Spark, criando um diretório chamado `spark-3.5.2-bin-hadoop3`.

4. **`!pip -q install findspark`**: Este comando instala a biblioteca `findspark` usando `pip`. Findspark é uma biblioteca Python que torna mais fácil configurar o Spark em um ambiente Python, principalmente no Colab. Ela define as variáveis de ambiente necessárias para que o Spark funcione corretamente.

Após executar essas linhas, você terá o Spark instalado e pronto para ser usado em seu notebook Colab.

In [3]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.5.2/spark-3.5.2-bin-hadoop3.tgz
!tar xf spark-3.5.2-bin-hadoop3.tgz
!pip -q install findspark

Defina as variáveis de ambiente do Spark:

In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.2-bin-hadoop3"

O código a seguir garante que o Spark seja configurado corretamente e esteja pronto para uso em seu ambiente Python.

* **`findspark.init()`**: executa a função `init()` do módulo `findspark`. Esta função:
    * Localiza a instalação do Spark em seu sistema.
    * Configura as variáveis de ambiente necessárias para que o Python possa interagir com o Spark. Isso permite que o driver Python (seu código Python) se comunique com o executor Spark (o código que realmente processa os dados).


In [5]:
import findspark
findspark.init()

Depois de executar a célula anterior, você poderá importar e usar as bibliotecas Spark como `pyspark.sql.SparkSession` para criar uma sessão Spark e começar a trabalhar com dados.

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# local define número de cores utilizados
spark = SparkSession.builder.appName('Aula 1').master("local[*]").getOrCreate()

# Primeira etapa: carregar os arquivos

Nessa etapa você deve carregar os quatro arquivos abaixos, utilizando o **Spark**

**Dicas:**

- Separador dos arquivos é , (vírgula)
- Os arquivos possuem cabeçalho

In [None]:
clients_schema = "city string, client_id string, cnae_id string, \
cod_city integer, cod_tract long, cod_uf integer, state string, client string, \
 company_id integer"
clients_df = spark.read.csv("clients.csv", header=True, schema=clients_schema, mode="FAILFAST")

In [None]:
clients_df.show()

+--------------------+---------+-------+--------+---------------+------+-----+--------------+----------+
|                city|client_id|cnae_id|cod_city|      cod_tract|cod_uf|state|        client|company_id|
+--------------------+---------+-------+--------+---------------+------+-----+--------------+----------+
|                NULL|  c855767|   NULL| 5211503|521150305000094|    52|   GO|Client c855767|       567|
|               POSSE|  c836888|   NULL| 5218300|521830005000006|    52|   GO|Client c836888|       567|
|                 POA|  c836597|   NULL| 3539806|353980605000005|    35|   SP|Client c836597|       567|
|           SAO PAULO|  c836596|   NULL| 3550308|355030837000019|    35|   SP|Client c836596|       567|
|              CUIABA|  c855005|   NULL| 5103403|510340310400031|    51|   MT|Client c855005|       567|
|         BREU BRANCO|  c855045|   NULL| 1501782|150178205000045|    15|   PA|Client c855045|       567|
|APARECIDA DE GOIANIA|  c836630|   NULL| 5201405|520140

In [None]:
sales_schema = "client_id string, items_count integer, list_price double, \
order_date date, order_id string, product_id string, sale_price double, \
salesman_id string, supplier_id string, company_id integer, \
product string, salesman string, supplier string, client string"

vendas_df = spark.read.csv("vendas.csv", schema=sales_schema, header=True)

In [None]:
vendas_df.printSchema()

root
 |-- client_id: string (nullable = true)
 |-- items_count: integer (nullable = true)
 |-- list_price: double (nullable = true)
 |-- order_date: date (nullable = true)
 |-- order_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- sale_price: double (nullable = true)
 |-- salesman_id: string (nullable = true)
 |-- supplier_id: string (nullable = true)
 |-- company_id: integer (nullable = true)
 |-- product: string (nullable = true)
 |-- salesman: string (nullable = true)
 |-- supplier: string (nullable = true)
 |-- client: string (nullable = true)



## Spark Pandas

### O que é Spark Pandas?

Spark Pandas é uma biblioteca que fornece uma interface similar ao Pandas para trabalhar com dados em clusters Apache Spark. Isso significa que você pode usar as mesmas funções e métodos do Pandas, mas com a capacidade de processar datasets imensos distribuídos em vários nós.

### Por que usar Spark Pandas?

Para cientistas de dados, o Spark Pandas oferece diversas vantagens:

- Escalabilidade: processa conjuntos de dados enormes com rapidez e eficiência, utilizando a computação distribuída do Spark.
- Familiaridade: permite utilizar a linguagem e as funções do Pandas, que você já conhece, para análise de dados em grande escala.
- Performance: aproveita as otimizações do Spark para acelerar tarefas como leitura, transformação e agregação de dados.
- Integração: funciona perfeitamente com outros componentes do ecossistema Spark, como Spark SQL e MLlib.

Agora vamos fazer a mesma operação de leitura de dados de clientes e vendas com **Spark Pandas** (vide [documentação](https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/index.html)).

Os clientes vamos armazenar no Dataframe `clients_pdf` e os dados de vendas em `vendas_pdf`. A leitura do csv é semelhante ao Pandas com o método `read_csv`.

In [None]:
import pyspark.pandas as ps



In [None]:
ps.read_csv('clients.csv', names=clients_schema)



Unnamed: 0,city,client_id,cnae_id,cod_city,cod_tract,cod_uf,state,client,company_id
0,city,client_id,cnae_id,,,,state,client,
1,,c855767,,5211503.0,521150300000000.0,52.0,GO,Client c855767,567.0
2,POSSE,c836888,,5218300.0,521830000000000.0,52.0,GO,Client c836888,567.0
3,POA,c836597,,3539806.0,353980600000000.0,35.0,SP,Client c836597,567.0
4,SAO PAULO,c836596,,3550308.0,355030800000000.0,35.0,SP,Client c836596,567.0
5,CUIABA,c855005,,5103403.0,510340300000000.0,51.0,MT,Client c855005,567.0
6,BREU BRANCO,c855045,,1501782.0,150178200000000.0,15.0,PA,Client c855045,567.0
7,APARECIDA DE GOIANIA,c836630,,5201405.0,520140500000000.0,52.0,GO,Client c836630,567.0
8,PEDRAS MA CRUZ,c904501,,3149150.0,314915000000000.0,31.0,MG,Client c904501,567.0
9,JANUARIA,c903627,,3135209.0,313520900000000.0,31.0,MG,Client c903627,567.0


In [None]:
clients_pdf = ps.read_csv('clients.csv',
                          dtype={"city" :"str", "client_id" :"str", "cnae_id" :"str",
                                 "cod_city": "int", "cod_tract": "float", "cod_uf": "int",
                                 "state" :"str", "client" :"str", "company_id": "int"})



In [None]:
clients_pdf.head(5)

Unnamed: 0,city,client_id,cnae_id,cod_city,cod_tract,cod_uf,state,client,company_id
0,,c855767,,5211503,521150300000000.0,52,GO,Client c855767,567
1,POSSE,c836888,,5218300,521830000000000.0,52,GO,Client c836888,567
2,POA,c836597,,3539806,353980600000000.0,35,SP,Client c836597,567
3,SAO PAULO,c836596,,3550308,355030800000000.0,35,SP,Client c836596,567
4,CUIABA,c855005,,5103403,510340300000000.0,51,MT,Client c855005,567


In [None]:
vendas_pdf = ps.read_csv("vendas.csv")



In [None]:
vendas_pdf.head(5)

Unnamed: 0,client_id,items_count,list_price,order_date,order_id,product_id,sale_price,salesman_id,supplier_id,company_id,product,salesman,supplier,client
0,c2943,3,0.0,2020-05-21,,p1477,25.166666,s69,su28,603,Product p1477,Salesman s69,Supplier su28,Client c2943
1,c2943,12,0.0,2020-05-21,,p156,19.4653,s69,su16,603,Product p156,Salesman s69,Supplier su16,Client c2943
2,c2092,2,0.0,2020-05-21,,p1314,39.985,s19,su16,603,Product p1314,Salesman s19,Supplier su16,Client c2092
3,c3412,5,0.0,2020-05-18,,p272,26.25,s79,su19,603,Product p272,Salesman s79,Supplier su19,Client c3412
4,c3412,10,0.0,2020-05-18,,p339,13.68,s79,su19,603,Product p339,Salesman s79,Supplier su19,Client c3412


# Consultas nas bases de dados

In [None]:
clients_pdf[clients_pdf['state'] == 'GO'].head(3)

Unnamed: 0,city,client_id,cnae_id,cod_city,cod_tract,cod_uf,state,client,company_id
0,,c855767,,5211503,521150300000000.0,52,GO,Client c855767,567
1,POSSE,c836888,,5218300,521830000000000.0,52,GO,Client c836888,567
6,APARECIDA DE GOIANIA,c836630,,5201405,520140500000000.0,52,GO,Client c836630,567


In [None]:
clients_df.filter(clients_df['state'] == 'GO').show(3)

+--------------------+---------+-------+--------+---------------+------+-----+--------------+----------+
|                city|client_id|cnae_id|cod_city|      cod_tract|cod_uf|state|        client|company_id|
+--------------------+---------+-------+--------+---------------+------+-----+--------------+----------+
|                NULL|  c855767|   NULL| 5211503|521150305000094|    52|   GO|Client c855767|       567|
|               POSSE|  c836888|   NULL| 5218300|521830005000006|    52|   GO|Client c836888|       567|
|APARECIDA DE GOIANIA|  c836630|   NULL| 5201405|520140505000019|    52|   GO|Client c836630|       567|
+--------------------+---------+-------+--------+---------------+------+-----+--------------+----------+
only showing top 3 rows



In [None]:
clients_df.createOrReplaceTempView("clients")

In [None]:
sql = """
SELECT state, count(*)
FROM clients
WHERE cnae_id is NOT NULL
GROUP BY STATE
ORDER BY 2 DESC
LIMIT 10
"""

In [None]:
clients_df

DataFrame[city: string, client_id: string, cnae_id: string, cod_city: int, cod_tract: bigint, cod_uf: int, state: string, client: string, company_id: int]

In [None]:
clients_df[clients_df['cnae_id'].isNotNull()].groupby(['state']).agg(count('*')).orderBy(count('*').desc()).show()

+-----+--------+
|state|count(1)|
+-----+--------+
|   SP|    8251|
|   GO|    5883|
|   CE|    5855|
|   MA|    4094|
|   PI|    3862|
|   PA|    3411|
|   PB|    3074|
|   RN|    2025|
|   MG|    1812|
|   RJ|    1679|
|   TO|    1655|
|   RO|    1342|
|   PR|    1125|
|   BA|    1045|
|   RS|     793|
|   SC|     661|
|   AM|     571|
|   DF|     478|
|   PE|     463|
|   ES|     445|
+-----+--------+
only showing top 20 rows



In [None]:
spark.sql(sql).show()

+-----+--------+
|state|count(1)|
+-----+--------+
|   SP|    8251|
|   GO|    5883|
|   CE|    5855|
|   MA|    4094|
|   PI|    3862|
|   PA|    3411|
|   PB|    3074|
|   RN|    2025|
|   MG|    1812|
|   RJ|    1679|
+-----+--------+



# **Tarefa: questões a serem respondidas**

1. Liste o nome dos 10 clientes que mais compraram no ano de 2021.
2. Quais os 5 estados do Brasil que tiveram mais vendas no mês de janeiro de 2022?
3. Quantos clientes tiveram compra com mais de um vendedor no ano de 2022?
4. Quantos fornecedores distintos existem na base de dados?
5. Qual o vendedor com maior volume de vendas analisando todo o histórico de compras?
6. Qual o produto mais vendido (em quantidade) por estado brasileiro e qual o valor total de vendas deste produto em cada estado?
7. Qual a média do valor total das vendas por dia da semana em cada estado no ano de 2022?



# Junção da base de dados de clientes e vendas

In [None]:
joined_df = vendas_df.join(clients_df, "client_id", "left")

In [None]:
joined_df.count()

4367269

In [None]:
joined_pdf = vendas_pdf.merge(clients_pdf, on='client_id', how='left')

In [None]:
len(joined_pdf)

4367269

## **Questão 1: Liste o nome dos 10 clientes que mais compraram no ano de 2021.**

### Questão 1: Apache Spark

A primeira resposta será desenvolvida utilizando a API do Spark.

In [None]:
# Filtrando as vendas de 2021 e removendo valores nulos em 'client_id'
vendas_2021 = vendas_df.filter("order_date >= '2021-01-01' and order_date <= '2021-12-31' and client_id is not null")

In [None]:
# Agrupando por 'client_id' e 'client' e somando 'sale_price'
top_clients = vendas_2021.groupBy(["client_id", "client"]).agg(sum("sale_price").alias("vendas"))

In [None]:
# Ordenando por 'vendas' em ordem decrescente
top_clients = top_clients.orderBy(desc("vendas"))

In [None]:
# Exibindo o resultado
top_clients.show(5)

+---------+------------+------------------+
|client_id|      client|            vendas|
+---------+------------+------------------+
|     c354| Client c354|34151.925634000014|
|    c4658|Client c4658|       28430.28621|
|    c5150|Client c5150|24343.193999999996|
|      c98|  Client c98|21936.962434000015|
|    c5127|Client c5127|20967.980328200007|
+---------+------------+------------------+
only showing top 5 rows



### Questão 1: Spark Pandas

A segunda resposta será desenvolvida utilizando a API do Pandas junto com Spark.

In [None]:
# Filtrando as vendas de 2021 e removendo valores nulos em 'client_id'
vendas_2021 = vendas_pdf[(vendas_pdf['order_date'] >= '2021-01-01') &
                          (vendas_pdf['order_date'] <= '2021-12-31') &
                          (vendas_pdf['client_id'].notna())]

# Agrupando por 'client_id' e 'client' e somando 'sale_price'
top_clients = vendas_2021.groupby(['client_id', 'client'])['sale_price'].sum().reset_index(name='vendas')

# Ordenando por 'vendas' em ordem decrescente
top_clients = top_clients.sort_values('vendas', ascending=False)

# Exibindo o resultado
top_clients.head(5)




Unnamed: 0,client_id,client,vendas
1185,c354,Client c354,34151.925634
2771,c4658,Client c4658,28430.28621
1968,c5150,Client c5150,24343.194
1778,c98,Client c98,21936.962434
831,c5127,Client c5127,20967.980328


### **Questão 2: Quais os 5 estados do Brasil que tiveram mais vendas no mês de janeiro de 2022**

### Questão 2: Apache Spark

Vamos responder utilizando a API do Spark.

In [None]:
# Filtrando as vendas de janeiro de 2022
jan_2022_sales = joined_df.filter("order_date >= '2022-01-01' and order_date <= '2022-01-31'")

# Agrupando por estado e somando as vendas
state_sales = jan_2022_sales.groupBy("state").agg(sum("sale_price").alias("vendas"))

# Ordenando por vendas em ordem decrescente
state_sales = state_sales.orderBy(desc("vendas"))

state_sales.show(5)

+-----+------------------+
|state|            vendas|
+-----+------------------+
|   RJ| 999410.6045631706|
|   SP| 698166.8008689005|
|   GO| 608656.8479156648|
|   ES|448342.31084699964|
|   MG|426551.29759549885|
+-----+------------------+
only showing top 5 rows



### Questão 2: Spark Pandas

Vamos responder utilizando o Spark Pandas.

In [None]:
# Filtrando as vendas de janeiro de 2022
jan_2022_sales = joined_pdf[(joined_pdf['order_date'] >= '2022-01-01') &
                            (joined_pdf['order_date'] <= '2022-01-31')]

# Agrupando por estado e somando as vendas
state_sales = jan_2022_sales.groupby('state')['sale_price'].sum().reset_index(name='vendas')

# Ordenando por vendas em ordem decrescente
state_sales = state_sales.sort_values('vendas', ascending=False)

# Exibindo os 5 primeiros estados
state_sales.head(5)




Unnamed: 0,state,vendas
19,RJ,999410.604563
6,SP,698166.800869
3,GO,608656.847916
7,ES,448342.310847
12,MG,426551.297595


###**Questão 3: Quantos clientes tiveram compra com mais de um vendedor no ano de 2022?**

### Questão 3: Apache Spark



In [None]:

vendas_2022 = joined_df.filter("order_date >= '2022-01-01' and order_date <= '2022-12-31'")


In [None]:
n_clientes = vendas_2022.groupBy('client_id').agg(countDistinct('salesman_id') \
                                     .alias('num_vendedores')) \
                                     .filter('num_vendedores > 1') \
                                     .count() \

print(f"Resposta: {n_clientes} clientes.")

Resposta: 60 clientes.


### Questão 3: Spark Pandas

In [None]:
vendas_2022 = vendas_pdf[vendas_pdf['order_date'].dt.year == 2022]

num_vendedores = vendas_2022.groupby('client_id')['salesman_id'].nunique().reset_index(name='vendedores_distintos')

clientes_filtrados = num_vendedores[num_vendedores['vendedores_distintos'] > 1]

num_clientes = len(clientes_filtrados)

print(f"Resposta: {num_clientes} clientes.")

Resposta: 60 clientes.


### Questão 3: SQL

In [7]:
vendas_df = spark.read.csv("vendas.csv", header=True, inferSchema=True)
clients_df = spark.read.csv("clients.csv", header=True, inferSchema=True)

In [8]:
#view temporária
vendas_df.createOrReplaceTempView("vendas")
clients_df.createOrReplaceTempView("clients")

In [10]:
sql = """SELECT client_id, COUNT(DISTINCT salesman_id) AS NumVendedores
    FROM vendas
    WHERE YEAR(order_date) = 2022
    GROUP BY client_id
    HAVING NumVendedores > 1"""

    #para cada id diferente ele vai buscar o a quantidade de vendedores diferentes pelo id dos vendedores
    #vai pegar pela data de 2022
    #vai agrupar esses clientes pelo id
    #vai pegar apenas os que tem mais que 1

In [11]:
spark.sql(sql).count()

60

###**Questão 4: Quantos fornecedores distintos existem na base de dados?**

### Questão 4: Apache Spark

In [12]:
n_fornecedores = vendas_df.select('supplier_id').distinct().count()

print(f"Resposta {n_fornecedores}")

Resposta 37


### Questão 4: Spark Pandas

In [None]:
num_fornecedores = vendas_pdf['supplier_id'].nunique()

print(f"Resposta {num_fornecedores}")

Resposta 37


### Questão 4: SQL

In [13]:
sql = """SELECT COUNT(DISTINCT supplier_id) AS Num_de_Fornecedores
    FROM vendas"""

    #na tabela de vendas puxa todos os valores de id para vendedores

In [14]:
spark.sql(sql).show()

+-------------------+
|Num_de_Fornecedores|
+-------------------+
|                 37|
+-------------------+



###**Questão 5: Qual o vendedor com maior volume de vendas analisando todo o histórico de compras?**

### Questão 5: Apache Spark

Interpretamos volume de vendas como maior número de produtos vendidos, sem realcionar ao preço de cada um.

In [None]:
sales = vendas_df.groupBy('salesman_id') \
    .agg(sum('items_count').alias('sales_volume')) \

max_sale = sales.agg(max('sales_volume').alias('max_sales_volume'))

salesman_max = sales.filter(f"sales_volume == {max_sale.collect()[0][0]}")

# vendedor com maior volume de vendas
salesman_max.show()

+-----------+------------+
|salesman_id|sales_volume|
+-----------+------------+
|        s25|      494390|
+-----------+------------+



### Questão 5: Spark Pandas

In [None]:
sales = vendas_pdf.groupby('salesman_id')['items_count'].sum().reset_index()

sales = sales.rename(columns={'items_count': 'sales_volume'})

max_sales_volume = sales['sales_volume'].max()

salesman_max = sales[sales['sales_volume'] == max_sales_volume]

salesman_max



Unnamed: 0,salesman_id,sales_volume
1,s25,494390


### Questão 5: SQL

In [21]:
sql = """SELECT salesman_id, SUM(items_count) AS sales_volume
FROM vendas
GROUP BY salesman_id
ORDER BY sales_volume DESC
LIMIT 1
"""

    #na tabela de vendas puxa todos os valores de id para vendedores e soma as vendas de cada um
    #agrupa por id das vendas
    #ordena decrecente
    #pega o primeiro

In [22]:
spark.sql(sql).show()

+-----------+------------+
|salesman_id|sales_volume|
+-----------+------------+
|        s25|      494390|
+-----------+------------+



###**Questão 6: Qual o produto mais vendido (em quantidade) por estado brasileiro e qual o valor total de vendas deste produto em cada estado?**

###Questão 6: Apache spark

In [None]:
product_sales_state = joined_df.groupBy('state', 'product') \
    .agg(sum('items_count').alias('sales_volume'), sum('sale_price').alias('sales_price'))

# product_sales_state.show()

max_per_state = product_sales_state.groupBy('state').agg(max('sales_volume').alias('max_per_state'))
# max_per_state.show()

sales_volume = product_sales_state.join(max_per_state, 'state') \
    .filter("sales_volume == max_per_state") \
    .drop('max_per_state')

sales_volume.show(30)

+-----+-------------+------------+------------------+
|state|      product|sales_volume|       sales_price|
+-----+-------------+------------+------------------+
|   GO| Product p510|      591898|1069.0584495699998|
|   PR|Product p2526|       16590|           11.0262|
|   ES| Product p510|      455642| 841.3383641200022|
|   MG| Product p510|      306859| 568.5343334800012|
|   BA| Product p510|      236388| 427.9196422200028|
|   MT|Product p2354|        5270| 4.779999999999999|
|   PI|Product p2526|       28438|          15.03764|
|   RN| Product p510|      283745| 514.7945784600033|
|   DF| Product p510|       38972| 71.38007778999996|
|   PE|Product p2526|       37114|20.889066650000007|
|   CE| Product p510|      143103| 306.5407165300008|
|   MS|Product p1871|        6165|              8.54|
|   PA|Product p2526|       54404| 38.36999999999999|
|   SC| Product p510|       17795|36.633116669999986|
|   PB| Product p510|       37796|       81.70514001|
|   TO|Product p2526|      1

### Questão 6: Spark Pandas

In [None]:
product_sales_state = joined_pdf.groupby(['state', 'product']).agg({
    'items_count': 'sum',
    'sale_price': 'sum'
}).reset_index()

product_sales_state = product_sales_state.rename(columns={
    'items_count': 'sales_volume',
    'sale_price': 'sales_price'
})

max_per_state = product_sales_state.groupby('state')['sales_volume'].max().reset_index()
max_per_state = max_per_state.rename(columns={'sales_volume': 'max_per_state'})

sales_volume = ps.merge(product_sales_state, max_per_state, on='state')

sales_volume = sales_volume[sales_volume['sales_volume'] == sales_volume['max_per_state']]

sales_volume = sales_volume.drop(columns=['max_per_state'])

sales_volume.head(30)

Unnamed: 0,state,product,sales_volume,sales_price
956,SC,Product p510,17795,36.633117
1672,RO,Product p510,75262,129.379061
3751,PI,Product p2526,28438,15.03764
4807,AM,Product p510,146037,310.962694
6090,,Product p2526,10500,0.61
6534,RR,Product p2368,13675,12.3458
9183,GO,Product p510,591898,1069.05845
9497,0,Product p510,2550,8.546
10408,TO,Product p2526,125762,60.284
12023,MT,Product p2354,5270,4.78


### Questão 6: SQL

In [17]:
sql = """WITH vendas_por_produto AS (
    SELECT c.state, v.product_id, SUM(v.items_count) AS quantidade, SUM(v.sale_price) AS total_vendas
    FROM vendas v
    JOIN clients c ON v.client_id = c.client_id
    GROUP BY c.state, v.product_id
    )

SELECT state, product_id, quantidade, total_vendas
FROM vendas_por_produto
WHERE (state, quantidade) IN (
    SELECT state, MAX(quantidade)
    FROM vendas_por_produto
    GROUP BY state
    )"""

    #primeiro se cria uma subquary temporaria que agrupa todos os produtos por estado para todos os estados
    #seleciona o estado, o produto, a quantidade vendida e o valor total das vendas
    #busca no estado o produto mais vendido e retorna ambos

In [18]:
spark.sql(sql).show(50)

+-----+----------+----------+------------------+
|state|product_id|quantidade|      total_vendas|
+-----+----------+----------+------------------+
|   MA|     p2526|     92547| 42.21990665000001|
|   AM|      p510|    146037| 310.9626944600008|
|   RR|     p2368|     13675|12.345799999999999|
|   PE|     p2526|     37114|20.889066650000007|
|   AC|     p2368|     10845|              3.46|
|   PA|     p2526|     54404| 38.36999999999999|
|   MS|     p1871|      6165|              8.54|
|   GO|      p510|    591898|1069.0584495699998|
|   RJ|      p510|    798446|1506.1491861299999|
|   BA|      p510|    236388| 427.9196422200028|
|   MG|      p510|    306859| 568.5343334800012|
|   RS|      p510|    125075|234.83105111999996|
|   AL|     p2354|      3450|             2.475|
|   SC|      p510|     17795|36.633116669999986|
|   PI|     p2526|     28438|          15.03764|
|   PB|      p510|     37796|       81.70514001|
|   TO|     p2526|    125762| 60.28399999999999|
|   CE|      p510|  

###**Questão 7: Qual a média do valor total das vendas por dia da semana em cada estado no ano de 2022?**

###Questão 7: Apache Spark


In [None]:
from pyspark.sql import functions as F

df_ano_2022 = joined_df.filter("order_date >= '2022-01-01' and order_date <= '2022-12-31'")
df_ano_2022 = df_ano_2022.withColumn('day_of_week', F.dayofweek(F.col('order_date')))

df_result = df_ano_2022.groupBy('state', 'day_of_week').agg(F.mean('sale_price').alias('media_vendas')).orderBy(F.desc('media_vendas'))
df_result.show(1000)

+-----+-----------+------------------+
|state|day_of_week|      media_vendas|
+-----+-----------+------------------+
| NULL|          2| 230.2578333333333|
|   AP|          6|             130.0|
|   AP|          5|119.95515789473684|
|    0|          5|           99.1125|
|   SE|          4| 94.24884722222224|
|   AL|          5| 87.47196497600002|
|   AC|          2|  84.9514524590164|
| NULL|          6| 81.88749999999999|
|   AC|          3| 79.81981347619046|
|   MS|          6| 79.13990500000001|
|   SE|          6|  77.0266212121212|
|   SC|          6| 74.53355287586207|
|   RR|          5| 73.30065777135518|
|   MT|          4| 73.24480118421054|
|   AC|          5| 71.30706666698113|
|   MT|          5| 70.39088801801802|
|   PE|          5| 69.61169956615126|
|   MT|          6| 69.34650370370372|
|   DF|          6| 67.58707004048581|
|   RR|          4| 66.97192033093839|
|   MT|          3|  66.0429950740741|
|   SE|          3| 66.03450000000001|
|   MS|          5| 65.87

### Questão 7: Spark Pandas

In [None]:
dia_da_semana = {0:'Domingo',
                 1:'Segunda',
                 2:'Terça',
                 3:'Quarta',
                 4:'Quinta',
                 5:'Sexta',
                 6:'Sábado'}


df_year = joined_pdf[joined_pdf['order_date'].dt.year == 2022]
df_year['day_of_week'] = df_year['order_date'].apply(lambda x: ps.to_datetime(x)).dt.dayofweek
df_year['day_of_week'] = df_year['day_of_week'].apply(lambda x: dia_da_semana[x])

df_year.groupby(['state','day_of_week'])['sale_price'].mean().reset_index().sort_values(by=['state','day_of_week'],ascending=False)

  return pd.to_datetime(


Unnamed: 0,state,day_of_week,sale_price
121,TO,Terça,53.279302
28,TO,Segunda,61.900046
129,TO,Quinta,52.90083
54,TO,Quarta,56.951347
79,TO,Domingo,58.263176
108,SP,Terça,55.115338
145,SP,Sexta,42.75075
103,SP,Segunda,59.298641
8,SP,Quinta,54.779506
55,SP,Quarta,56.517586


###Questão 7: SQL


In [23]:
sql = """SELECT
    c.state,
    DAYOFWEEK(v.order_date) AS dia,
    AVG(v.sale_price) AS avg_vendas

FROM vendas v
JOIN clients c ON v.client_id = c.client_id
WHERE YEAR(v.order_date) = 2022
  AND c.state IS NOT NULL
  AND c.state != '0'
GROUP BY c.state, DAYOFWEEK(v.order_date)
ORDER BY c.state, DAYOFWEEK(v.order_date)"""

#seleciona o estado do cliente
#extrai o dia da semana a partir da data e retorna um valor de 1 a 7
#calcula a média do valor total das vendas, soma e faz a media
#join das tabeals
#pega somente as do ano de 2022
  #tirar os null
  #tirar os 0
#agrupa por estado e dia
#ordena tudo

In [24]:
spark.sql(sql).show(300)

+-----+---+------------------+
|state|dia|        avg_vendas|
+-----+---+------------------+
|   AC|  2| 84.95145245901644|
|   AC|  3| 79.81981347619046|
|   AC|  4| 57.47847203787879|
|   AC|  5| 71.30706666698114|
|   AC|  6|49.007189752747266|
|   AL|  2| 51.15239948461536|
|   AL|  3|57.141082028248654|
|   AL|  4| 65.46768146341469|
|   AL|  5|      87.471964976|
|   AL|  6| 60.61347169811321|
|   AM|  2|61.603425651396016|
|   AM|  3|58.461317492779784|
|   AM|  4|57.022381783221284|
|   AM|  5| 59.24582814596862|
|   AM|  6| 55.87654099607412|
|   AP|  2| 36.22448421052632|
|   AP|  3| 61.66444444444445|
|   AP|  4|            57.726|
|   AP|  5|119.95515789473684|
|   AP|  6|             130.0|
|   BA|  2| 59.79279370594653|
|   BA|  3| 59.27045894868731|
|   BA|  4|56.297327775734864|
|   BA|  5| 55.67084753542683|
|   BA|  6|51.951975840992944|
|   CE|  2|60.864588976873115|
|   CE|  3| 58.06980395858712|
|   CE|  4|57.438274946743064|
|   CE|  5| 60.16748033163073|
|   CE| 