In [0]:
from pyspark.sql.functions import when
from pyspark.sql.functions import col
from pyspark.sql.functions import sum
from pyspark.sql.functions import count
from pyspark.sql.functions import dense_rank
from pyspark.sql.window import Window
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import array_distinct
from pyspark.sql.functions import desc
from pyspark.sql.functions import row_number
from pyspark.sql.functions import udf

In [0]:
membersDF = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/sfperalta2001@gmail.com/members.csv")
menuDF = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/sfperalta2001@gmail.com/menu.csv")
salesDF = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/sfperalta2001@gmail.com/sales.csv")

In [0]:
spark

In [0]:
membersDF.display()

customer_id,join_date
A,2021-01-07
B,2021-01-09


In [0]:
menuDF.display()

product_id,product_name,price
1,sushi,10
2,curry,15
3,ramen,12


In [0]:
salesDF.display()

customer_id,order_date,product_id
A,2021-01-01,1
A,2021-01-01,2
A,2021-01-07,2
A,2021-01-10,3
A,2021-01-11,3
A,2021-01-11,3
B,2021-01-01,2
B,2021-01-02,2
B,2021-01-04,1
B,2021-01-11,1


#1) ¿Cuál es la cantidad total que gastó cada cliente en el restaurante?

In [0]:
totalamtDF = (salesDF
              .join(menuDF,"product_id")
              .groupBy("customer_id").agg({'price':'sum'})
              .withColumnRenamed("sum(price)","cantidadtotal")
              .orderBy("customer_id"))
display(totalamtDF)

customer_id,cantidadtotal
A,76.0
B,74.0
C,36.0


#2) ¿Cuántos días ha visitado cada cliente el restaurante?

In [0]:
visitasDF = (salesDF
             .groupBy("customer_id")
             .agg(countDistinct("order_date").alias("visitas"))
)
visitasDF.display()

customer_id,visitas
B,6
C,2
A,4


# 3) ¿Cuál fue el primer artículo del menú comprado por cada cliente?

In [0]:
#salesDF.sort('order_date')#.show()
windowOrder = Window.partitionBy('customer_id').orderBy(col('order_date'))

display(salesDF.withColumn("row",dense_rank().over(windowOrder)) 
               .filter(col("row") == 1).drop("row"))



customer_id,order_date,product_id
A,2021-01-01,1
A,2021-01-01,2
B,2021-01-01,2
C,2021-01-01,3
C,2021-01-01,3


# 4) ¿Cuál es el artículo más comprado en el menú y cuántas veces lo compraron todos los clientes?

In [0]:
display(salesDF.groupBy('product_id')
               .agg(count(col('product_id')).alias('cantidad_comprada'))
               .sort(desc(('cantidad_comprada')))
               .join(menuDF, 'product_id')
               .select("product_id","cantidad_comprada","product_name"))

display(salesDF.filter(col('product_id') == 3).groupBy('customer_id').agg(count(col('customer_id')).alias('ramen_count')))

#salesDF.groupBy('product_id').count().show()

product_id,cantidad_comprada,product_name
3,8,ramen
1,3,sushi
2,4,curry


customer_id,ramen_count
B,2
C,3
A,3


# 5) ¿Qué artículo fue el más popular para cada cliente?

In [0]:
#salesDF.groupBy(['customer_id', 'product_id']).agg(count(col('product_id')).alias('contador')).sort(desc(('contador'))).show()
client_sales = salesDF.groupBy(['customer_id', 'product_id']).agg(count(col('product_id')).alias('contador')).sort(desc(('contador')))#.show()

windowClient = Window.partitionBy("customer_id").orderBy(col("contador").desc())

resDF = client_sales.withColumn("row",dense_rank().over(windowClient)) \
  .filter(col("row") == 1).drop("row") \
  .join(menuDF,"product_id").select('customer_id', 'product_name', 'contador')#.show()

display(resDF)

customer_id,product_name,contador
A,ramen,3
B,curry,2
B,ramen,2
B,sushi,2
C,ramen,3


# 6) ¿Qué artículo compró primero el cliente después de convertirse en miembro?

In [0]:
post_membresiaDF = (salesDF
                        .join(membersDF,"customer_id")
                        .filter(salesDF.order_date >= membersDF.join_date)
                        .withColumn("rank",dense_rank()
                                    .over(Window.partitionBy("customer_id")
                                         .orderBy("order_date"))
                                   )
                        .filter("rank == 1")
                        .join(menuDF,"product_id")
                        .select("customer_id","order_date","product_name")
                       )                     

post_membresiaDF.display()

customer_id,order_date,product_name
A,2021-01-07,curry
B,2021-01-11,sushi


# 7) ¿Qué artículo se compró justo antes de que el cliente se convirtiera en miembro?

