# Download data and inspect data

In [1]:
!mkdir data
!wget "http://www.iro.umontreal.ca/~lisa/datasets/profiledata_06-May-2005.tar.gz" -P data

--2018-05-09 15:47:31--  http://www.iro.umontreal.ca/~lisa/datasets/profiledata_06-May-2005.tar.gz
Resolving www.iro.umontreal.ca (www.iro.umontreal.ca)... 132.204.26.36
Connecting to www.iro.umontreal.ca (www.iro.umontreal.ca)|132.204.26.36|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 135880312 (130M) [application/x-gzip]
Saving to: ‘data/profiledata_06-May-2005.tar.gz’


2018-05-09 15:47:33 (89.6 MB/s) - ‘data/profiledata_06-May-2005.tar.gz’ saved [135880312/135880312]



In [2]:
!tree data -hs

data
└── [130M]  profiledata_06-May-2005.tar.gz

0 directories, 1 file


In [4]:
!tar -xf "data/profiledata_06-May-2005.tar.gz" -C data
!rm "data/profiledata_06-May-2005.tar.gz"
!tree data -hs

data
└── [4.0K]  profiledata_06-May-2005
    ├── [2.8M]  artist_alias.txt
    ├── [ 53M]  artist_data.txt
    ├── [1.2K]  README.txt
    └── [407M]  user_artist_data.txt

1 directory, 4 files


In [5]:
!cat data/profiledata_06-May-2005/README.txt

Music Listening Dataset
Audioscrobbler.com
6 May 2005
--------------------------------

This data set contains profiles for around 150,000 real people
The dataset lists the artists each person listens to, and a counter
indicating how many times each user played each artist

The dataset is continually growing; at the time of writing (6 May 2005) 
Audioscrobbler is receiving around 2 million song submissions per day

We may produce additional/extended data dumps if anyone is interested 
in experimenting with the data. 

Please let us know if you do anything useful with this data, we're always
up for new ways to visualize it or analyse/cluster it etc :)


License
-------

This data is made available under the following Creative Commons license:
http://creativecommons.org/licenses/by-nc-sa/1.0/


Files
-----

user_artist_data.txt
    3 columns: userid artistid playcount

artist_data.txt
    2 columns: artistid artist_name

artist_alias.txt
    2 columns:

In [6]:
!echo "check user_artist_data.txt"
!head -n 3 data/profiledata_06-May-2005/user_artist_data.txt
!echo ""
!echo "line count:"
!wc -l data/profiledata_06-May-2005/user_artist_data.txt

check user_artist_data.txt
1000002 1 55
1000002 1000006 33
1000002 1000007 8

line count:
24296858 data/profiledata_06-May-2005/user_artist_data.txt


In [7]:
!echo "check artist_data.txt"
!head -n 3 data/profiledata_06-May-2005/artist_data.txt
!echo ""
!echo "line count:"
!wc -l data/profiledata_06-May-2005/artist_data.txt

check artist_data.txt
1134999	06Crazy Life
6821360	Pang Nakarin
10113088	Terfel, Bartoli- Mozart: Don

line count:
1848579 data/profiledata_06-May-2005/artist_data.txt


In [8]:
!echo "check artist_alias.txt"
!head -n 3 data/profiledata_06-May-2005/artist_alias.txt
!echo ""
!echo "line count:"
!wc -l data/profiledata_06-May-2005/artist_alias.txt

check artist_alias.txt
1092764	1000311
1095122	1000557
6708070	1007267

line count:
193027 data/profiledata_06-May-2005/artist_alias.txt


# Build Spark Data Frames

In [9]:
# make sure pyspark tells workers to use python3 not 2 if both are installed
import os
os.environ['PYSPARK_PYTHON'] = '/home/augustinus/miniconda3/bin/python'

In [10]:
from pyspark.sql import SparkSession

