In [0]:
#Pima Indians Diabetes dataset 

In [1]:
%pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.stat import Correlation

from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt


In [2]:
%pyspark

#Main module to execute spark code
if __name__ == '__main__':
    conf = SparkConf() #Declare spark conf variable\
    sc = SparkContext.getOrCreate(conf=conf)
 
    #Instantiate spark builder and Set spark app name. Also, enable hive support using enableHiveSupport option of spark builder.
    spark = SparkSession(sc).builder.appName("Read-and-write-data-to-Hive-table-spark").config("hive.metastore.uris", "thrift://hive-hive-metastore-1:9083").enableHiveSupport().getOrCreate()
 
    #Read hive table in spark using sql method of spark session class
    df = spark.sql("show databases")
 
    #Display the spark dataframe values using show method
    df.show(10, truncate = False)



In [3]:
%pyspark
#read csv file
datafile=spark.read.csv("../../diabetes.csv",header=True,inferSchema = True)

#convert csv file to parquet file and store it as Hive Table
datafile.write.format("parquet").saveAsTable("diabetes_tableau")
spark.conf.set("spark.sql.parquet.mergeSchema", "true")

#read Hive table using SQL
datafile = spark.sql("select * from diabetes_tableau")


In [4]:
%pyspark
datafile.printSchema()

In [5]:
%pyspark
datafile.columns

In [6]:
%pyspark
datafile.toPandas().shape

In [7]:
%pyspark
datafile.toPandas().isnull().sum().sum()

In [8]:
%pyspark
datafile.toPandas().head()

In [9]:
%spark.sql
Select * from diabetes_tableau limit 10

In [10]:
%spark.sql
Select Outcome, mean(Pregnancies) as Pregnancies, mean(Glucose) as Glucose, mean(BloodPressure) as BloodPressure, mean(SkinThickness) as SkinThickness, mean(Insulin) as Insulin from diabetes_tableau group by Outcome


In [11]:
%pyspark
datafile.toPandas().info()

In [12]:
%spark.sql
Select Outcome, count(1) as count from diabetes_tableau group by Outcome

In [13]:
%pyspark
%matplotlib inline
corrmat = datafile.toPandas().corr()
f, ax = plt.subplots(figsize=(6, 6))
sns.heatmap(corrmat, vmax=1, annot=True)
plt.show()

In [14]:
%pyspark
%matplotlib inline
sns.lmplot(data=datafile.toPandas(),
           x='Glucose',
           y='Age')

sns.despine()
plt.show()
plt.clf()

In [15]:
%pyspark
%matplotlib inline
sns.countplot(x='Pregnancies',hue='Outcome',data=datafile.toPandas())

In [16]:
#Machine Learning Algorithms

In [17]:
%pyspark
#Train and Test data
data_train , data_test = datafile.randomSplit([0.8,0.2], seed = 123)

In [18]:
%pyspark
assembler = VectorAssembler (inputCols = ["Pregnancies", "Glucose", "BloodPressure", "SkinThickness", "Insulin","BMI", "DiabetesPedigreeFunction","Age"],outputCol='Input Attributes')
logisticregressor = LogisticRegression(labelCol = "Outcome",featuresCol = 'Input Attributes')

#Pipeline
pipeline  = Pipeline(stages = [assembler,logisticregressor])


datafile.printSchema()
Model = pipeline.fit(data_train)

In [19]:
%pyspark

decisiontree = DecisionTreeClassifier(labelCol = "Outcome",featuresCol = 'Input Attributes')

#Pipeline
pipelinetwo  = Pipeline(stages = [assembler,decisiontree])
Modeltwo = pipelinetwo.fit(data_train)

predtwo = Modeltwo.transform(data_test)
predtwo.toPandas().head()


In [20]:
%pyspark
evaltwo = MulticlassClassificationEvaluator(labelCol = 'Outcome')
accuracytwo = evaltwo.evaluate(predtwo, {evaltwo.metricName:'accuracy'})
accuracytwo


In [21]:
%pyspark
randomclassifier = RandomForestClassifier(labelCol = "Outcome",featuresCol = 'Input Attributes')

#Pipeline
pipelinethree  = Pipeline(stages = [assembler,randomclassifier])

Modelthree = pipelinethree.fit(data_train)
evalthree = MulticlassClassificationEvaluator(labelCol = 'Outcome')
predthree = Modelthree.transform(data_test)
predthree.toPandas().head()

In [22]:
%pyspark
accuracythree = evalthree.evaluate(predthree, {evalthree.metricName:'accuracy'})
accuracythree

In [23]:
#Stockage des résultats dans Hive

In [24]:
%pyspark

leaderboard = pd.DataFrame({'algorithm':["LogisticRegression","DecisionTree","Random Forest"],'metric':["ACCURACY","ACCURACY"],'values':[rmse,accuracytwo,accuracythree]})
print(type(leaderboard))

metrics = spark.createDataFrame(leaderboard)

metrics.write.saveAsTable("metric_table")

In [25]:
%spark.sql

show tables


In [26]:
%pyspark

df3 = spark.sql("select * from metric_table")
df3.show()