#Trabajo Final

### DataFrame Menu

In [0]:
file_location_menu = "/FileStore/tables/menu.csv"

menuDF = (spark
  .read
  .option("header", True)
  .option("inferSchema", "true")
  .csv(file_location_menu)
           )
# Mostrando el contenido de la data

display(menuDF)

product_id,product_name,price
1,sushi,10
2,curry,15
3,ramen,12


### DataFrame Members

In [0]:
# File location and type
file_location_members = "/FileStore/tables/members.csv"

membersDF = (spark
  .read
  .option("header", True)
  .option("inferSchema", "true")
  .csv(file_location_members)
           )
# Mostrando el contenido de la data

display(membersDF)

customer_id,join_date
A,2021-01-07T00:00:00.000+0000
B,2021-01-09T00:00:00.000+0000


### DataFrame Sales

In [0]:
# File location and type
file_location_sales = "/FileStore/tables/sales.csv"

salesDF = (spark
  .read
  .option("header", True)
  .option("inferSchema", "true")
  .csv(file_location_sales)
           )
# Mostrando el contenido de la data

display(salesDF)

customer_id,order_date,product_id
A,2021-01-01T00:00:00.000+0000,1
A,2021-01-01T00:00:00.000+0000,2
A,2021-01-07T00:00:00.000+0000,2
A,2021-01-10T00:00:00.000+0000,3
A,2021-01-11T00:00:00.000+0000,3
A,2021-01-11T00:00:00.000+0000,3
B,2021-01-01T00:00:00.000+0000,2
B,2021-01-02T00:00:00.000+0000,2
B,2021-01-04T00:00:00.000+0000,1
B,2021-01-11T00:00:00.000+0000,1


### Vistas y dataframes Previos

In [0]:
# Create a view or table

menuDF.createOrReplaceTempView("menuSQL") 
salesDF.createOrReplaceTempView("salesSQL")
membersDF.createOrReplaceTempView("membersSQL")

In [0]:
#View all joined data
from pyspark.sql import functions as F

dataDF=salesDF.join(membersDF,["customer_id"],'left')\
        .join(menuDF,["product_id"],'inner')\
        .withColumn("Miembro",F.when(F.col("join_date").isNull(),"No").otherwise("Si") )

dataDF.createOrReplaceTempView("dataSQL")

spark.sql("select * from dataSQL").show(truncate=False)

+----------+-----------+-------------------+-------------------+------------+-----+-------+
|product_id|customer_id|order_date         |join_date          |product_name|price|Miembro|
+----------+-----------+-------------------+-------------------+------------+-----+-------+
|1         |A          |2021-01-01 00:00:00|2021-01-07 00:00:00|sushi       |10   |Si     |
|2         |A          |2021-01-01 00:00:00|2021-01-07 00:00:00|curry       |15   |Si     |
|2         |A          |2021-01-07 00:00:00|2021-01-07 00:00:00|curry       |15   |Si     |
|3         |A          |2021-01-10 00:00:00|2021-01-07 00:00:00|ramen       |12   |Si     |
|3         |A          |2021-01-11 00:00:00|2021-01-07 00:00:00|ramen       |12   |Si     |
|3         |A          |2021-01-11 00:00:00|2021-01-07 00:00:00|ramen       |12   |Si     |
|2         |B          |2021-01-01 00:00:00|2021-01-09 00:00:00|curry       |15   |Si     |
|2         |B          |2021-01-02 00:00:00|2021-01-09 00:00:00|curry       |15 

### ¿Cuál es la cantidad total que gastó cada cliente en el restaurante?

In [0]:
from pyspark.sql.functions import sum

salesDF.join(menuDF,salesDF.product_id ==  menuDF.product_id,"inner") \
     .select('customer_id','price')\
     .groupBy('customer_id').agg(sum('price').alias('Total'))\
     .show()

+-----------+-----+
|customer_id|Total|
+-----------+-----+
|          B|   74|
|          C|   36|
|          A|   76|
+-----------+-----+



In [0]:
%sql

select s.customer_id, sum(m.price) Total
from menuSQL m inner join salesSQL s on s.product_id=m.product_id
group by s.customer_id

customer_id,Total
B,74
C,36
A,76


