## Sara Khosravi
###############################################################################################################################

In [1]:
#set environment
import os
import sys
 
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/local/anaconda/bin/python" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/local/anaconda/bin/python"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.4-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [2]:
#import Sparksession driver
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Classification of Shuttle") \
    .getOrCreate()

In [3]:
df = spark.read.csv('data/Shuttle.csv',inferSchema=True, header=True)
df.show()

+---+----------+--------+---------+--------+----+------+---------+--------+-----+
|_c0|being time|Rad Flow|Fpv Close|Fpv Open|High|Bypass|Bpv Close|Bpv Open|class|
+---+----------+--------+---------+--------+----+------+---------+--------+-----+
| 49|        -3|      79|        0|      50|   0|    30|       30|       0|    1|
| 40|        -5|      76|        1|      38|   0|    36|       37|       2|    1|
| 49|         0|      83|        8|      50|   3|    34|       33|       0|    1|
| 55|         0|      98|        0|      50|  -9|    42|       49|       6|    4|
| 56|         4|      77|        0|      16|  -3|    22|       62|      40|    4|
| 42|        -1|     108|        0|      42|   0|    65|       66|       2|    1|
| 42|         0|      87|        8|      42|   0|    45|       46|       2|    1|
| 57|        -2|      80|        0|      56|   0|    23|       23|       0|    1|
| 45|        -1|     108|        0|      44|   0|    63|       64|       2|    1|
| 46|        -3|

In [4]:
#renaming the columns
#df = df.toDF('Temperature', 'Luminosity', 'Radius', 'Absolute magnitud', 'Star type', 'Star color', 'Spectral Class')

In [4]:
df.show(5)

+---+----------+--------+---------+--------+----+------+---------+--------+-----+
|_c0|being time|Rad Flow|Fpv Close|Fpv Open|High|Bypass|Bpv Close|Bpv Open|class|
+---+----------+--------+---------+--------+----+------+---------+--------+-----+
| 49|        -3|      79|        0|      50|   0|    30|       30|       0|    1|
| 40|        -5|      76|        1|      38|   0|    36|       37|       2|    1|
| 49|         0|      83|        8|      50|   3|    34|       33|       0|    1|
| 55|         0|      98|        0|      50|  -9|    42|       49|       6|    4|
| 56|         4|      77|        0|      16|  -3|    22|       62|      40|    4|
+---+----------+--------+---------+--------+----+------+---------+--------+-----+
only showing top 5 rows



In [5]:
#Check for missing values
for col in df.columns:
    print("no. of cells in column", col, "with null values:", df.filter(df[col].isNull()).count())

no. of cells in column _c0 with null values: 0
no. of cells in column being time with null values: 0
no. of cells in column Rad Flow with null values: 0
no. of cells in column Fpv Close with null values: 0
no. of cells in column Fpv Open with null values: 0
no. of cells in column High with null values: 0
no. of cells in column Bypass with null values: 0
no. of cells in column Bpv Close with null values: 0
no. of cells in column Bpv Open with null values: 0
no. of cells in column class with null values: 0


In [7]:
#all the independent variables need to be packed into one column of vector type
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['being time','Rad Flow', 'Fpv Close', 'Fpv Open','High','Bypass','Bpv Close','Bpv Open'], 
                            outputCol="features")
feature_vec=assembler.transform(df).select('features','class')
feature_vec.show(5)

+--------------------+-----+
|            features|class|
+--------------------+-----+
|[-3.0,79.0,0.0,50...|    1|
|[-5.0,76.0,1.0,38...|    1|
|[0.0,83.0,8.0,50....|    1|
|[0.0,98.0,0.0,50....|    4|
|[4.0,77.0,0.0,16....|    4|
+--------------------+-----+
only showing top 5 rows



In [8]:
#Count of target classes
feature_vec.groupBy('class').count().show()
#there is not data imbalance

+-----+-----+
|class|count|
+-----+-----+
|    1|11386|
|    6|    4|
|    3|   39|
|    5|  806|
|    4| 2135|
|    7|    2|
|    2|   13|
+-----+-----+



In [9]:
# Split the data into train and test sets
train_data, test_data = feature_vec.randomSplit([.75,.25],seed=0)

In [10]:
from pyspark.ml.classification import LogisticRegression

# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="class", featuresCol="features",  
                        maxIter=100, regParam=0.0001, family="multinomial",  
                        elasticNetParam=0.0)

# Train model with Training Data
lrModel = lr.fit(train_data)
predictions = lrModel.transform(test_data)
predictions.printSchema()

root
 |-- features: vector (nullable = true)
 |-- class: integer (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = true)



In [11]:
predictions.select('class','prediction').show()

+-----+----------+
|class|prediction|
+-----+----------+
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
+-----+----------+
only showing top 20 rows



In [12]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='class', metricName='accuracy')
evaluator.evaluate(predictions)

0.9604743083003953

In [13]:
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='class', metricName='f1')
evaluator.evaluate(predictions)

0.9579412601597141

In [14]:
predictions.groupBy('class').count().show()

+-----+-----+
|class|count|
+-----+-----+
|    1| 2830|
|    6|    2|
|    3|    8|
|    5|  197|
|    4|  501|
|    7|    1|
|    2|    3|
+-----+-----+



In [15]:
#Grid Search
from pyspark.ml.classification import RandomForestClassifier
#Grid Search
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
rf = RandomForestClassifier( labelCol='class',seed=0)
paramGrid = (ParamGridBuilder()\
             .addGrid(rf.maxDepth,[10,11,12])\
             .addGrid(rf.numTrees,[20,30,40])\
             .build())

evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='class', metricName='f1')
# Create 4-fold CrossValidator
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=4)

cvModel = cv.fit(train_data)

In [16]:
list(zip(cvModel.avgMetrics, cvModel.getEstimatorParamMaps()))

[(0.9977159836617104,
  {Param(parent='RandomForestClassifier_4bd0bc841d07ab9c3ab9', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 10,
   Param(parent='RandomForestClassifier_4bd0bc841d07ab9c3ab9', name='numTrees', doc='Number of trees to train (>= 1).'): 20}),
 (0.9979056314084556,
  {Param(parent='RandomForestClassifier_4bd0bc841d07ab9c3ab9', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 10,
   Param(parent='RandomForestClassifier_4bd0bc841d07ab9c3ab9', name='numTrees', doc='Number of trees to train (>= 1).'): 30}),
 (0.997941087797557,
  {Param(parent='RandomForestClassifier_4bd0bc841d07ab9c3ab9', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 10,
   Param(parent='RandomForestClassifier_4bd0bc841d07ab9c3ab9', nam

In [17]:
#Best Model Params
score_params_list = list(zip(cvModel.avgMetrics, cvModel.getEstimatorParamMaps()))
max(score_params_list,key=lambda item:item[0])

(0.9981191980452542,
 {Param(parent='RandomForestClassifier_4bd0bc841d07ab9c3ab9', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 12,
  Param(parent='RandomForestClassifier_4bd0bc841d07ab9c3ab9', name='numTrees', doc='Number of trees to train (>= 1).'): 30})

In [18]:
predictions = cvModel.bestModel.transform(test_data)


In [19]:
evaluator.evaluate(predictions)

0.9957334656774008

In [20]:
#BY implementiong Random Forest Model, we got 99% accuracy.
#BY implementing LOGESTIC Regression Model, 96% accuracy  is gotten.