In [1]:
import pandas as pd
import numpy as np
import os
import pickle
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn import __version__ as sklearn_version
from sklearn.decomposition import PCA
from sklearn.preprocessing import scale
from sklearn.model_selection import train_test_split, cross_validate, GridSearchCV, learning_curve, StratifiedKFold
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.dummy import DummyRegressor
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error, confusion_matrix, classification_report
from sklearn.pipeline import make_pipeline
from sklearn.impute import SimpleImputer
from sklearn.feature_selection import SelectKBest, f_regression
from sklearn.svm import SVC
from sklearn.cluster import KMeans, MiniBatchKMeans

import datetime


In [2]:
kmean100_df = pd.read_csv('../data/interim/all_data/mbKMeans100clusters.csv', usecols=['track_id','clus'])
kmean100_df.rename(columns={'track_id':'track_id_clean'}, inplace=True)
kmean100_df.head()

Unnamed: 0,track_id_clean,clus
0,t_2e8f4b71-8a0b-4b9c-b7d8-fb5208e87f9f,94
1,t_dae2ec0e-ec7b-4b3e-b60c-4a884d0eccb0,36
2,t_cf0164dd-1531-4399-bfa6-dec19cd1fedc,28
3,t_0f90acc7-d5c5-4e53-901d-55610fbd090c,4
4,t_36b9ad02-095a-443d-a697-6c7285d9410a,29


In [3]:
import glob
file_list = glob.glob('../data/raw/training_set/log_0*.csv')

In [None]:
from dask.dataframe import from_pandas
from dask.dataframe.reshape import pivot_table
import dask.array as da

from timeit import default_timer as timer #to see how long the computation will take

def cal_svd(filename):
    start = timer()
    df = pd.read_csv(filename, usecols = ['session_id','skip_2','track_id_clean','session_length','session_position']).merge(kmean100_df)
    df = df.astype({'session_id':'category', 'track_id_clean':'category', 'clus': 'category'})
    df = df.loc[df['session_position']<(df['session_length']/2)]
    df['ListenYes'] = (df['skip_2'] == False)*1
    df['ListenYes'].replace(0, -1, inplace = True)
    df2 = df.groupby(['session_id', 'clus']).agg({'ListenYes':['sum']}).reset_index()
    df2.columns = df2.columns.droplevel(level = 1) # take out the unwanted level

    dd = from_pandas(df2, npartitions=1)
    DfMatrix = pivot_table(dd, values='ListenYes', index='session_id', columns='clus')
    u, s, v = da.linalg.svd(DfMatrix.to_dask_array())
    pd.DataFrame(v.compute()).to_csv('../data/interim/all_data/SVD/v_'+filename[25:])
    print('Runtime: %0.2fs' % (timer() - start))
    
from joblib import Parallel, delayed
Parallel(n_jobs=4, verbose=1)(delayed(cal_svd)(filename) for filename in file_list)



[Parallel(n_jobs=4)]: Using backend LokyBackend with 4 concurrent workers.


In [6]:

df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 1290405 entries, 0 to 2884471
Data columns (total 7 columns):
 #   Column            Non-Null Count    Dtype   
---  ------            --------------    -----   
 0   session_id        1290405 non-null  category
 1   session_position  1290405 non-null  int64   
 2   session_length    1290405 non-null  int64   
 3   track_id_clean    1290405 non-null  category
 4   skip_2            1290405 non-null  bool    
 5   clus              1290405 non-null  category
 6   ListenYes         1290405 non-null  int64   
dtypes: bool(1), category(3), int64(3)
memory usage: 67.6 MB


In [7]:
df = df.astype({'session_id':'category', 'track_id_clean':'category', 'clus': 'category'})
df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 1290405 entries, 0 to 2884471
Data columns (total 7 columns):
 #   Column            Non-Null Count    Dtype   
---  ------            --------------    -----   
 0   session_id        1290405 non-null  category
 1   session_position  1290405 non-null  int64   
 2   session_length    1290405 non-null  int64   
 3   track_id_clean    1290405 non-null  category
 4   skip_2            1290405 non-null  bool    
 5   clus              1290405 non-null  category
 6   ListenYes         1290405 non-null  int64   
dtypes: bool(1), category(3), int64(3)
memory usage: 67.6 MB


