### CFilter Demo: What kind of Halloween Candy to Recommend

<img src="images/happyHalloween.jpg" width="250" height="250">

#### A Halloween Demo using DataStax Enterprise Analytics, Apache Cassandra, Apache Spark, Python, Jupyter Notebooks and CFilter

#### Add some environment variables to find dse verision of pyspark. Edit these varibles with your path.

In [29]:
pysparkzip = "/opt/dse/resources/spark/python/lib/pyspark.zip"
py4jzip = "/opt/dse/resources/spark/python/lib/py4j-0.10.4-src.zip"

In [30]:
# Needed to be able to find pyspark libaries
import sys
sys.path.append(pysparkzip)
sys.path.append(py4jzip)

#### Import python packages -- all are required
##### Ignore any errors shown

In [31]:
import pandas
import cassandra
import pyspark
import re
import os
from IPython.display import display, Markdown
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

#### Helper function to have nicer formatting of Spark DataFrames

In [32]:
#Helper for pretty formatting for Spark DataFrames
def showDF(df, limitRows =  10, truncate = True):
    if(truncate):
        pandas.set_option('display.max_colwidth', 100)
    else:
        pandas.set_option('display.max_colwidth', -1)
    pandas.set_option('display.max_rows', limitRows)
    display(df.limit(limitRows).toPandas())
    pandas.reset_option('display.max_rows')

### Creating Tables and Loading Tweets

#### Connect to DSE Analytics Cluster

In [33]:
from cassandra.cluster import Cluster

cluster = Cluster(['dse'])
session = cluster.connect()

#### Create Demo Keyspace 

In [34]:
session.execute("""
    CREATE KEYSPACE IF NOT EXISTS halloween 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

<cassandra.cluster.ResultSet at 0x7fbf5e6f4590>

#### Set keyspace 

In [35]:
session.set_keyspace('halloween')

In [43]:
query = "CREATE TABLE IF NOT EXISTS candy_ranking_train (key1 int, userid int, candyid int, ranking int, PRIMARY KEY (key1))"
print query
session.execute(query)

CREATE TABLE IF NOT EXISTS candy_ranking_train (key1 int, userid int, candyid int, ranking int, PRIMARY KEY (key1))


<cassandra.cluster.ResultSet at 0x7fbf64146410>

In [44]:
query = "CREATE TABLE IF NOT EXISTS candy_ranking_test (key1 int, userid int, candyid int, ranking int, PRIMARY KEY (key1))"
print query
session.execute(query)

CREATE TABLE IF NOT EXISTS candy_ranking_test (key1 int, userid int, candyid int, ranking int, PRIMARY KEY (key1))


<cassandra.cluster.ResultSet at 0x7fbf5e67e5d0>

In [45]:
query = "CREATE TABLE IF NOT EXISTS candy (candyid int, name text, PRIMARY KEY (candyid))"
print query
session.execute(query)


CREATE TABLE IF NOT EXISTS candy (candyid int, name text, PRIMARY KEY (candyid))


<cassandra.cluster.ResultSet at 0x7fbf5e5ea190>

#### Load Candy Names from CSV File

In [46]:
fileName = 'data/candy.csv'
input_file = open(fileName, 'r')
for line in input_file:
    columns = line.split(',')
    query = "INSERT INTO candy (candyid, name)"
    query = query + " VALUES (%s, %s)"
    session.execute(query, (int(columns[0]), columns[1]))

#### Load Candy Ranking Training from CSV file

In [49]:
fileName = 'data/candy_rating.csv'
input_file = open(fileName, 'r')
for line in input_file:
    columns = line.split(',')
    query = "INSERT INTO candy_ranking_train (key1, userid, candyid, ranking)"
    query = query + " VALUES (%s, %s, %s, %s)"
    print query
    print int(columns[0])
    session.execute(query, (int(columns[0]), int(columns[1]), int(columns[2]), int(columns[3])))

INSERT INTO candy_ranking_train (key1, userid, candyid, ranking) VALUES (%s, %s, %s, %s)
1


InvalidRequest: Error from server: code=2200 [Invalid query] message="Undefined column name key1"

#### Load Candy Ranking Test from CSV file

In [22]:
fileName = 'data/candy_rating_testing.csv'
input_file = open(fileName, 'r')
for line in input_file:
    columns = line.split(',')
    query = "INSERT INTO candy_ranking_test (key1, userid, candyid, ranking)"
    query = query + " VALUES (%s, %s, %s, %s)"
    session.execute(query, (int(columns[0]), int(columns[1]), int(columns[2]), int(columns[3])))

InvalidRequest: Error from server: code=2200 [Invalid query] message="Undefined column name key"

#### Do a select * on the Candy table and verify that the data have been inserted into each Cassandra table

In [23]:
query = 'SELECT * FROM candy limit 10'
rows = session.execute(query)
for user_row in rows:
    print (user_row.name)

 Laffy Taffy

 Almond Joy

 Starbust

 Dum Dums

 Junior Mints

 KitKat

 Milky Way

 Snickers



In [24]:
query = 'SELECT * FROM candy_ranking_train'
rows = session.execute(query)
for user_row in rows:
    print (user_row.userid, user_row.candyid, user_row.ranking)

(10, 5, 5)
(11, 3, 5)
(1, 8, 1)
(2, 2, 1)
(12, 7, 5)
(3, 8, 5)


#### Create a spark session that is connected to Cassandra. From there load each table into a Spark Dataframe and take a count of the number of rows in each.

In [25]:
#countTokens = udf(lambda words: len(words), IntegerType())

spark = SparkSession.builder.appName('demo').master("dse://dse:9042").getOrCreate()


tableTrain = spark.read.format("org.apache.spark.sql.cassandra").options(table="candy_ranking_train", keyspace="halloween").load()
tableTest = spark.read.format("org.apache.spark.sql.cassandra").options(table="candy_ranking_test", keyspace="halloween").load()
tableCandy = spark.read.format("org.apache.spark.sql.cassandra").options(table="candy", keyspace="halloween").load()

print "Training DataSet: "
print tableTrain.count()
print "Number of Unique Candies: "
print tableCandy.count()


Training DataSet: 
6
Number of Unique Candies: 
8


In [None]:
als = ALS(maxIter=5, regParam=0.01, userCol="userid", itemCol="candyid", ratingCol="ranking",
          coldStartStrategy="drop")
model = als.fit(tableTrain)
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(tableTest)
showDF(predictions)

evaluator = RegressionEvaluator(metricName="rmse", labelCol="ranking",
                               predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(3)
candyRecs = model.recommendForAllItems(5)

showDF(userRecs)
showDF(candyRecs)

<img src="images/trickOrTreat.jpg" width="250" height="250">

In [42]:
session.execute("drop table candy_ranking_test")
session.execute("drop table candy_ranking_train")

InvalidRequest: Error from server: code=2200 [Invalid query] message="table candy_ranking_test does not exist"