In [1]:
import findspark
findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession, Row, Column, SQLContext, DataFrame, DataFrameStatFunctions
from pyspark.sql.functions import split
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("app1").setMaster("local")
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession.builder.getOrCreate()
sqlContext = SQLContext(sc)
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

In [19]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, DoubleType

import calendar
from datetime import date,datetime
import math
from pyspark.sql.functions import date_format, lit

import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import LabelEncoder
import pyarrow.parquet as pq
import scipy.io
import pandas as pd
le = LabelEncoder()

from sklearn.feature_selection import VarianceThreshold
from skfeature.function.similarity_based import fisher_score
from skfeature.utility import construct_W

from imblearn.over_sampling import SMOTE, ADASYN, RandomOverSampler

from sklearn.naive_bayes import GaussianNB,BernoulliNB,MultinomialNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, roc_auc_score

import time

In [3]:
def two_class_feature_selection(variance, Xtrain, y, Xtest):   
    score = fisher_score.fisher_score(Xtrain, y)
    idx = fisher_score.feature_ranking(score)
    features_number = len([i for i in range(len(idx)) if idx[i]==0])
    Xfisher = Xtrain[:, idx[0:features_number]]
    XtestFisher = Xtest[:, idx[0:features_number]]
    #print(Xfisher.shape)
    
    sel = VarianceThreshold(threshold=variance)
    Xvariance = sel.fit_transform(Xtrain)  
    #print(Xvariance.shape)
    XtestVariance = sel.transform(Xtest)
    #print(XtestVariance.shape)
    
    return Xvariance,Xfisher,XtestVariance,XtestFisher

In [4]:
wines = sqlContext.read.format('csv').options(header='true',delimiter=';',mode='DROPMALFORMED',inferSchema=True).load("winequality-white.csv")

In [5]:
wines.printSchema()

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)



In [6]:
wines = wines.withColumn("quality", wines["quality"].cast(StringType()))

In [7]:
#wines.select(wines["*"]).where(wines["quality"]==10).count()

In [8]:
# 3- 20
# 4- 163
# 5- 1457
# 6- 2198
# 7- 880
# 8- 175
# 9- 5

In [9]:
winesTrain = wines.sample(False,0.75,40)
winesTest = wines.subtract(winesTrain)
print(winesTrain.count())
print(winesTest.count())
print(winesTrain.count()+winesTest.count())
print(wines.count())

3635
859
4494
4898


In [10]:
winesTrainPandas = winesTrain.toPandas()
winesTestPandas = winesTest.toPandas()

In [22]:
winesTrain = winesTrainPandas.values
winesTest = winesTestPandas.values

XwinesTrain = winesTrain[:,0:11]
YwinesTrain = winesTrain[:,11]
XwinesTest = winesTest[:,0:11]
Ytest = winesTest[:,11]

scaler = StandardScaler().fit(XwinesTrain)
XwinesTrain = scaler.transform(XwinesTrain)
XwinesTest = scaler.transform(XwinesTest)



In [13]:
#print(XtrainSMOTE.shape,XtrainADASYN.shape,XtrainROS.shape)

In [30]:
results = pd.DataFrame(columns=['classifier','resampler','selection_algorithm','accuracy','precision','recall','duration'])

In [31]:
smote = SMOTE(k=1)
adasyn = ADASYN(k=1)
ros = RandomOverSampler()
resamplers = np.array([smote, adasyn])

In [32]:
#KNeighborsTest
weights = ['uniform','distance']
algorithms = ['ball_tree','kd_tree','brute']

for i in resamplers:
    Xtrain, ytrain = i.fit_sample(XwinesTrain,YwinesTrain)
    #print(Xtrain.shape)
    XtrainVariance,XtrainFisher,XtestVariance,XtestFisher = two_class_feature_selection((0.8*(1-0.8)), XwinesTrain,YwinesTrain,XwinesTest)
    #print("After selection")
    for j in range(len(algorithms)):
        for k in range(len(weights)):
            for l in (range(4)):
                classifier=KNeighborsClassifier(n_neighbors=((2*l)+1),weights=weights[k],algorithm=algorithms[j])
                
                start=time.clock()
                predicted=classifier.fit(XtrainVariance,YwinesTrain).predict(XtestVariance)
                end=time.clock()
                duration=end-start
                
                start=time.clock()
                predicted2=classifier.fit(XtrainFisher,YwinesTrain).predict(XtestFisher)
                end=time.clock()
                duration2=end-start
                
                #print(str(classifier)[:5])
                results.loc[len(results)]=[str(classifier),str(i),"variance",accuracy_score(Ytest,predicted),precision_score(Ytest,predicted,average='micro'), recall_score(Ytest,predicted,average='micro'), duration]
                results.loc[len(results)]=[str(classifier),str(i),"fisher",accuracy_score(Ytest,predicted2),precision_score(Ytest,predicted2,average='micro'), recall_score(Ytest,predicted2,average='micro'), duration2]



