In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
        .appName('Retailer Dataframe') \
        .master('local') \
        .getOrCreate()

In [4]:
from pyspark.sql.types import (
    TimestampType,
    StringType,
    IntegerType,
    StructType,
    StructField
)

In [5]:
# create schema 
sch = StructType(
                    [StructField('index', IntegerType()),
                     StructField('timestamp_started', TimestampType()),
                     StructField('user_id', StringType()),
                     StructField('event', StringType()),
                     StructField('flyer_id', IntegerType()),
                     StructField('merchant_id', IntegerType()),
                     StructField('timestamp_ended', TimestampType())
                    ]

)

In [6]:
# modified-retailer-dataset.csv file was placed in a local file system

In [7]:
%%sh
ls -ltr /home/itv000367/kanth

total 5264
-rw-r--r-- 1 itv000367 students 2379540 Sep 29 14:26 retailer-dataset.csv
-rw-r--r-- 1 itv000367 students 3009509 Sep 30 21:37 modified-retailer-dataset.csv


In [39]:
%%sh
hadoop fs -mkdir /user/itv000367/kanth/lixar

In [8]:
# copying modified-retailer-dataset.csv from local file system to hdfs

In [40]:
%%sh
hadoop fs -put /home/itv000367/kanth/modified-retailer-dataset.csv    /user/itv000367/kanth/lixar

In [9]:
%%sh
hadoop fs -ls /user/itv000367/kanth/lixar

Found 1 items
-rw-r--r--   3 itv000367 supergroup    3009509 2021-09-30 21:47 /user/itv000367/kanth/lixar/modified-retailer-dataset.csv


In [42]:
%%sh
hadoop fs -tail /user/itv000367/kanth/lixar/modified-retailer-dataset.csv


20716,2018-10-01 11:39:40-04:00,ffdf08cd49a42e94228752c5aceb91d1a12e47bc02a7a1cc9c659365a9bafd13,flyer_open,1968001,1344,2018-10-01 11:43:40-04:00
20717,2018-10-01 11:34:20-04:00,ffdf08cd49a42e94228752c5aceb91d1a12e47bc02a7a1cc9c659365a9bafd13,flyer_open,2005338,2085,2018-10-01 11:40:20-04:00
20718,2018-10-01 12:56:14-04:00,ffdf08cd49a42e94228752c5aceb91d1a12e47bc02a7a1cc9c659365a9bafd13,flyer_open,2016316,2268,2018-10-01 13:02:14-04:00
20719,2018-10-01 10:36:49-04:00,ffdf08cd49a42e94228752c5aceb91d1a12e47bc02a7a1cc9c659365a9bafd13,flyer_open,2026010,2284,2018-10-01 10:42:49-04:00
20720,2018-10-01 12:08:53-04:00,ffdf08cd49a42e94228752c5aceb91d1a12e47bc02a7a1cc9c659365a9bafd13,flyer_open,1996615,2366,2018-10-01 12:14:53-04:00
20721,2018-10-01 11:40:05-04:00,ffdf08cd49a42e94228752c5aceb91d1a12e47bc02a7a1cc9c659365a9bafd13,shopping_list_open,,,2018-10-01 11:41:05-04:00
20722,2018-10-01 13:26:34-04:00,ffdf08cd49a42e94228752c5aceb91d1a12e47bc02a7a1cc9c659365a9bafd13,shopping_list_open,,,20

In [10]:
# read modified-retailer-dataset.csv using spark and create a dataframe
retailDf = spark.read.format('csv') \
                .option('sep',',') \
                .option('header','True') \
                .schema(sch) \
                .load('/user/itv000367/kanth/lixar/modified-retailer-dataset.csv')

In [11]:
retailDf.show(10)

