# Collaborative Filtering and Comedy! 
------
<img src="images/seinfeld.jpg" width="400" height="400">

## A demo using DataStax Enterprise Analytics, Apache Cassandra, Apache Spark, Python, Jupyter Notebooks, Spark MlLib, and KMeans 

#### Real Dataset: http://eigentaste.berkeley.edu/dataset/

## Import python packages -- all are required
* Need to tell Jupyter to display with %matplotlib otherwise you will generate the plot but not display it

In [2]:
%matplotlib inline
import matplotlib.pyplot as plt

In [3]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.datastax.spark:spark-cassandra-connector_2.11:2.3.2 pyspark-shell'

In [68]:
import pandas
import cassandra
import pyspark
import re
import os
import matplotlib.pyplot as plt
from IPython.display import IFrame
from IPython.display import display, Markdown
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

#### Helper function to have nicer formatting of Spark DataFrames

In [47]:
#Helper for pretty formatting for Spark DataFrames
def showDF(df, limitRows =  10, truncate = False):
    if(truncate):
        pandas.set_option('display.max_colwidth', 100)
    else:
        pandas.set_option('display.max_colwidth', -1)
    pandas.set_option('display.max_rows', limitRows)
    display(df.limit(limitRows).toPandas())
    pandas.reset_option('display.max_rows')

# Apache Cassandra 
<img src="images/cassandralogo.png" width="200" height="200">

## Creating Tables and Loading Tables

## Connect to Apache Cassandra Local Instance

In [6]:
from cassandra.cluster import Cluster

cluster = Cluster(['127.0.01'])
session = cluster.connect()

### Create Demo Keyspace 

