In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
sc = pyspark.SparkContext(appName="Music_Recomendation_Training")

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("CorrelationExample") \
    .getOrCreate()

In [4]:
datasetPath = "./lastfm-dataset-360K/usersha1-artmbid-artname-plays.tsv"
df1 = spark.read.csv(datasetPath, sep=r'\t', header=False).select('_c0','_c2')

In [5]:
import pyspark.sql.functions as F
finalDf = df1.groupBy('_c0').agg(F.collect_set('_c2').alias('items')).withColumnRenamed("_c0", "user")
print((finalDf.count(), len(finalDf.columns)))

(359349, 2)


In [6]:
finalDf.show(5)

+--------------------+--------------------+
|                user|               items|
+--------------------+--------------------+
|0018726a8c3b371dc...|[tribalistas, urs...|
|0023836770b560316...|[swod, the beatle...|
|00759937bbc2c3fea...|[serge gainsbourg...|
|008cc3f3075e913d4...|[preston school o...|
|0093a82573bc14f2e...|[wir sind helden,...|
+--------------------+--------------------+
only showing top 5 rows



In [7]:
# Finding parameters for FPGrowth
auxDf = finalDf \
    .select(F.explode("items").alias("items_exploded")) \
    .groupBy("items_exploded") \
    .count() \
    .orderBy(F.desc("count"))
auxDf.show()
userCount = finalDf.count()

+--------------------+-----+
|      items_exploded|count|
+--------------------+-----+
|           radiohead|77348|
|         the beatles|76339|
|            coldplay|66738|
|red hot chili pep...|48989|
|                muse|47015|
|           metallica|45301|
|          pink floyd|44506|
|         the killers|41280|
|         linkin park|39833|
|             nirvana|39534|
|    system of a down|37324|
|               queen|34215|
|                  u2|33247|
|           daft punk|33040|
|            the cure|32673|
|        led zeppelin|32341|
|             placebo|32111|
|        depeche mode|31966|
|         david bowie|31907|
|           bob dylan|31840|
+--------------------+-----+
only showing top 20 rows



In [8]:
quant = auxDf.approxQuantile('count', (0.99, 0.95, 0.85), 0.05)
for value in quant:
    print(value/userCount)

0.215244789883929
0.215244789883929
5.565620051815923e-05


In [10]:
from pyspark.ml.fpm import FPGrowth

fpGrowth = FPGrowth(itemsCol="items", minSupport=0.005, minConfidence=0.2)
model = fpGrowth.fit(finalDf)

# Display frequent itemsets.
model.freqItemsets.show()

# Display generated association rules.
model.associationRules.show()

# transform examines the input items against all the association rules and summarize the
# consequents as prediction
model.transform(finalDf).show()

+--------------------+-----+
|               items| freq|
+--------------------+-----+
|          [ladytron]|10784|
|     [ladytron, air]| 2671|
|[ladytron, the kn...| 2863|
|[ladytron, depech...| 3033|
|[ladytron, röyksopp]| 2295|
|[ladytron, coldplay]| 1871|
|[ladytron, the cure]| 2287|
|[ladytron, goldfr...| 2396|
|    [ladytron, muse]| 1932|
|[ladytron, daft p...| 2213|
|[ladytron, portis...| 2308|
| [ladytron, placebo]| 2096|
|[ladytron, nine i...| 1999|
|[ladytron, the be...| 1952|
|[ladytron, massiv...| 2122|
|   [ladytron, björk]| 2290|
|[ladytron, radioh...| 3695|
|      [gwen stefani]| 6591|
|[gwen stefani, av...| 1818|
|[gwen stefani, ma...| 2699|
+--------------------+-----+
only showing top 20 rows

+--------------------+--------------------+-------------------+------------------+
|          antecedent|          consequent|         confidence|              lift|
+--------------------+--------------------+-------------------+------------------+
|[moby, massive at...|       

In [12]:
predictionDf = model.transform(finalDf)

In [15]:
model.save("mymodel")