In [8]:
df = df.loc[df['session_position']<(df['session_length']/2)]
df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 1290405 entries, 0 to 2884471
Data columns (total 7 columns):
 #   Column            Non-Null Count    Dtype   
---  ------            --------------    -----   
 0   session_id        1290405 non-null  category
 1   session_position  1290405 non-null  int64   
 2   session_length    1290405 non-null  int64   
 3   track_id_clean    1290405 non-null  category
 4   skip_2            1290405 non-null  bool    
 5   clus              1290405 non-null  category
 6   ListenYes         1290405 non-null  int64   
dtypes: bool(1), category(3), int64(3)
memory usage: 67.6 MB


In [9]:
df['ListenYes'] = (df['skip_2'] == False)*1
df['ListenYes'].replace(0, -1, inplace = True)
df.head()

Unnamed: 0,session_id,session_position,session_length,track_id_clean,skip_2,clus,ListenYes
0,7_00007784-914d-4fa0-a1ef-cc4ee90c895b,1,16,t_7bb9ddcd-da46-4fe6-8fa6-c9e295693c48,False,76,1
3,7_00007784-914d-4fa0-a1ef-cc4ee90c895b,2,16,t_5815a934-91b0-4e80-8865-9a48d1319727,False,73,1
5,7_e3b1b79d-7f9f-4528-b29c-e2d513602584,2,17,t_5815a934-91b0-4e80-8865-9a48d1319727,True,73,-1
6,7_00007784-914d-4fa0-a1ef-cc4ee90c895b,3,16,t_7771234d-4da6-4669-882e-5556334b6bda,True,52,-1
10,7_d8bf2bb9-a442-4452-aed3-5fa8e63150a2,2,20,t_7771234d-4da6-4669-882e-5556334b6bda,False,52,1


In [10]:
df2 = df.groupby(['session_id', 'clus']).agg({'ListenYes':['sum']}).reset_index()
df2.columns = df2.columns.droplevel(level = 1) # take out the unwanted level
df2.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 17215800 entries, 0 to 17215799
Data columns (total 3 columns):
 #   Column      Dtype   
---  ------      -----   
 0   session_id  category
 1   clus        category
 2   ListenYes   int64   
dtypes: category(2), int64(1)
memory usage: 218.8 MB


In [11]:
from dask.dataframe import from_pandas
dd = from_pandas(df2, npartitions=1)

In [12]:
dd.dtypes

session_id    category
clus          category
ListenYes        int64
dtype: object

In [13]:
from dask.dataframe.reshape import pivot_table
DfMatrix = pivot_table(dd, values='ListenYes', index='session_id', columns='clus')
DfMatrix

Unnamed: 0_level_0,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1,Unnamed: 30_level_1,Unnamed: 31_level_1,Unnamed: 32_level_1,Unnamed: 33_level_1,Unnamed: 34_level_1,Unnamed: 35_level_1,Unnamed: 36_level_1,Unnamed: 37_level_1,Unnamed: 38_level_1,Unnamed: 39_level_1,Unnamed: 40_level_1,Unnamed: 41_level_1,Unnamed: 42_level_1,Unnamed: 43_level_1,Unnamed: 44_level_1,Unnamed: 45_level_1,Unnamed: 46_level_1,Unnamed: 47_level_1,Unnamed: 48_level_1,Unnamed: 49_level_1,Unnamed: 50_level_1,Unnamed: 51_level_1,Unnamed: 52_level_1,Unnamed: 53_level_1,Unnamed: 54_level_1,Unnamed: 55_level_1,Unnamed: 56_level_1,Unnamed: 57_level_1,Unnamed: 58_level_1,Unnamed: 59_level_1,Unnamed: 60_level_1,Unnamed: 61_level_1,Unnamed: 62_level_1,Unnamed: 63_level_1,Unnamed: 64_level_1,Unnamed: 65_level_1,Unnamed: 66_level_1,Unnamed: 67_level_1,Unnamed: 68_level_1,Unnamed: 69_level_1,Unnamed: 70_level_1,Unnamed: 71_level_1,Unnamed: 72_level_1,Unnamed: 73_level_1,Unnamed: 74_level_1,Unnamed: 75_level_1,Unnamed: 76_level_1,Unnamed: 77_level_1,Unnamed: 78_level_1,Unnamed: 79_level_1,Unnamed: 80_level_1,Unnamed: 81_level_1,Unnamed: 82_level_1,Unnamed: 83_level_1,Unnamed: 84_level_1,Unnamed: 85_level_1,Unnamed: 86_level_1,Unnamed: 87_level_1,Unnamed: 88_level_1,Unnamed: 89_level_1,Unnamed: 90_level_1,Unnamed: 91_level_1,Unnamed: 92_level_1,Unnamed: 93_level_1,Unnamed: 94_level_1,Unnamed: 95_level_1,Unnamed: 96_level_1,Unnamed: 97_level_1,Unnamed: 98_level_1,Unnamed: 99_level_1,Unnamed: 100_level_1
,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [14]:
len(DfMatrix)

