In [172]:
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
import os
import pandas as pd
import datetime as dt

In [2]:
spark = SparkSession.builder.getOrCreate()
#start a spark session

In [3]:
spark_fp = os.path.join("/", "Volumes", "Marceline Jr.", "Spotify Dataset", "sample_sets", "track_features_subset_0.csv")
spark_fp

'/Volumes/Marceline Jr./Spotify Dataset/sample_sets/track_features_subset_0.csv'

In [4]:
df = spark.read.load(spark_fp, 
                      format="csv", inferSchema="true", header="true")
#load our dataframe lazily so we can sample from it

In [5]:
df.count()

10357741

In [6]:
pandas_df = df.limit(1000000).toPandas()
#if your spark session crashes, reduce the limit of the items you want

In [7]:
pandas_df.head(5)

Unnamed: 0,session_id,session_position,session_length,track_id_clean,skip_1,skip_2,skip_3,not_skipped,context_switch,no_pause_before_play,...,long_pause_before_play,hist_user_behavior_n_seekfwd,hist_user_behavior_n_seekback,hist_user_behavior_is_shuffle,hour_of_day,date,premium,context_type,hist_user_behavior_reason_start,hist_user_behavior_reason_end
0,5_000074db-eb0b-4ecc-a9db-0e8e39f10198,9,10,t_1131add1-4106-4a84-a117-63b8145cb4a8,False,False,False,True,0,1,...,0,0,0,False,17,2018-07-20,True,radio,trackdone,trackdone
1,5_0003aa7a-65e1-42ae-bebd-3097d62ee4e9,17,20,t_0678fe94-c636-407a-983a-b78c672a85c2,False,False,True,False,1,1,...,0,0,0,True,6,2018-07-20,True,catalog,clickrow,endplay
2,5_0007744a-b9db-4bd6-97d5-351cf2959ec1,8,20,t_86f41324-42c4-4011-ba80-851a56971097,False,False,False,True,0,1,...,0,0,0,False,23,2018-07-19,True,catalog,trackdone,trackdone
3,5_0009e4c1-815f-4ea9-811c-2605334f7ef3,2,15,t_77b02acb-1b1f-4b36-b8fc-2c3e01892b9a,False,False,False,True,0,0,...,1,0,0,False,9,2018-07-20,True,editorial_playlist,trackdone,trackdone
4,5_00109067-e6eb-4c24-ab76-32914ccadef8,16,20,t_4bb4aa96-286c-4111-9d2c-5b70a58254f0,False,False,False,True,0,1,...,0,0,0,False,13,2018-07-20,False,catalog,trackdone,trackdone


In [17]:
tf_path_one = os.path.join("/", "Volumes", "Marceline Jr.", "Spotify Dataset", "track_features", "tf_000000000000.csv")

In [18]:
tf_path_two = os.path.join("/", "Volumes", "Marceline Jr.", "Spotify Dataset", "track_features", "tf_000000000001.csv")

In [19]:
track_features_one = pd.read_csv(tf_path_one)
track_features_two = pd.read_csv(tf_path_two)

In [22]:
track_features = pd.concat([track_features_one, track_features_two])

In [23]:
len(track_features), len(track_features_one) + len(track_features_two)

(3706388, 3706388)

In [24]:
track_features.head()

