In [1]:
import pandas as pd
import csv
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import pyspark as ps
import os
from pyspark import SparkConf
from pyspark.sql import SparkSession
import time
from tqdm.notebook import tqdm
import pyspark.sql.functions as f

Loads Each of the required files for a given dataset.

In [2]:
# Creates a pyspark session to load the data with,
spark = SparkSession.builder.master("local").config(conf=SparkConf()).getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/08/18 23:59:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/08/18 23:59:31 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


Loads the files using pyspark (for speed and efficiency), extracting the relevant data from each record per user.

In [3]:
def load_file(file_path):
    """
    Loads one file from the MLHD dataset.
    
    Input:
        file_path; string
    Return:
        user_without_null; spark dataframe, the file where null values have been droped.
    """
    user = spark.read.format("text").load(file_path)

    # Loads files with column names
    user_with_col =user.withColumn("timestamp",f.split(user.value,"\s+")[0].cast("int")).\
                                     withColumn("artist-MBID",f.split(user.value,"\s+")[1].cast("string")).\
                                     withColumn("release-MBID",f.split(user.value,"\s+")[2].cast("string")).\
                                     withColumn("recording-MBID",f.split(user.value,"\s+")[3].cast("string")) 
    
    # Drops rows with empty values.
    user_without_null = user_with_col.drop()
    
    # Checks if the file is empty
    if user_without_null.count() == 0:
        print("NONE")
        return(None)
    else:
        return(user_without_null)

In [4]:
def get_total_listens(df):
    return(df.count())

def get_unique_songs(df):
    return(df.select(df["recording-MBID"]).distinct().count())

def get_max_repeated(df):
    return(df.groupBy(df["recording-MBID"])\
        .count()\
        .where(f.col('count') > 1)\
        .select(f.max('count'))\
        .first()[0])
    
    

def get_exploratoryness(df):
    L = df.count()
    distinct = df.groupBy("recording-MBID").count().toPandas()
    return(1-((1/L)*np.sum((1/distinct["count"]))))

In [5]:
def extract_information(df):
    """
    Extracts the basic stats for each user or the total dataset.
    
    Input 
        df: Dataframe, the test dataset
    Output
        dictionary {string, value}, the stats obtained from the dataset.
    """
    total_listens = total_listens(df)

    unique_songs = get_unique_songs(df)

    repeated_songs = get_max_repeated(df)

    exploratoryness = get_exploratoryness(df)

    return({"total_listens": total_listens, "unique_songs": unique_songs, "repeated_songs": repeated_songs, "exploratoryness" : exploratoryness})

## Song Count Dataframe

In [6]:
def get_song_count(df):
    """
    Calculates how many times a user listens a track
    
    Input:
        df; Dataframe, the user dataset
    Output
        grouped df, the number of times a track is listened to by the user.
    """
    return(df.groupBy('recording-MBID').count())

In [1]:
def load_user_features (directory):
    """
    Loads in users with feature extraction for EDA.
    
    Input
        directory; string, the file path for the directory of users.
    Output
        df; Dataframe, dataframe consisting of each users' features  
    """

    df = pd.DataFrame(columns = ["n_listens", "n_unique", "n_same", "exploratoryness", "mainsteamness"])
    i = 0
    # loop through each external file
    for ex_directory in os.listdir(directory):
        if ".tar" in ex_directory or ".DS_Store" in ex_directory:
            print(ex_directory)
        else:
            i+=1
            # Key stats recorded.
            inter_df = pd.DataFrame(columns = ["n_listens", "n_unique", "n_same", "exploratoryness"])
            print("Processing file: " + ex_directory + "    file number: " + str(i))
            #Loop through each internal file
            print("Internal Files: ", len(os.listdir(directory + "/" + ex_directory)))
            for file in os.listdir(directory + "/" + ex_directory):
                if file.endswith(".txt.gz"):
                    file_path = directory + "/" + ex_directory +"/" +file
                    uuid = file.strip(".txt.gz")
                    # Files loaded using spark to reduce time cost.
                    spark_file = load_file(file_path)
                    if spark_file != None:
                        # Extracts desired information about each user.
                        entry = extract_information(spark_file)
                        inter_df.loc[uuid] = entry
            
            df = pd.concat([df,inter_df])

    # Saves the dataframe as user_features
    df.to_csv("../../user_features.csv")
    return(df)