172158

In [28]:
# DfMatrix = pd.pivot_table(df2, values='ListenYes', index='session_id', columns='clus')
# del df2, df
# DfMatrix

In [29]:
# from timeit import default_timer as timer #to see how long the computation will take
# start = timer()
# u, s, vh = np.linalg.svd(DfMatrix.to_numpy())
# end = timer()
# print('\nRuntime: %0.2fs' % (end - start))

In [39]:
from dask.distributed import Client
client = Client(n_workers=4, threads_per_worker=1)
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 60025 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:60025/status,

0,1
Dashboard: http://127.0.0.1:60025/status,Workers: 4
Total threads: 4,Total memory: 64.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:60026,Workers: 4
Dashboard: http://127.0.0.1:60025/status,Total threads: 4
Started: Just now,Total memory: 64.00 GiB

0,1
Comm: tcp://127.0.0.1:60046,Total threads: 1
Dashboard: http://127.0.0.1:60047/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:60032,
Local directory: /Volumes/AC8888/Capstone_Spotify-Sequential-Skip-Prediction/notebooks/dask-worker-space/worker-hikk9xak,Local directory: /Volumes/AC8888/Capstone_Spotify-Sequential-Skip-Prediction/notebooks/dask-worker-space/worker-hikk9xak

0,1
Comm: tcp://127.0.0.1:60037,Total threads: 1
Dashboard: http://127.0.0.1:60040/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:60030,
Local directory: /Volumes/AC8888/Capstone_Spotify-Sequential-Skip-Prediction/notebooks/dask-worker-space/worker-b9v2d82k,Local directory: /Volumes/AC8888/Capstone_Spotify-Sequential-Skip-Prediction/notebooks/dask-worker-space/worker-b9v2d82k

0,1
Comm: tcp://127.0.0.1:60038,Total threads: 1
Dashboard: http://127.0.0.1:60039/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:60031,
Local directory: /Volumes/AC8888/Capstone_Spotify-Sequential-Skip-Prediction/notebooks/dask-worker-space/worker-ufht5fr3,Local directory: /Volumes/AC8888/Capstone_Spotify-Sequential-Skip-Prediction/notebooks/dask-worker-space/worker-ufht5fr3

0,1
Comm: tcp://127.0.0.1:60043,Total threads: 1
Dashboard: http://127.0.0.1:60044/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:60029,
Local directory: /Volumes/AC8888/Capstone_Spotify-Sequential-Skip-Prediction/notebooks/dask-worker-space/worker-zcpak9te,Local directory: /Volumes/AC8888/Capstone_Spotify-Sequential-Skip-Prediction/notebooks/dask-worker-space/worker-zcpak9te


In [15]:
from timeit import default_timer as timer #to see how long the computation will take
import dask.array as da
from dask.dataframe import from_pandas
start = timer()
u, s, v = da.linalg.svd(DfMatrix.to_dask_array())
# u, s, v = da.linalg.svd(da.from_array(DfMatrix.to_numpy()))
end = timer()
print('\nRuntime: %0.2fs' % (end - start))


Runtime: 0.02s


In [49]:
u

Unnamed: 0,Array,Chunk
Bytes,unknown,unknown
Shape,"(nan, nan)","(nan, nan)"
Count,20 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes unknown unknown Shape (nan, nan) (nan, nan) Count 20 Tasks 1 Chunks Type float64 numpy.ndarray",,