### ¿Cuántos días ha visitado cada cliente el restaurante?

In [0]:
from pyspark.sql.functions import countDistinct

salesDF.select('customer_id','order_date')\
     .groupBy('customer_id').agg(countDistinct('order_date').alias('Dias Visitados'))\
     .orderBy('customer_id')\
     .show(truncate=False)

+-----------+--------------+
|customer_id|Dias Visitados|
+-----------+--------------+
|A          |4             |
|B          |6             |
|C          |2             |
+-----------+--------------+



In [0]:
spark.sql("Select customer_id,count(distinct order_date) as `Dias Visitados` " + 
          "from salesSQL Group by customer_id " +
          "order by 1").show()

+-----------+--------------+
|customer_id|Dias Visitados|
+-----------+--------------+
|          A|             4|
|          B|             6|
|          C|             2|
+-----------+--------------+



### ¿Cuál fue el primer artículo del menú comprado por cada cliente?

In [0]:
from pyspark.sql.functions import min
from pyspark.sql import functions as F

primerafechaDF = salesDF.select("customer_id","order_date").groupby("customer_id").agg(min('order_date').alias('order_date'))

salesDF.join(primerafechaDF,["customer_id","order_date"],'inner')\
            .join(menuDF,["product_id"],'inner')\
            .select("customer_id","order_date","product_name")\
            .groupBy('customer_id','order_date').agg(F.collect_set('product_name').alias('Primer Producto Comprado'))\
            .show()

+-----------+-------------------+------------------------+
|customer_id|         order_date|Primer Producto Comprado|
+-----------+-------------------+------------------------+
|          B|2021-01-01 00:00:00|                 [curry]|
|          A|2021-01-01 00:00:00|          [curry, sushi]|
|          C|2021-01-01 00:00:00|                 [ramen]|
+-----------+-------------------+------------------------+



In [0]:
spark.sql("Select S.customer_id,S.order_date,ARRAY_AGG(Distinct M.product_name) `Primer Producto Comprado` from SalesSQL S INNER JOIN " +
          "(select customer_id,min(order_date) as order_date from salesSQL group by customer_id) S2 " +
          "ON S2.customer_id=S.customer_id and S2.order_date=S.order_date " +
          "INNER JOIN menuSQL M ON M.product_id=S.product_id "
         "GROUP BY S.customer_id,S.order_date").show()

+-----------+-------------------+------------------------+
|customer_id|         order_date|Primer Producto Comprado|
+-----------+-------------------+------------------------+
|          B|2021-01-01 00:00:00|                 [curry]|
|          A|2021-01-01 00:00:00|          [curry, sushi]|
|          C|2021-01-01 00:00:00|                 [ramen]|
+-----------+-------------------+------------------------+



### ¿Cuál es el artículo más comprado en el menú y cuántas veces lo compraron todos los clientes?

In [0]:
from pyspark.sql.functions import count,desc

producto_mas_compradoDF = salesDF.select("product_id")\
                            .groupby("product_id")\
                            .agg(count("product_id").alias("Cantidad"))\
                            .sort(desc("Cantidad")).limit(1)\
                            .join(menuDF,["product_id"],'inner')\
                            .select("product_id","product_name")

salesDF.select("customer_id","product_id")\
        .join(producto_mas_compradoDF,["product_id"],'inner')\
        .groupby("product_name","customer_id").agg(count("product_name").alias("Nro veces comprado"))\
        .show()

+------------+-----------+------------------+
|product_name|customer_id|Nro veces comprado|
+------------+-----------+------------------+
|       ramen|          C|                 3|
|       ramen|          A|                 3|
|       ramen|          B|                 2|
+------------+-----------+------------------+



In [0]:

spark.sql("Select p.product_name `Producto mas comprado`,s.customer_id,count(1) `Nro veces comprado` from salesSQL s " +
          "INNER JOIN (Select s.product_id,m.product_name,count(1) Cantidad " + 
                        "from salesSQL s inner join menuSQL m on s.product_id=m.product_id "+
                        "group by s.product_id,m.product_name "+
                        "order by count(1) desc "+
                        "Limit 1) p " +
          "ON s.product_id=p.product_id "+
          "group by p.product_name,s.customer_id").show()