In [None]:
def get_value_count (users, file_number):
    """
    Loads files, while calculating the number of times a track is listened to by a user. 
    This is the desired file structure for the recommendation system
    
    Input
        users; pandas series, a list of whitelisted users.
        file_number; int, the file buffer refering to the next file to load in.
    Output
        value_count; Dataframe, the grouped data structure used for the RS model
        bool; whether the final file has been downloaded
        file_number; int, the current number refering to the current file downloaded
    """
    
    directory = '../dirty_data/MLHD-SS-00'

    count = pd.DataFrame(columns = ["uuid", "recording-MBID", "count", "artistName"])
    
    files = []
        
    # Loop through each file directory based on the file buffer (filenumber)
    for ex_directory in os.listdir(directory)[file_number:]:
        file_number += 1
        if ".tar" in ex_directory or ".DS_Store" in ex_directory:
            print(ex_directory)
        else:            
             for file in (os.listdir(directory+"/"+ex_directory)):
                if file.endswith(".txt.gz"):
                    file_path = directory + "/"+ex_directory+ "/" +file
                    uuid = file.strip(".txt.gz")
                    if uuid in users.index:
                        # Load data using spark.
                        spark_file = load_file(file_path)
                        if spark_file != None:                     
                            # Group by the the recording-MBID to calculate number of times a track is listened to,
                            df = (spark_file.groupBy('recording-MBID','artist-MBID').count().toPandas())
                            df = df[df['recording-MBID'] != ""]
                            # Remove null values.
                            df = df.dropna()
                            # Assign the uuid to the users listening history
                            df['uuid'] = uuid
                            
                            files.append(df)
        # Record in batches of 300 to prevent overloading memory
        if len(files) > 300:
            count = pd.concat(files)
            # if it is not the final file then repeat with new file number
            return(count, False, file_number)
    value_count = pd.concat(files)
    # If it is the final file then the loading has finished
    return(value_count, True, file_number)

In [None]:
def add_track_names(data):
    """
    Adds the track names to the database using the recording mbdump
    
    Input
        data; Dataframe, the dataset implemented in the simulation
    Output
        filtered_data; Dataframe, the dataset with track names added.
    """
    
    # tracks loaded in using spark since its so large
    tracks = spark.read.csv("../../dirty_data/mbdump/mbdump/recording", sep = '\t')
    tracks = tracks.withColumnRenamed("_c1","recordingMBID")\
                   .withColumnRenamed("_c2","trackName")
    tracks = tracks.drop(*['_c0','_c3','_c4','_c5','_c6','_c7','_c8'])
    
    # the data is temporalily stored since its quicker to merge two spark files
    data.to_csv('../../temp/data.csv')
    data_spark = spark.read.option("header", "true").csv("../../temp/data.csv")
    data_spark = data_spark.drop(*["_c0"])
    
    filtered_spark = data_spark.join(tracks, data_spark['recording-MBID'] == tracks["recordingMBID"], 'left')
    
    filtered_spark = filtered_spark.drop(*['recording-MBID'])
    filtered_data = filtered_spark.toPandas()
    
    return(filtered_data)

