In [0]:
from pyspark.sql.functions import *

In [0]:
spark

In [0]:
# Leer un CSV 
membersDF = (spark
  .read
  .option("header", True)
  .option("inferSchema", True)
  .csv("/FileStore/tables/members.csv") #Importante aqui utilizaremos la ruta del DBFS
) # Importante aqui trabajaremos con toda la data, recuerda quitar el monto

menuDF = (spark
  .read
  .option("header", True)
  .option("inferSchema", True)
  .csv("/FileStore/tables/menu.csv") #Importante aqui utilizaremos la ruta del DBFS
) # Importante aqui trabajaremos con toda la data, recuerda quitar el monto

salesDF = (spark
  .read
  .option("header", True)
  .option("inferSchema", True)
  .csv("/FileStore/tables/sales.csv") #Importante aqui utilizaremos la ruta del DBFS
) # Importante aqui trabajaremos con toda la data, recuerda quitar el monto


In [0]:
print("MembersDF")
display(membersDF)
print("MenuDF")
display(menuDF)
print("SalesDF")
display(salesDF)

MembersDF


customer_id,join_date
A,2021-01-07
B,2021-01-09


MenuDF


product_id,product_name,price
1,sushi,10
2,curry,15
3,ramen,12


SalesDF


customer_id,order_date,product_id
A,2021-01-01,1
A,2021-01-01,2
A,2021-01-07,2
A,2021-01-10,3
A,2021-01-11,3
A,2021-01-11,3
B,2021-01-01,2
B,2021-01-02,2
B,2021-01-04,1
B,2021-01-11,1


In [0]:
salesDF.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- order_date: date (nullable = true)
 |-- product_id: integer (nullable = true)



In [0]:
#cols = ["customer_id","product_id"]
#sDF = salesDF.select(*cols)

#sDF.show()

In [0]:
# ●	¿Cuál es la cantidad total que gastó cada cliente en el restaurante?

# creamos la join expression
product_id_join_exp = salesDF["product_id"] == menuDF["product_id"]
join_type = "inner"

#ejecutamos el join
sales_menu_product_joinDF = salesDF.join(menuDF, product_id_join_exp, join_type)

# Agrupamos por cliente, y sumamos los precios de todos los productos que compró
customer_spendingDF = sales_menu_product_joinDF.groupBy("customer_id")\
                                             .agg(sum("price").alias("spending"))\
                                             .orderBy(col("customer_id"))

customer_spendingDF.show()


+-----------+--------+
|customer_id|spending|
+-----------+--------+
|          A|      76|
|          B|      74|
|          C|      36|
+-----------+--------+



In [0]:
# ●	¿Cuántos días ha visitado cada cliente el restaurante?

customer_visitsDF = salesDF.groupBy("customer_id","order_date")\
                            .count()\
                            .groupBy("customer_id").sum()\
                            .orderBy(col("customer_id"))\
                            .select("customer_id", col("sum(count)").alias("visits"))

customer_visitsDF.show()

+-----------+------+
|customer_id|visits|
+-----------+------+
|          A|     6|
|          B|     6|
|          C|     3|
+-----------+------+



In [0]:
# ●	¿Cuál fue el primer artículo del menú comprado por cada cliente?

first_pruchaseDF = salesDF.groupBy("customer_id")\
                           .agg(min("order_date").alias("first_pruchase"))

customer_joinExp = first_pruchaseDF["customer_id"] == salesDF["customer_id"]
date_joinExp = first_pruchaseDF["first_pruchase"] == salesDF["order_date"]
joinExp = customer_joinExp & date_joinExp

first_purchase_productsDF = salesDF.join(first_pruchaseDF, joinExp)\
                                   .drop(first_pruchaseDF["customer_id"])\
                                   .drop(first_pruchaseDF["first_pruchase"])

first_articleDF = first_purchase_productsDF.join(menuDF, ["product_id"])\
                                           .select("customer_id","order_date","product_name")\
                                           .dropDuplicates()\
                                           .orderBy("customer_id")

first_articleDF.show()