+---------------------+-----------+------------------+
|Producto mas comprado|customer_id|Nro veces comprado|
+---------------------+-----------+------------------+
|                ramen|          C|                 3|
|                ramen|          A|                 3|
|                ramen|          B|                 2|
+---------------------+-----------+------------------+



###  ¿Qué artículo fue el más popular para cada cliente?

In [0]:
import pyspark.sql.functions as f
from pyspark.sql.window import Window

salesDF.groupby("customer_id","product_id")\
        .agg(f.count("customer_id").alias("cantidad"))\
        .withColumn("max",f.max("cantidad").over(Window.partitionBy("customer_id")))\
        .where(f.col("cantidad") == f.col("max"))\
        .join(menuDF,["product_id"],'inner')\
        .select("customer_id","product_name",f.col("max").alias("Cantidad de veces consumido"))\
        .groupby("customer_id","Cantidad de veces consumido").agg(f.collect_list("product_name").alias("Producto mas consumido")).show(10,0)

+-----------+---------------------------+----------------------+
|customer_id|Cantidad de veces consumido|Producto mas consumido|
+-----------+---------------------------+----------------------+
|A          |3                          |[ramen]               |
|B          |2                          |[sushi, ramen, curry] |
|C          |3                          |[ramen]               |
+-----------+---------------------------+----------------------+



In [0]:

spark.sql("""Select T.customer_id,array_agg(m.product_name) `Producto mas consumido` ,T.max `Cantidad de veces consumido`
            from (Select customer_id,product_id,count(1) cantidad,
                  MAX(count(1)) OVER(PARTITION BY customer_id) Max 
                    from salesSQL group by customer_id,product_id) T 
            inner join menuSQL m ON T.product_id = m.product_id
            Where T.cantidad=T.max
            Group by T.customer_id,T.max
            """).display()

customer_id,Producto mas consumido,Cantidad de veces consumido
A,List(ramen),3
B,"List(sushi, ramen, curry)",2
C,List(ramen),3


###  ¿Qué artículo compró primero el cliente después de convertirse en miembro?

In [0]:
primera_compra_por_miembroDF= dataDF.filter(F.col("Miembro")=="Si")\
                                      .filter(F.col("order_date")>=F.col("join_date"))\
                                      .select("customer_id","order_date")\
                                      .groupby("customer_id").agg(F.min("order_date").alias("order_date"))

dataDF.join(primera_compra_por_miembroDF,["customer_id","order_date"],'inner')\
       .select("customer_id","order_date","product_name").show()

+-----------+-------------------+------------+
|customer_id|         order_date|product_name|
+-----------+-------------------+------------+
|          A|2021-01-07 00:00:00|       curry|
|          B|2021-01-11 00:00:00|       sushi|
+-----------+-------------------+------------+



In [0]:
primera_compra_por_miembro = """Select customer_id,min(order_date) order_date
                                from dataSQL
                                where Miembro = 'Si' and order_date>=join_date
                                group by customer_id"""
spark.sql("""Select d.customer_id,d.order_date,d.product_name from (""" + primera_compra_por_miembro +""")T INNER JOIN dataSQL d ON T.customer_id=d.customer_id and T.order_date=d.order_date""").show()

+-----------+-------------------+------------+
|customer_id|         order_date|product_name|
+-----------+-------------------+------------+
|          B|2021-01-11 00:00:00|       sushi|
|          A|2021-01-07 00:00:00|       curry|
+-----------+-------------------+------------+



###  ¿Qué artículo se compró justo antes de que el cliente se convirtiera en miembro?

In [0]:
ultima_compra_antes_ser_miembroDF= dataDF.filter(F.col("Miembro")=="Si")\
                                      .filter(F.col("order_date")<F.col("join_date"))\
                                      .select("customer_id","order_date")\
                                      .groupby("customer_id").agg(F.min("order_date").alias("order_date"))

dataDF.join(ultima_compra_antes_ser_miembroDF,["customer_id","order_date"],'inner')\
       .select("customer_id","order_date","product_name")\
       .groupby("customer_id","order_date").agg(F.collect_list("product_name").alias("product_name")).show()

