In [30]:
#installing pyspark
#pip install pyspark

In [31]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer,StandardScaler
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier

In [32]:
# Create a SparkSession
spark = SparkSession.builder.appName("NasaPredictor").master("local").getOrCreate()

In [33]:
# Load the dataset
data = spark.read.csv("C:/Users/EZENWAJIAKU CHINEDU/Desktop/Course Work/Big Data/nasa.csv", header=True, inferSchema=True)

In [34]:
import pyspark.sql.functions as fc
print((data.count(), len(data.columns)))
#data.describe().show()


# null values in each column
data_null = data.agg(*[fc.count(fc.when(fc.isnull(c), c)).alias(c) for c in data.columns])
#data_null.show()   # no null values

(4687, 40)


In [35]:
# Drop columns not neccessary for classification
new_data = data.drop('Neo Reference ID', 'Name', 'Orbit ID','Orbiting Body','Equinox','Miss Dist(Astronomical)', 'Miss Dist(miles)', 'Miss Dist(Astronomical)')
new_data = new_data.drop('Est Dia in KM(min)','Est Dia in M(min)','Est Dia in Miles(min)','Est Dia in Feet(min)','Relative Velocity km per hr')

In [36]:
#new_data.dtypes
#new_data.summary
#new_data.describe().show()

In [37]:
# Changing the Boolean type of Harzadous column to String
from pyspark.sql.functions import when

new_data = new_data.withColumn('Hazardous_Encoded', when(new_data.Hazardous==True, 1).otherwise(0))
new_data = new_data.drop("Hazardous")

In [38]:
new_data.groupBy('Hazardous_Encoded').count().orderBy('count').show()

+-----------------+-----+
|Hazardous_Encoded|count|
+-----------------+-----+
|                1|  755|
|                0| 3932|
+-----------------+-----+



In [39]:
print((new_data.count(), len(new_data.columns)))

(4687, 28)


In [40]:
new_data.dtypes

[('Absolute Magnitude', 'double'),
 ('Est Dia in KM(max)', 'double'),
 ('Est Dia in M(max)', 'double'),
 ('Est Dia in Miles(max)', 'double'),
 ('Est Dia in Feet(max)', 'double'),
 ('Close Approach Date', 'string'),
 ('Epoch Date Close Approach', 'double'),
 ('Relative Velocity km per sec', 'double'),
 ('Miles per hour', 'double'),
 ('Miss Dist(lunar)', 'double'),
 ('Miss Dist(kilometers)', 'double'),
 ('Orbit Determination Date', 'string'),
 ('Orbit Uncertainity', 'int'),
 ('Minimum Orbit Intersection', 'double'),
 ('Jupiter Tisserand Invariant', 'double'),
 ('Epoch Osculation', 'double'),
 ('Eccentricity', 'double'),
 ('Semi Major Axis', 'double'),
 ('Inclination', 'double'),
 ('Asc Node Longitude', 'double'),
 ('Orbital Period', 'double'),
 ('Perihelion Distance', 'double'),
 ('Perihelion Arg', 'double'),
 ('Aphelion Dist', 'double'),
 ('Perihelion Time', 'double'),
 ('Mean Anomaly', 'double'),
 ('Mean Motion', 'double'),
 ('Hazardous_Encoded', 'int')]

In [41]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

#label encoding of categorical columns
categorical_cols = ['Close Approach Date', 'Orbit Determination Date']
label_encoders = [StringIndexer(inputCol=col, outputCol=col + "_encoded").fit(new_data) for col in categorical_cols]
pipeline = Pipeline(stages=label_encoders)
new_data = pipeline.fit(new_data).transform(new_data)

In [42]:
new_data = new_data.drop('Close Approach Date', 'Orbit Determination Date')

In [43]:
new_data.dtypes

[('Absolute Magnitude', 'double'),
 ('Est Dia in KM(max)', 'double'),
 ('Est Dia in M(max)', 'double'),
 ('Est Dia in Miles(max)', 'double'),
 ('Est Dia in Feet(max)', 'double'),
 ('Epoch Date Close Approach', 'double'),
 ('Relative Velocity km per sec', 'double'),
 ('Miles per hour', 'double'),
 ('Miss Dist(lunar)', 'double'),
 ('Miss Dist(kilometers)', 'double'),
 ('Orbit Uncertainity', 'int'),
 ('Minimum Orbit Intersection', 'double'),
 ('Jupiter Tisserand Invariant', 'double'),
 ('Epoch Osculation', 'double'),
 ('Eccentricity', 'double'),
 ('Semi Major Axis', 'double'),
 ('Inclination', 'double'),
 ('Asc Node Longitude', 'double'),
 ('Orbital Period', 'double'),
 ('Perihelion Distance', 'double'),
 ('Perihelion Arg', 'double'),
 ('Aphelion Dist', 'double'),
 ('Perihelion Time', 'double'),
 ('Mean Anomaly', 'double'),
 ('Mean Motion', 'double'),
 ('Hazardous_Encoded', 'int'),
 ('Close Approach Date_encoded', 'double'),
 ('Orbit Determination Date_encoded', 'double')]