In [33]:
results.to_csv("resultsKNeighbors.csv",sep=";")
results.drop(results.index, inplace=True)

In [36]:
#BayesTest
multinomial = MultinomialNB()
gauss = GaussianNB()
bernoulli = BernoulliNB()
classifiers = np.array([gauss, bernoulli])

for i in resamplers:
    Xtrain, ytrain = i.fit_sample(XwinesTrain,YwinesTrain)
    #print(Xtrain.shape)
    XtrainVariance,XtrainFisher,XtestVariance,XtestFisher = two_class_feature_selection((0.8*(1-0.8)), XwinesTrain,YwinesTrain,XwinesTest)
    #print("After selection")
    for j in classifiers:
   
        start=time.clock()
        predicted=j.fit(XtrainVariance,YwinesTrain).predict(XtestVariance)
        end=time.clock()
        duration=end-start
    
        start=time.clock()
        predicted2=j.fit(XtrainFisher,YwinesTrain).predict(XtestFisher)
        end=time.clock()
        duration2=end-start
    
        #print(str(j)[:5])
        results.loc[len(results)]=[str(j),str(i),"variance",accuracy_score(Ytest,predicted),precision_score(Ytest,predicted,average='micro'), recall_score(Ytest,predicted,average='micro'), duration]
        results.loc[len(results)]=[str(j),str(i),"fisher",accuracy_score(Ytest,predicted2),precision_score(Ytest,predicted2,average='micro'), recall_score(Ytest,predicted2,average='micro'), duration2]



In [37]:
results.to_csv("resultsBayes.csv",sep=";")
results.drop(results.index, inplace=True)

In [38]:
#DecisionTreeTest
criterion = np.array(['gini','entropy'])
splitters = np.array(['random','best'])
max_features = np.array(['auto','sqrt','log2', None])

for i in resamplers:
    Xtrain, ytrain = i.fit_sample(XwinesTrain,YwinesTrain)
    print(Xtrain.shape)
    XtrainVariance,XtrainFisher,XtestVariance,XtestFisher = two_class_feature_selection((0.8*(1-0.8)), XwinesTrain,YwinesTrain,XwinesTest)
    #print("After selection")
    for j in criterion:
        for k in splitters:
            for l in max_features:
                classifier=DecisionTreeClassifier(criterion=j,splitter=k,max_features=l)
                
                start=time.clock()
                predicted=classifier.fit(XtrainVariance,YwinesTrain).predict(XtestVariance)
                end=time.clock()
                duration=end-start
                
                start=time.clock()
                predicted2=classifier.fit(XtrainFisher,YwinesTrain).predict(XtestFisher)
                end=time.clock()
                duration2=end-start
                
                #print(str(classifier)[:5])
                results.loc[len(results)]=[str(classifier),str(i),"variance",accuracy_score(Ytest,predicted),precision_score(Ytest,predicted,average='micro'), recall_score(Ytest,predicted,average='micro'), duration]
                results.loc[len(results)]=[str(classifier),str(i),"fisher",accuracy_score(Ytest,predicted2),precision_score(Ytest,predicted2,average='micro'), recall_score(Ytest,predicted2,average='micro'), duration2]



(11382, 11)
After selection
Decis
Decis
Decis
Decis
Decis
Decis
Decis
Decis
Decis
Decis
Decis
Decis
Decis
Decis
Decis
Decis




(11271, 11)
After selection
Decis
Decis
Decis
Decis
Decis
Decis
Decis
Decis
Decis
Decis
Decis
Decis
Decis
Decis
Decis
Decis


In [39]:
results.to_csv("resultsDT.csv",sep=";")
results.drop(results.index, inplace=True)