In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [3]:
spark = SparkSession.builder\
    .appName('classifiction_first_notebook')\
    .getOrCreate()
spark

In [3]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler

# Read Data

In [4]:
path = 'Toddler Autism dataset July 2018.csv'
df = spark.read.csv(path,
                    inferSchema=True,
                    header=True)
df.limit(5).toPandas()

Unnamed: 0,Case_No,A1,A2,A3,A4,A5,A6,A7,A8,A9,A10,Age_Mons,Qchat-10-Score,Sex,Ethnicity,Jaundice,Family_mem_with_ASD,Who completed the test,Class/ASD Traits
0,1,0,0,0,0,0,0,1,1,0,1,28,3,f,middle eastern,yes,no,family member,No
1,2,1,1,0,0,0,1,1,0,0,0,36,4,m,White European,yes,no,family member,Yes
2,3,1,0,0,0,0,0,1,1,0,1,36,4,m,middle eastern,yes,no,family member,Yes
3,4,1,1,1,1,1,1,1,1,1,1,24,10,m,Hispanic,no,no,family member,Yes
4,5,1,1,0,1,1,1,1,1,1,1,20,9,f,White European,no,yes,family member,Yes


In [5]:
df.printSchema()

root
 |-- Case_No: integer (nullable = true)
 |-- A1: integer (nullable = true)
 |-- A2: integer (nullable = true)
 |-- A3: integer (nullable = true)
 |-- A4: integer (nullable = true)
 |-- A5: integer (nullable = true)
 |-- A6: integer (nullable = true)
 |-- A7: integer (nullable = true)
 |-- A8: integer (nullable = true)
 |-- A9: integer (nullable = true)
 |-- A10: integer (nullable = true)
 |-- Age_Mons: integer (nullable = true)
 |-- Qchat-10-Score: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Ethnicity: string (nullable = true)
 |-- Jaundice: string (nullable = true)
 |-- Family_mem_with_ASD: string (nullable = true)
 |-- Who completed the test: string (nullable = true)
 |-- Class/ASD Traits : string (nullable = true)



# Class distribution

In [6]:
df.groupBy("Class/ASD Traits ").count().show()

+-----------------+-----+
|Class/ASD Traits |count|
+-----------------+-----+
|               No|  326|
|              Yes|  728|
+-----------------+-----+



# Inputs and outputs formatting

In [7]:
input_columns = df.columns[1:-1]
output_column = 'Class/ASD Traits '
input_columns

['A1',
 'A2',
 'A3',
 'A4',
 'A5',
 'A6',
 'A7',
 'A8',
 'A9',
 'A10',
 'Age_Mons',
 'Qchat-10-Score',
 'Sex',
 'Ethnicity',
 'Jaundice',
 'Family_mem_with_ASD',
 'Who completed the test']

In [8]:
# output columns formatting
renamed = df.withColumn('label_str', 
                        df[output_column].cast(StringType()))
indexer = StringIndexer(
                    inputCol='label_str', 
                    outputCol='label')
indexed = indexer.fit(renamed).transform(renamed)

In [9]:
indexed.select(['label', 'label_str']).show(5)

+-----+---------+
|label|label_str|
+-----+---------+
|  1.0|       No|
|  0.0|      Yes|
|  0.0|      Yes|
|  0.0|      Yes|
|  0.0|      Yes|
+-----+---------+
only showing top 5 rows



In [10]:
# input columns

numeric_inputs = []
string_inputs = []

for column in input_columns:
    if str(indexed.schema[column].dataType) == 'StringType':
        indexer = StringIndexer(
            inputCol=column,
            outputCol=column + "_num"
        )
        indexed = indexer.fit(indexed).transform(
            indexed
        )
        string_inputs.append(f"{column}_num")
    else:
        numeric_inputs.append(column)



In [11]:
indexed.printSchema()

root
 |-- Case_No: integer (nullable = true)
 |-- A1: integer (nullable = true)
 |-- A2: integer (nullable = true)
 |-- A3: integer (nullable = true)
 |-- A4: integer (nullable = true)
 |-- A5: integer (nullable = true)
 |-- A6: integer (nullable = true)
 |-- A7: integer (nullable = true)
 |-- A8: integer (nullable = true)
 |-- A9: integer (nullable = true)
 |-- A10: integer (nullable = true)
 |-- Age_Mons: integer (nullable = true)
 |-- Qchat-10-Score: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Ethnicity: string (nullable = true)
 |-- Jaundice: string (nullable = true)
 |-- Family_mem_with_ASD: string (nullable = true)
 |-- Who completed the test: string (nullable = true)
 |-- Class/ASD Traits : string (nullable = true)
 |-- label_str: string (nullable = true)
 |-- label: double (nullable = false)
 |-- Sex_num: double (nullable = false)
 |-- Ethnicity_num: double (nullable = false)
 |-- Jaundice_num: double (nullable = false)
 |-- Family_mem_with_ASD_num: double

