# Users analysis

In [None]:
import boto3
import pyspark
import pandas as pd
import matplotlib.pyplot as plt
sc = spark.sparkContext
from pyspark.sql import functions as F
from pyspark.sql.types import DateType
import datetime
from pyspark.sql.functions import unix_timestamp, from_unixtime
from pyspark.sql.functions import year, month, dayofmonth, dayofyear, dayofweek, weekofyear
from pyspark.sql.functions import isnull, when, count, col, date_format

In [None]:
ACCESS_KEY_ID = "****" # cle du compte student
SECRET_ACCESS_KEY = "****" # secret key du compte student

hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.access.key", ACCESS_KEY_ID)
hadoop_conf.set("fs.s3a.secret.key", SECRET_ACCESS_KEY)
hadoop_conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") 

S3_RESOURCE = 's3'
SCHEME = 's3'

BUCKET_NAME = 'full-stack-bigdata-datasets'
PREFIX = "Big_Data"
### END STRIP ###
INPUT_FILENAME = 'youtube_playlog.csv'

session = boto3.Session(
    aws_access_key_id='****',
    aws_secret_access_key='****'
)

# We create a S3 resource and a Bucket from this same resource
s3 = session.resource(S3_RESOURCE)
bucket = s3.Bucket(BUCKET_NAME)

In [None]:
def get_s3_path(key, bucket_name, prefix, scheme):
  return f"{scheme}://{bucket_name}/{prefix}/{key}"

file_path = get_s3_path(INPUT_FILENAME, BUCKET_NAME, PREFIX, SCHEME)
file_path

In [None]:
playlog = (spark.read.format("csv") \
                              .option("header", "true") \
                              .option("inferSchema","true") \
                              .load(file_path)
          )

playlog.printSchema()
print("Shape: ", (playlog.count(), len(playlog.columns)))

In [None]:
# TODO: load the data into a PySpark DataFrame: `playlog`
# NOTE: perform the usual checks
### BEGIN STRIP ###
playlog = playlog \
              .withColumn('datetime', from_unixtime('timestamp')) \
              .drop('timestamp') \
              .withColumn("year", year('datetime')) \
              .withColumn("month", month('datetime')) \
              .withColumn("dayofmonth", dayofmonth('datetime')) \
              .withColumn("dayofweek", dayofweek('datetime')) \
              .withColumn("dayofyear", dayofyear('datetime')) \
              .withColumn("weekofweek", weekofyear('datetime')) \
              .orderBy(F.desc("datetime"))
                          
playlog.printSchema()
playlog.limit(5).toPandas()
### END STRIP ###

Unnamed: 0,user,song,datetime,year,month,dayofmonth,dayofweek,dayofyear,weekofweek
0,44248,G3omRltvVrQ,2019-04-03 19:51:53,2019,4,3,4,93,14
1,25650,fdixQDPA2h0,2019-04-03 19:51:35,2019,4,3,4,93,14
2,32188,Hs3Wki_uYzQ,2019-04-03 19:50:55,2019,4,3,4,93,14
3,1702,Hs3Wki_uYzQ,2019-04-03 19:50:39,2019,4,3,4,93,14
4,1702,InyT9Gyoz_o,2019-04-03 19:50:35,2019,4,3,4,93,14


### Aggregates

#### `firstPlay`, `lastPlay`, `playCount`, `uniquePlayCount`
For each user, we will compute these metrics:
- `firstPlay`: datetime of the first listening
- `lastPlay`: datetime of the last listening
- `playCount`: total play counts
- `uniquePlayCount`: unique play counts

We'll save all these in a new DataFrame: `users`.  
When you're done, print out the first 5 rows of `users` ordered by descending `playCount`.

In [None]:
agg_dict = [F.min('datetime').alias('firstPlay'), 
            F.max('datetime').alias('lastPlay'), 
            F.count().alias('playCount')
           ]

playlog.groupBy('user') \
                  .agg(*agg_dict) \
                  .show()

In [None]:
# TODO: compute, for each user
#       - firstPlay
#       - lastPlay
#       - playCount
#       - uniquePlayCount
# Save the results in a DataFrame with name `users`
### BEGIN STRIP ###
agg_dict = [F.min('datetime').alias('firstPlay'), \
            F.max('datetime').alias('lastPlay'), \
            F.count('user').alias('playCount')]

df_users = playlog.groupBy('user') \
                  .agg(*agg_dict) \
                  .show()
                  
                    