In [0]:
pre_membresiaDF = (salesDF
                        .join(membersDF,"customer_id")
                        .filter(salesDF.order_date < membersDF.join_date)
                        .withColumn("rank",dense_rank()
                                    .over(Window.partitionBy("customer_id")
                                         .orderBy(desc("order_date")))
                                   )
                        .filter("rank == 1")
                        .join(menuDF,"product_id")
                        .select("customer_id","order_date","product_name")
                       )                     

pre_membresiaDF.display()

customer_id,order_date,product_name
A,2021-01-01,sushi
A,2021-01-01,curry
B,2021-01-04,sushi


# 8) ¿Cuál es el total de artículos y la cantidad gastada por cada miembro antes de convertirse en miembro?

In [0]:
pre_member_data = salesDF.join(menuDF, 'product_id').join(membersDF,"customer_id") \
                      .filter(salesDF.order_date < membersDF.join_date) \
                      .groupBy('customer_id') \
                      .agg(count(col('customer_id')).alias('contador'), sum(col('price')).alias('total_gastado')) \
                      .sort(desc(('contador')))#.show()

display(pre_member_data)

customer_id,contador,total_gastado
B,3,40.0
A,2,25.0


# 9) Si cada $ 1 gastado equivale a 10 puntos y el sushi tiene un multiplicador de puntos 2x, ¿cuántos puntos tendría cada cliente?

In [0]:
pointsDF = (salesDF
            .join(menuDF,"product_id")
            .withColumn("points",when(col("product_id") == 1,col("price")*20).otherwise(col("price")*10))
            .groupBy("customer_id")
            .agg(sum("points").alias("total puntos"))
)

pointsDF.display()

customer_id,total puntos
B,940.0
C,360.0
A,860.0


In [0]:
pointsDF = (salesDF
            .join(menuDF,"product_id")
            .withColumn("points",when(col("product_id") == 1,col("price")*20).otherwise(col("price")*10))
            .groupBy("customer_id")
            .agg({'points':'sum'})
            .withColumnRenamed("sum(points)","total puntos")
)

pointsDF.display()

customer_id,total puntos
B,940.0
C,360.0
A,860.0


# 10) En la primera semana después de que un cliente se une al programa (incluida la fecha de ingreso), gana el doble de puntos en todos los artículos, no solo en sushi. ¿Cuántos puntos tienen los clientes A y B a fines de enero?

In [0]:
def oneWeek(date):
    new = ""
    arr = date.split("-")
    day = arr[2]
    newday = int(day) + 7
    new += date[0:8]
    print(newday)
    new += str(newday)[0]
    new += str(newday)[1]
    return new
  
udf_OneWeek = udf(oneWeek)

x2pointsDF = (salesDF
            .join(menuDF,"product_id")
            .join(membersDF, 'customer_id')
            .withColumn("points", when((col("order_date") >= col('join_date')) & 
                                       (col("order_date") < udf_OneWeek(col("join_date"))),col("price")*20)
                                .otherwise(when(col('product_id') == 1, col('price')*20) 
                                .otherwise(col("price")*10)))
            .groupBy("customer_id")
            .agg(sum("points").alias("total puntos"))
)

display(x2pointsDF)

customer_id,total puntos
B,940.0
A,1370.0


In [0]:
# double week points
points2DF = salesDF.join(menuDF,"product_id") \
            .join(membersDF, 'customer_id') \
            .withColumn('double', udf_OneWeek(col("join_date"))) \
            .filter(col("order_date") >= col('join_date')) \
            .filter(col("order_date") < udf_OneWeek(col("join_date"))) \
            .withColumn('points', col("price")*20) \
            .groupBy("customer_id") \
            .agg(sum("points").alias("tot"))

#points pre double week
pre_pointsDF = salesDF.join(menuDF,"product_id") \
            .join(membersDF, 'customer_id') \
            .withColumn('double', udf_OneWeek(col("join_date"))) \
            .filter(col("order_date") < col('join_date')) \
            .withColumn('points', when(col('product_id') == 1, col("price")*20).otherwise(col("price")*10)) \
            .groupBy("customer_id") \
            .agg(sum("points").alias("tot1"))

### point post double week
post_pointsDF = salesDF.join(menuDF,"product_id") \
            .join(membersDF, 'customer_id') \
            .withColumn('double', udf_OneWeek(col("join_date"))) \
            .filter(col("order_date") >= udf_OneWeek(col('join_date'))) \
            .withColumn('points', when(col('product_id') == 1, col("price")*20).otherwise(col("price")*10)) \
            .groupBy("customer_id") \
            .agg(sum("points").alias("tot2"))

#totalDF = points2DF.join(pre_pointsDF, 'customer_id').join(post_pointsDF, 'customer_id').withColumn('total_points', col('tot')+col('tot1')+col('tot2'))
display(pre_pointsDF)
display(post_pointsDF)
display(points2DF)

customer_id,tot1
B,500.0
A,350.0


customer_id,tot2
B,240.0


customer_id,tot
B,200.0
A,1020.0
