In [1]:
%load_ext autoreload
%env SPARK_HOME=/usr/hdp/current/spark2-client

import findspark
findspark.init()

print('findspark initialized ...')

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col, column, max, min

spark = SparkSession.builder.appName('mlonspark')\
    .config('spark.executor.instances', '6')\
    .config('spark.jars', '/opt/dev/target/ml-on-spark-1.0.jar')\
    .getOrCreate()

print('pyspark ready ...')

env: SPARK_HOME=/usr/hdp/current/spark2-client
findspark initialized ...
pyspark ready ...


In [3]:
rawData = spark.read.format('text').load("/data/lastfm-dataset-360K/usersha1-artmbid-artname-plays.tsv")
rawData.show(10, False)
rawData.printSchema()
rawData.count()

+---------------------------------------------------------------------------------------------------------+
|value                                                                                                    |
+---------------------------------------------------------------------------------------------------------+
|00000c289a1829a808ac09c00daf10bc3c4e223b	3bd73256-3905-4f3a-97e2-8b341527f805	betty blowtorch	2137       |
|00000c289a1829a808ac09c00daf10bc3c4e223b	f2fb0ff0-5679-42ec-a55c-15109ce6e320	die Ärzte	1099             |
|00000c289a1829a808ac09c00daf10bc3c4e223b	b3ae82c2-e60b-4551-a76d-6620f1b456aa	melissa etheridge	897      |
|00000c289a1829a808ac09c00daf10bc3c4e223b	3d6bbeb7-f90e-4d10-b440-e153c0d10b53	elvenking	717              |
|00000c289a1829a808ac09c00daf10bc3c4e223b	bbd2ffd7-17f4-4506-8572-c1ea58c3f9a8	juliette & the licks	706   |
|00000c289a1829a808ac09c00daf10bc3c4e223b	8bfac288-ccc5-448d-9573-c33ea2aa5c30	red hot chili peppers	691  |
|00000c289a1829a808ac09c00da

17559530

In [4]:
def parseLine(line) :
    splits = line.value.split("\t")
    if (len(splits) == 4):
        return [splits]
    else:
        return []    
    

parsedData = rawData.rdd.flatMap(parseLine).toDF()
parsedData.show(10, False)
parsedData.count()

+----------------------------------------+------------------------------------+-----------------------+----+
|_1                                      |_2                                  |_3                     |_4  |
+----------------------------------------+------------------------------------+-----------------------+----+
|00000c289a1829a808ac09c00daf10bc3c4e223b|3bd73256-3905-4f3a-97e2-8b341527f805|betty blowtorch        |2137|
|00000c289a1829a808ac09c00daf10bc3c4e223b|f2fb0ff0-5679-42ec-a55c-15109ce6e320|die Ärzte              |1099|
|00000c289a1829a808ac09c00daf10bc3c4e223b|b3ae82c2-e60b-4551-a76d-6620f1b456aa|melissa etheridge      |897 |
|00000c289a1829a808ac09c00daf10bc3c4e223b|3d6bbeb7-f90e-4d10-b440-e153c0d10b53|elvenking              |717 |
|00000c289a1829a808ac09c00daf10bc3c4e223b|bbd2ffd7-17f4-4506-8572-c1ea58c3f9a8|juliette & the licks   |706 |
|00000c289a1829a808ac09c00daf10bc3c4e223b|8bfac288-ccc5-448d-9573-c33ea2aa5c30|red hot chili peppers  |691 |
|00000c289a1829a808

17559530

In [5]:
parsedData = parsedData\
    .withColumnRenamed("_1", "userHash")\
    .withColumnRenamed("_2", "artistMBID")\
    .withColumnRenamed("_3", "artistName")\
    .withColumnRenamed("_4", "listenCount")

parsedData.show(10, False)

