- **setup dataframe**

In [0]:
signup=[(1,'2017-09-22'), (3,'2017-04-21')]

columns=['userid','gold_signup_date']

goldusers_signup=spark.createDataFrame(signup,columns)

usersdata=[(1,'2014-09-02'), (2,'2015-01-15'), (3,'2014-04-11')]

users=spark.createDataFrame(usersdata,schema=['userid','signup_date'])

sales_data=[(1,'2017-04-18',2),
(3,'2019-12-18',1), 
(2,' 2019-12-18',3), 
(1,'2019-10-23',2), 
(1,'2018-03-19',3), 
(3,'2016-12-20',2), 
(1,'2016-11-09',1), 
(1,'2016-05-20',3), 
(2,'2017-09-24',1), 
(1,'2017-03-11',2), 
(1,'2016-03-11',1), 
(3,'2016-11-10',1), 
(3,'2017-12-07',2), 
(3,'2016-12-15',2), 
(2,'2017-11-08',2), 
(2,'2018-09-10',3)]

sales=spark.createDataFrame(sales_data,schema=['userid','created_date','product_id'])

product_info=[(1,'p1',980),
(2,'p2',870),
(3,'p3',330)]

product=spark.createDataFrame(product_info,schema=['product_id','product_name','price'])

**Reviewing dataset**

In [0]:
goldusers_signup.limit(1).display()
product.limit(1).display()
sales.limit(1).display()
users.limit(1).display()


userid,gold_signup_date
1,2017-09-22


product_id,product_name,price
1,p1,980


userid,created_date,product_id
1,2017-04-18,2


userid,signup_date
1,2014-09-02


**Date column datatype need to change in date format**

In [0]:
goldusers_signup.printSchema()

users.printSchema()

sales.printSchema()

product.printSchema()


root
 |-- userid: long (nullable = true)
 |-- gold_signup_date: string (nullable = true)

root
 |-- userid: long (nullable = true)
 |-- signup_date: string (nullable = true)

root
 |-- userid: long (nullable = true)
 |-- created_date: string (nullable = true)
 |-- product_id: long (nullable = true)

