# Classification and Clustering Algorithms paired with Wine and Chocolate
------
<img src="images/datastaxlogo.png" width="200" height="200">

#### A demo using DataStax Enterprise Analytics, Apache Cassandra, Apache Spark, Python, Jupyter Notebooks, Twitter tweets, pattern, Kmeans and ...

https://archive.ics.uci.edu/ml/datasets/wine

#### Add some environment variables to find dse verision of pyspark. Edit these varibles with your path.

In [None]:
pysparkzip = "/Users/amanda.moran/dse67/dse-6.7.0/resources/spark/python/lib/pyspark.zip"
py4jzip = "/Users/amanda.moran/dse67/dse-6.7.0/resources/spark/python/lib/py4j-0.10.7-src.zip"

In [None]:
# Needed to be able to find pyspark libaries
import sys
sys.path.append(pysparkzip)
sys.path.append(py4jzip)

#### Import python packages -- all are required
##### Ignore any errors shown

In [1]:
%matplotlib inline
import matplotlib.pyplot as plt

In [2]:
import pandas
import cassandra
import pyspark
import re
import os
from uuid import uuid1
import matplotlib.pyplot as plt
from IPython.display import display, Markdown
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

#### Helper function to have nicer formatting of Spark DataFrames

In [3]:
#Helper for pretty formatting for Spark DataFrames
def showDF(df, limitRows =  5, truncate = True):
    if(truncate):
        pandas.set_option('display.max_colwidth', 50)
    else:
        pandas.set_option('display.max_colwidth', -1)
    pandas.set_option('display.max_rows', limitRows)
    display(df.limit(limitRows).toPandas())
    pandas.reset_option('display.max_rows')

# DataStax Enterprise Analytics
<img src="images/datastaxlogo.png" width="200" height="200">

### Creating Tables and Loading Tables

#### Connect to DSE Analytics Cluster

In [4]:
from cassandra.cluster import Cluster

cluster = Cluster(['127.0.01'])
session = cluster.connect()

#### Create Demo Keyspace 

