In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number

spark = SparkSession.builder.appName("session").getOrCreate()
spark.conf.set("spark.sql.adaptive.enabled", "true") #Otimização dinâmicas

# Conversão de Arquivos CSV para Parquet
No uso do PySpark, ainda mais em ambientes de cloud, para diminuir o espaço utilziado e processamento futuro, o ideal usar o formato **Parquet** ao invés de **CSV**

In [2]:
#Importando os arquivos em CSV com a função nativa do PySpark
olist_customers = spark.read.option("header", True).csv("olist_customers_dataset.csv")
olist_orders = spark.read.option("header", True).csv("olist_orders_dataset.csv")
olist_order_reviews = spark.read.option("header", True).csv("olist_order_reviews_dataset.csv")
olist_order_items = spark.read.option("header", True).csv("olist_order_items_dataset.csv")
olist_sellers = spark.read.option("header", True).csv("olist_sellers_dataset.csv")
olist_products = spark.read.option("header", True).csv("olist_products_dataset.csv")

In [3]:
#Convertendo os dfs para Parquet
olist_customers.write.mode("overwrite").parquet("olist_customers_dataset.parquet")
olist_customers.write.mode("overwrite").parquet("olist_customers_dataset.parquet")
olist_orders.write.mode("overwrite").parquet("olist_orders_dataset.parquet")
olist_order_reviews.write.mode("overwrite").parquet("olist_order_reviews_dataset.parquet")
olist_order_items.write.mode("overwrite").parquet("olist_order_items_dataset.parquet")
olist_sellers.write.mode("overwrite").parquet("olist_sellers_dataset.parquet")
olist_products.write.mode("overwrite").parquet("olist_products_dataset.parquet")

In [4]:
#Importando novamente os dfs, mas com a versão em Parquet
olist_customers = spark.read.option("header", True).parquet("olist_customers_dataset.parquet")
olist_orders = spark.read.option("header", True).parquet("olist_orders_dataset.parquet")
olist_order_reviews = spark.read.option("header", True).parquet("olist_order_reviews_dataset.parquet")
olist_order_items = spark.read.option("header", True).parquet("olist_order_items_dataset.parquet")
olist_sellers = spark.read.option("header", True).parquet("olist_sellers_dataset.parquet")
olist_products = spark.read.option("header", True).parquet("olist_products_dataset.parquet")

In [None]:
import os

datasets = {
    "olist_customers": olist_customers,
    "olist_orders": olist_orders,
    "olist_order_reviews": olist_order_reviews,
    "olist_order_items": olist_order_items,
    "olist_sellers": olist_sellers,
    "olist_products": olist_products
}

for name, df in datasets.items():
    csv_path = f"{name}_dataset.csv"
    parquet_path = f"{name}_dataset.parquet"
    
    csv_size = os.path.getsize(csv_path) / (1024 * 1024)
    parquet_size = os.path.getsize(parquet_path) / (1024 * 1024)
    
    print(f"Arquivo: {name}")
    print(f"  CSV:     {csv_size:.2f} MB")
    print(f"  Parquet: {parquet_size:.2f} MB\n")

Arquivo: olist_customers
  CSV:     8.71 MB
  Parquet: 0.00 MB

Arquivo: olist_orders
  CSV:     16.93 MB
  Parquet: 0.00 MB

Arquivo: olist_order_reviews
  CSV:     13.78 MB
  Parquet: 0.00 MB

Arquivo: olist_order_items
  CSV:     14.83 MB
  Parquet: 0.00 MB

Arquivo: olist_sellers
  CSV:     0.17 MB
  Parquet: 0.00 MB

Arquivo: olist_products
  CSV:     2.30 MB
  Parquet: 0.00 MB



Além do tempo de execução para leitura dos arquivos, que passou de 6,1 segundos para 0,6 segundos na minha máquina. \
Todos os aruivos passaram a pesar menos de 1MB, mostrando o ganho que teria para arquivos e bases em GBs ou até TBs.

# Testes com a base da Olist

In [6]:
datasets = [olist_customers, olist_orders, olist_order_reviews, 
            olist_order_items, olist_sellers, olist_products]
for df in datasets:
    df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: string (nullable = true)
 |-- order_approved_at: string (nullable = true)
 |-- order_delivered_carrier_date: string (nullable = true)
 |-- order_delivered_customer_date: string (nullable = true)
 |-- order_estimated_delivery_date: string (nullable = true)