In [11]:
spark = SparkSession \
    .builder \
    .appName("Recommender") \
    .config('spark.executor.memory','8G') \
    .config('spark.driver.memory','16G')\
    .config('spark.driver.maxResultSize','16G')\
    .getOrCreate()

Create user_artist DataFrame

In [12]:
df_user_artist = spark.read.csv("data/profiledata_06-May-2005/user_artist_data.txt",
                                sep=' ',
                                schema='userid INT, artistid INT, playcount INT')

In [13]:
df_user_artist.printSchema()

root
 |-- userid: integer (nullable = true)
 |-- artistid: integer (nullable = true)
 |-- playcount: integer (nullable = true)



In [14]:
df_user_artist.show(3)

+-------+--------+---------+
| userid|artistid|playcount|
+-------+--------+---------+
|1000002|       1|       55|
|1000002| 1000006|       33|
|1000002| 1000007|        8|
+-------+--------+---------+
only showing top 3 rows



Create artist DataFrame

In [15]:
df_artist = spark.read.csv("data/profiledata_06-May-2005/artist_data.txt",
                            sep='\t',
                            schema='artistid INT, artist_name STRING')
df_artist.printSchema()
df_artist.show(3)

root
 |-- artistid: integer (nullable = true)
 |-- artist_name: string (nullable = true)

+--------+--------------------+
|artistid|         artist_name|
+--------+--------------------+
| 1134999|        06Crazy Life|
| 6821360|        Pang Nakarin|
|10113088|Terfel, Bartoli- ...|
+--------+--------------------+
only showing top 3 rows



Create artist_alias DataFrame

In [16]:
df_artist_alias = spark.read.csv("data/profiledata_06-May-2005/artist_alias.txt",
                                sep='\t',
                                schema='badid INT, goodid INT')
df_artist_alias.printSchema()
df_artist_alias.show(3)

root
 |-- badid: integer (nullable = true)
 |-- goodid: integer (nullable = true)

+-------+-------+
|  badid| goodid|
+-------+-------+
|1092764|1000311|
|1095122|1000557|
|6708070|1007267|
+-------+-------+
only showing top 3 rows



In [17]:
# Register SQL Tables
df_user_artist.createOrReplaceTempView('user_artist')
df_artist.createOrReplaceTempView('artist')
df_artist_alias.createOrReplaceTempView('artist_alias')
spark.sql("show tables").show()

+--------+------------+-----------+
|database|   tableName|isTemporary|
+--------+------------+-----------+
|        |      artist|       true|
|        |artist_alias|       true|
|        | user_artist|       true|
+--------+------------+-----------+



# Preprocessing

The aliases data set should be applied to convert all artist IDs (badid) to a canonical ID (goodid), if a different canonical ID exists. 

In [18]:
print('user_artist')
spark.sql(
    """
    describe user_artist
    """).show()

print('artist')
spark.sql(
    """
    describe artist
    """).show()

print('artist_alias')
spark.sql(
    """
    describe artist_alias
    """).show()

user_artist
+---------+---------+-------+
| col_name|data_type|comment|
+---------+---------+-------+
|   userid|      int|   null|
| artistid|      int|   null|
|playcount|      int|   null|
+---------+---------+-------+

artist
+-----------+---------+-------+
|   col_name|data_type|comment|
+-----------+---------+-------+
|   artistid|      int|   null|
|artist_name|   string|   null|
+-----------+---------+-------+

artist_alias
+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|   badid|      int|   null|
|  goodid|      int|   null|
+--------+---------+-------+



In [19]:
print('Checking artist')
spark.sql(
    """
    select count(*) as total_ids, 
    count(distinct artistid) as n_unique_ids, 
    count(distinct artistid)/count(*) as `ratio of unique`
    from artist 
    """).show()

Checking artist
+---------+------------+----------------+
|total_ids|n_unique_ids| ratio of unique|
+---------+------------+----------------+
|  1848671|     1848281|0.99978903763839|
+---------+------------+----------------+



In [20]:
print('Checking user_artist')
spark.sql(
    """
    select count(distinct artistid) as n_unique_artistids
    from user_artist 
    """).show()

