# Demo 5: Collaborative Filtering and Comedy! 
------
<img src="images/seinfeld.jpg" width="400" height="400">

#### Real Dataset: http://eigentaste.berkeley.edu/dataset/ Dataset 2 
#### Rate Jokes: http://eigentaste.berkeley.edu

## What are we trying to learn from this dataset?

# QUESTION:  Can Collaborative Filtering be used to find which jokes to recommend to our users?


In [1]:
%matplotlib inline
import matplotlib.pyplot as plt

In [2]:
import pandas
import cassandra
import pyspark
import re
import os
import matplotlib.pyplot as plt
from IPython.display import IFrame
from IPython.display import display, Markdown
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

#### Helper function to have nicer formatting of Spark DataFrames

In [3]:
#Helper for pretty formatting for Spark DataFrames
def showDF(df, limitRows =  10, truncate = False):
    if(truncate):
        pandas.set_option('display.max_colwidth', 100)
    else:
        pandas.set_option('display.max_colwidth', -1)
    pandas.set_option('display.max_rows', limitRows)
    display(df.limit(limitRows).toPandas())
    pandas.reset_option('display.max_rows')

# DataStax Enterprise Analytics
<img src="images/dselogo.png" width="400" height="200">

## Creating Tables and Loading Tables

### Connect to DSE Analytics Cluster

In [4]:
from cassandra.cluster import Cluster

cluster = Cluster(['127.0.01'])
session = cluster.connect()

### Create Demo Keyspace 