Unnamed: 0,Array,Chunk
Bytes,unknown,unknown
Shape,"(nan, nan)","(nan, nan)"
Count,20 Tasks,1 Chunks
Type,float64,numpy.ndarray


In [50]:
start = timer()
pd.DataFrame(u.compute())
end = timer()
print('\nRuntime: %0.2fs' % (end - start))


Runtime: 12.45s


In [51]:
pd.DataFrame(u.compute())

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,90,91,92,93,94,95,96,97,98,99
0,0.001806,-0.000876,-0.000379,-2.167887e-04,0.000018,0.000208,0.000051,0.002145,-0.004867,0.004981,...,9.204543e-06,2.723556e-06,-1.408129e-06,-0.000020,0.000004,1.652262e-06,-0.000009,-7.200949e-07,-4.701786e-06,-4.513691e-06
1,-0.002004,0.000979,-0.001164,-1.369003e-03,-0.003002,-0.002361,0.001178,0.000852,0.000410,0.000083,...,5.219788e-06,-8.001421e-06,-1.287585e-05,-0.000005,0.000015,2.695221e-05,0.000016,1.737425e-05,1.973242e-05,5.266568e-06
2,0.002934,-0.003013,0.000073,3.982700e-03,-0.003293,-0.003168,0.000555,0.003154,-0.001945,0.001768,...,-1.033898e-05,-1.195335e-05,1.468999e-05,0.000009,0.000080,-7.706776e-06,-0.000053,-1.160196e-05,-9.213980e-06,-1.343054e-05
3,-0.002187,0.000961,-0.000702,2.426203e-04,-0.000573,-0.003141,0.000259,-0.003620,-0.004255,0.006238,...,1.509885e-05,1.174795e-06,9.143739e-06,-0.000006,0.000017,-8.151524e-06,-0.000003,2.229382e-06,8.045150e-06,-7.015322e-06
4,0.000075,-0.000014,0.000163,9.960268e-05,0.000185,-0.000042,-0.000068,0.000765,0.004548,0.004076,...,-3.984739e-07,-3.294566e-08,-5.155353e-06,0.000007,-0.000006,-4.059865e-07,-0.000017,8.045299e-07,-9.168282e-07,-3.844541e-06
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
172153,-0.001626,0.001158,0.000481,-1.256711e-04,0.000181,0.000102,-0.000275,-0.000762,0.004959,0.009903,...,-1.185812e-05,1.506864e-05,-9.417376e-07,0.000004,0.000006,-2.716683e-05,-0.000066,-7.573359e-06,2.041486e-06,-3.619848e-06
172154,-0.000004,-0.000003,-0.000008,-4.475809e-07,-0.000006,-0.000006,-0.000005,-0.000013,-0.000106,-0.000098,...,1.275618e-05,8.386818e-06,1.708291e-06,0.000003,-0.000014,-6.147002e-06,-0.000001,-2.047378e-05,-3.868651e-05,-2.425714e-06
172155,0.000126,0.000104,0.000267,-8.941617e-05,0.000157,0.000212,0.000274,0.000585,-0.000272,0.000123,...,-5.163471e-05,-2.810449e-05,4.172617e-06,-0.000004,-0.000004,-4.249008e-06,0.000002,-4.848049e-06,1.369834e-05,8.327836e-07
172156,-0.000086,0.000006,-0.000421,-2.968590e-04,-0.000484,0.000065,-0.000100,-0.000376,-0.000270,0.000047,...,1.392430e-04,3.767373e-04,-8.542062e-05,-0.000168,0.000227,-2.703186e-04,0.000416,-7.703179e-06,-3.489510e-07,1.168787e-04


In [17]:
pd.DataFrame(v.compute()).to_csv('test')

In [55]:
s.compute()

100

In [59]:
da.to_npy_stack('test', v, axis=0)  

In [32]:
v2= v+v

In [34]:
v2.compute()

