# Demo Data Pre-processing

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt

In [None]:
import pandas as pd
import cassandra
import pyspark
import re
import os
import random
from random import randint, randrange
import matplotlib.pyplot as plt
from IPython.display import display, Markdown
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.clustering import KMeans
import seaborn as sns
from pyspark.ml.stat import Correlation
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.ml.feature import PCA, Imputer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col, asc
from pyspark.sql.functions import isnan
from pyspark.sql.functions import udf
from pyspark.sql import functions as F

#### Helper function to have nicer formatting of Spark DataFrames

In [None]:
#Helper for pretty formatting for Spark DataFrames
def showDF(df, limitRows =  5, truncate = True):
    if(truncate):
        pd.set_option('display.max_colwidth', 50)
    else:
        pd.set_option('display.max_colwidth', -1)
    pd.set_option('display.max_rows', limitRows)
    display(df.limit(limitRows).toPandas())
    pd.reset_option('display.max_rows')

In [None]:
def correlation_matrix(df, corr_columns, method='pearson'):
    vector_col = "corr_features"
    assembler = VectorAssembler(inputCols=corr_columns, outputCol=vector_col)
    df_vector = assembler.transform(df).select(vector_col)
    matrix = Correlation.corr(df_vector, vector_col, method)

    result = matrix.collect()[0]["pearson({})".format(vector_col)].values
    return pd.DataFrame(result.reshape(-1, len(corr_columns)), columns=corr_columns, index=corr_columns)

<img src="images/dselogo.png" width="400" height="200">

## Creating Tables and Loading Tables

### Connect to Cassandra

In [None]:
from cassandra.cluster import Cluster

cluster = Cluster(['dse'])
session = cluster.connect()

### Create Demo Keyspace 

