In [None]:
# Download Audioscrobbler dataset
! wget http://www.iro.umontreal.ca/~lisa/datasets/profiledata_06-May-2005.tar.gz

In [None]:
# Unzip Audioscrobbler dataset
! tar xzvf profiledata_06-May-2005.tar.gz

In [None]:
# Rename Audioscrobbler data folder
! mv profiledata_06-May-2005 data/audioscrobbler

In [None]:
# Delete downloaded tarball
! rm profiledata_06-May-2005.tar.gz

In [None]:
import pyspark
from pyspark.sql.types import *
import pyspark.ml.recommendation
from pyspark.ml.recommendation import *
import pyspark.ml.evaluation
from pyspark.ml.evaluation import *
import numpy as np

In [None]:
# If you started your notebook server with jupyspark.
sc, spark

In [None]:
# If you don't have the tree command, you can install it with:
#   Mac: brew install tree
#   Ubuntu: sudo apt-get install tree!tree data

In [None]:
# Load mapping of users to artists
# Space delimited fields: user_id, artist_id, play_count
raw_user_artist_data = sc.textFile("data/audioscrobbler/user_artist_data.txt")
raw_user_artist_data.take(5)

In [None]:
def prep_user_artist_data(row):
    """Convert user_artist_data into [(user_id:int, artist_id:int, play_count:int, log_play_count:float)]."""
    try:
        user_id, artist_id, play_count = [int(x) for x in row.split(' ')]
        log_play_count = float(np.log10(play_count))
        return [(user_id, artist_id, play_count, log_play_count)]
    except ValueError:
        return []

In [None]:
# Apply schema to user_artist_data
user_artist_data = raw_user_artist_data.flatMap(prep_user_artist_data)
user_artist_schema = StructType([
        StructField('user_id', IntegerType(), False),
        StructField('artist_id', IntegerType(), False),
        StructField('play_count', IntegerType(), False),
        StructField('log_play_count', FloatType(), False)
    ])
user_artist_df = spark.createDataFrame(user_artist_data, user_artist_schema).persist()
user_artist_df.registerTempTable('play_counts')

In [None]:
user_artist_df.show()

In [None]:
user_artist_df.describe().show()

In [None]:
print("Play Count Stats:")

spark.sql("""
    SELECT
        min(play_count) AS min
    ,   percentile_approx(play_count, 0.25) AS q_25
    ,   percentile_approx(play_count, 0.5) AS median
    ,   percentile_approx(play_count, 0.75) AS q_75
    ,   max(play_count) AS max
    FROM
        play_counts
    """).show()

In [None]:
spark.sql("""
    SELECT 
    1 - (
        SELECT
            Count(*) AS users_over_mean_play_count
        FROM
            play_counts
        WHERE
            play_count > 15
    ) / (
        SELECT
            Count(*) AS total_users
        FROM
            play_counts
    )
    AS mean_percentile
    """).show()

In [None]:
raw_artist_data = sc.textFile("data/audioscrobbler/artist_data.txt", use_unicode=False)
raw_artist_data.take(5)

In [None]:
def prep_artist_data(row):
    """Convert raw_artist_data into [(artist_id:int, artist_name:str)]."""
    try:
        artist_id, artist_name = row.split('\t')
        artist_id = int(artist_id)
        artist_name = artist_name.strip()
        return [(int(artist_id), artist_name.strip())]
    except ValueError:
        return []

In [None]:
artist_data = raw_artist_data.flatMap(prep_artist_data)

In [None]:
artist_data.take(5)

In [None]:
artist_schema = StructType([
    StructField('artist_id', IntegerType(), False),
    StructField('artist_name', StringType(), False)
    ])
artist_df = spark.createDataFrame(artist_data, artist_schema).persist()
artist_df.registerTempTable('artists')

In [None]:
artist_df.show()

In [None]:
raw_artist_alias_data = sc.textFile("data/audioscrobbler/artist_alias.txt", use_unicode=False)
raw_artist_alias_data.take(5)