In [12]:
d = {}

for col in numeric_inputs:
    d[col] = indexed.approxQuantile(
        col, [0.01, 0.99], 0.25
    )
    

In [13]:
print(d)

{'A1': [0.0, 1.0], 'A2': [0.0, 1.0], 'A3': [0.0, 1.0], 'A4': [0.0, 1.0], 'A5': [0.0, 1.0], 'A6': [0.0, 1.0], 'A7': [0.0, 1.0], 'A8': [0.0, 1.0], 'A9': [0.0, 1.0], 'A10': [0.0, 1.0], 'Age_Mons': [12.0, 36.0], 'Qchat-10-Score': [0.0, 10.0]}


In [14]:
d['A1']

[0.0, 1.0]

In [15]:
# for skewness removal
# if right skew then log +1
# if left skew the exponential
for col in numeric_inputs:
    skew = indexed.agg(
        skewness(
            indexed[col])
    ).collect()
    skew = skew[0][0]
    if skew > 1:
        indexed = indexed.withColumn(
            col,
            log(when(df[col] < d[col][0],d[col][0])\
                .when(indexed[col] >d[col][1], d[col][1])\
                .otherwise(indexed[col]) +1).alias(col)
        )

        print(f'{col} has been treated for positive right skewness')
    elif skew < -1:
        indexed = indexed.withColumn(
            col,
            exp(
                when(df[col] < d[col][0], d[col][0])\
               .when(df[col] > d[col][1], d[col][1])\
               .otherwise(df[col])
               ).alias(col)
               )
        print(f'{col} has been treated for negative (left) skewness')

In [16]:
# In case there is negative values then
# Naive Bayes algorithm can not be implemented
# therefore searching for negative value

# Calculate minimums for all columns
minimums = df.select(
    [ min(c).alias(c) for c in df.columns if c in numeric_inputs]
                   )
minimums.limit(5).toPandas()

Unnamed: 0,A1,A2,A3,A4,A5,A6,A7,A8,A9,A10,Age_Mons,Qchat-10-Score
0,0,0,0,0,0,0,0,0,0,0,12,0


In [17]:
min_array = minimums.select(
    array(numeric_inputs).alias('mins')
)
min_array.show(1, False)

+-------------------------------------+
|mins                                 |
+-------------------------------------+
|[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 12, 0]|
+-------------------------------------+



In [18]:
df_minimum = min_array.select(
    array_min(min_array.mins)
).collect()
df_minimum[0][0]

0

In [19]:
# Now create your final features list

features_list = numeric_inputs + string_inputs
features_list

['A1',
 'A2',
 'A3',
 'A4',
 'A5',
 'A6',
 'A7',
 'A8',
 'A9',
 'A10',
 'Age_Mons',
 'Qchat-10-Score',
 'Sex_num',
 'Ethnicity_num',
 'Jaundice_num',
 'Family_mem_with_ASD_num',
 'Who completed the test_num']

In [20]:
assembler = VectorAssembler(
    inputCols=features_list,
    outputCol='features'
)
output = assembler.transform(
    indexed
).select('features', 'label')

In [21]:
output.show(5, False)

+-----------------------------------------------------------------------+-----+
|features                                                               |label|
+-----------------------------------------------------------------------+-----+
|(17,[6,7,9,10,11,12,13,14],[1.0,1.0,1.0,28.0,3.0,1.0,2.0,1.0])         |1.0  |
|(17,[0,1,5,6,10,11,14],[1.0,1.0,1.0,1.0,36.0,4.0,1.0])                 |0.0  |
|(17,[0,6,7,9,10,11,13,14],[1.0,1.0,1.0,1.0,36.0,4.0,2.0,1.0])          |0.0  |
|[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,24.0,10.0,0.0,5.0,0.0,0.0,0.0]|0.0  |
|[1.0,1.0,0.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,20.0,9.0,1.0,0.0,0.0,1.0,0.0] |0.0  |
+-----------------------------------------------------------------------+-----+
only showing top 5 rows



22/02/06 19:34:02 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [22]:
scaler = MinMaxScaler(
    inputCol='features',
    outputCol='scaledFeatures',
    min=0,
    max=1000
)

print(f"Features scaled to range:\
     {scaler.getMin(), scaler.getMax()}")

scalerModel = scaler.fit(output)

scaled_data = scalerModel.transform(
    output
)
final_data = scaled_data.select(
    'label','scaledFeatures'
)

# Rename to default value
final_data = final_data.withColumnRenamed(
    'scaledFeatures',
    'features'
)
final_data.show(5, False)