+-----------+----------+------------+
|customer_id|order_date|product_name|
+-----------+----------+------------+
|          A|2021-01-01|       curry|
|          A|2021-01-01|       sushi|
|          B|2021-01-01|       curry|
|          C|2021-01-01|       ramen|
+-----------+----------+------------+



In [0]:
# ●	¿Cuál es el artículo más comprado en el menú y cuántas veces lo compraron todos los clientes?

most_purchasedDF = salesDF.groupBy("product_id")\
                           .count()\
                           .orderBy(desc("count"))\
                           .limit(1)

most_purchasedDF.join(menuDF.select("product_id","product_name"),
                      ["product_id"]).show()

most_purchased_product_id = most_purchasedDF.select("product_id").toPandas()['product_id'].values[0]

#print(most_purchased_product_id)
#print(type(int(most_purchased_product_id)))

purchase_by_customerDF = salesDF.filter( col("product_id") == int(most_purchased_product_id) )\
                                .groupBy("customer_id")\
                                .count()\
                                .orderBy("customer_id")\
                                .show()

+----------+-----+------------+
|product_id|count|product_name|
+----------+-----+------------+
|         3|    8|       ramen|
+----------+-----+------------+

+-----------+-----+
|customer_id|count|
+-----------+-----+
|          A|    3|
|          B|    2|
|          C|    3|
+-----------+-----+



In [0]:
# ●	¿Qué artículo fue el más popular para cada cliente?
# https://sparkbyexamples.com/pyspark/pyspark-select-first-row-of-each-group/

from pyspark.sql.window import Window

win = Window().partitionBy("customer_id").orderBy(col("count").desc())

popular_productsDF = salesDF.groupby("customer_id","product_id")\
                           .count()\
                           .withColumn("row", row_number().over(win))\
                           .filter(col("row") == 1).drop("row")

popular_productsDF.show()

+-----------+----------+-----+
|customer_id|product_id|count|
+-----------+----------+-----+
|          A|         3|    3|
|          B|         1|    2|
|          C|         3|    3|
+-----------+----------+-----+



In [0]:
# ●	¿Qué artículo compró primero el cliente después de convertirse en miembro?

win2 = Window().partitionBy("customer_id").orderBy(col("order_date").asc())

ranked_datesDF = salesDF.join(membersDF,["customer_id"])\
                       .filter(col("order_date") >= col("join_date"))\
                       .withColumn("row", row_number().over(win2))
                       #.drop("join_date")\
                       
#ranked_datesDF.show()
columns = ranked_datesDF.columns + menuDF.columns
final_columns = []
[final_columns.append(col) for col in columns if  ( (col not in final_columns) and (col != 'row') ) ]

first_purchase_as_memberDF = ranked_datesDF.filter(col("row") == 1)\
                                           .drop("row")\
                                           .join(menuDF, ["product_id"])

#first_purchase_as_memberDF.show() 
first_purchase_as_memberDF.select(*final_columns).show() 

+-----------+----------+----------+----------+------------+-----+
|customer_id|order_date|product_id| join_date|product_name|price|
+-----------+----------+----------+----------+------------+-----+
|          A|2021-01-07|         2|2021-01-07|       curry|   15|
|          B|2021-01-11|         1|2021-01-09|       sushi|   10|
+-----------+----------+----------+----------+------------+-----+



In [0]:
# ●	¿Qué artículo se compró justo antes de que el cliente se convirtiera en miembro?

win3 = Window().partitionBy("customer_id").orderBy(col("order_date").desc())

purchases_before_membershipsDF = salesDF.join(membersDF,["customer_id"])\
                                       .filter(col("order_date") < col("join_date"))\
                                       .withColumn("row", row_number().over(win2))

purchases_before_membershipsDF.show()

last_purchase_before_memberDF = ranked_datesDF.filter(col("row") == 1)\
                                           .drop("row")\
                                           .join(menuDF, ["product_id"])

last_purchase_before_memberDF.select("customer_id","product_id","product_name","order_date","join_date").show()