+-----+-------------------+--------------------+----------+--------+-----------+-------------------+
|index|  timestamp_started|             user_id|     event|flyer_id|merchant_id|    timestamp_ended|
+-----+-------------------+--------------------+----------+--------+-----------+-------------------+
|    0|2018-10-01 11:36:30|0017345b89958a1d8...|flyer_open| 1996661|       2053|2018-10-01 11:42:30|
|    1|2018-10-01 11:36:30|0017345b89958a1d8...|flyer_open| 2031166|       2148|2018-10-01 11:39:30|
|    2|2018-10-01 11:36:06|0017345b89958a1d8...|flyer_open| 2016311|       2268|2018-10-01 11:39:06|
|    3|2018-10-01 11:46:24|0017345b89958a1d8...|flyer_open| 2016316|       2268|2018-10-01 11:47:24|
|    4|2018-10-01 11:41:14|0017345b89958a1d8...|flyer_open| 2005529|       2280|2018-10-01 11:49:14|
|    5|2018-10-01 12:03:52|0017345b89958a1d8...|flyer_open| 2005529|       2280|2018-10-01 12:10:52|
|    6|2018-10-01 11:42:00|0017345b89958a1d8...|flyer_open| 2009325|       2282|2018-10-01 

In [12]:
retailDf = retailDf.drop('index')

In [13]:
retailDf.printSchema()

root
 |-- timestamp_started: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- event: string (nullable = true)
 |-- flyer_id: integer (nullable = true)
 |-- merchant_id: integer (nullable = true)
 |-- timestamp_ended: timestamp (nullable = true)



In [14]:
retailDf.count()

20723

In [15]:
retailDf_modified = retailDf.filter(retailDf.flyer_id.isNotNull() & retailDf.merchant_id.isNotNull())

In [16]:
retailDf_modified.count()

16169

In [17]:
retailDf_modified = retailDf_modified.select('user_id','merchant_id','flyer_id','event','timestamp_started','timestamp_ended')

In [19]:
retailDf_modified.show(5,truncate=False)

+----------------------------------------------------------------+-----------+--------+----------+-------------------+-------------------+
|user_id                                                         |merchant_id|flyer_id|event     |timestamp_started  |timestamp_ended    |
+----------------------------------------------------------------+-----------+--------+----------+-------------------+-------------------+
|0017345b89958a1d8cae79020dbbf6e2f687124ae8bf937fa6ed729e66a13f91|2053       |1996661 |flyer_open|2018-10-01 11:36:30|2018-10-01 11:42:30|
|0017345b89958a1d8cae79020dbbf6e2f687124ae8bf937fa6ed729e66a13f91|2148       |2031166 |flyer_open|2018-10-01 11:36:30|2018-10-01 11:39:30|
|0017345b89958a1d8cae79020dbbf6e2f687124ae8bf937fa6ed729e66a13f91|2268       |2016311 |flyer_open|2018-10-01 11:36:06|2018-10-01 11:39:06|
|0017345b89958a1d8cae79020dbbf6e2f687124ae8bf937fa6ed729e66a13f91|2268       |2016316 |flyer_open|2018-10-01 11:46:24|2018-10-01 11:47:24|
|0017345b89958a1d8cae79020d

In [20]:
from pyspark.sql.functions import to_timestamp,col, round
from pyspark.sql.types import LongType

In [21]:
# dataframe modified to include time difference between started and ended timestamp for a given event
retailDf_modified = retailDf_modified.withColumn('event_time_min',round(col('timestamp_ended').cast(LongType())/60 - col('timestamp_started').cast(LongType())/60,2))

In [22]:
retailDf_modified.show(30)

+--------------------+-----------+--------+----------+-------------------+-------------------+--------------+
|             user_id|merchant_id|flyer_id|     event|  timestamp_started|    timestamp_ended|event_time_min|
+--------------------+-----------+--------+----------+-------------------+-------------------+--------------+
|0017345b89958a1d8...|       2053| 1996661|flyer_open|2018-10-01 11:36:30|2018-10-01 11:42:30|           6.0|
|0017345b89958a1d8...|       2148| 2031166|flyer_open|2018-10-01 11:36:30|2018-10-01 11:39:30|           3.0|
|0017345b89958a1d8...|       2268| 2016311|flyer_open|2018-10-01 11:36:06|2018-10-01 11:39:06|           3.0|
|0017345b89958a1d8...|       2268| 2016316|flyer_open|2018-10-01 11:46:24|2018-10-01 11:47:24|           1.0|
|0017345b89958a1d8...|       2280| 2005529|flyer_open|2018-10-01 11:41:14|2018-10-01 11:49:14|           8.0|
|0017345b89958a1d8...|       2280| 2005529|flyer_open|2018-10-01 12:03:52|2018-10-01 12:10:52|           7.0|
|0017345b8

