# Demo 4: FP-Growth and DataStax Analytics
------
<img src="images/pixarMovies.jpg" width="500" height="500">


#### Dataset: https://grouplens.org/datasets/movielens/

## What are we trying to learn from this dataset? 

# QUESTION: Can Fp-Growth be used to find which movies to recommend to our users?


In [1]:
%matplotlib inline
import matplotlib.pyplot as plt

In [39]:
import pandas
import cassandra
import pyspark
import re
import os
import random
from random import randint, randrange
import matplotlib.pyplot as plt
from IPython.display import display, Markdown
from pyspark.sql import SparkSession
from pyspark.ml.fpm import FPGrowth
from pyspark.ml.feature import IndexToString, StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#### Helper function to have nicer formatting of Spark DataFrames

In [3]:
#Helper for pretty formatting for Spark DataFrames
def showDF(df, limitRows =  5, truncate = True):
    if(truncate):
        pandas.set_option('display.max_colwidth', 50)
    else:
        pandas.set_option('display.max_colwidth', -1)
    pandas.set_option('display.max_rows', limitRows)
    display(df.limit(limitRows).toPandas())
    pandas.reset_option('display.max_rows')

# DataStax Enterprise Analytics
<img src="images/dselogo.png" width="400" height="200">

## Creating Tables and Loading Tables

### Connect to DSE Analytics Cluster

In [4]:
from cassandra.cluster import Cluster

cluster = Cluster(['127.0.01'])
session = cluster.connect()

### Create Demo Keyspace 