In [None]:
def prep_artist_alias(row):
    """Convert a row from raw_artist_alias to [(alias_id:int, artist_id:int)]"""
    try:
        alias_id, artist_id = row.split('\t')
        return [(int(alias_id), int(artist_id))]
    except:
        return []

In [None]:
artist_alias_data = raw_artist_alias_data.flatMap(prep_artist_alias)

In [None]:
artist_alias_schema = StructType([
    StructField('alias_id', IntegerType(), False),
    StructField('artist_id', IntegerType(), False)
    ])

In [None]:
artist_alias_df = spark.createDataFrame(artist_alias_data, artist_alias_schema).persist()
artist_alias_df.registerTempTable('artist_aliases')

In [None]:
artist_alias_df.show()

In [None]:
# Find artist aliases
user_artist_corrections = spark.sql("""
    SELECT
        play_counts.user_id
    ,   play_counts.artist_id
    ,   artist_aliases.artist_id AS corrected_artist_id
    ,   play_counts.play_count
    ,   play_counts.log_play_count
    FROM
        play_counts
    INNER JOIN
        artist_aliases
    ON
        play_counts.artist_id = artist_aliases.alias_id
""")
user_artist_corrections.show(), user_artist_corrections.count()

In [None]:
# What if it's a left join instead of an inner join?
user_artist_corrections = spark.sql("""
    SELECT
        play_counts.user_id
    ,   play_counts.artist_id
    ,   artist_aliases.artist_id AS corrected_artist_id
    ,   play_counts.play_count
    FROM
        play_counts
    LEFT JOIN
        artist_aliases
    ON
        play_counts.artist_id = artist_aliases.alias_id
""")
user_artist_corrections.show(), user_artist_corrections.count()

In [None]:
# Cleanup artist aliases
dealiased_play_counts_df = spark.sql("""
    SELECT
        play_counts.user_id
    ,   (CASE
            WHEN artist_aliases.artist_id IS Null THEN play_counts.artist_id
            ELSE artist_aliases.artist_id
        END) as artist_id
    ,   play_counts.play_count
    ,   play_counts.log_play_count
    FROM
        play_counts
    LEFT JOIN
        artist_aliases
    ON
        play_counts.artist_id = artist_aliases.alias_id
""")
dealiased_play_counts_df.registerTempTable("dealiased_play_counts")
dealiased_play_counts_df.show()

In [None]:
spark.sql("""
    SELECT
        user_id
    ,   artist_id
    ,   Count(*) AS cnt
    FROM
        dealiased_play_counts
    GROUP BY
        user_id, artist_id
    ORDER BY
        Count(*) DESC
    """).show()

In [None]:
spark.sql("""
    SELECT
        artist_aliases.alias_id,
        artists.artist_name
    FROM
        artist_aliases
    JOIN
        artists
    ON
        artist_aliases.alias_id = artists.artist_id
    WHERE
        artist_aliases.artist_id = 1018110
    """).take(10)

In [None]:
clean_play_counts_df = spark.sql("""
    SELECT
        user_id
    ,   artist_id
    ,   SUM(play_count) AS play_count
    ,   LOG(10, SUM(play_count)) AS log_play_count
    FROM
        dealiased_play_counts
    GROUP BY
        user_id
    ,   artist_id
""")
clean_play_counts_df.registerTempTable("clean_play_counts")
clean_play_counts_df.show()

In [None]:
# Train/Test Split
train, test = clean_play_counts_df.randomSplit([0.8, 0.2])

In [None]:
als = ALS(rank=10, maxIter=5, seed=0, regParam=0.1, implicitPrefs=True, alpha=40,
          userCol="user_id", itemCol="artist_id", ratingCol="log_play_count", nonnegative=True)
model = als.fit(train)

In [None]:
predictions = model.transform(test).persist()

In [None]:
predictions.registerTempTable("predictions")

In [None]:
predictions.show()

In [None]:
spark.sql("SELECT * FROM predictions WHERE NOT ISNAN(prediction) ORDER BY prediction DESC").show()

In [None]:
spark.sql("SELECT * FROM predictions WHERE NOT ISNAN(prediction) ORDER BY prediction").show()