### Programación Concurrente
## 27. Joins con Spark


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, to_timestamp

In [2]:
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

Un Join es una operación que permite combinar datos de dos o más tablas en una sola, basándose en una relación entre ellas. Usualmente, esta relación se establece a través de una columna en común entre las tablas.

El Join permite acceder y manipular datos que están distribuidos en varias tablas de manera conjunta, obteniendo así resultados más complejos y completos.

Vamos a leer tres tablas; estas se encuentran en este [enlace de Kaggle](https://www.kaggle.com/datasets/svbstan/sales-product-and-customer-insight-repository)

In [3]:
customer = spark.read.csv('customer_profile_dataset.csv', header=True)
customer.show(5)

+-----------+----------+---------+------+-------------------+--------------------+------------+-------------------+-------------+-----------+-----+--------+
|customer_id|first_name|last_name|gender|      date_of_birth|               email|phone_number|        signup_date|      address|       city|state|zip_code|
+-----------+----------+---------+------+-------------------+--------------------+------------+-------------------+-------------+-----------+-----+--------+
|          1|    Robert|    Smith|Female|1994-06-14 21:40:27|jane.davis1@mail.com|634-106-4981|2016-10-16 17:23:25| 8465 Main St|San Antonio|   CA|   35566|
|          2|     Emily|   Garcia|Female|1989-09-21 17:56:31|robert.williams2@...|386-635-5998|2021-04-04 14:24:06|  305 Main St|   New York|   AZ|   23187|
|          3|   Jessica|    Brown|  Male|1984-01-21 21:43:13|emily.davis3@mail...|627-341-5213|2018-04-22 04:51:57|  5725 Oak St|    Chicago|   AZ|   99188|
|          4|   Michael|    Brown|  Male|1986-02-06 13:09:

In [4]:
products = spark.read.csv('products_dataset.csv', header=True)
products.show(5)

+----------+------------+--------+--------------+------+--------------------+
|product_id|product_name|category|price_per_unit| brand| product_description|
+----------+------------+--------+--------------+------+--------------------+
|         1|      Butter|   Dairy|         28.58|BrandB|Description for Rice|
|         2|      Butter|   Meats|         22.66|BrandB|Description for B...|
|         3|        Milk|   Meats|         26.52|BrandE|Description for B...|
|         4|      Banana|  Grains|         26.12|BrandB|Description for A...|
|         5|        Rice|  Fruits|         21.94|BrandD|Description for B...|
+----------+------------+--------+--------------+------+--------------------+
only showing top 5 rows



In [5]:
purchase = spark.read.csv('purchase_history_dataset.csv', header=True)
purchase.show(5)

+-----------+-----------+----------+-------------------+--------+------------------+
|purchase_id|customer_id|product_id|      purchase_date|quantity|      total_amount|
+-----------+-----------+----------+-------------------+--------+------------------+
|          1|          1|        42|2018-04-15 14:08:01|       3| 37.64207365077783|
|          2|          1|       138|2022-07-10 23:33:47|       4| 70.24710587172727|
|          3|          1|       403|2021-12-31 03:53:33|       3| 89.16889585975464|
|          4|          1|       193|2017-01-14 01:25:11|       2| 59.70505931112876|
|          5|          1|        26|2018-04-06 11:01:06|       3|101.77886387225126|
+-----------+-----------+----------+-------------------+--------+------------------+
only showing top 5 rows



Ejercicios; contesta desarrollando el correspondiente código en Pyspark:

1. Responde, ¿cuántos clientes llamados "Robert" (nota cómo hay *Males* y *Females*), compraron algún producto lácteo (Dairy) en 2022 ?

2. Eres empleado de *BrandB*. ¿En cuáles ciudades has vendido una mayor cantidad? (total_amount)

3. ¿De cuánto es la mayor cantidad (quantity) que ha sido comprado por algún hombre O cuyo producto sea pan (Bread)?

In [60]:
from pyspark.sql.functions import col, year

# Combinar todo en una sola indicación
robert_count = purchase.filter(year(col("purchase_date")) == 2022) \
    .join(products.filter(col("category") == "Dairy"), "product_id") \
    .join(customer.filter(col("first_name") == "Robert"), "customer_id") \
    .select("customer_id").count()

##Se cuentan todos los roberts aunque este sean repetidos 
print(f"Cantidad de clientes llamados Robert que compraron productos lácteos en 2022: {robert_count}")

Cantidad de clientes llamados Robert que compraron productos lácteos en 2022: 44


In [61]:
from pyspark.sql.functions import col

# Convertir "total_amount" a tipo Double
purchase = purchase.withColumn("total_amount", col("total_amount").cast(DoubleType()))

# Filtrar productos por la marca, unir con compras y clientes, agrupar por ciudad, y calcular total_amount
top_city = (
    purchase.join(products.filter(col("brand") == "BrandB"), "product_id")
    .join(customer, "customer_id")
    .groupBy("city")
    .sum("total_amount")
    .withColumnRenamed("sum(total_amount)", "total_amount")
    .orderBy(col("total_amount").desc())
    .first()
)

# Resultado
print(f"La ciudad con la mayor cantidad vendida de BrandB es {top_city['city']} con un total de {top_city['total_amount']}.")


La ciudad con la mayor cantidad vendida de BrandB es Chicago con un total de 29557.642903425036.


In [63]:
from pyspark.sql.functions import col, max as spark_max

# Filtrar compras realizadas por hombres o productos tipo Bread, y calcular la cantidad máxima
max_quantity = (
    purchase.join(customer, "customer_id")
            .join(products, "product_id")
            .filter((col("gender") == "Male") | (col("product_name") == "Bread"))
            .agg(spark_max("quantity").alias("max_quantity"))
            .collect()[0]["max_quantity"]
)

# Mostrar el resultado
print(f"La mayor cantidad comprada por un hombre o de productos Bread es: {max_quantity}")


La mayor cantidad comprada por un hombre o de productos Bread es: 5.0