+-----------+-------------------+--------------+
|customer_id|         order_date|  product_name|
+-----------+-------------------+--------------+
|          B|2021-01-01 00:00:00|       [curry]|
|          A|2021-01-01 00:00:00|[curry, sushi]|
+-----------+-------------------+--------------+



In [0]:
ultima_compra_antes_ser_miembroDF = """Select customer_id,max(order_date) order_date
                                from dataSQL
                                where Miembro = 'Si' and order_date<join_date
                                group by customer_id"""
spark.sql("""Select d.customer_id,d.order_date,ARRAY_AGG(d.product_name) product_name 
            from (""" + ultima_compra_antes_ser_miembroDF +""")T INNER JOIN dataSQL d ON T.customer_id=d.customer_id and T.order_date=d.order_date
            group by d.customer_id,d.order_date """).show()

+-----------+-------------------+--------------+
|customer_id|         order_date|  product_name|
+-----------+-------------------+--------------+
|          B|2021-01-04 00:00:00|       [sushi]|
|          A|2021-01-01 00:00:00|[curry, sushi]|
+-----------+-------------------+--------------+



###  ¿Cuál es el total de artículos y la cantidad gastada por cada miembro antes de convertirse en miembro?

In [0]:
dataDF.filter((F.col("Miembro")=="Si") & (F.col("order_date")<F.col("join_date")) )\
        .select("customer_id","price")\
        .groupby("customer_id").agg(F.count("customer_id").alias("cantidad_productos"),F.sum("price").alias("Total")).show()

+-----------+------------------+-----+
|customer_id|cantidad_productos|Total|
+-----------+------------------+-----+
|          B|                 3|   40|
|          A|                 2|   25|
+-----------+------------------+-----+



In [0]:
spark.sql("""Select d.customer_id,count(d.customer_id) cantidad_productos,sum(d.price) Total
            from dataSQL d where miembro='Si' and d.order_date<d.join_date
            group by d.customer_id """).show()


+-----------+------------------+-----+
|customer_id|cantidad_productos|Total|
+-----------+------------------+-----+
|          B|                 3|   40|
|          A|                 2|   25|
+-----------+------------------+-----+



###  Si cada $ 1 gastado equivale a 10 puntos y el sushi tiene un multiplicador de puntos 2x, ¿cuántos puntos tendría cada cliente?

In [0]:
dataDF.withColumn("puntos",F.col("price")*F.when(F.col("product_name")=='sushi',20).otherwise(10))\
        .groupby("customer_id").agg(F.sum("puntos")).show()

+-----------+-----------+
|customer_id|sum(puntos)|
+-----------+-----------+
|          B|        940|
|          C|        360|
|          A|        860|
+-----------+-----------+



In [0]:
spark.sql("""Select customer_id,sum(price*case when product_name='sushi' then 20 else 10 end)Puntos from dataSQL group by customer_id""").show()

+-----------+------+
|customer_id|Puntos|
+-----------+------+
|          B|   940|
|          C|   360|
|          A|   860|
+-----------+------+



###  En la primera semana después de que un cliente se une al programa (incluida la fecha de ingreso), gana el doble de puntos en todos los artículos, no solo en sushi. ¿Cuántos puntos tienen los clientes A y B a fines de enero?

In [0]:
dataDF.filter((F.col("Miembro")=="Si") & (F.col("order_date")>='2021-01-01') & (F.col("order_date") <= '2021-01-31'))\
            .withColumn("puntos",F.col("price")*F.when((F.col("order_date")>=F.col("join_date")) & (F.col("order_date")<=F.date_add(F.col("join_date"), 6)),20).otherwise(10))\
            .select("customer_id","puntos")\
            .groupby("customer_id").agg(F.sum("puntos")).show()

+-----------+-----------+
|customer_id|sum(puntos)|
+-----------+-----------+
|          B|        720|
|          A|       1270|
+-----------+-----------+



In [0]:
spark.sql("""Select customer_id,sum(price*CASE WHEN order_date BETWEEN join_date AND date_add(join_date, 6) THEN 20 ELSE 10 END) puntos
            from dataSQL where order_date between '2021-01-01' and '2021-01-31' and Miembro ='Si'
            group by customer_id""").show()

+-----------+------+
|customer_id|puntos|
+-----------+------+
|          B|   720|
|          A|  1270|
+-----------+------+