Unnamed: 0,track_id,duration,release_year,us_popularity_estimate,acousticness,beat_strength,bounciness,danceability,dyn_range_mean,energy,...,time_signature,valence,acoustic_vector_0,acoustic_vector_1,acoustic_vector_2,acoustic_vector_3,acoustic_vector_4,acoustic_vector_5,acoustic_vector_6,acoustic_vector_7
0,t_2e8f4b71-8a0b-4b9c-b7d8-fb5208e87f9f,326.013336,1971,99.582885,0.716209,0.366495,0.332605,0.439835,5.805774,0.238847,...,4,0.223395,0.146012,-0.706908,0.259496,0.481157,0.238427,-0.098389,-0.25496,-0.227383
1,t_dae2ec0e-ec7b-4b3e-b60c-4a884d0eccb0,147.813324,1963,97.272035,0.83946,0.362212,0.389829,0.50758,6.845427,0.420476,...,4,0.484702,0.039554,-0.539554,0.105141,0.692589,0.226047,-0.468162,0.164389,-0.769024
2,t_cf0164dd-1531-4399-bfa6-dec19cd1fedc,110.400002,1974,99.620384,0.054673,0.495002,0.589378,0.552311,9.361949,0.842938,...,4,0.818441,0.083863,-0.242108,-0.014258,0.096396,0.417641,-0.050576,-0.204757,-0.172563
3,t_0f90acc7-d5c5-4e53-901d-55610fbd090c,237.653336,1988,96.79683,0.042606,0.389634,0.359044,0.585673,6.068578,0.665398,...,4,0.594829,0.192498,0.340039,0.034846,-0.389794,0.518381,0.185008,-0.079907,-0.016978
4,t_36b9ad02-095a-443d-a697-6c7285d9410a,174.600006,1987,97.905891,0.249982,0.51364,0.485435,0.635095,7.198735,0.408715,...,4,0.591289,0.270586,-0.411061,0.165898,0.225652,0.335518,-0.036643,-0.0163,-0.44687


In [177]:
df = pd.merge(pandas_df, track_features, left_on = 'track_id_clean', right_on = 'track_id')

In [178]:
df.columns