In [23]:
from pyspark.sql.functions import avg

In [24]:
# finding average time spent by a user for each merchant flyer
retailDf_avgTimeSpent = retailDf_modified.groupBy('user_id','merchant_id') \
                                            .agg(round(avg('event_time_min'),2).alias('avg_time_min'))

In [25]:
retailDf_avgTimeSpent_sorted = retailDf_avgTimeSpent.sort('merchant_id',ascending=1)

In [26]:
retailDf_avgTimeSpent_sorted.show(truncate=False)

+----------------------------------------------------------------+-----------+------------+
|user_id                                                         |merchant_id|avg_time_min|
+----------------------------------------------------------------+-----------+------------+
|fef9bdac8e1a7449e860ec25aff07548698bbb431b77e3067289783e44c086fe|221        |6.0         |
|e41d6b6a9fa5b54184709061ae4881f652bf174a17820e7b604c3491f26c43ed|221        |4.0         |
|523e314c60326645ce2fe6c7fe615bd1abeaf2aea17b26d5965c9e6350481779|221        |4.0         |
|986954873eb6d2af7482444d1d50387fd8eab6ce17e908b5977140569a6b82e0|221        |2.0         |
|c013196f6668a03e18f7c18a72d8b6cb66117b913fbf42ce471a5ac99ef9c365|221        |9.0         |
|deb5d2ab528f042676588c6dca31919dd28e0949711414911790607a88796a4a|221        |8.0         |
|74362cf31062b95a3116604d55b6395ff05e8ec5a588329db53d1cd19d2825fd|221        |7.0         |
|0f7683b5e677736d9e82077f7e9f6ed6bd83873f47d51faa14e08126b6f60baf|221        |3.

In [27]:
retailDf_avgTimeSpent_sorted.count()

11819

In [60]:
spark.conf.set('spark.sql.shuffle.partitions',1)

In [63]:
retailDf_avgTimeSpent_sorted.write.csv('/user/itv000367/kanth/lixar/results/')

In [68]:
%%sh
hadoop fs -copyToLocal /user/itv000367/kanth/lixar/results/part-00000-2e3ec2c2-3a3d-4f45-baa3-cf02c6efe3e1-c000.csv     /home/itv000367/kanth/avgTime-perUser-perMerchant.csv

In [28]:
# finding average time spent by a user for each flyer_id by its merchant
retailDf_avgTimeSpent_perFlyer = retailDf_modified.groupBy('user_id','merchant_id','flyer_id') \
                                            .agg(round(avg('event_time_min'),2).alias('avg_time_min'))

In [29]:
retailDf_avgTimeSpent_perFlyer_sorted = retailDf_avgTimeSpent_perFlyer.sort(['merchant_id','flyer_id'],ascending=[1,1])

In [30]:
retailDf_avgTimeSpent_perFlyer_sorted.show(truncate=False)

+----------------------------------------------------------------+-----------+--------+------------+
|user_id                                                         |merchant_id|flyer_id|avg_time_min|
+----------------------------------------------------------------+-----------+--------+------------+
|b38f97880ccb69e8f89201eedbccc149a9ea914ca86111340a9b1d9de1788a01|221        |1994952 |6.0         |
|797161021befeecfc6efa624f302a6a314a93515eae1d99168d5f87758535947|221        |1994952 |1.0         |
|13b968027c223421411987b5e9343c06bd21986e4d519d2d41ceee33d7f0cb01|221        |1994953 |9.0         |
|97e27e751eb093d1650f1c88be27c2e1642fed6dfe66016b82740d658fb95a18|221        |1994953 |4.0         |
|840f1b021181b43ca0641303390d323cdd0418a1afb5a16e31cca94c391773f5|221        |1994953 |5.0         |
|550cf383001101d860e9bd93eae9ed990a6ca621411dba47391d8a9eae80493e|221        |1994953 |7.0         |
|d452a27cf5d924ee3409a6ad3a294f5eb74fdc1d49fffd61ca3f82d89c19ef9c|221        |1994953 |7.0 