+-----------+----------+----------+----------+---+
|customer_id|order_date|product_id| join_date|row|
+-----------+----------+----------+----------+---+
|          A|2021-01-01|         1|2021-01-07|  1|
|          A|2021-01-01|         2|2021-01-07|  2|
|          B|2021-01-01|         2|2021-01-09|  1|
|          B|2021-01-02|         2|2021-01-09|  2|
|          B|2021-01-04|         1|2021-01-09|  3|
+-----------+----------+----------+----------+---+

+-----------+----------+------------+----------+----------+
|customer_id|product_id|product_name|order_date| join_date|
+-----------+----------+------------+----------+----------+
|          A|         2|       curry|2021-01-07|2021-01-07|
|          B|         1|       sushi|2021-01-11|2021-01-09|
+-----------+----------+------------+----------+----------+



In [0]:
# ●	¿Cuál es el total de artículos y la cantidad gastada por cada miembro antes de convertirse en miembro?
purchases_before_membershipsDF.join(menuDF, ["product_id"])\
                              .groupBy("customer_id")\
                              .agg(count("product_id").alias("num_products_before_membership"),
                                    sum("price").alias("spent"))\
                              .orderBy("customer_id").show()

+-----------+------------------------------+-----+
|customer_id|num_products_before_membership|spent|
+-----------+------------------------------+-----+
|          A|                             2|   25|
|          B|                             3|   40|
+-----------+------------------------------+-----+



In [0]:
# ●	Si cada $ 1 gastado equivale a 10 puntos y el sushi tiene un multiplicador de puntos 2x, ¿cuántos puntos tendría cada cliente?
# https://sparkbyexamples.com/pyspark/pyspark-when-otherwise/
salesDF.join(menuDF, ["product_id"])\
       .withColumn("points", when(col("product_name") == 'sushi', col("price")*20  )
                            .otherwise(col("price")*10)
                  )\
        .groupBy("customer_id")\
        .sum("points")\
        .orderBy("customer_id")\
        .show()

+-----------+-----------+
|customer_id|sum(points)|
+-----------+-----------+
|          B|        940|
|          C|        360|
|          A|        860|
+-----------+-----------+



In [0]:
# ●	En la primera semana después de que un cliente se une al programa (incluida la fecha de ingreso), gana el doble de puntos en todos los artículos, no solo en sushi. ¿Cuántos puntos tienen los clientes A y B a fines de enero?

pointsDF = salesDF.join(menuDF, ["product_id"])\
                   .join(membersDF, ["customer_id"], "left_outer")\
                   .withColumn("TimeDiffDays", ((unix_timestamp("order_date") - unix_timestamp('join_date'))/(60*60*24) ) )\
                   .withColumn("points", when( (col("product_name") == 'sushi') | (col("TimeDiffDays").between(0,7)),
                                                col("price")*20  )
                                        .otherwise(col("price")*10)
                              )

pointsDF.show()

pointsDF.groupBy("customer_id").sum("points").orderBy("customer_id").show()

+-----------+----------+----------+------------+-----+----------+------------+------+
|customer_id|product_id|order_date|product_name|price| join_date|TimeDiffDays|points|
+-----------+----------+----------+------------+-----+----------+------------+------+
|          A|         1|2021-01-01|       sushi|   10|2021-01-07|        -6.0|   200|
|          A|         2|2021-01-01|       curry|   15|2021-01-07|        -6.0|   150|
|          A|         2|2021-01-07|       curry|   15|2021-01-07|         0.0|   300|
|          A|         3|2021-01-10|       ramen|   12|2021-01-07|         3.0|   240|
|          A|         3|2021-01-11|       ramen|   12|2021-01-07|         4.0|   240|
|          A|         3|2021-01-11|       ramen|   12|2021-01-07|         4.0|   240|
|          B|         2|2021-01-01|       curry|   15|2021-01-09|        -8.0|   150|
|          B|         2|2021-01-02|       curry|   15|2021-01-09|        -7.0|   150|
|          B|         1|2021-01-04|       sushi|   10|