+----------------------------------------+------------------------------------+-----------------------+-----------+
|userHash                                |artistMBID                          |artistName             |listenCount|
+----------------------------------------+------------------------------------+-----------------------+-----------+
|00000c289a1829a808ac09c00daf10bc3c4e223b|3bd73256-3905-4f3a-97e2-8b341527f805|betty blowtorch        |2137       |
|00000c289a1829a808ac09c00daf10bc3c4e223b|f2fb0ff0-5679-42ec-a55c-15109ce6e320|die Ärzte              |1099       |
|00000c289a1829a808ac09c00daf10bc3c4e223b|b3ae82c2-e60b-4551-a76d-6620f1b456aa|melissa etheridge      |897        |
|00000c289a1829a808ac09c00daf10bc3c4e223b|3d6bbeb7-f90e-4d10-b440-e153c0d10b53|elvenking              |717        |
|00000c289a1829a808ac09c00daf10bc3c4e223b|bbd2ffd7-17f4-4506-8572-c1ea58c3f9a8|juliette & the licks   |706        |
|00000c289a1829a808ac09c00daf10bc3c4e223b|8bfac288-ccc5-448d-9573-c33ea2

In [6]:
users = parsedData\
    .select(parsedData['userHash'])\
    .dropDuplicates(['userHash'])
users = users.rdd.zipWithIndex().map(lambda row: [row[0][0], row[1]]).toDF()
users = users.select(\
    users["_2"].alias('userId').cast("integer"),\
    users["_1"].alias('userHash')\
)
users.write.mode("overwrite").save("/data/lastfm-dataset-360K/users.parquet")

In [7]:
users.printSchema()
users.show(10, False)

root
 |-- userId: integer (nullable = true)
 |-- userHash: string (nullable = true)

+------+----------------------------------------+
|userId|userHash                                |
+------+----------------------------------------+
|0     |5410cfa2de1d31506084a6de0ad7d64276ba7f57|
|1     |542b73ac4cd44c43bf185d7d0e6bd65c979a7c1b|
|2     |542bb9a086129a85ad564c76c60b618a8b34f6de|
|3     |5441dfe441148310b14c461a4a333800a7fd304f|
|4     |5452326206fdc325339060394dc4dd750bd8867b|
|5     |545fb35615ce95c7927cae6a531c7288b5a7bba2|
|6     |5461801f3d8a34498ce874992f1aa956c44a4eca|
|7     |5479ce56f4d4def1417d758ad70aa6b4d64da4e5|
|8     |548fadb5d4018702c48e0ac846e53860702c6e5d|
|9     |54a5addc793666cd62e6bf32ca09542f02b2d35c|
+------+----------------------------------------+
only showing top 10 rows



In [8]:
persistedUsers = spark.read.load("/data/lastfm-dataset-360K/users.parquet")
persistedUsers.rdd.getNumPartitions()
persistedUsers.count()

359349

In [9]:
artists = parsedData\
    .select(parsedData['artistName'])\
    .dropDuplicates(['artistName'])
artists = artists.rdd.zipWithIndex().map(lambda row: [row[0][0], row[1]]).toDF()
artists = artists.select(\
    artists["_2"].alias('artistId').cast("integer"),\
    artists["_1"].alias('artistName')\
)
artists.write.mode("overwrite").save("/data/lastfm-dataset-360K/artists.parquet")

In [10]:
artists.printSchema()
artists.show(10, False)
artists.count()

root
 |-- artistId: integer (nullable = true)
 |-- artistName: string (nullable = true)

+--------+------------------+
|artistId|artistName        |
+--------+------------------+
|0       |lagwagon          |
|1       |vengaboys         |
|2       |soulstice         |
|3       |the mountain goats|
|4       |godsmack          |
|5       |clan greco        |
|6       |embrace           |
|7       |kate bush         |
|8       |bun b             |
|9       |vonda shepard     |
+--------+------------------+
only showing top 10 rows



292589

In [11]:
cooMatrixData = parsedData\
    .join(users, parsedData["userHash"]==users["userHash"], 'inner')\
    .join(artists, parsedData["artistName"]==artists["artistName"], 'inner')\
    .select(users["userId"], artists["artistId"], parsedData["listenCount"].cast("float"))

