* ```SparkSession:``` Ponto de entrada para usar o PySpark, responsável por criar e gerenciar a execução do Spark.
* ```col:``` Permite acessar colunas do DataFrame Spark para transformações ou cálculos.
* ```Funções de agregação (sum, avg):``` Realizam operações como soma e média sobre colunas.
* ```Funções de janela (row_number, rank):``` Calculam valores relacionados a partições de dados, como numeração de linhas e rankings.
* ```Window:``` Define especificações de janelas para operações de janela.

In [14]:
import os

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, row_number, rank
from pyspark.sql.window import Window

os.environ["PYSPARK_PYTHON"] = r"C:\Users\cezar\anaconda3\envs\deprep\python.exe"
os.environ["PYSPARK_DRIVER_PYTHON"] = r"C:\Users\cezar\anaconda3\envs\deprep\python.exe"

# Criar uma sessão Spark
spark = SparkSession.builder.master("local").appName("Spark SQL Study").getOrCreate()

In [15]:
import sys
print(sys.executable)

C:\Users\cezar\anaconda3\envs\deprep\python.exe


In [16]:
print("Python executado no Spark (workers):", spark.sparkContext.pythonExec)

Python executado no Spark (workers): C:\Users\cezar\anaconda3\envs\deprep\python.exe


* Cria um DataFrame Spark a partir do conjunto de dados fictício.
* Define os nomes das colunas.

In [17]:
# Dados fictícios
data = [
    (1, "Product A", "Category 1", "North", 100, "2023-01-01"),
    (2, "Product B", "Category 1", "North", 200, "2023-01-02"),
    (3, "Product C", "Category 2", "South", 300, "2023-01-03"),
    (4, "Product D", "Category 2", "South", 400, "2023-01-04"),
    (5, "Product E", "Category 1", "North", 150, "2023-01-05"),
]

# Criar um DataFrame Spark
df = spark.createDataFrame(data, ["id", "product", "category", "region", "sales_amount", "transaction_date"])
df.show()

+---+---------+----------+------+------------+----------------+
| id|  product|  category|region|sales_amount|transaction_date|
+---+---------+----------+------+------------+----------------+
|  1|Product A|Category 1| North|         100|      2023-01-01|
|  2|Product B|Category 1| North|         200|      2023-01-02|
|  3|Product C|Category 2| South|         300|      2023-01-03|
|  4|Product D|Category 2| South|         400|      2023-01-04|
|  5|Product E|Category 1| North|         150|      2023-01-05|
+---+---------+----------+------+------------+----------------+



* Registra o DataFrame como uma view temporária chamada ```sales```
* Permite executar consultas SQL diretamente sobre os dados usando ```spark.sql```

In [18]:
# Criar uma view temporária para SQL
df.createOrReplaceTempView("sales")

In [19]:
# 1. Consulta Básica em Spark SQL
print("Consulta Básica: Produtos na região 'North'")
spark.sql("""
SELECT
    product,
    sales_amount
FROM
    sales
WHERE
    region = 'North'
""").show()

Consulta Básica: Produtos na região 'North'
+---------+------------+
|  product|sales_amount|
+---------+------------+
|Product A|         100|
|Product B|         200|
|Product E|         150|
+---------+------------+



In [20]:
# 2. Soma de vendas por região (GROUP BY)
print("Soma de vendas por região")
spark.sql("""
SELECT
    region,
    SUM(sales_amount) AS total_sales
FROM
    sales
GROUP BY
    region
""").show()

Soma de vendas por região
+------+-----------+
|region|total_sales|
+------+-----------+
| South|        700|
| North|        450|
+------+-----------+



* ```Window.partitionBy("region")```: Cria partições por região (```North```, ```South```).
* ```orderBy("transaction_date")```: Ordena os dados por data dentro de cada partição.
* ```sum("sales_amount").over(windowSpec)```: Calcula a soma acumulada de vendas (```running_total```) dentro de cada partição.

In [21]:
# 3. Função de Janela: Soma acumulada por região
print("Soma acumulada por região")
windowSpec = Window.partitionBy("region").orderBy("transaction_date")
df = df.withColumn("running_total", sum("sales_amount").over(windowSpec))
df.select("region", "product", "sales_amount", "running_total").show()

Soma acumulada por região
+------+---------+------------+-------------+
|region|  product|sales_amount|running_total|
+------+---------+------------+-------------+
| North|Product A|         100|          100|
| North|Product B|         200|          300|
| North|Product E|         150|          450|
| South|Product C|         300|          300|
| South|Product D|         400|          700|
+------+---------+------------+-------------+



* ```rank().over(windowSpec)```: Atribui um ranking às vendas de cada região, baseado no valor (```sales_amount```), respeitando a ordem especificada no ```windowSpec```.

In [22]:
# 4. Ranqueamento de vendas por região
print("Ranking de vendas por região")
df = df.withColumn("sales_rank", rank().over(windowSpec))
df.select("region", "product", "sales_amount", "sales_rank").show()

Ranking de vendas por região
+------+---------+------------+----------+
|region|  product|sales_amount|sales_rank|
+------+---------+------------+----------+
| North|Product A|         100|         1|
| North|Product B|         200|         2|
| North|Product E|         150|         3|
| South|Product C|         300|         1|
| South|Product D|         400|         2|
+------+---------+------------+----------+



* ```sales_total```: Calcula a soma total de todas as vendas.
* ```withColumn("sales_percentage", ...)```: Adiciona uma nova coluna com o percentual de vendas de cada produto em relação ao total.

In [23]:
# 5. Cálculo de Percentual de Vendas
from pyspark.sql.functions import col, format_string

# Calculando o total de vendas
sales_total = df.groupBy().sum("sales_amount").collect()[0][0]

# Adicionando a coluna formatada como percentual
df = df.withColumn("sales_percentage", 
                   format_string("%.2f%%", (col("sales_amount") / sales_total) * 100))

# Exibindo as colunas desejadas
df.select("region", "product", "sales_amount", "sales_percentage").show()

+------+---------+------------+----------------+
|region|  product|sales_amount|sales_percentage|
+------+---------+------------+----------------+
| North|Product A|         100|           8.70%|
| North|Product B|         200|          17.39%|
| South|Product C|         300|          26.09%|
| South|Product D|         400|          34.78%|
| North|Product E|         150|          13.04%|
+------+---------+------------+----------------+



In [24]:
# Encerrar a sessão Spark
spark.stop()