In [None]:
def convert_to_int_ID(df, user_buffer, track_buffer):
    """
    Converts the dataframe's string based ID;s for users and recordings.
    
    Input
        df; Dataframe, the dataset implemented in the simulation
        user_buffer; int, buffer added since data is loaded in batches
        track_buffer; int, buffer added since data is loaded in batches
    Output
        df; Dataframe, the updated dataframe
        user_buffer; int, the updated buffer
        track_buffer; int, the updated buffer
    
    """
    
    df = df.rename(columns = {'recording-MBID':'recordingMBID', 'uuid':'userMBID'})
    
    # Create a dictionary of ints ranging from 0 to length of unique uuids
    item_lookup = pd.Series(dict(enumerate(df.recordingMBID.unique())), name = 'MBID')
    user_lookup = pd.Series(dict(enumerate(df.userMBID.unique())))
    
    # Swap the index and value of the lookups.
    user_lookup_2 = (pd.Series(user_lookup.index.values, index=user_lookup ))
    item_lookup_2 = (pd.Series(item_lookup.index.values, index=item_lookup)) 
    
    # Map the values to dataset.
    df['userID'] = df['userMBID'].map(user_lookup_2)
    df['trackID'] = df['recordingMBID'].map(item_lookup_2)
    
    # Increment each ID by the buffers
    df['userID'] = df["userID"] + user_buffer
    df['trackID'] = df['trackID'] + track_buffer

    # Update buffers
    user_buffer = max(df['userID'])
    track_buffer = max(df['trackID'])
    return(df, user_buffer, track_buffer)

In [None]:
def strip_data(users):
    """
    Returns a list of users that are will the required values as decided by from EDA
    
    Input
        users; Dataframe of users before pruning
    Output
        users; Dataframe of users after preprocessing
    """
    users = users[users['mainstreamness_track'].between(0, 0.28)]
    users = users[users['exploratoryness_track'].between(0.96, 1)]
    users = users[users['age'].between(18, 35)]
    
    return(users)

In [None]:
def whitelists_users():
    """
    Obtains the list of whiteliest users from the mbdump.
    
    Returns:
        whitelisted_users; Pandas series, the datastored on every whitelisted user.
    """
    # Loads the two datasets containing users
    user_demographics = pd.read_csv('../dirty_data/MLHD_demographics.csv', sep = '\t')
    user_demographics = user_demographics[['uuid','age', 'country', 'playcount']]
    user_behaviour = pd.read_csv('../dirty_data/MLHD_behavioural_features.csv', sep = '\t')
    user_behaviour = user_behaviour[['uuid','exploratoryness_artist', 'exploratoryness_track', 'mainstreamness_artist','mainstreamness_track']]
    
    # Merges files together
    users = pd.merge(user_demographics, user_behaviour, on = "uuid")
    
    # Prunes list of data
    whitelisted_users = strip_data(users)['uuid']
    whitelisted_users.to_pickle('../cleaned_data/users.pkl')
    return(whitelisted_users)

In [None]:
def load_by_country(country):
    """
    Loads data for every whitelisted user based on their country in batches.
    Input:
        country; string, the users country abbreviation.
    Output;
        data; dataframe, the resulting dataframe of user histories.
    
    """
    try:
        users = pd.read_pickle('../cleaned_data/users.pkl')
    except:
        print("users.pkl file not found loading from mbdump")
        users = whitelist_users()
        
        
    try:
        artists = pd.read_csv('../cleaned_data/artists.csv', index_col = [0])
    except: 
        print("artists.pkl not found, loading artists")
        artists = load_artists()
        
    # Filters users by their country   
    country_users = users[users['country'] == country.upper()]
    
    file_number = 0
    finished = False
    
    batches = []
    user_buffer = 0
    track_buffer = 0
    while finished == False:
        
        # Gets the number of times each user has listened each track
        test_data, finished, file_number = get_value_count(country_users, file_number, 'recording-MBID')

        # Adds the tracks names to the dataset
        data  = add_track_names(test_data)

        # Adds the interger based ID for users and tracks.
        data, user_buffer, track_buffer = convert_to_int_ID(data, user_buffer, track_buffer)

        # Makes sure the count data is an integer
        data['count'] = data['count'].astype('int64')
        
        # Merges the data with artist id and name
        data =  pd.merge(data, artists[['artist-MBID','artistName']], on="artist-MBID", how="left")
   
        batches.append(data)
        print("batch: ", len(batches))
        
    data = pd.concat(batches)
    # Removes any nans.
    data = data.mask(data.eq('None')).dropna()
    return(data)