Checking user_artist
+------------------+
|n_unique_artistids|
+------------------+
|           1631028|
+------------------+



Mapping badid to goodid for user_artist and artist

In [21]:
alias = df_artist_alias.dropna().toPandas()
bad_2_good_mapper = dict(alias[['badid','goodid']].values)
# if x is in bad_2_good_mapper: get goodid, otherwise keep x unchanged 
bad2good = lambda x : bad_2_good_mapper.get(x,x)

Apply bad2good (.map) to the artistid column in df_artist, df_user_artist

In [22]:
df_artist.artistid = df_artist.select('artistid').rdd.map(bad2good).toDF().artistid
df_user_artist = df_user_artist.dropna()
df_user_artist.artistid = df_user_artist.select('artistid').rdd.map(bad2good).toDF().artistid
df_artist.createOrReplaceTempView('artist')
df_user_artist.createOrReplaceTempView('user_artist')

In [23]:
print('Checking artist')
spark.sql(
    """
    select count(*) as total_ids, 
    count(distinct artistid) as n_unique_ids, 
    count(distinct artistid)/count(*) as `ratio of unique`
    from artist 
    """).show()

print('Checking user_artist')
spark.sql(
    """
    select count(distinct artistid) as n_artistids
    from user_artist 
    """).show()

Checking artist
+---------+------------+----------------+
|total_ids|n_unique_ids| ratio of unique|
+---------+------------+----------------+
|  1848671|     1848281|0.99978903763839|
+---------+------------+----------------+

Checking user_artist
+-----------+
|n_artistids|
+-----------+
|    1631028|
+-----------+



In [24]:
data_train = spark.sql(
    """
    select userid, artistid, sum(playcount) as playcount
    from user_artist
    group by userid,artistid
    """)

# Build ALS Recommender

In [25]:
from pyspark.ml.recommendation import ALS

In [27]:
data_train.printSchema()

root
 |-- userid: integer (nullable = true)
 |-- artistid: integer (nullable = true)
 |-- playcount: long (nullable = true)



In [28]:
als = ALS(rank=50,regParam=1,alpha=40,
          userCol='userid',itemCol='artistid',ratingCol='playcount',
          numUserBlocks=4,numItemBlocks=4,implicitPrefs=True,seed=0)

als_recommender = als.fit(data_train)
print('Model fitted')
als_recommender.save('model.bin')

Model fitted


# Recommend top 10 Artists for user 2093760

In [29]:
user_subsets = \
    spark.sql("""
        select distinct userid
        from user_artist
        where userid in (2093760)
        """)
user_subsets.show()

+-------+
| userid|
+-------+
|2093760|
+-------+



In [30]:
recommendations = als_recommender.recommendForUserSubset(user_subsets, 10)
recommendations.printSchema()

root
 |-- userid: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- artistid: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [47]:
recommendations.createOrReplaceTempView('recommendations')

top10 = spark.sql(
    """
    select userid, explode(recommendations.artistid) as artistid
    from recommendations 
    where userid = 2093760
    """)
top10.createOrReplaceTempView('top10')

top10_recommended_artist_names = \
spark.sql(
    """
    select userid, artist_name
    from top10 join artist
    on top10.artistid = artist.artistid
    """)

In [48]:
df_top10_artists_for_2093760 = top10_recommended_artist_names.toPandas()
df_top10_artists_for_2093760

Unnamed: 0,userid,artist_name
0,2093760,Green Day
1,2093760,Outkast
2,2093760,Eminem
3,2093760,50 Cent
4,2093760,Snoop Dogg
5,2093760,Black Eyed Peas
6,2093760,Jay-Z
7,2093760,2Pac
8,2093760,Kanye West
9,2093760,[unknown]


# Deploy on AWS

In [None]:
spark-submit wordcount.py | tee output.txt
aws s3 cp output.txt s3://my_bucket/my_folder/