In [31]:
retailDf_avgTimeSpent_perFlyer_sorted.count()

13684

In [73]:
retailDf_avgTimeSpent_perFlyer_sorted.write.csv('/user/itv000367/kanth/lixar/results1/')

In [74]:
%%sh
hadoop fs -ls /user/itv000367/kanth/lixar/results1

Found 2 items
-rw-r--r--   3 itv000367 supergroup          0 2021-10-01 09:26 /user/itv000367/kanth/lixar/results1/_SUCCESS
-rw-r--r--   3 itv000367 supergroup    1120230 2021-10-01 09:26 /user/itv000367/kanth/lixar/results1/part-00000-b079bbd0-3ed6-43c7-8095-ad0f2e950065-c000.csv


In [75]:
%%sh
hadoop fs -copyToLocal /user/itv000367/kanth/lixar/results1/part-00000-b079bbd0-3ed6-43c7-8095-ad0f2e950065-c000.csv     /home/itv000367/kanth/avgTime-perUser-perMerchant-perFlyer.csv

In [32]:
# finding average time spent by a user on flyers
retailDf_avgTimeSpent_byUser = retailDf_modified.groupBy('user_id') \
                                            .agg(round(avg('event_time_min'),2).alias('avg_time_min'))

In [33]:
retailDf_avgTimeSpent_byUser_sorted = retailDf_avgTimeSpent_byUser.sort('avg_time_min',ascending=0)

In [36]:
retailDf_avgTimeSpent_byUser_sorted.show(truncate=False)

+----------------------------------------------------------------+------------+
|user_id                                                         |avg_time_min|
+----------------------------------------------------------------+------------+
|f010739ef41caf432ded2652ea109624065c8a80ca704c3d16fa6a0a48453748|9.0         |
|1177d6bc22b5c5c5a14d5a5f9419915ba1709a65f809309a57a7e5318d6629b8|9.0         |
|2555069cad0e48eafb80c59cc51c652cf10eba9feb9d46e1b234ebfc126249be|9.0         |
|047932250bc4de35f4be96587466e5cd3dfb25e543ba0fb3dd7517593329131e|9.0         |
|80744674a332ddd29e199cced12979857bbd1af49757821b8396aee1dacdd7d2|9.0         |
|c1fac3bddae0ee665d4f17900d7025fe09b637d82123ed3140aaadb3d4408515|9.0         |
|ac71141432e7087df2149275aa4786a5641b1c93fcf5ebd63f70ed136eb7bc1b|9.0         |
|6fd7d6ec0c94b7f9486bcb5863368af360fba38af65d2c4483cc0012f7f01101|9.0         |
|ecbb409adf79b755fe2add6dca11a1af00352de4d6e6fcb96db5cb1588bd277f|9.0         |
|cdd408976b110beea7f721d925ef35038966c71

In [37]:
retailDf_avgTimeSpent_byUser_sorted.count()

1161

In [82]:
retailDf_avgTimeSpent_byUser_sorted.write.csv('/user/itv000367/kanth/lixar/results2/')

In [83]:
%%sh
hadoop fs -ls /user/itv000367/kanth/lixar/results2

Found 2 items
-rw-r--r--   3 itv000367 supergroup          0 2021-10-01 09:34 /user/itv000367/kanth/lixar/results2/_SUCCESS
-rw-r--r--   3 itv000367 supergroup      80738 2021-10-01 09:34 /user/itv000367/kanth/lixar/results2/part-00000-592a6c49-63c2-4f00-b71c-c66b9d6977a9-c000.csv


In [85]:
%%sh
hadoop fs -copyToLocal /user/itv000367/kanth/lixar/results2/part-00000-592a6c49-63c2-4f00-b71c-c66b9d6977a9-c000.csv     /home/itv000367/kanth/avgTime-perUser-perFlyer.csv