In [5]:
session.execute("""
    CREATE KEYSPACE IF NOT EXISTS accelerate 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

<cassandra.cluster.ResultSet at 0x110e88390>

### Set keyspace 

In [6]:
session.set_keyspace('accelerate')

### Create table called `movies`. Our PRIMARY will be a unique key (movieid) 

In [101]:
query = "CREATE TABLE IF NOT EXISTS movies \
                                   (movieid int, title text, genres text, \
                                   PRIMARY KEY (movieid))"
session.execute(query)

<cassandra.cluster.ResultSet at 0x11448cc88>

### Create table called `movieRatings`. Our PRIMARY key will be a compositite key (userid, movieid).

In [102]:
query = "CREATE TABLE IF NOT EXISTS movieratings \
                                   (userid int, movieid int, rating float, timestamp text, \
                                   PRIMARY KEY (userid, movieid))"
session.execute(query)

<cassandra.cluster.ResultSet at 0x114474080>

## Movies

* **Movieid**
* **Title**
* **Genres**

## Movie Ratings Table
### What do these of these 4 columns represent:

* **UserId**
* **MovieId**
* **Rating**
* **Timestamp**

### Load 2 Movie Dataset -- Movies and Movie Ratings Table
<img src="images/bttf3.jpg" width="500" height="300">

### Load Movie datasets from CSV file (rating_movies.csv, movies.csv)
* No clean up was requried! How nice :)

#### Insert all the Movie Data into the DSE table `movies` and `movieratings`

In [103]:
fileName = 'data/ratings.csv'
input_file = open(fileName, 'r')

for line in input_file:
    row = line.split(',')
    
    query = "INSERT INTO movieratings (userid, movieid, rating, timestamp)"
    query = query + " VALUES (%s, %s, %s, %s)"
    session.execute(query, (int(row[0]), int(row[1]), float(row[2]), row[3]))

In [104]:
fileName = 'data/movies.csv'
input_file = open(fileName, 'r')

for line in input_file:
    row = line.split(',')
        
    query = "INSERT INTO movies (movieid, title, genres)"
    query = query + " VALUES (%s, %s, %s)"
    session.execute(query, (int(row[0]), row[1], row[2]))

## Machine Learning with DSE Analytics and Apache Spark
<img src="images/sparklogo.png" width="150" height="200">

#### Create a spark session that is connected to DSE. From there load each table into a Spark Dataframe and take a count of the number of rows in each.

In [105]:
spark = SparkSession.builder.appName('demo').master("local").getOrCreate()


movieDF = spark.read.format("org.apache.spark.sql.cassandra").options(table="movieratings", keyspace="accelerate").load()

print ("Table Row Count: ")
print (movieDF.count())

Table Row Count: 
100000


In [106]:
showDF(movieDF)

Unnamed: 0,userid,movieid,rating,timestamp
0,381,10,4.0,6/19/96 9:44\n
1,381,50,5.0,6/19/96 9:47\n
2,381,52,4.0,6/19/96 11:50\n
3,381,110,4.0,6/19/96 9:45\n
4,381,111,4.0,6/19/96 10:03\n


In [107]:
newMovieDF = movieDF.drop('timestamp')
showDF(newMovieDF)

Unnamed: 0,userid,movieid,rating
0,381,10,4.0
1,381,50,5.0
2,381,52,4.0
3,381,110,4.0
4,381,111,4.0


In [108]:
newestMovies = newMovieDF.filter("rating > 3")
showDF(newestMovies)

Unnamed: 0,userid,movieid,rating
0,381,10,4.0
1,381,50,5.0
2,381,52,4.0
3,381,110,4.0
4,381,111,4.0


In [110]:
from pyspark.sql import Row
from pyspark.sql.functions import collect_set
group_user = newestMovies.groupBy('userid').agg(collect_set('movieid').alias('moviesRated'))
group_user.show()


+------+--------------------+
|userid|         moviesRated|
+------+--------------------+
|   496|[356, 1953, 1395,...|
|   463|[161, 509, 590, 2...|
|   148|[356, 4995, 539, ...|
|   471|[508, 356, 4008, ...|
|   540|[356, 70286, 5874...|
|   623|[356, 165, 593, 5...|
|   243|[592, 356, 153, 1...|
|   392|[3254, 596, 4995,...|
|    31|[54997, 45517, 70...|
|   516|[356, 785, 1345, ...|
|   451|[6440, 1222, 714,...|
|   137|[356, 3173, 1222,...|
|   251|[466, 356, 110, 3...|
|    85|[945, 916, 1293, ...|
|   580|[4306, 4973, 1223...|
|    65|[356, 40870, 7609...|
|   458|[299, 2396, 3798,...|
|   481|[356, 17, 32, 293...|
|    53|[1953, 1649, 1172...|
|   255|[2, 2006, 380, 20...|
+------+--------------------+
only showing top 20 rows



In [111]:
df = group_user.withColumnRenamed("moviesRated", "items")
showDF(df)

Unnamed: 0,userid,items
0,463,"[161, 509, 590, 277, 105, 410, 207, 25, 381, 5..."
1,496,"[356, 1953, 1395, 1476, 1222, 3421, 2028, 1266..."
2,148,"[356, 4995, 539, 916, 2340, 4285, 1680, 4062, ..."
3,471,"[508, 356, 4008, 1272, 8982, 3450, 2078, 2028,..."
4,623,"[356, 165, 593, 590, 318, 292, 50, 296, 380, 3..."


In [115]:
df.select('userid').distinct().count()

702

In [113]:
fpGrowth = FPGrowth(itemsCol="items", minSupport=0.1, minConfidence=0.2)
model = fpGrowth.fit(df)
recommendDF=model.transform(df)
recommendDF.show()

+------+--------------------+--------------------+
|userid|               items|          prediction|
+------+--------------------+--------------------+
|   148|[356, 4995, 539, ...|[4993, 2959, 296,...|
|   471|[508, 356, 4008, ...|[296, 589, 2858, ...|
|   496|[356, 1953, 1395,...|[318, 593, 296, 2...|
|   463|[161, 509, 590, 2...|[50, 356, 2858, 2...|
|   243|[592, 356, 153, 1...|[593, 260, 527, 2...|
|   392|[3254, 596, 4995,...|[356, 527, 260, 5...|
|   540|[356, 70286, 5874...|[593, 318, 589, 4...|
|   623|[356, 165, 593, 5...|[2858, 47, 260, 5...|
|    31|[54997, 45517, 70...|[356, 527, 296, 5...|
|   516|[356, 785, 1345, ...|[318, 593, 2858, ...|
|    85|[945, 916, 1293, ...|[2959, 296, 2571,...|
|   580|[4306, 4973, 1223...|[318, 356, 593, 5...|
|   451|[6440, 1222, 714,...|[1198, 260, 1196,...|
|   137|[356, 3173, 1222,...|[50, 47, 5952, 49...|
|   251|[466, 356, 110, 3...|[318, 593, 457, 2...|
|    65|[356, 40870, 7609...|[1198, 260, 296, ...|
|   458|[299, 2396, 3798,...|[5

In [127]:
amanda = recommendDF.select('items').first()

print(list(amanda))

[[356, 1953, 1395, 1476, 1222, 3421, 2028, 1266, 2355, 3040, 1291, 21, 1233, 2788, 471, 2301, 908, 1641, 1387, 1256, 858, 2455, 2470, 1077, 3255, 2291, 440, 34, 2699, 2416, 1858, 1198, 2795, 457, 2918, 2012, 3957, 3703, 1242, 1221, 590, 2208, 1171, 1883, 2108, 1288, 2000, 3039, 70, 2023, 1973, 1923, 2779, 551, 2352, 480, 2302, 1307, 799, 2396, 3362, 1969, 2804, 39, 1259, 955, 2065, 368, 849, 2671, 587, 1197, 3506, 1278, 2571, 1220, 589, 235, 3552, 1214, 1020, 910, 2109, 352, 2915, 3671, 2278, 3927, 2657, 3513, 5060, 1845, 3363, 3740, 3088, 1970, 2174, 2064, 3836, 367, 1079, 317, 1304, 1196, 2539, 3070, 588, 3253, 2716, 1, 1200, 1215, 2791, 1136, 2081, 3424, 1544, 1036, 1269, 1240, 1234, 2454, 951, 2150, 1663, 3210, 3814, 1078, 3052, 1405, 2321, 1080, 1580, 3504, 1276, 514, 260, 3608, 1380, 1270, 2867]]


In [132]:
# Display frequent itemsets.
model.freqItemsets.show()

# Display generated association rules.
dfAssociation = model.associationRules


+--------------------+----+
|               items|freq|
+--------------------+----+
|               [318]| 277|
|               [296]| 276|
|          [296, 318]| 162|
|               [356]| 270|
|          [356, 296]| 138|
|     [356, 296, 318]|  93|
|          [356, 318]| 152|
|               [593]| 243|
|          [593, 356]| 141|
|     [593, 356, 296]| 100|
|[593, 356, 296, 318]|  73|
|     [593, 356, 318]|  97|
|          [593, 296]| 170|
|     [593, 296, 318]| 119|
|          [593, 318]| 157|
|               [260]| 213|
|          [260, 356]|  88|
|          [260, 296]| 103|
|          [260, 593]|  84|
|          [260, 318]|  95|
+--------------------+----+
only showing top 20 rows



In [137]:
amanda = dfAssociation.select('antecedent').first()

amanda1 = dfAssociation.select('consequent').first()

print(list(amanda))
print(list(amanda1))

movieToRecommend=list(amanda1)
print(movieToRecommend)

[[608, 593]]
[[296]]
[[296]]


In [144]:
query = "select title from movies WHERE movieid="
query = query + str(movieToRecommend[0][0])
session.execute(query)

<cassandra.cluster.ResultSet at 0x11418c828>

In [100]:
session.execute("""drop table movies""")
session.execute("""drop table movieratings""")

<cassandra.cluster.ResultSet at 0x11448c278>