In [5]:
session.execute("""
    CREATE KEYSPACE IF NOT EXISTS wineChocolate 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

<cassandra.cluster.ResultSet at 0x10946d490>

#### Set keyspace 

In [6]:
session.set_keyspace('winechocolate')

In [12]:
query = "CREATE TABLE IF NOT EXISTS chocolate \
                                   (chocolateid int, company text, bar_location text, ref int, \
                                   review_date int, cocoa_percent float, company_location text, rating float, \
                                   bean_type text, bean_origin text, \
                                   PRIMARY KEY (chocolateid))"
session.execute(query)


<cassandra.cluster.ResultSet at 0x1126a9910>

### Load Wine Dataset
<img src="images/" width="100" height="100">

#### Load Negative Tweets from CSV file

Had to do some data cleaning to load this data. Removed `%`, removed extra commas, removed ``"``

In [13]:
fileName = 'data/chocolateFinal.csv'
input_file = open(fileName, 'r')
i = 1

for line in input_file:
    chocolateId = i 
    columns = line.split(',')
    if columns[7] == "\xc2\xa0":
        columns[7] = ""
    columns[8] = columns[8].rstrip("\r\n")
    
    query = "INSERT INTO chocolate (chocolateid, company, bar_location, ref, review_date, cocoa_percent, \
                                    company_location, rating, bean_type, bean_origin)"
    query = query + " VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
    session.execute(query, (chocolateId, columns[0], columns[1], int(columns[2]), int(columns[3]), 
                    float(columns[4]), columns[5], float(columns[6]), columns[7], columns[8]))
    i = i + 1

#### Do a select * on each table and verify that the data have been inserted into each Apache Cassandra table

In [14]:
query = 'SELECT * FROM chocolate WHERE chocolateid=200'
rows = session.execute(query)
for row in rows:
    print (row.chocolateid, row.bar_location, row.ref, row.review_date, row.cocoa_percent, row.company_location,
          row.rating, row.bean_type, row.bean_origin)

(200, u'Sambiao 2009', 565, 2010, 70.0, u'U.S.A.', 3.0, u'Tiitaio', u'Madagasca')


## DSE Analytics with Apache Spark
<img src="images/sparklogo.png" width="150" height="200">

### Finally time for Apache Spark! 

#### Create a spark session that is connected to Cassandra. From there load each table into a Spark Dataframe and take a count of the number of rows in each.

In [15]:
spark = SparkSession.builder.appName('demo').master("local").getOrCreate()


chocolateTable = spark.read.format("org.apache.spark.sql.cassandra").options(table="chocolate", keyspace="winechocolate").load()

print ("Table Row Count: ")
print (chocolateTable.count())

Table Row Count: 
1795


In [16]:
showDF(chocolateTable)

Unnamed: 0,chocolateid,bar_location,bean_origin,bean_type,cocoa_percent,company,company_location,rating,ref,review_date
0,381,la Amistad,Costa Rica,,70.0,Chequessett,U.S.A.,3.5,1235,2014
1,671,Guatemala,Guatemala,,73.0,Fech Boad,U.S.A.,3.5,1634,2015
2,622,Wampusipi batch 007,Hoduas,,75.0,ENNA,U.S.A.,3.25,1916,2016
3,444,Ocumae,Veezuela,Ciollo,70.0,Compaia de Chocolate (Salgado),Agetia,3.75,292,2008
4,746,Ghaa,Ghaa,Foasteo,64.0,Guido Castaga,Italy,3.0,355,2009


In [17]:
# Split the data into train and test
splits = chocolateTable.randomSplit([0.8, 0.2], 1234)
train = splits[0]
test = splits[1]

print (train.count())
print (test.count())
showDF(train)
showDF(test)

1488
305


Unnamed: 0,chocolateid,bar_location,bean_origin,bean_type,cocoa_percent,company,company_location,rating,ref,review_date
0,14,Equateu,Ecuado,,70.0,A. Moi,Face,3.75,1011,2013
1,17,Papua New Guiea,Papua New Guiea,,70.0,A. Moi,Face,3.25,1015,2013
2,21,Chachamayo Povice,Peu,,63.0,A. Moi,Face,4.0,1019,2013
3,24,Chulucaas El Plataal,Peu,,70.0,Acalli,U.S.A.,3.75,1462,2015
4,25,Tumbes Noadio,Peu,Ciollo,70.0,Acalli,U.S.A.,3.75,1470,2015


Unnamed: 0,chocolateid,bar_location,bean_origin,bean_type,cocoa_percent,company,company_location,rating,ref,review_date
0,1,Agua Gade,Sao Tome,,63.0,A. Moi,Face,3.75,1876,2016
1,2,Kpime,Togo,,70.0,A. Moi,Face,2.75,1676,2015
2,5,Quilla,Peu,,70.0,A. Moi,Face,3.5,1704,2015
3,42,La Dalia Matagalpa,"Tiitaio""","""Ciollo",70.0,Alexade,Nethelads,3.5,1944,2017
4,72,Domiica Republic,Domiica Republic,,75.0,Ambosia,Caada,3.25,1498,2015


In [18]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [63]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer

# Convert target into numerical categories

labelIndexer = StringIndexer(inputCol="bean_origin", outputCol="origin", handleInvalid='skip')
training1 = labelIndexer.fit(train).transform(train)

labelIndexer2 = StringIndexer(inputCol="company_location", outputCol="label", handleInvalid='skip')
training2 = labelIndexer2.fit(training1).transform(training1)


assembler = VectorAssembler(
    inputCols=['cocoa_percent', 'rating', 'origin', 'review_date'],
    outputCol='features')

trainingData = assembler.transform(training2)
showDF(trainingData)



labelIndexer2 = StringIndexer(inputCol="bean_origin", outputCol="origin", handleInvalid='skip')
testing1 = labelIndexer2.fit(test).transform(test)

labelIndexer4 = StringIndexer(inputCol="company_location", outputCol="label", handleInvalid='skip')
testing2 = labelIndexer4.fit(testing1).transform(testing1)

assembler1 = VectorAssembler(
    inputCols=['cocoa_percent', 'rating', 'origin', 'review_date'],
    outputCol='features')

testingData = assembler1.transform(testing2)

showDF(testingData)

Unnamed: 0,chocolateid,bar_location,bean_origin,bean_type,cocoa_percent,company,company_location,rating,ref,review_date,origin,label,features
0,14,Equateu,Ecuado,,70.0,A. Moi,Face,3.75,1011,2013,1.0,1.0,"[70.0, 3.75, 1.0, 2013.0]"
1,17,Papua New Guiea,Papua New Guiea,,70.0,A. Moi,Face,3.25,1015,2013,10.0,1.0,"[70.0, 3.25, 10.0, 2013.0]"
2,21,Chachamayo Povice,Peu,,63.0,A. Moi,Face,4.0,1019,2013,2.0,1.0,"[63.0, 4.0, 2.0, 2013.0]"
3,24,Chulucaas El Plataal,Peu,,70.0,Acalli,U.S.A.,3.75,1462,2015,2.0,0.0,"[70.0, 3.75, 2.0, 2015.0]"
4,25,Tumbes Noadio,Peu,Ciollo,70.0,Acalli,U.S.A.,3.75,1470,2015,2.0,0.0,"[70.0, 3.75, 2.0, 2015.0]"


Unnamed: 0,chocolateid,bar_location,bean_origin,bean_type,cocoa_percent,company,company_location,rating,ref,review_date,origin,label,features
0,3,Atsae,Togo,,70.0,A. Moi,Face,3.0,1676,2015,26.0,1.0,"[70.0, 3.0, 26.0, 2015.0]"
1,9,Pueto Cabello,Veezuela,Ciollo,70.0,A. Moi,Face,3.75,1319,2014,1.0,1.0,"[70.0, 3.75, 1.0, 2014.0]"
2,12,Madagasca,Madagasca,Ciollo,70.0,A. Moi,Face,3.0,1011,2013,4.0,1.0,"[70.0, 3.0, 4.0, 2013.0]"
3,35,Mote Alege D. Badeo,Bazil,Foasteo,75.0,Akesso's (Palus),Switzelad,2.75,508,2010,5.0,13.0,"[75.0, 2.75, 5.0, 2010.0]"
4,87,Toscao Black,,Bled,70.0,Amedei,Italy,5.0,40,2006,7.0,4.0,"[70.0, 5.0, 7.0, 2006.0]"


In [70]:
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# train the model
model = nb.fit(trainingData)

# select example rows to display.
predictions = model.transform(testingData)
#predictions.show()
showDF(predictions)
# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

Unnamed: 0,chocolateid,bar_location,bean_origin,bean_type,cocoa_percent,company,company_location,rating,ref,review_date,origin,label,features,rawPrediction,probability,prediction
0,3,Atsae,Togo,,70.0,A. Moi,Face,3.0,1676,2015,26.0,1.0,"[70.0, 3.0, 26.0, 2015.0]","[-478.35419420878793, -474.50585347327467, -47...","[0.004063082448542703, 0.19062013864404476, 0....",11.0
1,9,Pueto Cabello,Veezuela,Ciollo,70.0,A. Moi,Face,3.75,1319,2014,1.0,1.0,"[70.0, 3.75, 1.0, 2014.0]","[-350.39808569729865, -356.8494173211063, -355...","[0.00761993627941294, 1.2027451504795257e-05, ...",5.0
2,12,Madagasca,Madagasca,Ciollo,70.0,A. Moi,Face,3.0,1011,2013,4.0,1.0,"[70.0, 3.0, 4.0, 2013.0]","[-361.4174219171088, -366.6553594534377, -365....","[0.30812929703668246, 0.0016365362311522944, 0...",0.0
3,35,Mote Alege D. Badeo,Bazil,Foasteo,75.0,Akesso's (Palus),Switzelad,2.75,508,2010,5.0,13.0,"[75.0, 2.75, 5.0, 2010.0]","[-381.8766695418385, -386.70156727678807, -385...","[0.4667591264216455, 0.003746850424977749, 0.0...",0.0
4,87,Toscao Black,,Bled,70.0,Amedei,Italy,5.0,40,2006,7.0,4.0,"[70.0, 5.0, 7.0, 2006.0]","[-390.059789921235, -393.9614310878765, -393.2...","[0.5885230233564345, 0.011893296016284666, 0.0...",0.0


Test set accuracy = 0.194539249147


In [71]:
showDF(predictions.select("company_location", "label", "prediction", "probability"))

Unnamed: 0,company_location,label,prediction,probability
0,Face,1.0,1.0,"[0.034617599202334, 0.3186684242623727, 0.0996..."
1,Face,1.0,11.0,"[0.004103930302033418, 0.19067863616832403, 0...."
2,Face,1.0,0.0,"[0.14105077557925086, 0.0005025311749787721, 0..."
3,Nethelads,22.0,0.0,"[0.6296588136936752, 0.017616575500404304, 0.0..."
4,Caada,2.0,5.0,"[0.040456360766822706, 9.455322516524282e-05, ..."


In [None]:
session.execute("""drop table chocolate""")