root
 |-- review_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- review_score: string (nullable = true)
 |-- review_comment_title: string (nullable = true)
 |-- review_comment_message: string (nullable = true)
 |-- review_creation_date: string (nullable = true)
 |-- review_answer_timestamp: string (nullable

In [7]:
olist_orders.show(5)

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|2d846c03073b1a424...|5a3473648da10d280...|   delivered|     2017-05-10 15:18:16|2017-05-12 02:45:22|         2017-05-15 09:14:35|          2017-05-25 10:49:48|          2017-05-18 00:00:00|
|398c8e07a36c8e104...|dd9fb9905e0a0a4f0...|   delivered|     2017-07-30 19:07:55|2017-07-30 19:23:30|         2017-08-11 19:02:40|          2017-08-22 18:39:52|          2017-08-31 00:00:00|
|33e2db141538ccdaf...|37ff611a66e811a67...|  

In [10]:
olist_orders.createOrReplaceTempView("orders")

query = spark.sql("""
    SELECT 
        order_status AS status,
        COUNT(*) AS qtd,
        COUNT(CASE WHEN order_purchase_timestamp 
                  BETWEEN '2017-10-02' AND '2017-10-30'
                  THEN 1 ELSE NULL END) AS hora_compra
    FROM orders
    GROUP BY order_status
    SORT BY qtd DESC
""")

query.show()

+-----------+-----+-----------+
|     status|  qtd|hora_compra|
+-----------+-----+-----------+
|  delivered|96478|       4060|
|    shipped| 1107|         32|
|   canceled|  625|         23|
|unavailable|  609|         50|
|   invoiced|  314|         13|
| processing|  301|         16|
|    created|    5|          0|
|   approved|    2|          0|
+-----------+-----+-----------+



In [22]:
olist_products.createOrReplaceTempView("products")

query = spark.sql("""
    SELECT 
        product_category_name AS categoria,
        COUNT(DISTINCT product_id) AS produtos
    FROM products
    GROUP BY product_category_name
    SORT BY produtos DESC
""")

query.show()

+--------------------+--------+
|           categoria|produtos|
+--------------------+--------+
|     cama_mesa_banho|    3029|
|       esporte_lazer|    2867|
|    moveis_decoracao|    2657|
|        beleza_saude|    2444|
|utilidades_domest...|    2335|
|          automotivo|    1900|
|informatica_acess...|    1639|
|          brinquedos|    1411|
|  relogios_presentes|    1329|
|           telefonia|    1134|
|               bebes|     919|
|          perfumaria|     868|
|fashion_bolsas_e_...|     849|
|           papelaria|     849|
|          cool_stuff|     789|
|  ferramentas_jardim|     753|
|            pet_shop|     719|
|                NULL|     610|
|         eletronicos|     517|
|construcao_ferram...|     400|
+--------------------+--------+
only showing top 20 rows



In [16]:
olist_products.createOrReplaceTempView("products")

query = spark.sql("""
    SELECT COUNT(DISTINCT product_id) AS produtos,
           COUNT(DISTINCT product_category_name) AS categoria
    FROM products
""")

query.show()

+--------+---------+
|produtos|categoria|
+--------+---------+
|   32951|       73|
+--------+---------+



# 3 maiores salários - Uso do Window

A função Window no PySpark é usada para definir janelas de particionamento e ordenação dentro de um DataFrame, possibilitando a aplicação de funções analíticas (window functions), como row_number, rank, lead, lag, sum, avg, etc., sem agrupar os dados.

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number

spark = SparkSession.builder.appName("Top3Salaries").getOrCreate()

# Exemplo de DataFrame
data = [
    ("Alice", "TI", 5000),
    ("Bob", "TI", 7000),
    ("Carol", "TI", 6500),
    ("Dave", "RH", 4500),
    ("Eve", "RH", 6000),
    ("Frank", "RH", 6200),
    ("Grace", "RH", 6100)
]
df = spark.createDataFrame(data, ["employee_name", "department", "salary"])

# Janela e ranking
window_spec = Window.partitionBy("department").orderBy(col("salary").desc())
df = df.withColumn("rank", row_number().over(window_spec))
df.filter(col("rank") <= 3).show()


+-------------+----------+------+----+
|employee_name|department|salary|rank|
+-------------+----------+------+----+
|        Frank|        RH|  6200|   1|
|        Grace|        RH|  6100|   2|
|          Eve|        RH|  6000|   3|
|          Bob|        TI|  7000|   1|
|        Carol|        TI|  6500|   2|
|        Alice|        TI|  5000|   3|
+-------------+----------+------+----+



# Linhas para Colunas

In [12]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Criar SparkSession
spark = SparkSession.builder.appName("RowToColumn").getOrCreate()

# Dados de exemplo
data = [
    ("Alice", "Jan", 3000),
    ("Alice", "Feb", 3200),
    ("Bob", "Jan", 2800),
    ("Bob", "Feb", 2900),
]
df = spark.createDataFrame(data, ["employee", "month", "salary"])

# Pivot
pivot_df = df.groupBy("employee").pivot("month").sum("salary")

pivot_df.show()

+--------+----+----+
|employee| Feb| Jan|
+--------+----+----+
|     Bob|2900|2800|
|   Alice|3200|3000|
+--------+----+----+



In [13]:
df.createOrReplaceTempView("salaries")
result = spark.sql("""
    SELECT
      employee,
      SUM(CASE WHEN month = 'Jan' THEN salary END) AS Jan,
      SUM(CASE WHEN month = 'Feb' THEN salary END) AS Feb
    FROM salaries
    GROUP BY employee
""")
result.show()

+--------+----+----+
|employee| Jan| Feb|
+--------+----+----+
|   Alice|3000|3200|
|     Bob|2800|2900|
+--------+----+----+



# Pipelines

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder

indexer = StringIndexer(inputCol="categoria", outputCol="categoria_indexada")
df_indexado = indexer.fit(df).transform(df)



encoder = OneHotEncoder(inputCols=["categoria_indexada"], outputCols=["categoria_vetorizada"])
df_encoded = encoder.fit(df_indexado).transform(df_indexado)


In [18]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder

In [20]:
indexer = StringIndexer(inputCol="customer_state", outputCol="stringIndexer")
df_indexado = indexer.fit(olist_customers).transform(olist_customers)
df_indexado.show(15)

+--------------------+--------------------+------------------------+--------------------+--------------+-------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|stringIndexer|
+--------------------+--------------------+------------------------+--------------------+--------------+-------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|          0.0|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                   09790|sao bernardo do c...|            SP|          0.0|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                   01151|           sao paulo|            SP|          0.0|
|b2b6027bc5c5109e5...|259dac757896d24d7...|                   08775|     mogi das cruzes|            SP|          0.0|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|                   13056|            campinas|            SP|          0.0|
|879864dab9bc30475...|4c93744516667ad3b...|     

In [23]:
encoder = OneHotEncoder(inputCols=["stringIndexer"], outputCols=["oneHotEnconder"])
df_encoded = encoder.fit(df_indexado).transform(df_indexado)
df_encoded.show(25)

+--------------------+--------------------+------------------------+--------------------+--------------+-------------+---------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|stringIndexer| oneHotEnconder|
+--------------------+--------------------+------------------------+--------------------+--------------+-------------+---------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|          0.0| (26,[0],[1.0])|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                   09790|sao bernardo do c...|            SP|          0.0| (26,[0],[1.0])|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                   01151|           sao paulo|            SP|          0.0| (26,[0],[1.0])|
|b2b6027bc5c5109e5...|259dac757896d24d7...|                   08775|     mogi das cruzes|            SP|          0.0| (26,[0],[1.0])|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|            

# Exemplo de Pipeline e Modelo

In [25]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

spark = SparkSession.builder.appName("PipelineExemplo").getOrCreate()

data = [
    ("A", 25.0, 50000.0, 1.0),
    ("B", 45.0, 60000.0, 0.0),
    ("A", 35.0, 65000.0, 1.0),
    ("C", 23.0, 52000.0, 0.0),
    ("B", 52.0, 70000.0, 1.0)
]

schema = StructType([
    StructField("categoria", StringType(), True),
    StructField("idade", DoubleType(), True),
    StructField("renda", DoubleType(), True),
    StructField("label", DoubleType(), True)
])

df = spark.createDataFrame(data, schema)

In [26]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler
from pyspark.ml.classification import LogisticRegression

# 1. StringIndexer para variável categórica
indexer = StringIndexer(inputCol="categoria", outputCol="categoria_index")

# 2. OneHotEncoder para a coluna indexada
encoder = OneHotEncoder(inputCols=["categoria_index"], outputCols=["categoria_ohe"])

# 3. VectorAssembler para juntar todas as features em um vetor
assembler = VectorAssembler(
    inputCols=["categoria_ohe", "idade", "renda"], 
    outputCol="features_raw"
)

# 4. MinMaxScaler para normalizar as features
scaler = MinMaxScaler(inputCol="features_raw", outputCol="features")

# 5. Modelo Estimador
lr = LogisticRegression(featuresCol="features", labelCol="label")

# 6. Pipeline com todas as etapas
pipeline = Pipeline(stages=[indexer, encoder, assembler, scaler, lr])


In [27]:
pipeline_model = pipeline.fit(df)  # PipelineModel (estimator aplicado)

# Transformar os dados de entrada e fazer previsões
result = pipeline_model.transform(df)

# Mostrar resultados
result.select("categoria", "idade", "renda", "features", "prediction", "probability").show()


+---------+-----+-------+--------------------+----------+--------------------+
|categoria|idade|  renda|            features|prediction|         probability|
+---------+-----+-------+--------------------+----------+--------------------+
|        A| 25.0|50000.0|[1.0,0.0,0.068965...|       1.0|[1.97174813557347...|
|        B| 45.0|60000.0|[0.0,1.0,0.758620...|       0.0|[0.99999999038469...|
|        A| 35.0|65000.0|[1.0,0.0,0.413793...|       1.0|[2.34638336882296...|
|        C| 23.0|52000.0|       (4,[3],[0.1])|       0.0|           [1.0,0.0]|
|        B| 52.0|70000.0|   [0.0,1.0,1.0,1.0]|       1.0|[9.19521084368343...|
+---------+-----+-------+--------------------+----------+--------------------+



In [32]:
indexer = StringIndexer(inputCol="categoria", outputCol="categoria_index")
df_indexed = indexer.fit(df).transform(df)
df_indexed.select("categoria", "categoria_index").show(5)

+---------+---------------+
|categoria|categoria_index|
+---------+---------------+
|        A|            0.0|
|        B|            1.0|
|        A|            0.0|
|        C|            2.0|
|        B|            1.0|
+---------+---------------+



In [33]:
encoder = OneHotEncoder(inputCols=["categoria_index"], outputCols=["categoria_ohe"])
df_encoded = encoder.fit(df_indexed).transform(df_indexed)
df_encoded.select("categoria_index", "categoria_ohe").show(truncate=False)

+---------------+-------------+
|categoria_index|categoria_ohe|
+---------------+-------------+
|0.0            |(2,[0],[1.0])|
|1.0            |(2,[1],[1.0])|
|0.0            |(2,[0],[1.0])|
|2.0            |(2,[],[])    |
|1.0            |(2,[1],[1.0])|
+---------------+-------------+



In [34]:
assembler = VectorAssembler(inputCols=["categoria_ohe", "idade", "renda"], outputCol="features_raw")
df_assembled = assembler.transform(df_encoded)
df_assembled.select("categoria", "idade", "renda", "categoria_ohe", "features_raw").show(truncate=False)

+---------+-----+-------+-------------+----------------------+
|categoria|idade|renda  |categoria_ohe|features_raw          |
+---------+-----+-------+-------------+----------------------+
|A        |25.0 |50000.0|(2,[0],[1.0])|[1.0,0.0,25.0,50000.0]|
|B        |45.0 |60000.0|(2,[1],[1.0])|[0.0,1.0,45.0,60000.0]|
|A        |35.0 |65000.0|(2,[0],[1.0])|[1.0,0.0,35.0,65000.0]|
|C        |23.0 |52000.0|(2,[],[])    |[0.0,0.0,23.0,52000.0]|
|B        |52.0 |70000.0|(2,[1],[1.0])|[0.0,1.0,52.0,70000.0]|
+---------+-----+-------+-------------+----------------------+



In [35]:
scaler = MinMaxScaler(inputCol="features_raw", outputCol="features")
scaler_model = scaler.fit(df_assembled)
df_scaled = scaler_model.transform(df_assembled)
df_scaled.select("features_raw", "features").show(truncate=False)

+----------------------+----------------------------------+
|features_raw          |features                          |
+----------------------+----------------------------------+
|[1.0,0.0,25.0,50000.0]|[1.0,0.0,0.06896551724137931,0.0] |
|[0.0,1.0,45.0,60000.0]|[0.0,1.0,0.7586206896551724,0.5]  |
|[1.0,0.0,35.0,65000.0]|[1.0,0.0,0.41379310344827586,0.75]|
|[0.0,0.0,23.0,52000.0]|(4,[3],[0.1])                     |
|[0.0,1.0,52.0,70000.0]|[0.0,1.0,1.0,1.0]                 |
+----------------------+----------------------------------+



In [39]:
from pyspark.ml import Pipeline

# Definindo o pipeline com todas as etapas
pipeline = Pipeline(stages=[indexer, encoder, assembler, scaler, lr])

# Treinando e gerando o modelo final
pipeline_model = pipeline.fit(df)

# Salvando o PipelineModel treinado
pipeline_model.save("meu_pipeline_modelo")

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "C:\Users\Computador\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\LocalCache\local-packages\Python311\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Computador\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\LocalCache\local-packages\Python311\site-packages\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.11_3.11.2544.0_x64__qbz5n2kfra8p0\Lib\socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
from pyspark.ml import PipelineModel

# Carregando o modelo salvo
modelo_carregado = PipelineModel.load("meu_pipeline_modelo")

# Aplicando em novos dados (deve ter mesmas colunas e tipos)
novos_dados = spark.createDataFrame([
    ("B", 40.0, 58000.0, 1.0),
    ("C", 22.0, 49000.0, 0.0)
], ["categoria", "idade", "renda", "label"])

# Transformando os novos dados
resultado = modelo_carregado.transform(novos_dados)
resultado.select("features", "prediction", "probability").show(truncate=False)

# Exemplos de Otimização

In [43]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

spark = SparkSession.builder.appName("PipelineExemplo").getOrCreate()

data = [
    ("A", 25.0, 50000.0, 1.0),
    ("B", 45.0, 60000.0, 0.0),
    ("A", 35.0, 65000.0, 1.0),
    ("C", 23.0, 52000.0, 0.0),
    ("B", 52.0, 70000.0, 1.0)
]

schema = StructType([
    StructField("categoria", StringType(), True),
    StructField("idade", DoubleType(), True),
    StructField("renda", DoubleType(), True),
    StructField("label", DoubleType(), True)
])

df = spark.createDataFrame(data, schema)

In [None]:
df = spark.read.parquet("dados.parquet")
df.cache()  # ou df.persist(StorageLevel.MEMORY_AND_DISK)
df.count()  # primeira ação dispara o cache
df.select("coluna").show()

In [44]:
df_repart = df.repartition(10, "idade")
df_repart.show(5)

+---------+-----+-------+-----+
|categoria|idade|  renda|label|
+---------+-----+-------+-----+
|        C| 23.0|52000.0|  0.0|
|        A| 35.0|65000.0|  1.0|
|        B| 52.0|70000.0|  1.0|
|        A| 25.0|50000.0|  1.0|
|        B| 45.0|60000.0|  0.0|
+---------+-----+-------+-----+



In [45]:
df_coalesced = df.coalesce(2)
df_coalesced

DataFrame[categoria: string, idade: double, renda: double, label: double]

In [None]:
from pyspark.sql.functions import broadcast

df_grande = spark.read.parquet("grande.parquet")
df_pequeno = spark.read.parquet("referencia.parquet")

df_join = df_grande.join(broadcast(df_pequeno), "chave")

In [None]:
spark.conf.set("spark.sql.shuffle.partitions", 100)  # padrão é 200

In [None]:
df = spark.read.parquet("grande.parquet").select("id", "nome").filter("id > 1000")

In [None]:
from pyspark.sql.functions import lit, concat, rand

# Adiciona "sal" à chave
df1 = df1.withColumn("salt", (rand() * 10).cast("int"))
df1 = df1.withColumn("chave_salgada", concat("chave", "salt"))

# Duplica df2 com sal
salts = spark.range(0, 10).withColumnRenamed("id", "salt")
df2_exp = df2.crossJoin(salts)
df2_exp = df2_exp.withColumn("chave_salgada", concat("chave", "salt"))

# Faz o join usando chave salgada
df_join = df1.join(df2_exp, "chave_salgada")

In [None]:
from pyspark.sql.functions import col

# Bom
df = df.withColumn("novo_valor", col("valor") * 2)

# Evite
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

def multiplica(x):
    return x * 2

multiplica_udf = udf(multiplica, IntegerType())
df = df.withColumn("novo_valor", multiplica_udf(col("valor")))  # Menos eficiente