In [44]:
#new_data.show(5)

In [45]:
#seperating the target column from features
features_data = new_data.drop("Hazardous_Encoded")

In [46]:
#features_data.dtypes

In [47]:
#Assembling all the features
features_col = features_data.columns
print(features_col)
assembler = VectorAssembler(inputCols=features_col, outputCol="Vfeatures")
new_data = assembler.transform(new_data)
new_data = new_data.select("Vfeatures", "Hazardous_Encoded")


['Absolute Magnitude', 'Est Dia in KM(max)', 'Est Dia in M(max)', 'Est Dia in Miles(max)', 'Est Dia in Feet(max)', 'Epoch Date Close Approach', 'Relative Velocity km per sec', 'Miles per hour', 'Miss Dist(lunar)', 'Miss Dist(kilometers)', 'Orbit Uncertainity', 'Minimum Orbit Intersection', 'Jupiter Tisserand Invariant', 'Epoch Osculation', 'Eccentricity', 'Semi Major Axis', 'Inclination', 'Asc Node Longitude', 'Orbital Period', 'Perihelion Distance', 'Perihelion Arg', 'Aphelion Dist', 'Perihelion Time', 'Mean Anomaly', 'Mean Motion', 'Close Approach Date_encoded', 'Orbit Determination Date_encoded']


In [48]:
new_data.show(5)

