In [1]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/45/b0/9d6860891ab14a39d4bddf80ba26ce51c2f9dc4805e5c6978ac0472c120a/pyspark-3.1.1.tar.gz (212.3MB)
[K     |████████████████████████████████| 212.3MB 75kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 18.3MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.1-py2.py3-none-any.whl size=212767604 sha256=3c7164d5099ecfba59190856f7d6fef0b39aa5d40bc0d4e7aec9ec1d0bcce361
  Stored in directory: /root/.cache/pip/wheels/0b/90/c0/01de724414ef122bd05f056541fb6a0ecf47c7ca655f8b3c0f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.1


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

In [4]:
spark = SparkSession.builder.appName('sparkify') \
    .config('spark.driver.maxResultSize', '3g') \
    .getOrCreate()

In [5]:
df = spark.read.parquet("/content/drive/MyDrive/datasets/dsnd-sparkify/event_labeled.parquet")
df = df.drop('userIdTemp')
df.printSchema()

root
 |-- userId: string (nullable = true)
 |-- up_ts: timestamp (nullable = true)
 |-- down_ts: string (nullable = true)
 |-- isChurn: boolean (nullable = true)
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- ts: timestamp (nullable = true)



In [32]:
def change_colname_join_df(join_df, suffix='_temp'):
    '''
    INPUT:
    join_df - dataframe on the right side of join
    suffix - added string on each column name

    OUTPUT:
    res_df - dataframe with renamed column

    | col_1 | col_ 2 |  -->  | col_1_temp | col_ 2_temp |
    '''
    res_df = join_df
    for col_name in join_df.columns:
        res_df = res_df.withColumnRenamed(col_name, col_name + suffix)
    
    return res_df

def remove_cols_suffix(df1, suffix="_temp"):
    '''
    INPUT:
    df1 - dataframe with suffix column

    OUTPUT:
    result - dataframe without suffix column

    | col_1_temp | col_ 2_temp |  -->  | col_1 | col_ 2 |
    '''
    for col in df1.columns:
        if suffix in col:
            df1 = df1.withColumnRenamed(col, col[:len(col) - len(suffix)])
    
    return df1


def chain_and(df1, df2, key_cols, suffix="_temp"):
    '''
    DESCRIPTION:
    create chaining and condition for joining
    (df1.col1 == df2.col1_suffix) == (df1.col2 == df2.col2_suffix)
    
    INPUT:
    df1 - left dataframe
    df2 - right dataframe
    key_cols - columns name for join conditions
    suffix - columns name suffix

    OUTPUT:
    res - chain and for join
    '''
    for i, col in enumerate(key_cols):
        if i == 0:
            res = df1[col] == df2[col + suffix]
        else:
            res = res & (df1[col] == df2[col + suffix])
    
    return res

def chain_left_join(df1, dfs, key_cols, suffix="_temp", 
                    path="/content/drive/MyDrive/datasets/dsnd-sparkify/join_temp.parquet"):
    '''
    INPUT:
    df1 - left dataframe
    df2 - right dataframe
    key_cols - columns name for join conditions
    suffix - columns name suffix

    OUTPUT:
    result - dataframe after chain join
    '''
    for i, df_temp in enumerate(dfs):
        # get left dataframe
        if i == 0:
            res_df = df1
        else:
            res_df = spark.read.parquet(path)
        
        # change right dataframe columns name
        df_temp = change_colname_join_df(df_temp)

        # join and save
        res_df = res_df.join(df_temp, chain_and(res_df, df_temp, key_cols)) \
            .drop(key_cols[0] + suffix).drop(key_cols[1] + suffix)
        res_df = remove_cols_suffix(res_df)
        
        res_df.write.mode("overwrite").parquet(path)
    
    return spark.read.parquet(path)

In [29]:
test_df = chain_left_join(song_rate, [n_playlist_df], ["userId", "up_ts"])
test_df.show(5)

+-------+-------------------+----------+--------+-----------------+------+
| userId|              up_ts|song_count|datediff|        song_rate|n_song|
+-------+-------------------+----------+--------+-----------------+------+
|1003452|2018-11-01 01:43:20|       776|      29|26.75862068965517|    21|
|1005435|2018-10-15 21:04:20|       321|      46|6.978260869565218|     4|
|1024700|2018-10-02 18:09:50|       271|      59|4.593220338983051|    12|
|1049500|2018-11-01 20:46:01|      1637|      29|56.44827586206897|    47|
|1051660|2018-10-06 01:07:59|      1342|      55|             24.4|    36|
+-------+-------------------+----------+--------+-----------------+------+
only showing top 5 rows



In [10]:
# number of song heard in one subscription
n_songs_play_df = df.filter(df.page == "NextSong").groupBy(["userId", "up_ts"]).count().withColumnRenamed("count", "n_songs")
n_songs_play_df.show(5)

+-------+-------------------+-------+
| userId|              up_ts|n_songs|
+-------+-------------------+-------+
|1367536|2018-10-08 14:58:27|   1806|
|1322258|2018-10-08 23:28:14|    997|
|1554972|2018-11-19 21:16:01|    472|
|1655008|2018-10-25 06:50:45|    234|
|1968237|2018-10-27 11:19:26|    700|
+-------+-------------------+-------+
only showing top 5 rows



In [11]:
# number of day in subscription
maxdate_df = df.select("ts").agg(F.max(df.ts))

datediff_df = df.select(["userId", "up_ts", "down_ts", "isChurn"]).dropDuplicates() \
    .join(maxdate_df, ~df.userId.isNull(), how='left') \
    .withColumn("datediff", 
        F.datediff(F.when(F.col("isChurn"), F.col("down_ts")).otherwise(F.col("max(ts)").cast(T.TimestampType())), df.up_ts)) \
    .drop("max(ts)")

datediff_df.show(5)

+-------+-------------------+-------------------+-------+--------+
| userId|              up_ts|            down_ts|isChurn|datediff|
+-------+-------------------+-------------------+-------+--------+
|1111091|2018-11-27 17:43:31|2099-12-31 00:00:00|  false|       3|
|1161080|2018-10-26 19:14:14|2018-11-23 21:55:06|   true|      28|
|1291366|2018-10-01 04:22:41|2099-12-31 00:00:00|  false|      60|
|1335330|2018-11-26 15:00:33|2099-12-31 00:00:00|  false|       4|
|1721316|2018-10-11 17:19:44|2099-12-31 00:00:00|  false|      50|
+-------+-------------------+-------------------+-------+--------+
only showing top 5 rows



In [12]:
# number of song played per day
jdf = change_colname_join_df(datediff_df)
song_rate = df.filter(df.page == 'NextSong').groupBy(["userId", "up_ts"]).count() \
    .withColumnRenamed("count", "song_count") \
    .join(jdf, (df.up_ts == jdf.up_ts_temp) & (df.userId == jdf.userId_temp)) \
    .drop("userId_temp", "up_ts_temp", "down_ts_temp", "isChurn_temp") \
    .withColumn("song_rate", F.col("song_count") / F.when(F.col("datediff_temp") == 0, 1).otherwise(F.col("datediff_temp")))

song_rate.show(5)

+-------+-------------------+----------+-------------+------------------+
| userId|              up_ts|song_count|datediff_temp|         song_rate|
+-------+-------------------+----------+-------------+------------------+
|1071843|2018-11-08 13:16:59|      1190|           22| 54.09090909090909|
|1120784|2018-10-11 14:16:28|      1502|           50|             30.04|
|1128522|2018-11-13 17:46:22|       203|            2|             101.5|
|1130061|2018-10-11 20:04:50|        96|           50|              1.92|
|1135039|2018-10-04 11:31:06|       643|           57|11.280701754385966|
+-------+-------------------+----------+-------------+------------------+
only showing top 5 rows



In [19]:
# number of songs added to playlist
n_playlist_df = df.select(["userId", "up_ts", "page"]).filter(df.page =="Add to Playlist") \
    .groupBy(["userId", "up_ts"]) \
    .agg(F.count(F.col("page")).alias("n_song"))

n_playlist_df.show(5)

+-------+-------------------+------+
| userId|              up_ts|n_song|
+-------+-------------------+------+
|1367536|2018-10-08 14:58:27|    39|
|1322258|2018-10-08 23:28:14|    30|
|1554972|2018-11-19 21:16:01|    21|
|1655008|2018-10-25 06:50:45|    11|
|1968237|2018-10-27 11:19:26|    15|
+-------+-------------------+------+
only showing top 5 rows



In [14]:
#number of thumbs up and down
tup_tdown_df = df.select(["userId", "up_ts", "down_ts", "isChurn", "page"]).filter(df.page.isin(["Thumbs Up", "Thumbs Down"])) \
    .groupby(["userId", "up_ts", "page"]) \
    .agg(F.count(F.col("page"))) \
    .groupby(["userId", "up_ts"]) \
    .pivot("page") \
    .agg(F.first("count(page)"))

tup_tdown_df.show(5)


+-------+-------------------+-----------+---------+
| userId|              up_ts|Thumbs Down|Thumbs Up|
+-------+-------------------+-----------+---------+
|1427546|2018-10-17 22:52:00|          4|       25|
|1712775|2018-11-23 17:56:01|          6|       35|
|1358238|2018-10-02 14:25:59|         23|       64|
|1071843|2018-11-08 13:16:59|          6|       51|
|1554972|2018-11-19 21:16:01|          5|       35|
+-------+-------------------+-----------+---------+
only showing top 5 rows



In [15]:
# average session length and number of session
session_df = df.groupBy(["userId", "up_ts", "sessionId"]) \
    .agg(
        F.min(df.ts).cast(T.LongType()).alias("min"),
        F.max(df.ts).cast(T.LongType()).alias("max")
    ) \
    .withColumn("diff", (F.col("max") - F.col("min"))) \
    .groupBy(["userId", "up_ts"]) \
    .agg(F.avg(F.col("diff")).alias("avg_sess_len"), F.count(F.col("sessionId")).alias("sess_count"))

session_df.show(5)

+-------+-------------------+------------------+----------+
| userId|              up_ts|      avg_sess_len|sess_count|
+-------+-------------------+------------------+----------+
|1914133|2018-10-09 11:11:55|           22340.0|        12|
|1071843|2018-11-08 13:16:59|27020.272727272728|        11|
|1582360|2018-10-04 15:48:38|19012.117647058825|        34|
|1576394|2018-10-03 07:51:46| 19205.60606060606|        33|
|1957517|2018-10-11 04:46:39|          28964.75|        12|
+-------+-------------------+------------------+----------+
only showing top 5 rows



In [16]:
# platform for each user
device_df = df.select(["userId", "up_ts", "userAgent"]).withColumn("platform",
    F.when(df.userAgent.contains("Macintosh"), "macos") \
    .when(df.userAgent.contains("Windows"), "windows") \
    .when(df.userAgent.contains("iPad"), "ipad") \
    .when(df.userAgent.contains("iPhone"), "iphone") \
    .when(df.userAgent.contains("Linux"), "linux")) \
    .groupBy(["userId", "up_ts", "platform"]) \
    .agg(F.count("platform")) \
    .withColumn("isplatform", ~F.isnull(F.col("count(platform)"))) \
    .groupby(["userId", "up_ts"]) \
    .pivot("platform") \
    .agg(F.max("isplatform")) \
    .fillna(False)

device_df.show(5)

+-------+-------------------+-----+------+-----+-----+-------+
| userId|              up_ts| ipad|iphone|linux|macos|windows|
+-------+-------------------+-----+------+-----+-----+-------+
|1130061|2018-10-11 20:04:50|false| false|false| true|  false|
|1914133|2018-10-09 11:11:55|false| false|false| true|  false|
|1128522|2018-11-13 17:46:22|false| false|false| true|  false|
|1712775|2018-11-23 17:56:01|false| false|false|false|   true|
|1568402|2018-10-18 14:26:47|false|  true|false|false|  false|
+-------+-------------------+-----+------+-----+-----+-------+
only showing top 5 rows



In [17]:
#state
state_df = df.select(["userId","up_ts","location"]) \
    .withColumn("state", F.split(df.location, ', ')[1]) \
    .groupBy(["userId", "up_ts", "state"]) \
    .agg(~F.isnull(F.count(F.col("state")))) \
    .withColumnRenamed("(NOT (count(state) IS NULL))", "isstate") \
    .groupBy(["userId", "up_ts"]) \
    .pivot("state") \
    .agg(F.max("isstate")) \
    .fillna(False)

state_df.show(5)

+-------+-------------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----------+-----+-----+-----+-----+-----+-----+-----+-----+--------+--------+-----+-----+--------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+--------+-----+--------+-----+-----+-----+-----+-----+-----+-----+-----------+-----+-----+-----+-----+-----+-----+-----+--------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+--------+-----+-----+-----+-----+
| userId|              up_ts|   AK|   AL|   AR|AR-MO|AR-OK|   AZ|   CA|   CO|   CT|DC-VA-MD-WV|   DE|   FL|   GA|GA-AL|GA-SC|   HI|   IA|IA-IL|IA-IL-MO|IA-NE-SD|   ID|   IL|IL-IN-WI|IL-MO|   IN|IN-KY|IN-MI|   KS|   KY|KY-IL|KY-IN|   LA|   MA|MA-CT|MA-NH|   MD|MD-DE|MD-WV|   ME|   MI|MI-WI|   MN|MN-WI|   MO|MO-IL|MO-KS|   MS|MS-LA|   MT|   NC