# Demo 4: FP-Growth and DataStax Analytics
------
<img src="images/pixarMovies.jpg" width="500" height="500">


#### Dataset: https://grouplens.org/datasets/movielens/

## What are we trying to learn from this dataset? 

# QUESTION: Can FP-Growth be used to find which movies to recommend to our users?


In [1]:
%matplotlib inline
import matplotlib.pyplot as plt

In [2]:
import pandas
import cassandra
import pyspark
import re
import os
import random
import matplotlib.pyplot as plt
from random import randint, randrange
from IPython.display import display, Markdown
from pyspark.sql import SparkSession
from pyspark.ml.fpm import FPGrowth
from pyspark.sql import Row
from pyspark.sql.functions import collect_set
from pyspark.ml.feature import IndexToString, StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
%store -r astraUsername astraPassword astraSecureConnect astraKeyspace

In [3]:
#Helper for pretty formatting for Spark DataFrames
def showDF(df, limitRows =  5, truncate = True):
    if(truncate):
        pandas.set_option('display.max_colwidth', 50)
    else:
        pandas.set_option('display.max_colwidth', None)
    pandas.set_option('display.max_rows', limitRows)
    display(df.limit(limitRows).toPandas())
    pandas.reset_option('display.max_rows')

#### Helper function to have nicer formatting of Spark DataFrames

## Creating Tables and Loading Tables

<img src="images/dselogo.png" width="400" height="200">

In [4]:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider

cloud_config = {
    'secure_connect_bundle': '/tmp/'+astraSecureConnect
}
auth_provider = PlainTextAuthProvider(username=astraUsername, password=astraPassword)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

### Set keyspace 

In [5]:
session.set_keyspace(astraKeyspace)

### Create table called `movies`. Our PRIMARY will be a unique key (movieid) 

In [6]:
query = "CREATE TABLE IF NOT EXISTS movies \
                                   (movieid int, title text, genres text, \
                                   PRIMARY KEY (movieid))"
session.execute(query)

<cassandra.cluster.ResultSet at 0x7ff97b5fd2b0>

### Create table called `movieRatings`. Our PRIMARY key will be a compositite key (userid, movieid).

In [7]:
query = "CREATE TABLE IF NOT EXISTS movieratings \
                                   (userid int, movieid int, rating float, timestamp text, \
                                   PRIMARY KEY (userid, movieid))"
session.execute(query)

<cassandra.cluster.ResultSet at 0x7ff96d5ddbe0>

## Movies

* **Movieid**
* **Title**
* **Genres**

## Movie Ratings Table
### What do these of these 4 columns represent:

* **UserId**
* **MovieId**
* **Rating**
* **Timestamp**

### Load 2 Movie Dataset -- Movies and Movie Ratings Table
<img src="images/bttf3.jpg" width="500" height="300">

In [11]:
#download file to local (working on better way)
from google.cloud import storage
storage_client = storage.Client()
bucket = storage_client.get_bucket('andygoade-dev')

#download file for ratings
blob = storage.Blob('notebooks/jupyter/data/ratings.csv', bucket)
blob.download_to_filename('/tmp/ratings.csv')

#download file for movies
blob = storage.Blob('notebooks/jupyter/data/movies.csv', bucket)
blob.download_to_filename('/tmp/movies.csv')

### Load Movie datasets from CSV file (rating_movies.csv, movies.csv)
* No clean up was requried! How nice :)

#### Insert all the Movie Data into the table `movies` and `movieratings`

In [9]:
fileName = '/tmp/ratings.csv'
input_file = open(fileName, 'r')

for line in input_file:
    row = line.split(',')
    
    query = "INSERT INTO movieratings (userid, movieid, rating, timestamp)"
    query = query + " VALUES (%s, %s, %s, %s)"
    session.execute(query, (int(row[0]), int(row[1]), float(row[2]), row[3]))

In [12]:
fileName = '/tmp/movies.csv'
input_file = open(fileName, 'r')

for line in input_file:
    row = line.split(',')
        
    query = "INSERT INTO movies (movieid, title, genres)"
    query = query + " VALUES (%s, %s, %s)"
    session.execute(query, (int(row[0]), row[1], row[2]))

## Machine Learning with Apache Spark
<img src="images/sparklogo.png" width="150" height="200">

