In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [4]:
# Inicializando a SparkSession
spark = SparkSession.builder\
    .appName("ETL com Spark")\
    .getOrCreate()


In [21]:
# Caminhos dos arquivos CSV
file_path_customers = "/content/customers.csv"
file_path_sales = "/content/sales.csv"

# Extraindo os dados dos arquivos CSV
df_customers = spark.read.csv(file_path_customers, header=True, inferSchema=True)
df_sales = spark.read.csv(file_path_sales, header=True, inferSchema=True)

# Exibindo os esquemas dos DataFrames
df_customers.printSchema()
df_sales.printSchema()


root
 |-- customer_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- active: boolean (nullable = true)

root
 |-- sale_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product: string (nullable = true)
 |-- quantity: integer (nullable = true)



In [22]:
# Transformando os dados: Filtrando clientes ativos e que tiveram alguma compra
df_customers_filtered = df_customers.filter(col("active") == True)
df_sales_filtered = df_sales.filter(col("quantity") > 0)


In [23]:
# Realizando um join entre os DataFrames
df_joined = df_customers_filtered.join(df_sales_filtered, df_customers_filtered.customer_id == df_sales_filtered.customer_id, "inner")
df_joined.show(5,False)

+-----------+------------+-----------------------+------+-------+-----------+---------+--------+
|customer_id|name        |email                  |active|sale_id|customer_id|product  |quantity|
+-----------+------------+-----------------------+------+-------+-----------+---------+--------+
|62         |Customer_62 |customer62@example.com |true  |2      |62         |Product_C|5       |
|166        |Customer_166|customer166@example.com|true  |3      |166        |Product_C|5       |
|192        |Customer_192|customer192@example.com|true  |5      |192        |Product_A|2       |
|28         |Customer_28 |customer28@example.com |true  |7      |28         |Product_B|3       |
|2          |Customer_2  |customer2@example.com  |true  |9      |2          |Product_C|5       |
+-----------+------------+-----------------------+------+-------+-----------+---------+--------+
only showing top 5 rows



In [25]:
# Selecionando colunas específicas do DataFrame final
df_result = df_joined.select(
    df_customers_filtered.customer_id,
    df_customers_filtered.name,
    df_sales_filtered.product,
    df_sales_filtered.quantity
 )

# Exibindo os primeiros registros do DataFrame final
df_result.show()

+-----------+------------+---------+--------+
|customer_id|        name|  product|quantity|
+-----------+------------+---------+--------+
|         62| Customer_62|Product_C|       5|
|        166|Customer_166|Product_C|       5|
|        192|Customer_192|Product_A|       2|
|         28| Customer_28|Product_B|       3|
|          2|  Customer_2|Product_C|       5|
|        115|Customer_115|Product_A|       1|
|         41| Customer_41|Product_C|       3|
|         80| Customer_80|Product_A|       4|
|         79| Customer_79|Product_C|       1|
|         52| Customer_52|Product_A|       8|
|         59| Customer_59|Product_C|       3|
|         32| Customer_32|Product_B|       4|
|        101|Customer_101|Product_C|       2|
|        167|Customer_167|Product_A|       7|
|         22| Customer_22|Product_C|       4|
|         80| Customer_80|Product_B|       8|
|        103|Customer_103|Product_A|       5|
|         57| Customer_57|Product_B|       6|
|        182|Customer_182|Product_

In [26]:
# Carregando os dados finais em um novo arquivo CSV
output_path = "/content/result.csv"
df_result.write.csv(output_path, header=True)



In [None]:
# Supondo que eu queira salvar o DataFrame no HDFS
output_path = "/user/seu_usuario/result.csv"  # Caminho no HDFS

# Escrever o DataFrame no HDFS
df_result.write.csv(output_path, header=True)