# RNN Model

## Install Packages

In [10]:
# Install your required packages here
!pip install pandas numpy matplotlib sklearn fsspec gcsfs tqdm tensorflow

Collecting tensorflow
  Downloading tensorflow-2.3.1-cp37-cp37m-manylinux2010_x86_64.whl (320.4 MB)
[K     |████████████████████████████████| 320.4 MB 17 kB/s /s eta 0:00:01
Collecting gast==0.3.3
  Downloading gast-0.3.3-py2.py3-none-any.whl (9.7 kB)
Collecting keras-preprocessing<1.2,>=1.1.1
  Downloading Keras_Preprocessing-1.1.2-py2.py3-none-any.whl (42 kB)
[K     |████████████████████████████████| 42 kB 774 kB/s  eta 0:00:01
[?25hCollecting google-pasta>=0.1.8
  Downloading google_pasta-0.2.0-py3-none-any.whl (57 kB)
[K     |████████████████████████████████| 57 kB 5.9 MB/s  eta 0:00:01
[?25hCollecting tensorflow-estimator<2.4.0,>=2.3.0
  Downloading tensorflow_estimator-2.3.0-py2.py3-none-any.whl (459 kB)
[K     |████████████████████████████████| 459 kB 51.0 MB/s eta 0:00:01
Collecting astunparse==1.6.3
  Downloading astunparse-1.6.3-py2.py3-none-any.whl (12 kB)
Collecting opt-einsum>=2.3.2
  Downloading opt_einsum-3.3.0-py3-none-any.whl (65 kB)
[K     |████████████████████

## Imports & Constants

In [14]:
#Import the libraries for RNN LSTM
import numpy as np
import pandas as pd
import math

import sklearn
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler
from sklearn import preprocessing
from sklearn.model_selection import KFold
from sklearn.model_selection import train_test_split

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.python.keras.utils.vis_utils import plot_model

from glob import glob
from tqdm.notebook import tqdm

from google.cloud import storage

In [15]:
# Commented out IPython magic to ensure Python compatibility.
%env GOOGLE_APPLICATION_CREDENTIALS=ai-project-2020-f4dfbc25326c.json

env: GOOGLE_APPLICATION_CREDENTIALS=ai-project-2020-f4dfbc25326c.json


In [16]:
bucket_name = "ai-project-2020-spotify"
client = storage.Client()
bucket = client.get_bucket(bucket_name)

## Utility Functions

In [17]:
def ave_pre(submission, groundtruth):
    """ Calculate average accuracy (which is the same as average precision in this context) """
    s = 0.0
    t = 0.0
    c = 1.0
    for x, y in zip(submission, groundtruth):
        if x != 0 and x != 1:
            raise ValueError()
        if x == y:
            s += 1.0
            t += s / c
        c += 1
    return t / len(groundtruth)

def evaluate(submission, groundtruth):
    """ Calculate metrics for prediction and ground thruth lists (source: starter kit) """
    ap_sum = 0.0
    first_pred_acc_sum = 0.0
    counter = 0
    for sub, tru in zip(submission, groundtruth):
        # if len(sub) != len(tru):
        #     raise Exception('Line {} should contain {} predictions, but instead contains '
        #                     '{}'.format(counter + 1, len(tru), len(sub)))
        try:
            ap_sum += ave_pre(sub, tru)
        except ValueError as e:
            raise ValueError('Invalid prediction in line {}, should be 0 or 1'.format(counter))
        first_pred_acc_sum += sub[0] == tru[0]
        counter += 1
    ap = ap_sum / counter
    first_pred_acc = first_pred_acc_sum / counter
    return ap, first_pred_acc

def normalize(df,feature_name):
    result = df.copy()
    for name in feature_name:
        max_value = df[name].max()
        min_value = df[name].min()
        result[name] = (df[name] - min_value) / (max_value - min_value)
    return result

def categorical_to_dummies(df, categorical_cols):
    """ Create dummies (one hot encoding) for each categorical variables """
    dummies = pd.get_dummies(df[categorical_cols], prefix=categorical_cols)
    return df.drop(columns=categorical_cols).join(dummies)

def split_sessions(data, perc_in=0.6):
    """ Split interactions into train and test sessions. """
    sessions = data['session_id'].unique()
    amt_in = int(perc_in * len(sessions))
    sessions_in = np.random.choice(sessions, amt_in, replace=False)
    sessions_out = np.array(list(set(sessions) - set(sessions_in)))
    indexed_data = data.set_index('session_id')
    data_in = indexed_data.loc[sessions_in]
    data_out = indexed_data.loc[sessions_out]
    return data_in, data_out

## Import Session Logs

In [18]:
# Cloud bucket contains larger datasets:
files = []
train_files = list(bucket.list_blobs(prefix='training_set/'))
for blob in [blob for blob in train_files if '20180715' in blob.name]:
  files.append(f"gs://{bucket_name}/"+blob.name)
  print(blob.name)
print(files)

training_set/log_0_20180715_000000000000.csv.gz
training_set/log_1_20180715_000000000000.csv.gz
training_set/log_2_20180715_000000000000.csv.gz
training_set/log_3_20180715_000000000000.csv.gz
training_set/log_4_20180715_000000000000.csv.gz
training_set/log_5_20180715_000000000000.csv.gz
training_set/log_6_20180715_000000000000.csv.gz
training_set/log_7_20180715_000000000000.csv.gz
['gs://ai-project-2020-spotify/training_set/log_0_20180715_000000000000.csv.gz', 'gs://ai-project-2020-spotify/training_set/log_1_20180715_000000000000.csv.gz', 'gs://ai-project-2020-spotify/training_set/log_2_20180715_000000000000.csv.gz', 'gs://ai-project-2020-spotify/training_set/log_3_20180715_000000000000.csv.gz', 'gs://ai-project-2020-spotify/training_set/log_4_20180715_000000000000.csv.gz', 'gs://ai-project-2020-spotify/training_set/log_5_20180715_000000000000.csv.gz', 'gs://ai-project-2020-spotify/training_set/log_6_20180715_000000000000.csv.gz', 'gs://ai-project-2020-spotify/training_set/log_7_201807

In [19]:
# Cloud bucket contains larger datasets:
logs = pd.read_csv(f"gs://{bucket_name}/training_set/log_0_20180715_000000000000.csv.gz")

In [20]:
# remove date for convenience (could encode this as well)
logs.drop(columns=['date'], inplace=True)

# Create dummies (one hot encoding) for each categorical variable in logs
categorical_cols = ['context_type', 'hist_user_behavior_reason_start', 'hist_user_behavior_reason_end']
logs = categorical_to_dummies(logs, categorical_cols)
print(logs.shape)

(2990609, 44)


## Import Track Features

In [21]:
track_features_1 = pd.read_csv(f"gs://{bucket_name}/track_features/tf_000000000000.csv").set_index('track_id')
track_features_2 = pd.read_csv(f"gs://{bucket_name}/track_features/tf_000000000001.csv").set_index('track_id')
track_features = track_features_1.append(track_features_2)

In [22]:
track_features = categorical_to_dummies(track_features, ['mode'])

## Determine Model Shape


In [23]:
def find_cat_user_behavior(files):
  ''' Will find all categorical values for the columns 'hist_user_behavior_reason_start', 'hist_user_behavior_reason_end', 'context_type'.
  args: 
    files: list of csv files to get categorical values from
  returns:
    hist_user_behavior_reason_end_cat: name of categorical columns for hist_user_behavior_reason_end
    hist_user_behavior_reason_cat: name of categorical columns for hist_user_behavior_reason_start
    context_type_cat: name of categorical columns for context_type
  '''
  iterator_generator = (pd.read_csv(f, usecols=['hist_user_behavior_reason_start', 'hist_user_behavior_reason_end', 'context_type']) for f in files)
  hist_user_behavior_reason_cat = set()
  hist_user_behavior_reason_end_cat = set()
  context_type_cat = set()
  for iterator in iterator_generator:
    for col in iterator['hist_user_behavior_reason_start'].unique():
      hist_user_behavior_reason_cat.add('hist_user_behavior_reason_start_'+col)
    for col in iterator['hist_user_behavior_reason_end'].unique():
      hist_user_behavior_reason_end_cat.add('hist_user_behavior_reason_end_'+col)
    for col in iterator['context_type'].unique():
        context_type_cat.add('context_type_'+col)
  return hist_user_behavior_reason_end_cat, hist_user_behavior_reason_cat, context_type_cat

In [24]:
hist_user_behavior_reason_end_cat, hist_user_behavior_reason_cat, context_type_cat = find_cat_user_behavior(files)

In [25]:
# All track feature columns:
track_features_columns = set(track_features.columns.tolist())
# All log columns:
logs = pd.read_csv(files[0])
log_columns = set(logs.columns.tolist())
log_columns = hist_user_behavior_reason_end_cat.union(hist_user_behavior_reason_cat).union(context_type_cat).union(log_columns)
# Columns we won't input into the model but that are present in the logs:
unwanted_columns = {'session_id','session_position', 'session_length', 'track_id_clean', 'date',
       'skip_1', 'skip_2', 'skip_3', 'not_skipped', 'context_type', 'hist_user_behavior_reason_start', 'hist_user_behavior_reason_end'}
# Columns you have access to in first part of session: all track feature columns + log columns - unwanted_columns
first_part_session_columns = list(track_features_columns.union(log_columns).difference(unwanted_columns))
# Columns you have access to in second part of session: all track feature columns
second_part_session_columns = list(track_features_columns)

In [26]:
print(first_part_session_columns)

['hist_user_behavior_reason_end_clickrow', 'mechanism', 'context_type_catalog', 'duration', 'hour_of_day', 'context_type_charts', 'hist_user_behavior_reason_end_clickside', 'context_type_user_collection', 'acousticness', 'energy', 'beat_strength', 'no_pause_before_play', 'hist_user_behavior_is_shuffle', 'hist_user_behavior_reason_end_backbtn', 'hist_user_behavior_reason_end_endplay', 'hist_user_behavior_reason_start_playbtn', 'context_type_personalized_playlist', 'acoustic_vector_0', 'hist_user_behavior_reason_start_trackdone', 'hist_user_behavior_reason_start_endplay', 'hist_user_behavior_reason_start_clickside', 'hist_user_behavior_reason_start_trackerror', 'bounciness', 'organism', 'context_type_editorial_playlist', 'hist_user_behavior_reason_end_remote', 'mode_major', 'release_year', 'flatness', 'hist_user_behavior_reason_start_uriopen', 'long_pause_before_play', 'acoustic_vector_1', 'acoustic_vector_3', 'time_signature', 'hist_user_behavior_reason_start_remote', 'hist_user_behavio

In [27]:
print(second_part_session_columns)

['time_signature', 'mechanism', 'duration', 'acousticness', 'energy', 'beat_strength', 'valence', 'danceability', 'mode_minor', 'acoustic_vector_7', 'key', 'acoustic_vector_4', 'dyn_range_mean', 'acoustic_vector_0', 'speechiness', 'acoustic_vector_5', 'bounciness', 'organism', 'liveness', 'release_year', 'instrumentalness', 'acoustic_vector_2', 'mode_major', 'flatness', 'loudness', 'acoustic_vector_6', 'acoustic_vector_1', 'us_popularity_estimate', 'acoustic_vector_3', 'tempo']


In [28]:
len(first_part_session_columns)

68

In [29]:
len(second_part_session_columns)

30

## Data Wrangling Functions

In [30]:
def create_matrix(data):
    # print(" ## Splitting into First and Second Part")
    data1 = data[data.session_position <= (data.session_length / 2)]
    data2 = data[data.session_position > (data.session_length / 2)]
    
    # Split into first and second part
    start_sessions = data1.groupby("session_id")
    end_sessions = data2.groupby("session_id")
    
    X1, X2, y = [], [], []
    
    # For first part
    # print(" ## Processing First Part of Sessions")
    for session_id in start_sessions.groups.keys():
        # Get columns based on group
        session = start_sessions.get_group(session_id)[first_part_session_columns + ["skip_2"]]
        # Set skip_2
        session["skip_2"] = session["skip_2"] * 1
        x = session.to_numpy()
        # Padding operation
        X1.append(np.pad(x, ((0, 10 - len(x)), (0, 0)), 'constant', constant_values=(0)))
    
    # For second part
    # print(" ## Processing Second Part of Sessions")
    for session_id in end_sessions.groups.keys():
        session = end_sessions.get_group(session_id)[second_part_session_columns + ["skip_2"]]
        # Set 
        true_y = session["skip_2"].to_numpy() * 1
        # remove skip_2 from second part
        del session["skip_2"]
        x = session.to_numpy()
        X2.append(np.pad(x, ((0, 10 - len(x)), (0, 0)), 'constant', constant_values=(0)))
        y.append(np.pad(true_y, (0, 10 - len(true_y)), 'constant', constant_values=(0)))
    
    X1 = np.array(X1)
    X2 = np.array(X2)
    y = np.array(y)
    return X1,X2,y

In [31]:
def logs_categorical_columns_adder(data):
  ''' 
  Some chunks don't have the same categorical values for hist_user_behavior_reason_end and hist_user_behavior_reason_cat. 
  This function will add empty one-hot-encoded columns to dataframes.

  args:
    data: dataframe to add categorical values to.

  returns:
    data: dataframe with new empty one-hot-encoded columns.
  '''
  col_to_add = hist_user_behavior_reason_end_cat.union(hist_user_behavior_reason_cat).difference(set(data.columns))
  for col in col_to_add:
    data[col] = 0
  return data

In [32]:
def logs_cleaning(data):
  ''' Cleans data. 
  args:
    data: dataframe to clean.
  returns:
    data: cleaned dataframe.
  '''
  # remove date for convenience (could encode this as well)
  data.drop(columns=['date'], inplace=True)
  # Create dummies (one hot encoding) for each categorical variable in logs
  categorical_cols = ['context_type', 'hist_user_behavior_reason_start', 'hist_user_behavior_reason_end']
  data = categorical_to_dummies(data, categorical_cols)

  # Convert booleans to ints
  data['premium'] = data['premium']*1
  data['hist_user_behavior_is_shuffle'] = data['hist_user_behavior_is_shuffle']*1
  data['skip_1'] = data['skip_1']*1
  data['skip_2'] = data['skip_2']*1
  data['skip_3'] = data['skip_3']*1

  # Normalize
  feature_name = ['duration',
  'release_year',
  'us_popularity_estimate',
  'flatness',
  'loudness',
  'tempo',
  'acoustic_vector_0',
  'acoustic_vector_1',
  'acoustic_vector_2',
  'acoustic_vector_3',
  'acoustic_vector_4',
  'acoustic_vector_5',
  'acoustic_vector_6',
  'acoustic_vector_7',
  'key']

  data = normalize(data, feature_name)
  return data

In [33]:
def logs_feature_joining(data):
  ''' Joins a chunk of data from the session logs with the track features. 
  args:
    data: dataframe to join features to.
  returns:
    data: dataframe including logs and track feature columns.
  '''
  data = data.join(track_features, on='track_id_clean', how='left')
  return data

In [34]:
def data_generator(files, chunksize=1000):
  ''' Will infinitely generate chunks of data from all csv files.
  args:
    files: list of path names to CSV files holding session logs.
    chunksize: CSV files will be read in chunks of size chunksize.

  Note:
    Chunk size isn't uniform, will only return full sessions.
    So, one chunk might be of length 997, next might be 1005.
    Reason being that chunks can split a session in two, this is unwanted behavior.
  '''
  assert isinstance(files, list), "files argument should be list of paths"
  while True:
    iterator_generator = (pd.read_csv(f, iterator=True, chunksize=chunksize) for f in files)
    dummy = pd.DataFrame()
    for iterator in iterator_generator:
      # print('\n### Opened new file')
      for chunk in iterator:
        # Get position of last row element
        last_position = chunk.iloc[-1]['session_position']
        # Slice last session
        last_session = chunk.iloc[-last_position:]
        # Drop last session from current chunk
        chunk.drop(last_session.index, inplace=True)
        # Append chunk to previous last session (to get a full session)
        dummy = dummy.append(chunk)
        yield dummy
        # Assign this last session to dummy for next session
        dummy = last_session
  print('\n### Processed all Files')

In [35]:
def batch(iterables_list, batch_size=1):
  ''' Will return batches of a iterables.
  args:
    iterables_list: list of iterables to return batches from.
    batch_size: size of batch.
  
  Note:
    Will always return a batch of batch_size, even if iterables list is smaller than batch_size.
    Does this by mean of addings duplicates.
  '''
  if iterables_list[0].shape[0] != iterables_list[1].shape[0] != iterables_list[2].shape[0]:
    raise ValueError
  l = len(iterables_list[0])
  for ndx in range(0, l, batch_size):
      if min(ndx + batch_size, l) == l:
        yield iterables_list[0][l-batch_size: l], iterables_list[1][l-batch_size: l], iterables_list[2][l-batch_size: l]
      else: 
        yield iterables_list[0][ndx:min(ndx + batch_size, l)], iterables_list[1][ndx:min(ndx + batch_size, l)], iterables_list[2][ndx:min(ndx + batch_size, l)]

In [36]:
from sklearn.metrics.pairwise import cosine_similarity

In [37]:
def cosineSimilarity(data):
  ''' Will calculate the cosine similarity between 
  1. skipped songs and current song
  2. songs in the second half of the data and current song '''

  skipped_songs_data = data[data['skip_2'] == 1]
  second_half_songs_data = data[data['session_position'] > 0.5 * data['session_length']]

  # Option 1
  cosine_distance_columns = ['duration', 'release_year', 'us_popularity_estimate', 'acousticness',
       'beat_strength', 'bounciness', 'danceability', 'dyn_range_mean',
       'energy', 'flatness', 'instrumentalness', 'key', 'liveness', 'loudness',
       'mechanism', 'organism', 'speechiness', 'tempo', 'time_signature',
       'valence', 'acoustic_vector_0', 'acoustic_vector_1',
       'acoustic_vector_2', 'acoustic_vector_3', 'acoustic_vector_4',
       'acoustic_vector_5', 'acoustic_vector_6', 'acoustic_vector_7']
  
  # Option 2
  #cosine_distance_columns = ['acoustic_vector_0', 'acoustic_vector_1',
  #     'acoustic_vector_2', 'acoustic_vector_3', 'acoustic_vector_4',
  #     'acoustic_vector_5', 'acoustic_vector_6', 'acoustic_vector_7']

  # Option 3
  # cosine_distance_columns = ['acousticness',
  #     'beat_strength', 'bounciness', 'danceability', 'dyn_range_mean',
  #     'energy', 'flatness', 'instrumentalness', 'key', 'liveness', 'loudness',
  #     'mechanism', 'organism', 'speechiness', 'tempo', 'time_signature',
  #     'valence']
  
  #skipped_songs_data[cosine_distance_columns]
  #second_half_songs_data[cosine_distance_columns]

  mean_skipped_song = skipped_songs_data[cosine_distance_columns].mean().tolist()
  mean_second_half_songs = second_half_songs_data[cosine_distance_columns].mean().tolist()

  tqdm.pandas()

  data['similarity_mean_skipped_song'] = data[cosine_distance_columns].apply(lambda x: cosine_similarity([mean_skipped_song], [x.tolist()])[0][0], axis=1)

  data['similarity_mean_second_half_songs'] = data[cosine_distance_columns].apply(lambda x: cosine_similarity([mean_second_half_songs], [x.tolist()])[0][0], axis=1)

  return data

In [38]:
def data_processor(files, batchsize=100, chunksize=10000):
  ''' Generator: will clean, join,... each chunk.
  args:
    files: list of path names to CSV files holding session logs.
    chunksize: CSV files will be read in chunks of size chunksize.
    batch_size: size of batch to yield.
  
  yields: tuple of (X1_batch, X2_batch), y_batch. Where y_batch are target values.
    X1_batch, X2_batch are the first and second session part with size = batch_size.
  '''
  generator = data_generator(files, chunksize)
  amount_of_sessions = 0
  
  for chunk in generator:
    amount_of_sessions += chunk['session_id'].nunique()
    chunk = logs_feature_joining(chunk)
    chunk = logs_cleaning(chunk)
    chunk = logs_categorical_columns_adder(chunk)
    chunk = cosineSimilarity(chunk)
    X1, X2, y = create_matrix(chunk)
    for X1_batch, X2_batch, y_batch in batch([X1,X2,y], batchsize):
          yield (X1_batch, X2_batch), y_batch

In [39]:
# Pre-processing data
def data_pre_processor(files, filenameX1, filenameX2, filenamey, batchsize=100, chunksize=20000, delete=False):
  ''' Pipeline: will clean, join,... each chunk and save to csv.
  args:
    files (list): list of path names to CSV files holding session logs.
    chunksize (int): CSV files will be read in chunks of size chunksize.
    filename (string): path to save file to.
    delete (bool): delete file at filename first or not.
  '''
  generator = data_generator(files, chunksize)
  amount_of_sessions = 0
  header = True
  if delete:
    os.remove(filename)
  for chunk in generator:
    amount_of_sessions += chunk['session_id'].nunique()
    chunk = logs_feature_joining(chunk)
    chunk = logs_cleaning(chunk)
    chunk = logs_categorical_columns_adder(chunk)
    chunk = cosineSimilarity(chunk)
    chunk.to_csv(filename, header=True, mode='a')
    header = False

In [40]:
def X_y(data, batchsize = 100):
  X1, X2, y = create_matrix(data)
  for X1_batch, X2_batch, y_batch in batch([X1,X2,y], batchsize):
        yield (X1_batch, X2_batch), y_batch

## Model

In [41]:
# Network 1 - First half of the session
input_layer1 = layers.Input(shape=(10, len(first_part_session_columns)+1))

# Normalisation
norm1 = layers.BatchNormalization()(input_layer1)

# Recurrent layer(s)
lstm1 = layers.Bidirectional(layers.GRU(25, return_sequences=False, input_shape=(10, len(first_part_session_columns)+1)))(norm1)

# Network 2 - Second half of the session
input_layer2 = layers.Input(shape=(10, len(second_part_session_columns)))

# Normalisation
norm2 = layers.BatchNormalization()(input_layer2)

# Recurrent layer(s)
lstm2 = layers.Bidirectional(layers.GRU(25, return_sequences=False, input_shape=(10, len(second_part_session_columns))))(norm2)

# Concatenation & dense layer
concat = layers.Concatenate()([lstm1, lstm2])
dense_last = layers.Dense(10, activation="relu")(concat)

In [42]:
def calculate_steps_per_epoch(files, batchsize):
  # Calculates lower limit of how many batches there will be.
  iterator_generator = (pd.read_csv(f, usecols=['session_id']) for f in files)
  unique_session_count = 0
  for iterator in iterator_generator:
    unique_session_count += iterator['session_id'].nunique()
  steps_per_epoch = unique_session_count / batchsize
  return int(steps_per_epoch)

In [43]:
steps = calculate_steps_per_epoch(files, 100)
print(steps)

14232


In [None]:
# Compile
lossf = keras.losses.MeanAbsoluteError()
model = keras.Model(inputs=[input_layer1, input_layer2], outputs=[dense_last])
model.compile(optimizer='adam', loss=lossf, metrics=["acc"])

processor = data_processor(files, batchsize=100, chunksize=10000)
history = model.fit(processor, epochs=5, steps_per_epoch=steps)

  from pandas import Panel


Epoch 1/5
    4/14232 [..............................] - ETA: 3:04 - loss: 0.4283 - acc: 0.0950

  from pandas import Panel


   13/14232 [..............................] - ETA: 3:13:39 - loss: 0.4173 - acc: 0.1862

  from pandas import Panel


   18/14232 [..............................] - ETA: 4:35:48 - loss: 0.4124 - acc: 0.2261

  from pandas import Panel


   23/14232 [..............................] - ETA: 5:22:02 - loss: 0.4148 - acc: 0.2339

  from pandas import Panel


   30/14232 [..............................] - ETA: 5:29:46 - loss: 0.4096 - acc: 0.2393

  from pandas import Panel


   35/14232 [..............................] - ETA: 5:53:05 - loss: 0.4060 - acc: 0.2374

## Evaluation

In [None]:
# Predict
steps = calculate_steps_per_epoch([files[7]], 100)
processor = data_processor([files[7]], batchsize=100, chunksize=10000)

truth = []
predictions = []

for step in tqdm(range(steps)):
  X_batches, y_batch = processor.__next__()
  prediction = model.predict(X_batches)
  predictions.extend(prediction)
  truth.extend(y_batch)

In [None]:
evaluate([[i >= 0.5 for i in p] for p in predictions], truth)