cooMatrixData.printSchema()
cooMatrixData.show(10, False)

root
 |-- userId: integer (nullable = true)
 |-- artistId: integer (nullable = true)
 |-- listenCount: float (nullable = true)

+------+--------+-----------+
|userId|artistId|listenCount|
+------+--------+-----------+
|234812|310     |33.0       |
|4878  |1139    |115.0      |
|168416|421     |63.0       |
|201840|1261    |1.0        |
|297568|1008    |33.0       |
|137989|828     |123.0      |
|39940 |903     |257.0      |
|92140 |352     |9.0        |
|81117 |352     |30.0       |
|292123|352     |1.0        |
+------+--------+-----------+
only showing top 10 rows



In [2]:
cooMatrixData.write.mode("overwrite").save("/data/lastfm-dataset-360K/coo-data.parquet")

NameError: name 'cooMatrixData' is not defined

In [3]:
cooMatrixData = spark.read.load("/data/lastfm-dataset-360K/coo-data.parquet")
print(cooMatrixData.rdd.getNumPartitions())

7


In [4]:
%autoreload
from mlonspark.scaler import Scaler

scaleByUser = Scaler().setGroupCol("userId").setInputCol("listenCount").setOutputCol("scaled-by-user")
cooMatrixData = scaleByUser.fit(cooMatrixData).transform(cooMatrixData)

scaleByArtist = Scaler().setGroupCol("artistId").setInputCol("listenCount").setOutputCol("scaled-by-artist")
cooMatrixData = scaleByArtist.fit(cooMatrixData).transform(cooMatrixData)

cooMatrixData.printSchema()
cooMatrixData.show(10, False)

cooMatrixData.write.mode("overwrite").save("/data/lastfm-dataset-360K/coo-data-scaled.parquet")

root
 |-- userId: integer (nullable = true)
 |-- artistId: integer (nullable = true)
 |-- listenCount: float (nullable = true)
 |-- scaled-by-user: float (nullable = true)
 |-- scaled-by-artist: float (nullable = true)

+------+--------+-----------+--------------+----------------+
|userId|artistId|listenCount|scaled-by-user|scaled-by-artist|
+------+--------+-----------+--------------+----------------+
|201881|148     |48.0       |2.3076925     |1.4402878       |
|84477 |148     |238.0      |1.0977128     |3.9007194       |
|58282 |148     |231.0      |1.2643003     |3.810072        |
|32705 |148     |77.0       |2.7674417     |1.8158274       |
|65052 |148     |35.0       |1.156282      |1.2719424       |
|30154 |148     |283.0      |2.5765662     |4.4834533       |
|116331|148     |54.0       |1.7285714     |1.5179856       |
|216540|148     |210.0      |1.3335881     |3.5381296       |
|73886 |148     |16.0       |1.1168832     |1.0258993       |
|14553 |148     |14.0       |1.05357

In [5]:
cooMatrixData.agg(min("scaled-by-artist"), max("scaled-by-artist")).show()

+---------------------+---------------------+
|min(scaled-by-artist)|max(scaled-by-artist)|
+---------------------+---------------------+
|                  1.0|                 10.0|
+---------------------+---------------------+



In [6]:
train, test = cooMatrixData.randomSplit([0.7, 0.3])

train.write.mode("overwrite").save("/data/lastfm-dataset-360K/coo-data-train.parquet")
test.write.mode("overwrite").save("/data/lastfm-dataset-360K/coo-data-test.parquet")

In [7]:
train = spark.read.load("/data/lastfm-dataset-360K/coo-data-train.parquet")
print(train.rdd.getNumPartitions())

8


In [8]:
%autoreload
from mlonspark.alternating_least_square import AlternatingLeastSquare

als = AlternatingLeastSquare()\
    .setUserCol("userId")\
    .setItemCol("artistId")\
    .setRatingCol("scaled-by-user")\
    .setNumUserBlocks(7)\
    .setNumItemBlocks(7)\
    .setMaxIter(10)