### BEGIN STRIP ###

Unnamed: 0,user,firstPlay,lastPlay,playCount,uniquePlayCount
0,213,2014-02-14 15:34:17,2019-04-02 06:04:08,278749,161406
1,7290,2014-04-30 20:12:41,2019-04-03 06:50:05,151513,83831
2,435,2014-02-14 19:51:09,2019-04-03 19:36:28,144711,20055
3,21950,2014-10-23 09:09:36,2019-02-06 00:54:54,126285,15075
4,6270,2014-04-13 18:45:54,2018-08-11 20:46:08,125056,9247


In [None]:
# TODO: Sanity check that all firstPlay are before than lastPlay
### BEGIN STRIP ###
users.filter(F.col('firstPlay') > F.col('lastPlay')).count()

### END STRIP ###

In [None]:
# Another sanity check, we grouped on user, so these should be unique
# TODO: make sure all users are unique in the DataFrame
### BEGIN STRIP ##
print(f"Total users: {users.count()}")
print(f"Distinct users: {users.select('user').distinct().count()}")
### END STRIP ###

### `timespan`
We will compute `timespan`: the overall span of activity from a user in days, rounded to the inferior, for example:
- if a user was active 23 hours on the service, we will say he was active 0 days
- for 53 hours, that would be 2 days of activity

We **will not** transform the `users` DataFrame in place, but instead save the result as a new DataFrame: `users_with_timespan`.

In [None]:
# TODO: Compute timespan and save the result a new DataFrame: `users_with_timespan`
### BEGIN STRIP ###
from pyspark.sql.types import IntegerType

def compute_timespan(df):
  return df.withColumn('timespan', (
    (F.unix_timestamp('lastPlay') - F.unix_timestamp('firstPlay')) / (60**2 * 24)).cast(IntegerType()))