root
 |-- product_id: long (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: long (nullable = true)



In [0]:
from pyspark.sql.types import DateType
from pyspark.sql.functions import col

goldusers_signup=goldusers_signup.withColumn('gold_signup_date',col('gold_signup_date').cast(DateType()))

users=users.withColumn('signup_date',col('signup_date').cast(DateType()))

sales=sales.withColumn('created_date',col('created_date').cast(DateType()))


**_1 ---- what is total amount each customer spent on zomato ?_**

In [0]:
from pyspark.sql.functions import count,sum

users.join(sales,users['userid']==sales['userid'],'inner')\
     .join(product,sales['product_id']==product['product_id'],'left')\
        .select(users['userid'],product['price']).groupBy('userid').agg(sum('price')
                                                                    .alias('Total Spend')).display()

userid,Total Spend
1,5230
2,2510
3,4570


**_2 ---- How many days has each customer visited zomato?_**

In [0]:
from pyspark.sql.functions import count,col,desc

sales.groupBy('userid').agg(count('created_date').alias('Total Visited'))\
     .orderBy(col('Total Visited').desc()).display()

userid,Total Visited
1,7
3,5
2,4


**_3 --- what was the first product purchased by each customer?_**

In [0]:
from pyspark.sql.functions import window,rank,row_number,dense_rank,col
from pyspark.sql.window import Window  

sales.withColumn('rank',row_number().over(Window.partitionBy('userid').orderBy(col('created_date').asc())))\
    .filter(col('rank')==1).select(col('userid'),col('created_date').alias('purchased_on'),col('product_id'))\
    .display()

userid,purchased_on,product_id
1,2016-03-11,1
2,2017-09-24,1
3,2016-11-10,1


**_4 -- what is most purchased item on menu & how many times was it purchased by all customers ?_**

In [0]:
from pyspark.sql.functions import max,count
sales.groupBy('product_id').agg(count('product_id')\
    .alias('Total Purchased times'),count('product_id').alias('Most Fav Iteam'))\
    .withColumn('Rank',row_number().over(Window.orderBy(col('Most Fav Iteam').desc())))\
    .filter(col('Rank')==1).select('product_id','Total Purchased times').display()


product_id,Total Purchased times
2,7


**_5 ---- which item was most popular for each customer?_**

In [0]:
sales.groupBy('userid','product_id').agg(count('product_id').alias('total times purchased'))\
    .withColumn('rank',row_number().over(Window.partitionBy('userid')\
        .orderBy(col('total times purchased').desc())))\
            .filter(col('rank')==1)\
                .select('userid','product_id','total times purchased').display()

userid,product_id,total times purchased
1,2,3
2,3,2
3,2,3


**_6 --- which item was purchased first by customer after they become a member ?_**

In [0]:
users.alias('u').join(goldusers_signup.alias('g'),users["userid"]==goldusers_signup["userid"],'left')\
    .select("u.userid","g.gold_signup_date")\
    .join(sales.alias('s'),(users["userid"]==sales["userid"]),'inner')\
        .select("u.userid","g.gold_signup_date","s.created_date","s.product_id")\
            .filter(col("created_date")<col("gold_signup_date"))\
            .withColumn("rank",row_number().over(Window.partitionBy("userid").orderBy(col("created_date").asc())))\
                .where(col("rank")==1).select('userid','gold_signup_date','created_date','product_id').display()

userid,gold_signup_date,created_date,product_id
1,2017-09-22,2016-03-11,1
3,2017-04-21,2016-11-10,1


**7 --- which item was purchased just before customer became a member?**

In [0]:
# goldusers_signup.display()

sales.alias('s').join(goldusers_signup.alias('g'),sales['userid']==goldusers_signup['userid'],'inner')\
    .filter(col('created_date')< col('gold_signup_date'))\
    .withColumn('Last Order',row_number().over(Window.partitionBy('s.userid').orderBy(col('created_date')\
        .desc()))).filter(col('Last Order')==1).select('s.userid','created_date','s.product_id','gold_signup_date')\
            .show()

+------+------------+----------+----------------+
|userid|created_date|product_id|gold_signup_date|
+------+------------+----------+----------------+
|     1|  2017-04-18|         2|      2017-09-22|
|     3|  2016-12-20|         2|      2017-04-21|
+------+------------+----------+----------------+



**_8 ---- what is total orders and amount spent for each member before they become a member ?_**

In [0]:
from pyspark.sql.functions import sum

sales.alias('s').join(product.alias('p'),sales['product_id']==product['product_id'],'inner')\
    .join(goldusers_signup.alias('g'),sales['userid']==goldusers_signup['userid'],'inner')\
        .filter(col('gold_signup_date')>col('created_date'))\
            .groupBy('s.userid').agg(sum('p.price').alias('Total Amount Spend')).show()


+------+------------------+
|userid|Total Amount Spend|
+------+------------------+
|     1|              4030|
|     3|              2720|
+------+------------------+



**_9 --- rank all transaction of the customers_**

In [0]:
from pyspark.sql.functions import row_number,col
from pyspark.sql.window import Window
users.alias('u').join(sales.alias('s'),users['userid']==sales['userid'],'inner')\
    .select('u.userid','u.signup_date','s.created_date','s.product_id')\
        .withColumn('Rank',rank().over(Window.partitionBy('userid').orderBy(col('created_date').asc())))\
            .show()

+------+-----------+------------+----------+----+
|userid|signup_date|created_date|product_id|Rank|
+------+-----------+------------+----------+----+
|     1| 2014-09-02|  2016-03-11|         1|   1|
|     1| 2014-09-02|  2016-05-20|         3|   2|
|     1| 2014-09-02|  2016-11-09|         1|   3|
|     1| 2014-09-02|  2017-03-11|         2|   4|
|     1| 2014-09-02|  2017-04-18|         2|   5|
|     1| 2014-09-02|  2018-03-19|         3|   6|
|     1| 2014-09-02|  2019-10-23|         2|   7|
|     2| 2015-01-15|  2017-09-24|         1|   1|
|     2| 2015-01-15|  2017-11-08|         2|   2|
|     2| 2015-01-15|  2018-09-10|         3|   3|
|     2| 2015-01-15|  2019-12-18|         3|   4|
|     3| 2014-04-11|  2016-11-10|         1|   1|
|     3| 2014-04-11|  2016-12-15|         2|   2|
|     3| 2014-04-11|  2016-12-20|         2|   3|
|     3| 2014-04-11|  2017-12-07|         2|   4|
|     3| 2014-04-11|  2019-12-18|         1|   5|
+------+-----------+------------+----------+----+


**_10 --- rank all transaction for each member whenever they are zomato gold member for every non gold member transaction mark as na_**

In [0]:
from pyspark.sql.functions import rank,lit,when

sales.alias('s').join(goldusers_signup.alias('g'),sales['userid']==goldusers_signup['userid'],'left')\
    .select('s.userid','s.created_date','s.product_id','g.gold_signup_date')\
        .withColumn('Rank',when( col('gold_signup_date').isNotNull(),rank().over(Window.partitionBy('userid').orderBy('created_date'))).otherwise(lit('NA'))).show()
            

+------+------------+----------+----------------+----+
|userid|created_date|product_id|gold_signup_date|Rank|
+------+------------+----------+----------------+----+
|     1|  2016-03-11|         1|      2017-09-22|   1|
|     1|  2016-05-20|         3|      2017-09-22|   2|
|     1|  2016-11-09|         1|      2017-09-22|   3|
|     1|  2017-03-11|         2|      2017-09-22|   4|
|     1|  2017-04-18|         2|      2017-09-22|   5|
|     1|  2018-03-19|         3|      2017-09-22|   6|
|     1|  2019-10-23|         2|      2017-09-22|   7|
|     2|  2017-09-24|         1|            NULL|  NA|
|     2|  2017-11-08|         2|            NULL|  NA|
|     2|  2018-09-10|         3|            NULL|  NA|
|     2|  2019-12-18|         3|            NULL|  NA|
|     3|  2016-11-10|         1|      2017-04-21|   1|
|     3|  2016-12-15|         2|      2017-04-21|   2|
|     3|  2016-12-20|         2|      2017-04-21|   3|
|     3|  2017-12-07|         2|      2017-04-21|   4|
|     3|  