In [7]:
session.execute("""
    CREATE KEYSPACE IF NOT EXISTS jokes 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

<cassandra.cluster.ResultSet at 0x120416ba8>

### Set keyspace 

In [8]:
session.set_keyspace('jokes')

### Create table called wine. Our PRIMARY will be a unique key (userid) we generate for each row. This will result in an even distribution of the data but we will have to utilize that PRIMARY KEY in our WHERE clause in any of our CQL queries. 

In [56]:
query = "CREATE TABLE IF NOT EXISTS jokes_table1 \
                                    (userid int, jokeid int, rating float, \
                                     PRIMARY KEY (userid, jokeid))"
session.execute(query)

<cassandra.cluster.ResultSet at 0x12124d400>

### Load Joke Dataset
<img src="images/laughing.gif" width="300" height="300">

### Load Jokes dataset from CSV file (jester-data-1.csv)
* No clean up was requried! How nice :)

#### Insert all the Joke Rating Data into the Apache Cassandra table `jokes_table`

In [57]:
fileName = 'data/jester_ratings3.csv'
input_file = open(fileName, 'r')

for line in input_file:
    jokeRow = line.split(',')
    query = "INSERT INTO jokes_table1 (userid, jokeid, rating)"
    
    query = query + "VALUES (%s, %s, %s)"
    
    session.execute(query, (int(jokeRow[0]), int(jokeRow[1]) , float(jokeRow[2]) ))

#### Do a select * on joke_table WHERE wineid = x to verify that data was loaded into the table

In [70]:
query = 'SELECT * FROM jokes_table1 WHERE userid = 100'
rows = session.execute(query)
for row in rows:
    print (row.userid, row.jokeid, row.rating)

100 5 -0.875
100 7 9.906000137329102
100 8 -0.843999981880188
100 13 8.937999725341797
100 15 -0.968999981880188
100 16 -9.75
100 17 9.593999862670898


## Machine Learning with Apache Spark
<img src="images/sparklogo.png" width="150" height="200">

### Finally time for Apache Spark! 

#### Create a spark session that is connected to Cassandra. From there load each table into a Spark Dataframe and take a count of the number of rows in each.

In [59]:
spark = SparkSession.builder.appName('demo').master("local").getOrCreate()

jokeTable = spark.read.format("org.apache.spark.sql.cassandra").options(table="jokes_table1", keyspace="jokes").load()

print ("Table Row Count: ")
print (jokeTable.count())

Table Row Count: 
10000


In [60]:
showDF(jokeTable)
joke_df = jokeTable.withColumn("rating", test.rating.cast('int'))
showDF(joke_df)

Unnamed: 0,userid,jokeid,rating
0,218,5,9.531
1,218,7,-0.594
2,218,8,9.875
3,4,5,-5.812
4,4,7,-4.5
5,4,8,-4.906
6,18,5,-0.438
7,18,7,-7.344
8,18,8,2.375
9,18,13,-2.281


AnalysisException: 'Resolved attribute(s) rating#2 missing from userid#9271,jokeid#9272,rating#9273 in operator !Project [userid#9271, jokeid#9272, cast(rating#2 as int) AS rating#9285]. Attribute(s) with the same name appear in the operation: rating. Please check if the right attribute(s) are used.;;\n!Project [userid#9271, jokeid#9272, cast(rating#2 as int) AS rating#9285]\n+- Relation[userid#9271,jokeid#9272,rating#9273] org.apache.spark.sql.cassandra.CassandraSourceRelation@4a0c411b\n'

In [61]:
(training, test) = jokeTable.randomSplit([0.8, 0.2])

training_df = training.withColumn("rating", training.rating.cast('int'))
testing_df = test.withColumn("rating", test.rating.cast('int'))

showDF(training_df)

Unnamed: 0,userid,jokeid,rating
0,4,5,-5
1,4,7,-4
2,4,8,-4
3,18,5,0
4,18,7,-7
5,18,8,2
6,18,13,-2
7,18,15,-7
8,18,16,-4
9,18,19,-3


In [62]:
als = ALS(maxIter=5, regParam=0.01, userCol="userid", itemCol="jokeid", ratingCol="rating",
          coldStartStrategy="drop")

model = als.fit(training_df)

In [63]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(testing_df)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Generate top 10 joke recommendations for each user
userRecs = model.recommendForAllUsers(10)

showDF(userRecs)

# Generate top 10 user recommendations for each joke
jokeRecs = model.recommendForAllItems(10)

showDF(jokeRecs)


Root-mean-square error = 6.265703145303816


Unnamed: 0,userid,recommendations
0,148,"[(75, 30.87855339050293), (66, 28.134014129638672), (64, 25.760520935058594), (83, 23.183927536010742), (40, 21.544038772583008), (98, 20.769433975219727), (31, 20.074989318847656), (74, 19.116016387939453), (69, 18.117467880249023), (35, 18.11456298828125)]"
1,243,"[(33, 17.40928840637207), (87, 15.353056907653809), (125, 12.780736923217773), (122, 11.882739067077637), (109, 10.796052932739258), (46, 9.057601928710938), (139, 8.624897956848145), (44, 8.41999340057373), (51, 8.065143585205078), (56, 7.839664459228516)]"
2,251,"[(139, 7.4646196365356445), (97, 7.374938488006592), (123, 4.722718238830566), (131, 2.6079823970794678), (80, 2.1647560596466064), (133, 1.74586820602417), (135, 1.6211751699447632), (108, 1.6160073280334473), (28, 1.462329387664795), (114, 1.345022201538086)]"
3,85,"[(53, 23.444725036621094), (83, 20.632675170898438), (69, 18.544334411621094), (138, 17.63379669189453), (81, 17.346214294433594), (21, 16.988012313842773), (52, 16.81336212158203), (66, 16.52860450744629), (127, 15.276453018188477), (129, 14.083913803100586)]"
4,137,"[(141, 21.817230224609375), (44, 17.85183334350586), (124, 17.578853607177734), (143, 15.591338157653809), (103, 14.879071235656738), (73, 14.256434440612793), (33, 13.75822925567627), (43, 13.343883514404297), (117, 11.411349296569824), (79, 10.856827735900879)]"
5,65,"[(110, 12.940678596496582), (43, 12.660163879394531), (73, 12.247119903564453), (80, 12.136799812316895), (38, 11.39245891571045), (33, 11.327520370483398), (116, 11.204057693481445), (68, 11.094342231750488), (90, 11.076092720031738), (94, 11.067971229553223)]"
6,53,"[(71, 14.830660820007324), (114, 13.391698837280273), (113, 11.727563858032227), (56, 11.286611557006836), (119, 10.457679748535156), (46, 10.309399604797363), (122, 9.782358169555664), (66, 8.979514122009277), (104, 8.54745101928711), (120, 8.322032928466797)]"
7,133,"[(75, 10.07713794708252), (109, 9.776427268981934), (119, 9.64278507232666), (34, 9.45654582977295), (83, 8.600651741027832), (69, 8.579179763793945), (105, 8.55909538269043), (66, 8.420408248901367), (26, 8.33706283569336), (46, 8.160682678222656)]"
8,155,"[(80, 61.71413040161133), (110, 53.4033203125), (108, 53.13975524902344), (106, 48.280765533447266), (118, 47.96070098876953), (82, 46.35362243652344), (114, 46.20625686645508), (71, 45.10487747192383), (94, 44.62059020996094), (97, 44.161651611328125)]"
9,108,"[(21, 17.170969009399414), (102, 17.048480987548828), (75, 16.843189239501953), (81, 15.443465232849121), (57, 15.087509155273438), (83, 13.270089149475098), (78, 12.704123497009277), (39, 12.266119003295898), (108, 12.039423942565918), (69, 11.403729438781738)]"


Unnamed: 0,jokeid,recommendations
0,148,"[(258, 17.321176528930664), (136, 14.053995132446289), (61, 13.78636646270752), (28, 12.587363243103027), (253, 11.544229507446289), (93, 10.45460319519043), (27, 10.28254222869873), (9, 9.937729835510254), (74, 9.913469314575195), (230, 9.762044906616211)]"
1,31,"[(81, 34.0018424987793), (155, 32.526702880859375), (250, 22.058740615844727), (166, 21.514419555664062), (148, 20.074989318847656), (115, 19.588237762451172), (12, 19.07025718688965), (240, 18.76581382751465), (117, 18.634449005126953), (184, 16.273033142089844)]"
2,85,"[(26, 27.821496963500977), (9, 26.189910888671875), (27, 20.23271369934082), (237, 18.704910278320312), (185, 18.283824920654297), (138, 17.802446365356445), (204, 15.296958923339844), (94, 14.987640380859375), (84, 14.215047836303711), (136, 14.041051864624023)]"
3,137,"[(155, 15.769050598144531), (94, 14.585921287536621), (148, 14.218659400939941), (117, 13.419683456420898), (240, 12.67559814453125), (27, 12.63527774810791), (136, 12.074342727661133), (205, 11.025575637817383), (81, 10.977522850036621), (62, 10.69485855102539)]"
4,65,"[(81, 56.19865417480469), (155, 30.93911361694336), (136, 28.255250930786133), (253, 25.895462036132812), (166, 21.785945892333984), (205, 21.427505493164062), (66, 20.623952865600586), (165, 19.239398956298828), (184, 18.62884521484375), (240, 17.548694610595703)]"
5,53,"[(12, 33.32024002075195), (175, 33.315773010253906), (107, 31.784751892089844), (81, 30.855751037597656), (205, 28.30265235900879), (253, 26.578123092651367), (85, 23.444725036621094), (203, 20.60529136657715), (42, 18.37078094482422), (258, 18.184783935546875)]"
6,133,"[(81, 43.42300033569336), (166, 33.25961685180664), (155, 32.514102935791016), (123, 30.02727508544922), (12, 26.05786895751953), (250, 25.77063751220703), (115, 23.643600463867188), (207, 19.578575134277344), (148, 17.99712562561035), (226, 17.417619705200195)]"
7,78,"[(155, 41.89334487915039), (3, 30.635631561279297), (166, 29.17324447631836), (258, 24.76178741455078), (158, 24.46875762939453), (136, 23.4233341217041), (81, 19.61318588256836), (172, 19.349361419677734), (177, 17.342662811279297), (123, 16.567764282226562)]"
8,108,"[(155, 53.13975524902344), (258, 49.0959358215332), (136, 42.85388946533203), (3, 33.08396530151367), (166, 30.225933074951172), (158, 30.181238174438477), (81, 29.519527435302734), (253, 27.82582664489746), (165, 23.67397117614746), (84, 21.676437377929688)]"
9,34,"[(123, 37.14117431640625), (81, 32.54072952270508), (12, 32.25951385498047), (115, 31.250389099121094), (155, 29.890296936035156), (177, 27.495187759399414), (253, 25.72541046142578), (107, 24.079378128051758), (166, 21.463478088378906), (187, 19.782772064208984)]"


In [64]:
showDF(userRecs.filter(userRecs.userid == 65))

Unnamed: 0,userid,recommendations
0,65,"[(110, 12.940678596496582), (43, 12.660163879394531), (73, 12.247119903564453), (80, 12.136799812316895), (38, 11.39245891571045), (33, 11.327520370483398), (116, 11.204057693481445), (68, 11.094342231750488), (90, 11.076092720031738), (94, 11.067971229553223)]"


In [67]:
IFrame(src='images/init94.html', width=700, height=200)

In [69]:
IFrame(src='images/init43.html', width=700, height=200)

In [55]:
session.execute("""drop table jokes_table1""")

<cassandra.cluster.ResultSet at 0x121045160>