### Programación Concurrente
## 27. Joins con Spark


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, to_timestamp

In [2]:
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


Un Join es una operación que permite combinar datos de dos o más tablas en una sola, basándose en una relación entre ellas. Usualmente, esta relación se establece a través de una columna en común entre las tablas.

El Join permite acceder y manipular datos que están distribuidos en varias tablas de manera conjunta, obteniendo así resultados más complejos y completos.

Vamos a leer tres tablas; estas se encuentran en este [enlace de Kaggle](https://www.kaggle.com/datasets/svbstan/sales-product-and-customer-insight-repository)

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, lit

# Cargar los datos de los clientes
customer_profile = spark.read.csv('/content/drive/My Drive/customer_profile_dataset.csv', header=True, inferSchema=True)

# Cargar los datos de los productos
products = spark.read.csv('/content/drive/My Drive/products_dataset.csv', header=True, inferSchema=True)

# Cargar los datos de historial de compras
purchase = spark.read.csv('/content/drive/My Drive/purchase_history_dataset.csv', header=True, inferSchema=True)



1. Responde, ¿cuántos clientes llamados "Robert" (nota cómo hay *Males* y *Females*), compraron algún producto lácteo (Dairy) en 2022 ?

In [5]:
from pyspark.sql.functions import col, to_timestamp, lit

robert_dairy_purchases_2022 = purchase \
    .filter(col('purchase_date').between(lit("2022-01-01 00:00:00"), lit("2022-12-31 23:59:59"))) \
    .join(products.filter(col('category') == 'Dairy'), "product_id") \
    .join(customer_profile.filter(col('first_name') == 'Robert'), "customer_id") \
    .select('customer_id').distinct().count()

print(f"Cantidad de clientes llamados 'Robert' que compraron productos lácteos en 2022: {robert_dairy_purchases_2022}")


Cantidad de clientes llamados 'Robert' que compraron productos lácteos en 2022: 31


2. Eres empleado de *BrandB*. ¿En cuáles ciudades has vendido una mayor cantidad? (total_amount)

In [6]:
top_cities_brandb = purchase \
    .join(products.filter(col('brand') == 'BrandB'), "product_id") \
    .join(customer_profile, "customer_id") \
    .groupBy('city').sum('total_amount') \
    .orderBy(col('sum(total_amount)').desc()) \
    .show()


+------------+------------------+
|        city| sum(total_amount)|
+------------+------------------+
|     Chicago|29557.642903425036|
|     Houston| 28586.38258782695|
|     Phoenix| 28041.96419969516|
|Philadelphia|27492.074313737838|
|    New York|27246.218068437967|
| San Antonio|25221.051797835673|
| Los Angeles|23819.529899740137|
+------------+------------------+



3. ¿De cuánto es la mayor cantidad (quantity) que ha sido comprado por algún hombre O cuyo producto sea pan (Bread)?

In [7]:
from pyspark.sql.functions import col, to_timestamp, lit

max_quantity_male_or_bread = purchase \
    .join(customer_profile, "customer_id", "left_outer") \
    .join(products, "product_id", "left_outer") \
    .filter((col('gender') == 'Male') | (col('category') == 'Bread')) \
    .select('quantity').groupBy().max('quantity').collect()[0][0]

print(f"La mayor cantidad comprada por algún hombre O en productos de pan es: {max_quantity_male_or_bread}")



La mayor cantidad comprada por algún hombre O en productos de pan es: 5