#### Create a spark session that is connected to cassandra. From there load each table into a Spark Dataframe and take a count of the number of rows in each.

In [13]:
spark = SparkSession \
    .builder \
    .appName('demo4') \
    .master("local") \
    .config( \
        "spark.cassandra.connection.config.cloud.path", \
        "file:/tmp/"+astraSecureConnect) \
    .config("spark.cassandra.auth.username", astraUsername) \
    .config("spark.cassandra.auth.password", astraPassword) \
    .getOrCreate()

movieDF = spark.read.format("org.apache.spark.sql.cassandra").options(table="movieratings", keyspace=astraKeyspace).load()

print ("Table Row Count: ")
print (movieDF.count())

Table Row Count: 
100000


In [14]:
showDF(movieDF)

Unnamed: 0,userid,movieid,rating,timestamp
0,689,1,3.0,3/8/06 11:01\n
1,689,2,2.5,3/8/06 11:09\n
2,689,6,4.5,3/8/06 11:05\n
3,689,10,3.5,3/8/06 11:03\n
4,689,16,4.0,3/8/06 10:53\n


## This dataset is not in the format we need it to be. We need it to be more in a transaction format. Each user and the list of movies they have reviewed. 

#### Remove the timestamp column since we will not be using that

In [15]:
newMovieDF = movieDF.drop('timestamp')
showDF(newMovieDF)

Unnamed: 0,userid,movieid,rating
0,689,1,3.0
1,689,2,2.5
2,689,6,4.5
3,689,10,3.5
4,689,16,4.0


#### Before we collect the set of movies for each user let's filter out any movies they rated below 3

In [16]:
newestMovies = newMovieDF.filter("rating > 3")
showDF(newestMovies)

Unnamed: 0,userid,movieid,rating
0,689,6,4.5
1,689,10,3.5
2,689,16,4.0
3,689,21,4.0
4,689,25,3.5


#### GroupBy the user id and create a collection set of all the movies they have rated and seen. 

In [17]:
group_user = newestMovies.groupBy('userid').agg(collect_set('movieid').alias('moviesRated'))
group_user.show()