array([[ 4.22225911e-04,  1.29240575e-02,  1.46438350e-02, ...,
         2.35390437e-03,  1.41375475e-02,  1.79287012e-02],
       [-1.10414240e-05, -7.91055006e-04, -5.94932576e-03, ...,
        -3.13036841e-03, -1.33035787e-02, -1.26023700e-02],
       [ 1.58224703e-03,  2.76360596e-02,  7.66325913e-02, ...,
        -2.50345245e-04,  7.15190191e-02,  5.27628918e-02],
       ...,
       [-6.21309654e-02,  4.17717999e-04,  5.34678638e-04, ...,
         5.48531411e-04,  1.19980039e-03,  2.26653951e-03],
       [-4.48902386e-02, -1.42467168e-04, -1.70359093e-04, ...,
         4.46726702e-04,  2.74271989e-04,  3.01475549e-04],
       [-7.52799688e-02, -1.87213534e-03,  6.23844005e-04, ...,
        -2.83391255e-04, -1.29354652e-03, -2.01968654e-04]])

In [None]:
from functions import cal_similarMat
from timeit import default_timer as timer #to see how long the computation will take
start = timer()

sim_output = cal_similarMat(pd.concat(df_list))

end = timer()
print('\nRuntime: %0.2fs' % (end - start))


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  data2=DataBinary[['session_id','clus','ListenYes']]
  (2 * xtie * ytie) / m + x0 * y0 / (9 * m * (size - 2)))


In [6]:
from functions import cal_similarMat
from timeit import default_timer as timer #to see how long the computation will take
start = timer()


from joblib import Parallel, delayed
sim_output_List = Parallel(n_jobs=10, verbose=1)(delayed(cal_similarMat)(df) for df in df_list)

end = timer()
print('\nRuntime: %0.2fs' % (end - start))

[Parallel(n_jobs=10)]: Using backend LokyBackend with 10 concurrent workers.


KeyboardInterrupt: 

66

In [2]:
from dask import dataframe as dd
dask_df_list = []
dask_df_list.append(dd.read_csv('../data/raw/training_set/log_0*.csv')[['session_id','skip_2','track_id_clean']])
dask_df_list.append(dd.read_csv('../data/raw/training_set/log_1*.csv')[['session_id','skip_2','track_id_clean']])
dask_df_list.append(dd.read_csv('../data/raw/training_set/log_2*.csv')[['session_id','skip_2','track_id_clean']])
dask_df_list.append(dd.read_csv('../data/raw/training_set/log_3*.csv')[['session_id','skip_2','track_id_clean']])
dask_df_list.append(dd.read_csv('../data/raw/training_set/log_4*.csv')[['session_id','skip_2','track_id_clean']])
dask_df_list.append(dd.read_csv('../data/raw/training_set/log_5*.csv')[['session_id','skip_2','track_id_clean']])
dask_df_list.append(dd.read_csv('../data/raw/training_set/log_6*.csv')[['session_id','skip_2','track_id_clean']])
dask_df_list.append(dd.read_csv('../data/raw/training_set/log_7*.csv')[['session_id','skip_2','track_id_clean']])
dask_df_list.append(dd.read_csv('../data/raw/training_set/log_8*.csv')[['session_id','skip_2','track_id_clean']])

In [3]:
dask_df = dd.concat(dask_df_list)

dask_df

Unnamed: 0_level_0,session_id,skip_2,track_id_clean
npartitions=4735,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
,object,bool,object
,...,...,...
...,...,...,...
,...,...,...
,...,...,...


In [4]:
kmean100_df = pd.read_csv('../data/interim/all_data/mbKMeans100clusters.csv', usecols=['track_id','clus'])
kmean100_df.rename(columns={'track_id':'track_id_clean'}, inplace=True)
kmean100_df.head()

Unnamed: 0,track_id_clean,clus
0,t_2e8f4b71-8a0b-4b9c-b7d8-fb5208e87f9f,94
1,t_dae2ec0e-ec7b-4b3e-b60c-4a884d0eccb0,36
2,t_cf0164dd-1531-4399-bfa6-dec19cd1fedc,28
3,t_0f90acc7-d5c5-4e53-901d-55610fbd090c,4
4,t_36b9ad02-095a-443d-a697-6c7285d9410a,29


In [6]:
dask_df_merged = dask_df.merge(kmean100_df, on=['track_id_clean'])
dask_df_merged

Unnamed: 0_level_0,session_id,skip_2,track_id_clean,clus
npartitions=4735,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
,object,bool,object,int64
,...,...,...,...
...,...,...,...,...
,...,...,...,...
,...,...,...,...


