In [1]:
import findspark
findspark.init()

from pyspark import SparkContext
from pyspark.sql import SparkSession 

sc = SparkContext()
spark = SparkSession.builder.appName("spo").getOrCreate()

In [2]:
spotify = spark.read.format("csv").option("header", "true").load("s3://502-mountain-dew-spotify/training_set/log_0_20180715_000000000000.csv")

In [3]:
spotify.printSchema()

root
 |-- session_id: string (nullable = true)
 |-- session_position: string (nullable = true)
 |-- session_length: string (nullable = true)
 |-- track_id_clean: string (nullable = true)
 |-- skip_1: string (nullable = true)
 |-- skip_2: string (nullable = true)
 |-- skip_3: string (nullable = true)
 |-- not_skipped: string (nullable = true)
 |-- context_switch: string (nullable = true)
 |-- no_pause_before_play: string (nullable = true)
 |-- short_pause_before_play: string (nullable = true)
 |-- long_pause_before_play: string (nullable = true)
 |-- hist_user_behavior_n_seekfwd: string (nullable = true)
 |-- hist_user_behavior_n_seekback: string (nullable = true)
 |-- hist_user_behavior_is_shuffle: string (nullable = true)
 |-- hour_of_day: string (nullable = true)
 |-- date: string (nullable = true)
 |-- premium: string (nullable = true)
 |-- context_type: string (nullable = true)
 |-- hist_user_behavior_reason_start: string (nullable = true)
 |-- hist_user_behavior_reason_end: string

In [32]:
columns_to_drop = ["hist_user_behavior_reason_start", "hist_user_behavior_reason_end", "context_type", "date"
                  ,"hour_of_day", "hist_user_behavior_is_shuffle", "hist_user_behavior_n_seekfwd", "session_length","session_position"]
df_drop = spotify.drop(*columns_to_drop)
df_drop.show(1)

+--------------------+--------------------+------+------+------+-----------+--------------+--------------------+-----------------------+----------------------+-----------------------------+-------+
|          session_id|      track_id_clean|skip_1|skip_2|skip_3|not_skipped|context_switch|no_pause_before_play|short_pause_before_play|long_pause_before_play|hist_user_behavior_n_seekback|premium|
+--------------------+--------------------+------+------+------+-----------+--------------+--------------------+-----------------------+----------------------+-----------------------------+-------+
|0_00006f66-33e5-4...|t_0479f24c-27d2-4...| false| false| false|       true|             0|                   0|                      0|                     0|                            0|   true|
+--------------------+--------------------+------+------+------+-----------+--------------+--------------------+-----------------------+----------------------+-----------------------------+-------+
only showi

In [38]:
from pyspark.sql.types import *

for c in ["skip_1", "skip_2", "skip_3", "not_skipped", "context_switch", "no_pause_before_play",
         "short_pause_before_play", "long_pause_before_play", "premium"]:
    df_drop = df_drop.withColumn(c, df_drop[c].cast(BooleanType()))
for c in ["hist_user_behavior_n_seekback"]:
    df_drop = df_drop.withColumn(c, df_drop[c].cast(IntegerType()))  
df_drop.printSchema()


root
 |-- session_id: string (nullable = true)
 |-- track_id_clean: string (nullable = true)
 |-- skip_1: boolean (nullable = true)
 |-- skip_2: boolean (nullable = true)
 |-- skip_3: boolean (nullable = true)
 |-- not_skipped: boolean (nullable = true)
 |-- context_switch: boolean (nullable = true)
 |-- no_pause_before_play: boolean (nullable = true)
 |-- short_pause_before_play: boolean (nullable = true)
 |-- long_pause_before_play: boolean (nullable = true)
 |-- hist_user_behavior_n_seekback: integer (nullable = true)
 |-- premium: boolean (nullable = true)



In [62]:
df = df_drop

In [63]:
from pyspark.sql.functions import udf

@udf("integer")
def skip_rating_udf(premium, skip_1, skip_2, skip_3, not_skip, nopause):
    if premium :
        if skip_1:
            skip1 = 1 - (1 - nopause)
        else:
            skip1 = 0
        if skip_2:
            skip2 = 3
        else:
            skip2 = 0
        if skip_3:
            skip3 = 5
        else:
            skip3 = 0
        if not_skip:
            notskip = 6 + (2 - 2 * nopause)
        else:
            notskip = 0
        return max(skip1, skip2, skip3, notskip)
    else:
        if skip_1:
            skip1 = 0
        else:
            skip1 = 0
        if skip_2:
            skip2 = 1
        else:
            skip2 = 0
        if skip_3:
            skip3 = 2
        else:
            skip3 = 0
        if not_skip:
            notskip = 5 + (1 - 1 * nopause)
        else:
            notskip = 0
        return max(skip1, skip2, skip3, notskip)

In [70]:
@udf("integer")
def final_rating_udf(Skip_Rating, context_switch, seekback):
    if context_switch:
        context = 1
    else:
        context = 0
    inside = 1 - (Skip_Rating + 2*context)/10
    return (1 -(inside**(seekback**1/3))) * 10

In [65]:
from pyspark.sql import functions as ft
df = df.withColumn("Not_Pause", ft.when(ft.col("short_pause_before_play")|ft.col("long_pause_before_play"), 0).otherwise(1))

In [66]:
df = df.withColumn("Skip_Rating", skip_rating_udf(ft.col("premium"), ft.col("skip_1"), ft.col("skip_2"), ft.col("skip_3"), ft.col("not_skipped"), ft.col("Not_Pause")))

In [71]:
df = df.withColumn("Final_rating", final_rating_udf(ft.col("Skip_Rating"), ft.col("context_switch"),
                                                    ft.col("hist_user_behavior_n_seekback")))

In [78]:
df1 = df.select("session_id","Skip_Rating").where(ft.col("Skip_Rating")>0)

In [79]:
df1.show(50)

+----------+-----------+
|session_id|Skip_Rating|
+----------+-----------+
+----------+-----------+

