## Scarico il dataset

In [None]:
!wget http://files.grouplens.org/datasets/movielens/ml-latest.zip

Il dataset è contenuto all'interno di un'archivio zip, per estrarlo (ipotizzando che tu sia su Ubuntu) ci servirà unzip, installiamolo

In [None]:
!unzip ml-latest.zip

In [None]:
!ls ml-latest

I file che ci interessano sono:
* **ratings.csv**: che contiene, per ogni riga, id dell'utente, id del film, valutazione da 1.0 a 5.0 e timestamp.
* **movies.csv**: che contiene nome e genere dei film associati agli id.

## Inizializzo Spark

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .appName("chelibrotivendo'") \
        .config("spark.executor.cores", 4) \
        .config("spark.sql.pivotMaxValues", 20000000)\
        .getOrCreate()

## Importo il dataset in un Dataframe

In [2]:
df = spark.read.csv("ml-latest/ratings2.csv", header=True, inferSchema=True)

## Quante sono le transazioni?

In [3]:
df.count()

1048575

## Quali sono i 10 prodotti più venduti?

In [4]:
dfMovies= df.groupBy("movieId")

In [5]:
dfMovies.count().orderBy("count", ascending=False).show(10)

+-------+-----+
|movieId|count|
+-------+-----+
|    356| 3606|
|    318| 3582|
|    296| 3410|
|    593| 3244|
|   2571| 3117|
|    260| 3039|
|    480| 2791|
|    527| 2662|
|    110| 2583|
|      1| 2573|
+-------+-----+
only showing top 10 rows



## Tengo solo le colonne che mi servono

In [6]:
df=df.select('transactionId','movieId')
df.cache()

DataFrame[transactionId: int, movieId: int]

## Pivot che mi serve per portare la cardinalità su Transaction_Id a 1

In [7]:
import pyspark.sql.functions as F
df_a = df.groupBy('transactionId').agg(F.collect_list('movieId').alias("items"))
df_a.cache()
df_a.show(10)


+-------------+--------------------+
|transactionId|               items|
+-------------+--------------------+
|          148|[1, 3, 5, 7, 17, ...|
|          463|[10, 14, 16, 25, ...|
|          471|[1, 10, 32, 47, 5...|
|          496|[3, 110, 144, 266...|
|          833|[327, 431, 475, 7...|
|         1088|[912, 1947, 1989,...|
|         1238|[153, 480, 1485, ...|
|         1342|    [24, 2723, 2826]|
|         1580|[44, 107, 193, 26...|
|         1591|[2, 5, 10, 150, 1...|
+-------------+--------------------+
only showing top 10 rows



In [8]:
from pyspark.ml.fpm import FPGrowth

fpGrowth = FPGrowth(itemsCol="items", minSupport=0.2, minConfidence=0.6)
model = fpGrowth.fit(df_a)
model.freqItemsets.show()
model.associationRules.show()
model.transform(df_a).show()

+----------+----+
|     items|freq|
+----------+----+
|     [356]|3606|
|     [318]|3582|
|[318, 356]|2223|
|     [296]|3410|
|[296, 318]|2257|
|[296, 356]|2256|
|     [593]|3244|
|[593, 296]|2288|
|[593, 318]|2109|
|[593, 356]|2197|
|    [2571]|3117|
|     [260]|3039|
|     [480]|2791|
|[480, 356]|2130|
|     [527]|2662|
|     [110]|2583|
|       [1]|2573|
|    [1210]|2479|
|    [1196]|2444|
|     [589]|2404|
+----------+----+
only showing top 20 rows

+----------+----------+------------------+------------------+
|antecedent|consequent|        confidence|              lift|
+----------+----------+------------------+------------------+
|     [296]|     [318]|0.6618768328445748|1.9460878848461924|
|     [296]|     [356]|  0.66158357771261| 1.932279046164506|
|     [296]|     [593]|0.6709677419354839| 2.178369993238137|
|     [593]|     [296]|0.7053020961775586| 2.178369993238137|
|     [593]|     [318]|0.6501233045622689|1.9115294929228965|
|     [593]|     [356]|0.6772503082614056|1.97