In [6]:
dask_df_merged.head()

Unnamed: 0,session_id,skip_2,track_id_clean,clus
0,0_00006f66-33e5-4de7-a324-2d18e439fc1e,False,t_0479f24c-27d2-46d6-a00c-7ec928f2b539,66
1,0_00079a23-1600-486a-91bd-5208be0c745a,False,t_0479f24c-27d2-46d6-a00c-7ec928f2b539,66
2,0_012b0fb4-0cc3-429f-9a78-cc6e622153fb,False,t_0479f24c-27d2-46d6-a00c-7ec928f2b539,66
3,0_013cc010-c476-4ad2-8972-73449e0b2ef4,False,t_0479f24c-27d2-46d6-a00c-7ec928f2b539,66
4,0_01a5f0dc-9938-48c9-92f1-c7e51f34d290,False,t_0479f24c-27d2-46d6-a00c-7ec928f2b539,66


In [11]:
from functions import cal_similarMat
pd.DataFrame()

ddd = dask_df_merged.partitions[0:2]
a = ddd.map_partitions(cal_similarMat)

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  data2['ListenYes'].replace(0, -1, inplace = True)


ValueError: Metadata inference failed in `cal_similarMat`.

You have supplied a custom function and Dask is unable to 
determine the type of output that that function returns. 

To resolve this please provide a meta= keyword.
The docstring of the Dask function you ran should have more information.

Original error is below:
------------------------
ZeroDivisionError('float division by zero')

Traceback:
---------
  File "/Users/andrewchang/opt/anaconda3/envs/Springboard_DS_2022/lib/python3.8/site-packages/dask/dataframe/utils.py", line 177, in raise_on_meta_error
    yield
  File "/Users/andrewchang/opt/anaconda3/envs/Springboard_DS_2022/lib/python3.8/site-packages/dask/dataframe/core.py", line 6086, in _emulate
    return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
  File "/Volumes/AC8888/Capstone_Spotify-Sequential-Skip-Prediction/notebooks/functions.py", line 135, in cal_similarMat
    SessTrackEuclDist.loc[i][j] = sum(SessionListening*TrackEuclDist)/sum(TrackEuclDist)


In [24]:
dask_df_merged.memory_usage()

Dask Series Structure:
npartitions=1
    int64
      ...
dtype: int64
Dask Name: series-groupby-sum-agg, 33825 tasks

In [25]:
dask_df_merged = dask_df_merged.astype({'session_id':'category','track_id_clean':'category'})
dask_df_merged.memory_usage()

Dask Series Structure:
npartitions=1
    int64
      ...
dtype: int64
Dask Name: series-groupby-sum-agg, 38560 tasks

In [8]:
def myadd(df):
    return len(df)

ddd = dask_df_merged.partitions[0:10]
a = ddd.map_partitions(myadd)

In [10]:
a.compute()

0    373902
1    373783
2    373854
3    373824
4    373890
5    373867
6    373723
7    373766
8    376886
9    376919
dtype: int64

In [18]:
def myadd(df):
    return len(df)

for part in dask_df_merged.partitions[1:10]:
    print(len(part))

10
6
14
4


In [17]:
len(dask_df_merged.partitions[0])

373902

In [19]:
sessions = dask_df_merged.drop(columns=['skip_2','track_id_clean','clus']).drop_duplicates()

In [17]:
print(sessions.shape)

(Delayed('int-b970d9ec-1786-426e-9645-d3e7b6d8595b'), 1)


In [21]:
dask_df_merged['session_id'].nunique().compute()

  df = reader(bio, **kwargs)


ValueError: Mismatched dtypes found in `pd.read_csv`/`pd.read_table`.

+-------------+--------+----------+
| Column      | Found  | Expected |
+-------------+--------+----------+
| hour_of_day | object | int64    |
+-------------+--------+----------+

The following columns also raised exceptions on conversion:

- hour_of_day
  ValueError("invalid literal for int() with base 10: '10\\x012018-08-14'")

Usually this is due to dask's dtype inference failing, and
*may* be fixed by specifying dtypes manually by adding:

dtype={'hour_of_day': 'object'}

to the call to `read_csv`/`read_table`.