In [9]:
model = als.fit(train)
print(model)

AlternatingLeastSquare_440e8ecefd884cbfb764


In [10]:
model.write().overwrite().save('/data/lastfm-dataset-360K/model.alsmodel')

In [11]:
predictions = model.transform(test)
predictions.printSchema()
predictions.show(10)

root
 |-- userId: integer (nullable = true)
 |-- artistId: integer (nullable = true)
 |-- listenCount: float (nullable = true)
 |-- scaled-by-user: float (nullable = true)
 |-- scaled-by-artist: float (nullable = true)
 |-- prediction: float (nullable = false)

+------+--------+-----------+--------------+----------------+------------+
|userId|artistId|listenCount|scaled-by-user|scaled-by-artist|  prediction|
+------+--------+-----------+--------------+----------------+------------+
| 38304|     148|       23.0|     1.3435115|       1.1165468|5.9921807E-4|
| 54065|     148|      229.0|     3.4793928|       3.7841725|5.4218137E-4|
|144880|     148|      145.0|     1.1409639|        2.696403| 5.722666E-4|
|153214|     148|      166.0|      1.438134|       2.9683454|2.3577473E-4|
| 32705|     148|       77.0|     2.7674417|       1.8158274|5.9686566E-4|
|263510|     148|       59.0|     1.1044855|       1.5827338|2.8515604E-4|
|177946|     148|       25.0|     1.2352941|        1.142446|6.

In [12]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import isnan

evaluator = RegressionEvaluator()\
    .setMetricName("rmse")\
    .setLabelCol("scaled-by-user")\
    .setPredictionCol("prediction")

predictionsFiltered = predictions.where(~isnan(col("prediction")))                                
rmse = evaluator.evaluate(predictionsFiltered)

print("RMSE = %f" % rmse)

RMSE = 3.030132


In [13]:
#rmse

In [14]:
#histogram = cooMatrixData.select(cooMatrixData["listenCount"]).rdd.map(lambda x : x[0]).histogram(100)

In [15]:
%autoreload
from mlonspark.alternating_least_square import AlternatingLeastSquare

als = AlternatingLeastSquare()\
    .setUserCol("userId")\
    .setItemCol("artistId")\
    .setRatingCol("scaled-by-artist")\
    .setNumUserBlocks(7)\
    .setNumItemBlocks(7)\
    .setMaxIter(10)

model = als.fit(train)
print(model)

predictions = model.transform(test)
predictions.printSchema()
predictions.show(10)

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import isnan

evaluator = RegressionEvaluator()\
    .setMetricName("rmse")\
    .setLabelCol("scaled-by-artist")\
    .setPredictionCol("prediction")

predictionsFiltered = predictions.where(~isnan(col("prediction")))                                
rmse = evaluator.evaluate(predictionsFiltered)

print("RMSE = %f" % rmse)

AlternatingLeastSquare_40558fe4bf922627dc04
root
 |-- userId: integer (nullable = true)
 |-- artistId: integer (nullable = true)
 |-- listenCount: float (nullable = true)
 |-- scaled-by-user: float (nullable = true)
 |-- scaled-by-artist: float (nullable = true)
 |-- prediction: float (nullable = false)

+------+--------+-----------+--------------+----------------+------------+
|userId|artistId|listenCount|scaled-by-user|scaled-by-artist|  prediction|
+------+--------+-----------+--------------+----------------+------------+
|263510|     148|       59.0|     1.1044855|       1.5827338| 6.227538E-4|
| 32705|     148|       77.0|     2.7674417|       1.8158274|8.1333134E-4|
|177946|     148|       25.0|     1.2352941|        1.142446|0.0011173686|
| 54065|     148|      229.0|     3.4793928|       3.7841725| 8.390052E-4|
|144880|     148|      145.0|     1.1409639|        2.696403|0.0010109582|
| 38304|     148|       23.0|     1.3435115|       1.1165468|7.4140425E-4|
|153214|     148|  