+--------------------+-----------------+
|           Vfeatures|Hazardous_Encoded|
+--------------------+-----------------+
|[21.6,0.284472297...|                1|
|[21.3,0.326617897...|                0|
|[20.3,0.517654482...|                1|
|[27.4,0.019680675...|                0|
|[21.6,0.284472297...|                1|
+--------------------+-----------------+
only showing top 5 rows



In [49]:
#scaling the features
scaled_data = StandardScaler(inputCol="Vfeatures", outputCol="features")
new_data = scaled_data.fit(new_data).transform(new_data)

In [50]:
new_data.show(20)

+--------------------+-----------------+--------------------+
|           Vfeatures|Hazardous_Encoded|            features|
+--------------------+-----------------+--------------------+
|[21.6,0.284472297...|                1|[7.47153547294760...|
|[21.3,0.326617897...|                0|[7.36776414693444...|
|[20.3,0.517654482...|                1|[7.02185972689057...|
|[27.4,0.019680675...|                0|[9.47778110920205...|
|[21.6,0.284472297...|                1|[7.47153547294760...|
|[19.6,0.714562102...|                0|[6.77972663285986...|
|[19.6,0.714562102...|                0|[6.77972663285986...|
|[19.2,0.859092601...|                0|[6.64136486484231...|
|[17.8,1.636967205...|                0|[6.15709867678089...|
|[21.5,0.297879063...|                1|[7.43694503094321...|
|[22.4,0.196806745...|                0|[7.74825900898269...|
|[25.8,0.041118757...|                0|[8.92433403713185...|
|[25.0,0.059434687...|                0|[8.64761050109676...|
|[19.1,0

In [51]:
#renaming the target column to label
new_data = new_data.select("features", "Hazardous_Encoded")
new_data = new_data.withColumnRenamed("Hazardous_Encoded","label")

In [52]:
#splitting into test and train data
train_data, test_data = new_data.randomSplit([0.8, 0.2], seed=42)

In [53]:
# Logistic Regression
log_reg=LogisticRegression().fit(train_data)

#Get Predictions for Logistic Regression Model
predictions = log_reg.transform(test_data)
multi_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label")

#Metrics for evaluation
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
auc = evaluator.evaluate(predictions)
f1_Score = (2*precision*recall)/(precision+recall)

print("AUC-ROC: ", auc)
print("Accuracy: ", accuracy)
print("Precision: ", precision)
print("Recall: ", recall)
print("F-Score: ",f1_Score)

AUC-ROC:  0.9918904237156604
Accuracy:  0.9586129753914989
Precision:  0.958272242707148
Recall:  0.9586129753914988
F-Score:  0.9584425787661427


In [54]:
#Display the Logistic Regresssion predictions
predictions.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[4.92221989722427...|    0|[17.8918287416001...|[0.99999998303016...|       0.0|
|[5.05020453264050...|    0|[28.6212693850750...|[0.99999999999962...|       0.0|
|[5.25774718466683...|    0|[15.9383154135761...|[0.99999988030458...|       0.0|
|[5.32692806867560...|    0|[56.3633633172686...|           [1.0,0.0]|       0.0|
|[5.43069939468876...|    0|[19.0731573613145...|[0.99999999479245...|       0.0|
|[5.49988027869754...|    0|[25.6552662788696...|[0.99999999999278...|       0.0|
|[5.53447072070192...|    0|[25.4828339951187...|[0.99999999999143...|       0.0|
|[5.55176594170412...|    0|[14.3997357325042...|[0.99999944246262...|       0.0|
|[5.63824204671508...|    0|[20.9636822446090...|[0.99999999921369...|       0.0|
|[5.638242046715

Gradient Boost

In [55]:
# Gradient Boost Classifier
gradient_boost_class = GBTClassifier(labelCol="label", featuresCol="features")
model = gradient_boost_class.fit(train_data)

#Get predictions for Gradient Boost model
predictionGBT = model.transform(test_data)

#Metrics for evaluation
recall = multi_evaluator.evaluate(predictionGBT, {multi_evaluator.metricName: "weightedRecall"})
accuracy = multi_evaluator.evaluate(predictionGBT, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(predictionGBT, {multi_evaluator.metricName: "weightedPrecision"})
auc = evaluator.evaluate(predictionGBT)
f1_Score = (2*precision*recall)/(precision+recall)

print("AUC-ROC: ", auc)
print("Accuracy: ", accuracy)
print("Precision: ", precision)
print("Recall: ", recall)
print("F-Score: ",f1_Score)

AUC-ROC:  0.9935977029334161
Accuracy:  0.9899328859060402
Precision:  0.990064402542591
Recall:  0.9899328859060403
F-Score:  0.989998639856475


In [56]:
predictionGBT.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[4.92221989722427...|    0|[-0.8921173251773...|[0.14378102777132...|       1.0|
|[5.05020453264050...|    0|[1.54350200272498...|[0.95635347857270...|       0.0|
|[5.25774718466683...|    0|[1.54350200272498...|[0.95635347857270...|       0.0|
|[5.32692806867560...|    0|[1.54350200272498...|[0.95635347857270...|       0.0|
|[5.43069939468876...|    0|[1.54350200272498...|[0.95635347857270...|       0.0|
|[5.49988027869754...|    0|[1.54350200272498...|[0.95635347857270...|       0.0|
|[5.53447072070192...|    0|[1.54350200272498...|[0.95635347857270...|       0.0|
|[5.55176594170412...|    0|[1.54350200272498...|[0.95635347857270...|       0.0|
|[5.63824204671508...|    0|[1.54350200272498...|[0.95635347857270...|       0.0|
|[5.638242046715

RANDOM FOREST CLASSIFIER

In [57]:
random_forest = RandomForestClassifier(labelCol="label", featuresCol="features")
model = random_forest.fit(train_data)

#Get predictions for Randomforest Boost model
predictionRDF = model.transform(test_data)
recall = multi_evaluator.evaluate(predictionRDF, {multi_evaluator.metricName: "weightedRecall"})
accuracy = multi_evaluator.evaluate(predictionRDF, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(predictionRDF, {multi_evaluator.metricName: "weightedPrecision"})
auc = evaluator.evaluate(predictionRDF)
f1_Score = (2*precision*recall)/(precision+recall)

print("AUC-ROC: ", auc)
print("Accuracy: ", accuracy)
print("Precision: ", precision)
print("Recall: ", recall)
print("F-Score: ",f1_Score)

AUC-ROC:  0.9949460654974391
Accuracy:  0.9944071588366891
Precision:  0.9944286977654283
Recall:  0.994407158836689
F-Score:  0.9944179281844263


In [58]:
predictionRDF.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[4.92221989722427...|    0|[14.7916318506549...|[0.73958159253274...|       0.0|
|[5.05020453264050...|    0|[18.3187209801349...|[0.91593604900674...|       0.0|
|[5.25774718466683...|    0|[18.9708633182592...|[0.94854316591296...|       0.0|
|[5.32692806867560...|    0|[19.0180934939243...|[0.95090467469622...|       0.0|
|[5.43069939468876...|    0|[19.0794144103474...|[0.95397072051737...|       0.0|
|[5.49988027869754...|    0|[18.6974344318204...|[0.93487172159102...|       0.0|
|[5.53447072070192...|    0|[19.2844739246083...|[0.96422369623041...|       0.0|
|[5.55176594170412...|    0|[18.3187209801349...|[0.91593604900674...|       0.0|
|[5.63824204671508...|    0|[17.3996496297034...|[0.86998248148517...|       0.0|
|[5.638242046715