In [19]:
from functions import cal_similarMat

In [None]:
sim_output = cal_similarMat(dask_df_merged)

In [None]:
def cal_similarMat(df_train):
    import numpy as np
    import pandas as pd
    
    
    sessions = list(np.sort(df_train['session_id'].unique())) 
    tracks = list(df_train['clus'].unique()) 
    no_skip_2 = (list(df_train['skip_2']==False))*1 # use *1 to convert bool to integer
    
    DfSessionUnique = []
    DfSessionUnique = pd.DataFrame(sessions,columns=['sessions'])
    
    from scipy import sparse
    from pandas.api.types import CategoricalDtype

    rows = df_train['session_id'].astype(CategoricalDtype(categories=sessions)).cat.codes # unique sessions (index)

    # Get the associated row indices
    cols = df_train['clus'].astype(CategoricalDtype(categories=tracks)).cat.codes # unique tracks (column)
    
    
    # Get the associated column indices
    #Compressed Sparse Row matrix
    listeningSparse = []
    listeningSparse = sparse.csr_matrix((no_skip_2, (rows, cols)), shape=(len(sessions), len(tracks)))
    #csr_matrix((data, (row_ind, col_ind)), [shape=(M, N)])
    #where data, row_ind and col_ind satisfy the relationship a[row_ind[k], col_ind[k]] = data[k]. , see https://docs.scipy.org/doc/scipy/reference/generated/scipy.sparse.csr_matrix.html

    listeningSparse
    #a sparse matrix is not a pandas dataframe, but sparse matrices are efficient for row slicing and fast matrix vector products
    
    
    DataBinary = df_train.copy()
    DataBinary['ListenYes'] = (DataBinary['skip_2'] == False)*1
    
    data2=DataBinary[['session_id','clus','ListenYes']]

    data2['ListenYes'].replace(0, -1, inplace = True)

    data3 = data2.groupby(['session_id', 'clus']).agg({'ListenYes':['sum']})
    data3 = data3.reset_index()
    data3.columns = data3.columns.droplevel(level = 1) # take out the unwanted level
    
    
    DfMatrix = pd.pivot_table(data3, values='ListenYes', index='session_id', columns='clus')

    DfMatrix=DfMatrix.fillna(0) #NaN values need to get replaced by 0, meaning they have not been listened yet.
    
    DfResetted = DfMatrix.reset_index().rename_axis(None, axis=1) 

    DfTracksListen = DfResetted.drop(columns=['session_id'])

    #Normalization
    import numpy as np
    DfTracksListenNorm = DfTracksListen / np.sqrt(np.square(DfTracksListen).sum(axis=0)) 

    #### similarity and correlation
    # Calculating with Vectors to compute Cosine Similarities
    TrackTrackSim = DfTracksListenNorm.transpose().dot(DfTracksListenNorm) 

    #Another approach to the above would be using correlation
    TrackTrackCorr = DfTracksListenNorm.corr()
    
    #Spearman correlation
    TrackTrackSpearCorr = DfTracksListenNorm.corr(method = 'spearman')
    
    #Kendall correlation
    TrackTrackKendCorr = DfTracksListenNorm.corr(method = 'kendall')
    
    from scipy.spatial.distance import cdist
    
    #### distances
    # Euclidean distance
    TrackTrackEuclDist = pd.DataFrame(cdist(DfTracksListenNorm.T,DfTracksListenNorm.T, 'euclidean'), index = TrackTrackSim.index, columns = TrackTrackSim.columns)

    # Squared Euclidean distance
    TrackTrackSqEuclDist = pd.DataFrame(cdist(DfTracksListenNorm.T,DfTracksListenNorm.T, 'sqeuclidean'), index = TrackTrackSim.index, columns = TrackTrackSim.columns)
    
    # Manhattan distance
    TrackTrackManhDist = pd.DataFrame(cdist(DfTracksListenNorm.T,DfTracksListenNorm.T, 'cityblock'), index = TrackTrackSim.index, columns = TrackTrackSim.columns)

    # Canberra distance
    TrackTrackCanbDist = pd.DataFrame(cdist(DfTracksListenNorm.T,DfTracksListenNorm.T, 'canberra'), index = TrackTrackSim.index, columns = TrackTrackSim.columns)

    #### boolean distances
    # Hamming distance
    TrackTrackHammDist = pd.DataFrame(cdist(DfTracksListenNorm.T>0,DfTracksListenNorm.T>0, 'hamming'), index = TrackTrackSim.index, columns = TrackTrackSim.columns)

 

    # Create a place holder matrix for similarities, and fill in the session column
    SessTrackSimilarity = pd.DataFrame(index=DfResetted.index, columns=DfResetted.columns)
    SessTrackSimilarity.iloc[:,:1] = DfResetted.iloc[:,:1]
    SessTrackCorrelation = SessTrackSimilarity.copy()
    SessTrackSpearCorr = SessTrackSimilarity.copy()
    SessTrackKendCorr = SessTrackSimilarity.copy()
    SessTrackEuclDist = SessTrackSimilarity.copy()
    SessTrackSqEuclDist = SessTrackSimilarity.copy()
    SessTrackManhDist = SessTrackSimilarity.copy()
    SessTrackCanbDist = SessTrackSimilarity.copy()
    SessTrackHammDist = SessTrackSimilarity.copy()

    #We now loop through the rows and columns filling in empty spaces with similarity scores.
    
    SessionListening = []
    TrackTopSimilarity = []

    for i in range(0,len(SessTrackSimilarity.index)):
        for j in range(1,len(SessTrackSimilarity.columns)):

            ses = SessTrackSimilarity.index[i]
            tra = SessTrackSimilarity.columns[j]

            SessionListening = DfTracksListen.loc[ses,]
            TrackSimilarity = TrackTrackSim[tra]
            TrackCorrelation = TrackTrackCorr[tra]
            TrackSpearCorr = TrackTrackSpearCorr[tra]
            TrackKendCorr = TrackTrackKendCorr[tra]
            TrackEuclDist = TrackTrackEuclDist[tra]
            TrackSqEuclDist = TrackTrackSqEuclDist[tra]
            TrackManhDist = TrackTrackManhDist[tra]
            TrackCanbDist = TrackTrackCanbDist[tra]
            TrackHammDist = TrackTrackHammDist[tra]

            SessTrackSimilarity.loc[i][j] = sum(SessionListening*TrackSimilarity)/sum(TrackSimilarity)
            SessTrackCorrelation.loc[i][j] = sum(SessionListening*TrackCorrelation)/sum(TrackCorrelation)
            SessTrackSpearCorr.loc[i][j] = sum(SessionListening*TrackSpearCorr)/sum(TrackSpearCorr)
            SessTrackKendCorr.loc[i][j] = sum(SessionListening*TrackKendCorr)/sum(TrackKendCorr)
            SessTrackEuclDist.loc[i][j] = sum(SessionListening*TrackEuclDist)/sum(TrackEuclDist)
            SessTrackSqEuclDist.loc[i][j] = sum(SessionListening*TrackSqEuclDist)/sum(TrackSqEuclDist)
            SessTrackManhDist.loc[i][j] = sum(SessionListening*TrackManhDist)/sum(TrackManhDist)
            SessTrackCanbDist.loc[i][j] = sum(SessionListening*TrackCanbDist)/sum(TrackCanbDist)
            SessTrackHammDist.loc[i][j] = sum(SessionListening*TrackHammDist)/sum(TrackHammDist)

    
    SessTrackSimilarity.set_index('session_id', inplace = True)
    SessTrackCorrelation.set_index('session_id', inplace = True)
    SessTrackSpearCorr.set_index('session_id', inplace = True)
    SessTrackKendCorr.set_index('session_id', inplace = True)
    SessTrackEuclDist.set_index('session_id', inplace = True)
    SessTrackSqEuclDist.set_index('session_id', inplace = True)
    SessTrackManhDist.set_index('session_id', inplace = True)
    SessTrackCanbDist.set_index('session_id', inplace = True)
    SessTrackHammDist.set_index('session_id', inplace = True)
    
    
    sim_output = [SessTrackSimilarity, SessTrackCorrelation, SessTrackSpearCorr, SessTrackKendCorr, SessTrackEuclDist, SessTrackSqEuclDist, SessTrackManhDist, SessTrackCanbDist, SessTrackHammDist]
    
    
    return sim_output