Features scaled to range:     (0.0, 1000.0)
+-----+----------------------------------------------------------------------------------------------------------------------+
|label|features                                                                                                              |
+-----+----------------------------------------------------------------------------------------------------------------------+
|1.0  |(17,[6,7,9,10,11,12,13,14],[1000.0,1000.0,1000.0,666.6666666666666,300.0,1000.0,200.0,1000.0])                        |
|0.0  |(17,[0,1,5,6,10,11,14],[1000.0,1000.0,1000.0,1000.0,1000.0,400.0,1000.0])                                             |
|0.0  |(17,[0,6,7,9,10,11,13,14],[1000.0,1000.0,1000.0,1000.0,1000.0,400.0,200.0,1000.0])                                    |
|0.0  |[1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,500.0,1000.0,0.0,500.0,0.0,0.0,0.0]            |
|0.0  |[1000.0,1000.0,0.0,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0

In [23]:
# Split into Test and Training datasets

train, test = final_data.randomSplit([0.7, 0.3])

In [24]:
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.ml.tuning import (CrossValidator,
                               ParamGridBuilder)


In [27]:
Bin_evaluator = BinaryClassificationEvaluator(
    rawPredictionCol='prediction'
)

Mc_evaluator = MulticlassClassificationEvaluator(
    metricName='accuracy'
)

# Logistic Regression

In [28]:
classifier = LogisticRegression()
fitModel = classifier.fit(train)

predictionAndLabel=fitModel.transform(
    test
)
auc = Bin_evaluator.evaluate(
    predictionAndLabel)
print(f'AUC: {auc}')

22/02/06 19:43:13 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/02/06 19:43:13 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS


AUC: 1.0


In [29]:
# can be done with multiclass Classification
predictions = fitModel.transform(
    test
)
accuracy = (Mc_evaluator.evaluate(
    predictions))*100

print(f'Accuracy: {accuracy:.2f}')

Accuracy: 100.00


In [30]:
# First which algorithm to use
classifier = LogisticRegression()
paramGrid =(
    ParamGridBuilder().addGrid(
        classifier.maxIter,
        [10, 15, 20]
    ).build()) 

crossval = CrossValidator(
    estimator=classifier,
    estimatorParamMaps=paramGrid,
    evaluator=Mc_evaluator,
    numFolds=2
)

fitModel = crossval.fit(
    train
)
BestModel = fitModel.bestModel

print(f'Intercept: {str(BestModel.interceptVector)}')
print(f'Coefficients: {BestModel.coefficientMatrix}')




Intercept: [51.25340384465132]
Coefficients: DenseMatrix([[-0.01141508, -0.0102881 , -0.01042605, -0.01207538, -0.00996857,
              -0.01165182, -0.010967  , -0.00991785, -0.01118944, -0.01121953,
              -0.00199482, -0.0302907 , -0.00026916, -0.00278132, -0.00057139,
              -0.00028023, -0.00382372]])


In [34]:
predictions= fitModel.transform(test)
accuracy = (
    Mc_evaluator.evaluate(
        predictions
    )
)
print(accuracy)

1.0


In [36]:
coeff_array = BestModel.coefficientMatrix.toArray()
coeff_scores = [float(x) for x in coeff_array[0]]
result = spark.createDataFrame(
    zip(input_columns, coeff_scores),
    schema=['features', 'coeff']
)
result.show()

+--------------------+--------------------+
|            features|               coeff|
+--------------------+--------------------+
|                  A1|-0.01141507992330...|
|                  A2|-0.01028809582257...|
|                  A3|-0.01042604851217...|
|                  A4|-0.01207538131949...|
|                  A5|-0.00996856700474...|
|                  A6|-0.01165182450976...|
|                  A7|-0.01096699813330...|
|                  A8|-0.00991785346613...|
|                  A9| -0.0111894425269281|
|                 A10|-0.01121952828675...|
|            Age_Mons|-0.00199481656435...|
|      Qchat-10-Score|-0.03029070189907398|
|                 Sex|-2.69163921490370...|
|           Ethnicity|-0.00278131811105...|
|            Jaundice|-5.71392924310833...|
| Family_mem_with_ASD|-2.80229083710591...|
|Who completed the...|-0.00382371968582...|
+--------------------+--------------------+



In [37]:
trainingSummary = BestModel.summary

In [39]:
trainingSummary.predictions.describe().show()

+-------+------------------+------------------+
|summary|             label|        prediction|
+-------+------------------+------------------+
|  count|               734|               734|
|   mean|0.3215258855585831|0.3215258855585831|
| stddev|0.4673805718253027|0.4673805718253027|
|    min|               0.0|               0.0|
|    max|               1.0|               1.0|
+-------+------------------+------------------+



In [42]:
objectiveHistory = trainingSummary.objectiveHistory
print(" ")
print("objectiveHistory:(scaled loss + regularization at each iteration)")
[print(objective) for objective in objectiveHistory];

 
objectiveHistory:(scaled loss + regularization at each iteration)
0.6280142815513996
0.5213192624846951
0.45856304664143005
0.29273569949704176
0.2659202876809173
0.22708745847954648
0.20552710550258288
0.18127594989350054
0.14398359590390153
0.08198058700315335
0.07901012687927496
0.028392641840305932
0.019523105695930434
0.012540337225207676
0.007486365235160843
0.005424466566574381
0.002641455221904271
0.0017099239720956847
0.001008991458378112
0.000585111430970679
0.00029716417847435773


In [45]:
for i, rate in enumerate(
    trainingSummary.falsePositiveRateByLabel):
    print(f'label {i}: {rate}')

label 0: 0.0
label 1: 0.0


In [46]:
for i, rate in enumerate(
    trainingSummary.truePositiveRateByLabel):
    print(f'label {i}: {rate}')

label 0: 1.0
label 1: 1.0


In [47]:

print(" ")
print("Precision by label:")
for i, prec in enumerate(trainingSummary.precisionByLabel):
    print("label %d: %s" % (i, prec))

print(" ")
print("Recall by label:")
for i, rec in enumerate(trainingSummary.recallByLabel):
    print("label %d: %s" % (i, rec))

print(" ")
print("F-measure by label:")
for i, f in enumerate(trainingSummary.fMeasureByLabel()):
    print("label %d: %s" % (i, f))

# Generate confusion matrix and print (includes accuracy)
accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print(" ")
print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

 
Precision by label:
label 0: 1.0
label 1: 1.0
 
Recall by label:
label 0: 1.0
label 1: 1.0
 
F-measure by label:
label 0: 1.0
label 1: 1.0
 
Accuracy: 1.0
FPR: 0.0
TPR: 1.0
F-measure: 1.0
Precision: 1.0
Recall: 1.0


In [48]:
classifier = RandomForestClassifier()

paramGrid = (ParamGridBuilder()\
    .addGrid(
        classifier.maxDepth,
        [2, 5, 10 ]
    )\
    .addGrid(
        classifier.numTrees,
        [5, 20, 50])\
    .build())

In [52]:
crossval = CrossValidator(
    estimator=classifier,
    estimatorParamMaps=paramGrid,
    evaluator=MulticlassClassificationEvaluator(),
    numFolds=5
)
fitModel = crossval.fit(train)


In [53]:

BestModel = fitModel.bestModel
featureImportances = BestModel.featureImportances.toArray()

print(f'Feature Importances: {featureImportances}')
predictions = fitModel.transform(test)
accuracy = (
    Mc_evaluator.evaluate(
        predictions
    )
)
print(f'Accuracy : {accuracy}')

Feature Importances: [0.12332297 0.0829292  0.         0.072007   0.         0.
 0.06306404 0.         0.07667703 0.         0.         0.58199976
 0.         0.         0.         0.         0.        ]
Accuracy : 1.0


In [59]:
fi = [str(i) for i in featureImportances]

In [61]:
fi_df = spark.createDataFrame(
    zip(fi, input_columns),
    schema=['feature importance', 'Column'])

In [62]:
fi_df.show()

+-------------------+--------------------+
| feature importance|              Column|
+-------------------+--------------------+
|0.12332297371381817|                  A1|
|0.08292920353430017|                  A2|
|                0.0|                  A3|
|0.07200700252508971|                  A4|
|                0.0|                  A5|
|                0.0|                  A6|
|0.06306403730343825|                  A7|
|                0.0|                  A8|
|0.07667702628618182|                  A9|
|                0.0|                 A10|
|                0.0|            Age_Mons|
| 0.5819997566371719|      Qchat-10-Score|
|                0.0|                 Sex|
|                0.0|           Ethnicity|
|                0.0|            Jaundice|
|                0.0| Family_mem_with_ASD|
|                0.0|Who completed the...|
+-------------------+--------------------+



# Gradient Boost Tree Classifier

In [63]:
label_count = final_data.select(
    countDistinct('label')
).collect()
label_count[0][0]

2

In [None]:
# more than 2 class can not be done
# at the moment at pyspark for 
# Gradient boosting


classifier = GBTClassifier()

paramGrid = (
    ParamGridBuilder()\
    .addGrid(
        classifier.maxDepth,
        [2, 5, 10, 20])
    .addGrid(
        classifier.maxBins,
        [10, 20, 40, 80, 100]
    ).GridBuilder()
)