+------+--------------------+
|userid|         moviesRated|
+------+--------------------+
|   463|[161, 509, 590, 2...|
|   148|[356, 4995, 539, ...|
|   496|[356, 1953, 1395,...|
|   471|[1961, 356, 4008,...|
|   243|[592, 356, 153, 1...|
|   392|[3254, 596, 4995,...|
|   540|[356, 70286, 5874...|
|   623|[356, 165, 593, 5...|
|    31|[54997, 45517, 70...|
|   516|[356, 785, 1345, ...|
|    85|[945, 916, 1293, ...|
|   137|[356, 3173, 1222,...|
|   251|[466, 356, 110, 3...|
|   580|[4306, 4973, 1223...|
|   451|[6440, 1222, 714,...|
|   458|[299, 2396, 3798,...|
|    65|[356, 40870, 7609...|
|    53|[1953, 1649, 1172...|
|   588|[4995, 4343, 3450...|
|   255|[2, 2006, 380, 20...|
+------+--------------------+
only showing top 20 rows



#### For FP-Growth the list needs to be a column named `items`

In [18]:
df = group_user.withColumnRenamed("moviesRated", "items")
showDF(df)

Unnamed: 0,userid,items
0,463,"[161, 509, 590, 277, 105, 410, 207, 25, 381, 5..."
1,148,"[356, 4995, 539, 916, 2340, 4285, 1680, 4062, ..."
2,496,"[356, 1953, 1395, 1476, 1222, 3421, 2028, 1266..."
3,471,"[1961, 356, 4008, 1272, 8982, 3450, 2078, 2028..."
4,243,"[592, 356, 153, 150, 165, 288, 318, 253, 296, ..."


In [19]:
df.select('userid').distinct().count()

702

### FPGROWTH for  Recommendations
#### Use Apache Spark MLlib with FPGrowth to find Recommendation 
#### https://spark.apache.org/docs/latest/ml-frequent-pattern-mining.html
#### https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.fpm.FPGrowth

In [20]:
fpGrowth = FPGrowth(itemsCol="items", minSupport=0.1, minConfidence=0.2)
model = fpGrowth.fit(df)
recommendDF=model.transform(df)
recommendDF.show()

+------+--------------------+--------------------+
|userid|               items|          prediction|
+------+--------------------+--------------------+
|   463|[161, 509, 590, 2...|[50, 356, 2858, 2...|
|   148|[356, 4995, 539, ...|[4993, 2959, 296,...|
|   496|[356, 1953, 1395,...|[318, 593, 296, 2...|
|   471|[1961, 356, 4008,...|[296, 589, 2858, ...|
|   243|[592, 356, 153, 1...|[593, 260, 527, 2...|
|   392|[3254, 596, 4995,...|[356, 527, 260, 5...|
|   540|[356, 70286, 5874...|[593, 318, 589, 4...|
|   623|[356, 165, 593, 5...|[2858, 47, 260, 5...|
|    31|[54997, 45517, 70...|[356, 527, 296, 5...|
|   516|[356, 785, 1345, ...|[318, 593, 2858, ...|
|    85|[945, 916, 1293, ...|[2959, 296, 2571,...|
|   137|[356, 3173, 1222,...|[50, 47, 5952, 49...|
|   251|[466, 356, 110, 3...|[318, 593, 457, 2...|
|   580|[4306, 4973, 1223...|[318, 356, 593, 5...|
|   451|[6440, 1222, 714,...|[1198, 260, 1196,...|
|   458|[299, 2396, 3798,...|[50, 296, 858, 60...|
|    65|[356, 40870, 7609...|[1

#### If you have watched these movies `antecedent` then you will like this movie `consquent`

In [21]:
# Display frequent itemsets.
#model.freqItemsets.show()

# Display generated association rules.
dfAssociation = model.associationRules

dfAssociation.show()

+------------+----------+------------------+------------------+
|  antecedent|consequent|        confidence|              lift|
+------------+----------+------------------+------------------+
|  [608, 593]|     [296]|0.8301886792452831| 2.111566858080394|
|  [608, 593]|     [318]|0.7075471698113207| 1.793133982698726|
|[7153, 2959]|    [4993]| 0.935064935064935| 4.405473720909962|
|       [541]|    [1198]|0.5658914728682171|2.2069767441860466|
|       [541]|     [260]|0.6124031007751938|  2.01834261382247|
|       [541]|     [296]|0.6744186046511628|1.7153690596562183|
|       [541]|     [593]|0.5581395348837209|1.6124031007751938|
|       [541]|    [1196]|0.5968992248062015| 2.289744567289363|
|       [541]|    [2571]|0.6589147286821705|2.1922186707814393|
|  [527, 593]|     [356]|0.6428571428571429|1.6714285714285715|
|  [527, 593]|     [296]|0.7053571428571429|1.7940605590062113|
|  [527, 593]|     [318]|0.7142857142857143|1.8102114492006187|
| [2028, 296]|    [2858]| 0.782608695652

In [22]:
print("If you like these movies: ")
print(list(dfAssociation.select('antecedent').first()))
print("Then you will like this movie:")
print(list(dfAssociation.select('consequent').first()))

movieYoulike = list(dfAssociation.select('antecedent').first())
movieToRecommend=list(dfAssociation.select('consequent').first())

If you like these movies: 
[[608, 593]]
Then you will like this movie:
[[296]]


#### Query database to get movie titles

In [23]:
query = "select title from movies WHERE movieid="
query = query + str(movieYoulike[0][0])

rows = session.execute(query)
print(rows)

for user_row in rows:
    print (user_row.title)

query = "select title from movies WHERE movieid="
query = query + str(movieYoulike[0][1])

rows = session.execute(query)
print(rows)

for user_row in rows:
    print (user_row.title)

<cassandra.cluster.ResultSet object at 0x7ff96c3d2d30>
Fargo (1996)
<cassandra.cluster.ResultSet object at 0x7ff96c471668>
"Silence of the Lambs


#### Then you will like this movie ... 

In [24]:
query = "select title from movies WHERE movieid="
query = query + str(movieToRecommend[0][0])

rows = session.execute(query)
print(rows)

for user_row in rows:
    print (user_row.title)

<cassandra.cluster.ResultSet object at 0x7ff96d5e3160>
Pulp Fiction (1994)


In [25]:
session.execute("""drop table movies""")
session.execute("""drop table movieratings""")

<cassandra.cluster.ResultSet at 0x7ff96c3c1cc0>

In [26]:
spark.stop()