Index(['session_id', 'session_position', 'session_length', 'track_id_clean',
       'skip_1', 'skip_2', 'skip_3', 'not_skipped', 'context_switch',
       'no_pause_before_play', 'short_pause_before_play',
       'long_pause_before_play', 'hist_user_behavior_n_seekfwd',
       'hist_user_behavior_n_seekback', 'hist_user_behavior_is_shuffle',
       'hour_of_day', 'date', 'premium', 'context_type',
       'hist_user_behavior_reason_start', 'hist_user_behavior_reason_end',
       'track_id', 'duration', 'release_year', 'us_popularity_estimate',
       'acousticness', 'beat_strength', 'bounciness', 'danceability',
       'dyn_range_mean', 'energy', 'flatness', 'instrumentalness', 'key',
       'liveness', 'loudness', 'mechanism', 'mode', 'organism', 'speechiness',
       'tempo', 'time_signature', 'valence', 'acoustic_vector_0',
       'acoustic_vector_1', 'acoustic_vector_2', 'acoustic_vector_3',
       'acoustic_vector_4', 'acoustic_vector_5', 'acoustic_vector_6',
       'acoustic_vector

In [179]:
df.drop(['track_id_clean', 'skip_1', 'skip_2', 'skip_3','hist_user_behavior_reason_end', 'session_id', 'track_id','date'], axis = 1, inplace = True)

In [180]:
df.columns

Index(['session_position', 'session_length', 'not_skipped', 'context_switch',
       'no_pause_before_play', 'short_pause_before_play',
       'long_pause_before_play', 'hist_user_behavior_n_seekfwd',
       'hist_user_behavior_n_seekback', 'hist_user_behavior_is_shuffle',
       'hour_of_day', 'premium', 'context_type',
       'hist_user_behavior_reason_start', 'duration', 'release_year',
       'us_popularity_estimate', 'acousticness', 'beat_strength', 'bounciness',
       'danceability', 'dyn_range_mean', 'energy', 'flatness',
       'instrumentalness', 'key', 'liveness', 'loudness', 'mechanism', 'mode',
       'organism', 'speechiness', 'tempo', 'time_signature', 'valence',
       'acoustic_vector_0', 'acoustic_vector_1', 'acoustic_vector_2',
       'acoustic_vector_3', 'acoustic_vector_4', 'acoustic_vector_5',
       'acoustic_vector_6', 'acoustic_vector_7'],
      dtype='object')

In [181]:
df[['hist_user_behavior_reason_start', 'context_type']]

Unnamed: 0,hist_user_behavior_reason_start,context_type
0,trackdone,radio
1,fwdbtn,radio
2,trackdone,user_collection
3,trackdone,radio
4,trackdone,user_collection
...,...,...
999995,fwdbtn,user_collection
999996,trackdone,radio
999997,fwdbtn,catalog
999998,trackdone,catalog


In [182]:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import FunctionTransformer
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier

In [183]:
df['not_skipped'] = df['not_skipped'].apply(lambda x: 1 if x == True else 0)

In [184]:
df.dtypes

session_position                     int32
session_length                       int32
not_skipped                          int64
context_switch                       int32
no_pause_before_play                 int32
short_pause_before_play              int32
long_pause_before_play               int32
hist_user_behavior_n_seekfwd         int32
hist_user_behavior_n_seekback        int32
hist_user_behavior_is_shuffle         bool
hour_of_day                          int32
premium                               bool
context_type                        object
hist_user_behavior_reason_start     object
duration                           float64
release_year                         int64
us_popularity_estimate             float64
acousticness                       float64
beat_strength                      float64
bounciness                         float64
danceability                       float64
dyn_range_mean                     float64
energy                             float64
flatness   

In [185]:
df['premium']= df['premium'].apply(lambda x: 1 if x is True else 0)
df['hist_user_behavior_is_shuffle'] = df['hist_user_behavior_is_shuffle'].apply(lambda x: 1 if x is True else 0)

In [202]:
df['mode']

0         major
1         major
2         major
3         major
4         major
          ...  
999995    major
999996    major
999997    major
999998    major
999999    major
Name: mode, Length: 1000000, dtype: object

In [203]:
as_is = ['session_position', 'session_length','hist_user_behavior_is_shuffle',
       'hour_of_day','premium','duration', 
       'release_year', 'us_popularity_estimate', 'acousticness',
       'beat_strength', 'bounciness', 'danceability', 'dyn_range_mean',
       'energy', 'flatness', 'instrumentalness', 'liveness', 'loudness',
       'mechanism', 'key', 'organism', 'speechiness', 'tempo',
       'time_signature', 'valence', 'acoustic_vector_0', 'acoustic_vector_1',
       'acoustic_vector_2', 'acoustic_vector_3', 'acoustic_vector_4',
       'acoustic_vector_5', 'acoustic_vector_6', 'acoustic_vector_7', 'context_switch', 'no_pause_before_play', 'short_pause_before_play',
       'long_pause_before_play', 'hist_user_behavior_n_seekfwd',
       'hist_user_behavior_n_seekback']
ohe = ['mode','context_type', 'hist_user_behavior_reason_start']

In [204]:
preproc = ColumnTransformer(
    transformers = [
        ('as_is', FunctionTransformer(lambda x: x), as_is),
        ('one_hot', OneHotEncoder(handle_unknown = 'ignore'), ohe),
    ]
)

In [205]:
predict = df['not_skipped']

In [206]:
pl = Pipeline(steps = [('preprocessor', preproc), ('classifier', DecisionTreeClassifier(max_depth = 10))])
x_train, x_test, y_train, y_test = train_test_split(df.drop('not_skipped', axis = 1), df['not_skipped'], test_size= 0.2)

In [207]:
pl.fit(x_train, y_train)

Pipeline(steps=[('preprocessor',
                 ColumnTransformer(transformers=[('as_is',
                                                  FunctionTransformer(func=<function <lambda> at 0x7fb5ba8bc940>),
                                                  ['session_position',
                                                   'session_length',
                                                   'hist_user_behavior_is_shuffle',
                                                   'hour_of_day', 'premium',
                                                   'duration', 'release_year',
                                                   'us_popularity_estimate',
                                                   'acousticness',
                                                   'beat_strength',
                                                   'bounciness', 'danceability',
                                                   'dyn_range_me...
                                                   '

In [208]:
pl.score(x_test, y_test)

0.805635

In [211]:
pl.predict(x_test)

array([1, 0, 0, ..., 0, 0, 1])