<img src="images/datastaxdevs_banner.png" width="600" height="200">

# Algorithm 4: FP-Growth
------
<img src="images/pixarMovies.jpg" width="500" height="500">


#### Dataset: https://grouplens.org/datasets/movielens/

## What are we trying to learn from this dataset? 

### Can FP-Growth be used to determine movie recommendations?


In [None]:
import os
import pandas
from pyspark.sql import SparkSession
#
from operator import itemgetter
#
from pyspark.sql.functions import collect_set
from pyspark.ml.fpm import FPGrowth
from pyspark.sql.functions import array_contains, col
#
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
#
from dotenv import load_dotenv, find_dotenv

from tools import showDF, examineCassandraTable

In [None]:
# read .env file for connection params
dotenv_file = find_dotenv('.env')
load_dotenv(dotenv_file)
astraUsername = os.environ['ASTRA_DB_CLIENT_ID']
astraPassword = os.environ['ASTRA_DB_CLIENT_SECRET']
astraSecureConnect = os.environ['ASTRA_DB_SECURE_BUNDLE_PATH']
astraKeyspace = os.environ['ASTRA_DB_KEYSPACE']

## Inspect input data: Table(s)

In [None]:
cloud_config = {
    'secure_connect_bundle': '/home/jovyan/' + astraSecureConnect
}
auth_provider = PlainTextAuthProvider(username=astraUsername, password=astraPassword)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

### Set keyspace 

In [None]:
session.set_keyspace(astraKeyspace)

### Examine table `movies` (structure and contents)

In [None]:
print(examineCassandraTable(session, astraKeyspace, 'movies'))

### Examine table `movieratings` (structure and contents)

In [None]:
print(examineCassandraTable(session, astraKeyspace, 'movieratings'))

## Column meaning:

#### `movies`

* **Movieid**
* **Title**
* **Genres**

#### `movieratings`

* **UserId**
* **MovieId**
* **Rating**
* **Timestamp**

<img src="images/bttf3.jpg" width="500" height="300">

# Machine Learning with Apache Cassandra & Apache Spark
<img src="images/sparklogo.png" width="150" height="200">

### Create a Spark session that is connected to the database. From there load each table into a Spark Dataframe and take a count of the number of rows in each.

In [None]:
spark = SparkSession \
    .builder \
    .appName('demo') \
    .master('local') \
    .config( \
        'spark.cassandra.connection.config.cloud.path', \
        'file:' + '/home/jovyan/' + astraSecureConnect) \
    .config('spark.cassandra.auth.username', astraUsername) \
    .config('spark.cassandra.auth.password', astraPassword) \
    .getOrCreate()

movieDF = spark \
    .read \
    .format('org.apache.spark.sql.cassandra') \
    .options(table='movieratings', keyspace=astraKeyspace) \
    .load()

print ('Table Row Count:')
print (movieDF.count())

In [None]:
showDF(movieDF)

## This dataset is not in the format we need.

### Let's make it into _"one row per user with a list of movies the user reviewed"_

#### Remove the timestamp column since we will not be using that

In [None]:
newMovieDF = movieDF.drop('timestamp')
showDF(newMovieDF)

#### Before we collect the set of movies for each user let's filter out any movies they rated less or equal to 3

In [None]:
newestMovies = newMovieDF.filter('rating > 3')
showDF(newestMovies)

#### GroupBy the user id and create a collection set of all the movies they have rated and seen. 

In [None]:
group_user = newestMovies.groupBy('userid').agg(collect_set('movieid').alias('moviesRated'))
group_user.show()

#### Let's rename the column with movies to `items`

In [None]:
df = group_user.withColumnRenamed('moviesRated', 'items')
showDF(df)

In [None]:
df.select('userid').distinct().count()

### FP-Growth for  Recommendations
#### Use Apache Spark MLlib with FPGrowth to find recommendations
#### https://spark.apache.org/docs/latest/ml-frequent-pattern-mining.html
#### https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.fpm.FPGrowth

In [None]:
fpGrowth = FPGrowth(itemsCol='items', minSupport=0.1, minConfidence=0.2)
model = fpGrowth.fit(df)
recommendDF=model.transform(df)
recommendDF.show()

In [None]:
# Display frequent itemsets.
model.freqItemsets.show()

In [None]:
# Display generated association rules.
dfAssociation = model.associationRules
dfAssociation.show()

If you have watched (and liked) an `antecedent` movie, then we'll recommend the corresponding `consequent` ones

In [None]:
print('If you liked these movies: ', end='')
print(list(dfAssociation.select('antecedent').first()))
print('Then you will like this movie: ', end='')
print(list(dfAssociation.select('consequent').first()))

movieYouLike = list(dfAssociation.select('antecedent').first())
movieToRecommend=list(dfAssociation.select('consequent').first())

### Let's resolve `movieid` to actual names with the `movies` table:

In [None]:
def resolveMovie(movie_id):
    query = 'SELECT title FROM movies WHERE movieid=%s;'
    movie_row = session.execute(query, (movie_id,)).one()
    print('    [movieid=%s] => "%s"' % (movie_id, movie_row.title))

#### If you liked these...

In [None]:
print('Antecedents:')
resolveMovie(movieYouLike[0][0])
resolveMovie(movieYouLike[0][1])

#### Then you will like this movie ... 

In [None]:
print('Consequent:')
resolveMovie(movieToRecommend[0][0])

## Example model usage

_Note: in real life, your input is probably massive (as opposed to a single row); also, it is likely read from the database._

In [None]:
def associate_movie_by_id(input_movie_id):
    consequent_counts = dfAssociation\
        .filter(array_contains(col('antecedent'), input_movie_id))\
        .select('consequent')\
        .rdd\
        .flatMap(lambda x: x)\
        .flatMap(lambda x: x)\
        .groupBy(lambda x: x)\
        .map(lambda xy: (xy[0], len(xy[1])))\
        .collect()
    if len(consequent_counts):
        return sorted(consequent_counts, key=itemgetter(1), reverse=True)[0][0]
    else:
        return None

In [None]:
appreciated_movie = 527
recommended_movie = associate_movie_by_id(input_movie_id=appreciated_movie)

In [None]:
print('Example: user liked this:')
resolveMovie(appreciated_movie)
print('Then we\'ll recomment this:')
resolveMovie(recommended_movie)

#### Stop the Spark session

In [None]:
spark.stop()