In [None]:
session.execute("""
    CREATE KEYSPACE IF NOT EXISTS accelerate 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

### Set keyspace 

In [None]:
session.set_keyspace('accelerate')

### Create table called `socialMedia`. Our PRIMARY will be a unique key (status_id) we generate for each row. 

In [None]:
query = "CREATE TABLE IF NOT EXISTS iris \
                                   (Id int, SepalLengthCm float, SepalWidthCm float, \
                                   PetalLengthCm float, PetalWidthCm float, Species text, \
                                   PRIMARY KEY (Id))"
session.execute(query)

### What do these columns represent: 




### Load dataset from CSV file (socialMedia.csv)

#### Insert all the Data into the Apache Cassandra table `iris`

In [None]:
fileName = 'data/datasets_19_420_Iris.csv'
input_file = open(fileName, 'r')
next(input_file)
for line in input_file:
    row = line.split(',')

    query = "INSERT INTO iris (Id, SepalLengthCm, SepalWidthCm, PetalLengthCm, PetalWidthCm, Species)"
    query = query + " VALUES (%s, %s, %s, %s, %s, %s)"
    session.execute(query, (int(row[0]), float(row[1]), float(row[2]), float(row[3]), float(row[4]), str(row[5])))
    

## Loading with Apache Spark
<img src="images/sparklogo.png" width="150" height="200">

In [None]:
spark = SparkSession.builder.appName('demo').master("local").getOrCreate()


irisDF = spark.read.format("org.apache.spark.sql.cassandra").options(table="iris", keyspace="accelerate").load()

print ("Table Row Count: ")
print (irisDF.count())

In [None]:
showDF(irisDF)

In [None]:
labelIndexer = StringIndexer(inputCol="species", outputCol="label", handleInvalid='keep')
training = labelIndexer.fit(irisDF).transform(irisDF)

showDF(training)

In [None]:
irisPD = training.toPandas()
sns.countplot(y=irisPD.label)
plt.xlabel("Count of each Target class")
plt.ylabel("Target classes")
plt.show()

In [None]:
p=sns.pairplot(irisPD, hue = 'species')

In [None]:
plt.figure(figsize=(15,15))
r1 = correlation_matrix(training, ['id', 'petallengthcm', 'sepalwidthcm', 'petalwidthcm', 'sepallengthcm', 'label'])
p=sns.heatmap(r1, annot=True,cmap='RdYlGn')

In [None]:
assembler = VectorAssembler(
    inputCols=['petallengthcm', 'sepalwidthcm', 'petalwidthcm', 'sepallengthcm'],
    outputCol='features')

trainingData = assembler.transform(training)

In [None]:
splits = trainingData.randomSplit([0.8, 0.2], 124)
train = splits[0]
test = splits[1]

print ("Train Dataframe Row Count: ")
print (train.count())
print ("Test Datafram Row Count: ")
print (test.count())

In [None]:
pca = PCA(k=4, inputCol="features", outputCol="pca")
model = pca.fit(train)
transformed = model.transform(train)
showDF(transformed)

In [None]:
with plt.style.context('dark_background'):
    plt.figure(figsize=(6, 4))

    plt.bar(range(4), model.explainedVariance.array, alpha=0.5, align='center',
            label='individual explained variance')
    plt.ylabel('Explained variance ratio')
    plt.xlabel('Principal components')
    plt.legend(loc='best')
    plt.tight_layout()

In [None]:
pca = PCA(k=2, inputCol="features", outputCol="pca")
model = pca.fit(train)
transformed_train = model.transform(train)
transformed_test = model.transform(test)
showDF(transformed_train)

In [None]:
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=10)

model = rf.fit(transformed_train)

predictions = model.transform(test)
#predictions.show()
showDF(predictions)

In [None]:
# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

In [None]:
rf_pca = RandomForestClassifier(labelCol="label", featuresCol="pca", numTrees=10)

model_pca = rf_pca.fit(transformed_train)

predictions_pca = model_pca.transform(transformed_test)
#predictions.show()
showDF(predictions_pca)

In [None]:
# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions_pca)
print("Test set accuracy = " + str(accuracy))

In [None]:
query = "CREATE TABLE IF NOT EXISTS diabetes \
                                   (Id int, timesPregnant int, plasmaGlucose int, bloodPressure int, \
                                   tricepThickness int, serumInsulin int, bmi float, diabetesPedegree float, \
                                   age int, label int, PRIMARY KEY (Id))"
session.execute(query)

In [None]:
fileName = 'data/pima-indians-diabetes.csv'
input_file = open(fileName, 'r')
i = 1
for line in input_file:
    iD = i
    row = line.split(',')

    query = "INSERT INTO diabetes (Id, timesPregnant, plasmaGlucose, bloodPressure, \
                                   tricepThickness, serumInsulin, bmi, diabetesPedegree, \
                                   age, label)"
    query = query + " VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
    session.execute(query, (int(iD), int(row[0]), int(row[1]), int(row[2]), int(row[3]), int(row[4]), float(row[5]), float(row[6]), int(row[7]), int(row[8])))
    i = i + 1

In [None]:
diabetesDF = spark.read.format("org.apache.spark.sql.cassandra").options(table="diabetes", keyspace="accelerate").load()

print ("Table Row Count: ")
print (diabetesDF.count())

In [None]:
showDF(diabetesDF, 20)

In [None]:
print(diabetesDF.schema.names)
print([diabetesDF.where((col(c_name) == 0)).count() for c_name in diabetesDF.schema.names])

In [None]:
diabetesDF = diabetesDF.withColumn("bloodpressure", F.when(F.col("bloodpressure")==0, float("nan")).otherwise(F.col("bloodpressure")))
diabetesDF = diabetesDF.withColumn("plasmaglucose", F.when(F.col("plasmaglucose")==0, float("nan")).otherwise(F.col("plasmaglucose")))
diabetesDF = diabetesDF.withColumn("tricepthickness", F.when(F.col("tricepthickness")==0, float("nan")).otherwise(F.col("tricepthickness")))
diabetesDF = diabetesDF.withColumn("seruminsulin", F.when(F.col("seruminsulin")==0, float("nan")).otherwise(F.col("seruminsulin")))
diabetesDF = diabetesDF.withColumn("bmi", F.when(F.col("bmi")==0, float("nan")).otherwise(F.col("bmi")))

In [None]:
print(diabetesDF.schema.names)
print([diabetesDF.where((col(c_name) == 0)).count() for c_name in diabetesDF.schema.names])

In [None]:
showDF(diabetesDF, 20)

In [None]:
imputer = Imputer()
imputer.setInputCols(["plasmaglucose", "bloodpressure", "bmi"])
imputer.setOutputCols(["out_plasmaglucose", "out_bloodpressure", "out_bmi"])
model = imputer.fit(diabetesDF)
#model.setInputCols(["plasmaglucose", "bloodpressure", "bmi"])
showDF(model.transform(diabetesDF),100)