users_with_timespan = users.transform(co
### END STRIP ###

Unnamed: 0,user,firstPlay,lastPlay,playCount,uniquePlayCount,timespan
0,148,2014-02-14 15:53:53,2019-02-09 15:40:00,50280,14826,1820
1,463,2014-02-14 17:49:46,2018-11-18 19:18:31,39796,4601,1738
2,471,2014-02-14 21:11:44,2014-04-04 20:45:32,235,133,48
3,496,2014-02-14 22:03:35,2015-12-26 09:59:10,551,518,679
4,833,2014-02-15 16:01:16,2019-02-02 03:26:18,2606,1502,1812


Let's check how this looks like, we will be using Databricks' `display` to plot an histogram of `timespan`.

In [None]:
# TODO: Plot an histogram of `timespan`
### BEGIN STRIP ###
display(users_with_timespan.select('timespan'))
### END STRIP ###

timespan
1820
1738
48
679
1812
23
493
1418
187
1740


Looking like a powerlaw, let's try to log transform.

In [None]:
# TODO: Use describe on the `timespan` column
### BEGIN STRIP ###
users_with_timespan.select('timespan').describe().toPandas().set_index('summary')
### END STRIP ###

Unnamed: 0_level_0,timespan
summary,Unnamed: 1_level_1
count,45904.0
mean,126.72220285813871
stddev,307.83969796747124
min,0.0
max,1874.0


In [None]:
# TODO: Plot a histogram of log transformed `timespan`
### BEGIN STRIP ###
display(users_with_timespan.select(F.log1p('timespan')))
### END STRIP ###

LOG1P(timespan)
7.507141079727608
7.461065514354283
3.8918202981106265
6.522092798170152
7.502738210754851
3.1780538303479458
6.202535517187923
7.257707677160043
5.236441962829949
7.462214939768189


In [None]:
# TODO: Plot a QQ-Plot of log transformed `timespan`
### BEGIN STRIP ###
display(users_with_timespan.select(F.log1p('timespan')))
### END STRIP ###

LOG1P(timespan)
7.507141079727608
7.461065514354283
3.8918202981106265
6.522092798170152
7.502738210754851
3.1780538303479458
6.202535517187923
7.257707677160043
5.236441962829949
7.462214939768189


We'll filter out users who stayed for less than a day and plot an histogram of this filtered data.

In [None]:
# TODO: Plot a histogram of log transformed `timespan` of users who stayed more than one day
### BEGIN STRIP ###
display(users_with_timespan.where(F.col('timespan') != 0).select(F.log('timespan')))

### END STRIP ###

LOG(timespan)
7.506591780070841
7.460490305825338
3.871201010907891
6.520621127558696
7.502186486602924
3.5263605246161616
6.678342114654332
7.259819610363186
5.717027701406222
7.526717561352706


### `isSingleDayUser`
What percentage of users used the service for less than one day?

What percentage of users used the service for less than one day?

In [None]:
# TODO: Compute the percentage of users who used the service for less than a day
### BEGIN STRIP ###
users_with_timespan \
  .select(F.sum((F.col('timespan') < 1).cast(IntegerType()))) \
  .rdd.map(lambda r: r[0]).first() / users.count() * 100
### END STRIP ###

Wow, that's a lot! We will flag this as its own column.  
That means we will create a new Boolean column `isSingleDayUser` that is `True` if the user used the service for less than a day and `False` otherwise.

In [None]:
# TODO: Create a new column (isSingleDayUser) to flag if a user used the service for less than a day
### BEGIN STRIP ###
users_with_single_day = users_with_timespan.withColumn('isSingleDayUser', (F.col('timespan') < 1))
users_with_single_day.limit(5).toPandas()
### END STRIP ###

Unnamed: 0,user,firstPlay,lastPlay,playCount,uniquePlayCount,timespan,isSingleDayUser
0,148,2014-02-14 15:53:53,2019-02-09 15:40:00,50280,14826,1820,False
1,463,2014-02-14 17:49:46,2018-11-18 19:18:31,39796,4601,1738,False
2,471,2014-02-14 21:11:44,2014-04-04 20:45:32,235,133,48,False
3,496,2014-02-14 22:03:35,2015-12-26 09:59:10,551,518,679,False
4,833,2014-02-15 16:01:16,2019-02-02 03:26:18,2606,1502,1812,False


### Measure of activity: `activeDaysCount` and `meanPlaycountByActiveDay`
This one is a bit harder, we want to compute:
- the number of active days for each user (not the `timespan`)
- the average play count on these active days for each user

In [None]:
# TODO: create 2 new columns
#       - activeDaysCount: the count of days each user was active
#       - dailyAvgPlayCount: the daily average playcount per user (active days only)
#       - activeDay
### BEGIN STRIP ###
def computeDailyStats(df):
  gb = df.groupBy(*(F.col(c) for c in ('user', 'year', 'dayofyear'))).count()
  exprs = (F.mean('count').alias('dailyAvgPlayCount'),
           F.count('count').alias('activeDaysCount'))
  return gb.groupBy('user').agg(*exprs)

users_with_avg = users_with_single_day.join(
  playlog.transform(computeDailyStats), 'user')
users_with_avg.limit(5).toPandas()
### END STRIP ###

Unnamed: 0,user,firstPlay,lastPlay,playCount,uniquePlayCount,timespan,isSingleDayUser,dailyAvgPlayCount,activeDaysCount
0,148,2014-02-14 15:53:53,2019-02-09 15:40:00,50280,14826,1820,False,54.592834,921
1,463,2014-02-14 17:49:46,2018-11-18 19:18:31,39796,4601,1738,False,48.949569,813
2,471,2014-02-14 21:11:44,2014-04-04 20:45:32,235,133,48,False,23.5,10
3,496,2014-02-14 22:03:35,2015-12-26 09:59:10,551,518,679,False,6.8875,80
4,833,2014-02-15 16:01:16,2019-02-02 03:26:18,2606,1502,1812,False,23.908257,109


In [None]:
# TODO: Plot a histogram of log of `activeDaysCount`
### BEGIN STRIP ###
display(users_with_avg.select(F.log('activeDaysCount')))
### END STRIP ###

LOG(activeDaysCount)
6.825460036255307
6.70073110954781
2.302585092994046
4.382026634673881
4.691347882229144
1.0986122886681098
1.0986122886681098
4.204692619390966
0.0
0.0


In [None]:
# TODO: Plot a histogram of log of `dailyAvgPlayCount`
### BEGIN STRIP ###
display(users_with_avg.select(F.log('dailyAvgPlayCount')))
### END STRIP ###

LOG(dailyAvgPlayCount)
3.999902626448877
3.89079057416144
3.157000421150113
1.929708174479033
3.1742238754556467
3.791736839553644
2.8134107167600364
3.4135584784857294
0.6931471805599453
1.0986122886681098


## Going further
What else do you think would be interesting to compute?
What about the ratio of activity, e.g. the ratio between `timespan` and `activeDaysCount`?