In [5]:
session.execute("""
    CREATE KEYSPACE IF NOT EXISTS accelerate 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

<cassandra.cluster.ResultSet at 0x1152541d0>

### Set keyspace 

In [6]:
session.set_keyspace('accelerate')

### Create table called jokes. Our PRIMARY will need to be a unique composite key (userid, jokeid). This will result in an even distribution of the data and allow for each row to be unique. Remember we will have to utilize that PRIMARY KEY in our WHERE clause in any of our CQL queries. 

In [7]:
query = "CREATE TABLE IF NOT EXISTS jokes \
                                    (userid int, jokeid int, rating float, \
                                     PRIMARY KEY (userid, jokeid))"
session.execute(query)

<cassandra.cluster.ResultSet at 0x115254cf8>

### What do these of these 3 columns represent: 

* **Column 1**: User id
* **Column 2**: Joke id
* **Column 3**: Rating of joke (-10.00 - 10.00) 

### Load Jokes dataset from CSV file (jester_ratings3.csv)
* This is a file I created from the *.dat file and I only have 10,000 rows -- dataset has over 1 million rows
<img src="images/laughing.gif" width="300" height="300">

#### Insert all the Joke Rating Data into the DSE table `jokes`

In [8]:
fileName = 'data/jester_ratings3.csv'
input_file = open(fileName, 'r')

for line in input_file:
    jokeRow = line.split(',')
    query = "INSERT INTO jokes (userid, jokeid, rating)"
    
    query = query + "VALUES (%s, %s, %s)"
    
    session.execute(query, (int(jokeRow[0]), int(jokeRow[1]) , float(jokeRow[2]) ))

#### Do a select * on joke_table WHERE userid = x to verify that data was loaded into the table

In [9]:
query = 'SELECT * FROM jokes WHERE userid = 100'
rows = session.execute(query)
for row in rows:
    print (row.userid, row.jokeid, row.rating)

100 5 -0.875
100 7 9.906000137329102
100 8 -0.843999981880188
100 13 8.937999725341797
100 15 -0.968999981880188
100 16 -9.75
100 17 9.593999862670898


## Machine Learning with DSE Analytics and Apache Spark
<img src="images/sparklogo.png" width="150" height="200">

### Finally time for Apache Spark! 

#### Create a spark session that is connected to DSE. From there load each table into a Spark Dataframe and take a count of the number of rows in each.

In [10]:
spark = SparkSession.builder.appName('demo').master("local").getOrCreate()

jokeTable = spark.read.format("org.apache.spark.sql.cassandra").options(table="jokes", keyspace="accelerate").load()

print ("Table Row Count: ")
print (jokeTable.count())

Table Row Count: 
10000


#### CFilter with PySpark requires that the ratings not be double/foat but int

In [11]:
#joke_df = jokeTable.withColumn("rating", jokeTable.rating.cast('int'))

#### Split dataset into training and testing set 

In [12]:
(training, test) = jokeTable.randomSplit([0.8, 0.2])

training_df = training.withColumn("rating", training.rating.cast('int'))
testing_df = test.withColumn("rating", test.rating.cast('int'))

showDF(training_df)

Unnamed: 0,userid,jokeid,rating
0,1,5,0
1,1,7,-9
2,1,8,-9
3,1,13,-6
4,1,15,0
5,1,17,-9
6,1,18,-7
7,1,19,-8
8,1,20,-9
9,1,21,-7


### Setup for CFliter with ALS

https://spark.apache.org/docs/latest/ml-collaborative-filtering.html

In [13]:
als = ALS(maxIter=5, regParam=0.01, userCol="userid", itemCol="jokeid", ratingCol="rating",
          coldStartStrategy="drop")

model = als.fit(training_df)

In [14]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(testing_df)

# Generate top 10 joke recommendations for each user
userRecs = model.recommendForAllUsers(10)

showDF(userRecs)

# Generate top 10 user recommendations for each joke
jokeRecs = model.recommendForAllItems(10)

Unnamed: 0,userid,recommendations
0,148,"[(44, 34.13772964477539), (51, 25.700042724609375), (125, 24.875423431396484), (146, 24.236581802368164), (69, 23.100719451904297), (143, 23.069561004638672), (119, 22.743892669677734), (65, 21.494281768798828), (138, 20.9459228515625), (26, 20.843549728393555)]"
1,243,"[(98, 16.153411865234375), (79, 15.212631225585938), (101, 13.930389404296875), (57, 12.906051635742188), (58, 12.793245315551758), (24, 9.772829055786133), (85, 9.358484268188477), (13, 8.649678230285645), (23, 7.858341693878174), (37, 7.614729404449463)]"
2,251,"[(55, 4.414163112640381), (52, 4.053103446960449), (124, 3.2235026359558105), (133, 1.8868601322174072), (141, 1.1414566040039062), (120, 0.8265798687934875), (122, 0.6061234474182129), (44, 0.4783670902252197), (114, 0.1717315912246704), (82, -0.7522410750389099)]"
3,85,"[(40, 93.38774108886719), (48, 70.61991882324219), (90, 61.215755462646484), (86, 56.27168655395508), (114, 55.27031326293945), (43, 53.20564651489258), (60, 47.55717468261719), (100, 42.91171646118164), (33, 42.73453140258789), (104, 42.418277740478516)]"
4,137,"[(37, 11.013225555419922), (100, 9.195761680603027), (15, 9.175329208374023), (7, 8.999856948852539), (8, 8.987367630004883), (17, 8.032954216003418), (78, 7.779767036437988), (20, 7.432913780212402), (57, 7.157354354858398), (13, 7.101101875305176)]"
5,65,"[(94, 13.413860321044922), (82, 11.760915756225586), (80, 11.44350528717041), (116, 11.285964012145996), (63, 11.251116752624512), (26, 11.184839248657227), (99, 11.051153182983398), (43, 10.730409622192383), (75, 10.707722663879395), (92, 10.706671714782715)]"
6,53,"[(114, 13.702998161315918), (116, 13.296106338500977), (117, 12.910265922546387), (80, 11.056073188781738), (105, 9.647844314575195), (148, 9.136292457580566), (91, 8.573720932006836), (60, 8.479909896850586), (55, 8.326761245727539), (52, 8.317741394042969)]"
7,133,"[(35, 8.389063835144043), (94, 8.131109237670898), (32, 8.074833869934082), (121, 7.820661544799805), (53, 7.75675106048584), (89, 7.586538314819336), (119, 6.810390949249268), (46, 6.713859558105469), (69, 6.705226421356201), (81, 6.654937744140625)]"
8,155,"[(46, 43.723426818847656), (35, 38.590579986572266), (119, 37.78889846801758), (53, 37.23609161376953), (26, 36.63851547241211), (69, 36.21256637573242), (34, 35.53752899169922), (21, 34.712867736816406), (32, 33.222694396972656), (125, 31.87568473815918)]"
9,108,"[(75, 37.769920349121094), (40, 33.574073791503906), (102, 28.94316864013672), (114, 26.217697143554688), (43, 25.416234970092773), (124, 25.390714645385742), (100, 24.526344299316406), (48, 24.486526489257812), (81, 23.37546157836914), (86, 23.240493774414062)]"


In [15]:
showDF(userRecs.filter(userRecs.userid == 65))

Unnamed: 0,userid,recommendations
0,65,"[(94, 13.413860321044922), (82, 11.760915756225586), (80, 11.44350528717041), (116, 11.285964012145996), (63, 11.251116752624512), (26, 11.184839248657227), (99, 11.051153182983398), (43, 10.730409622192383), (75, 10.707722663879395), (92, 10.706671714782715)]"


In [16]:
IFrame(src='images/init94.html', width=700, height=200)

In [17]:
IFrame(src='images/init43.html', width=700, height=200)

In [18]:
session.execute("""drop table jokes""")

<cassandra.cluster.ResultSet at 0x11537e160>