In [1]:
# Must be included at the beginning of each new notebook.
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Data_Mining_GlobalTerrorism').getOrCreate()

# Importing complete dataset

In [2]:
df = spark.read.csv('Complete_dataset.csv', header=True, inferSchema=True)

In [3]:
df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Date: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Province: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Attack: string (nullable = true)
 |-- Target: string (nullable = true)
 |-- Nationality: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Individual: integer (nullable = true)
 |-- Weapon: string (nullable = true)
 |-- Ishostkid: double (nullable = true)



In [4]:
from pyspark.sql import *
spark = SparkSession.builder.appName('Final1').getOrCreate()
# Print data columns.
df.columns

['Year',
 'Month',
 'Date',
 'Country',
 'Region',
 'Province',
 'City',
 'Attack',
 'Target',
 'Nationality',
 'Group',
 'Individual',
 'Weapon',
 'Ishostkid']

# Data Transformation - encoding

In [5]:
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,OneHotEncoder,StringIndexer)

# Goal 2

In [6]:
#indexing

Country_indexer = StringIndexer(inputCol='Country',outputCol='CountryIndex')
Region_indexer = StringIndexer(inputCol='Region',outputCol='RegionIndex')
Province_indexer = StringIndexer(inputCol='Province',outputCol='ProvinceIndex')
City_indexer = StringIndexer(inputCol='City',outputCol='CityIndex')
Target_indexer = StringIndexer(inputCol='Target',outputCol='TargetIndex')
Nationality_indexer = StringIndexer(inputCol='Nationality',outputCol='NationalityIndex')
Group_indexer = StringIndexer(inputCol='Group',outputCol='GroupIndex')
Weapon_indexer = StringIndexer(inputCol='Weapon',outputCol='WeaponIndex')
Attack_indexer = StringIndexer(inputCol='Attack',outputCol='label')

#encoding

Country_encoder = OneHotEncoder(inputCol='CountryIndex',outputCol='CountryVec')
Region_encoder = OneHotEncoder(inputCol='RegionIndex',outputCol='RegionVec')
Province_encoder = OneHotEncoder(inputCol='ProvinceIndex',outputCol='ProvinceVec')
City_encoder = OneHotEncoder(inputCol='CityIndex',outputCol='CityVec')
Target_encoder = OneHotEncoder(inputCol='TargetIndex',outputCol='TargetVec')
Nationality_encoder = OneHotEncoder(inputCol='NationalityIndex',outputCol='NationalityVec')
Group_encoder = OneHotEncoder(inputCol='GroupIndex',outputCol='GroupVec')
Weapon_encoder = OneHotEncoder(inputCol='WeaponIndex',outputCol='WeaponVec')

label_encoder = OneHotEncoder(inputCol='label',outputCol='label')



In [7]:
# Now we can assemble all of this as one vector in the features column. 
assembler = VectorAssembler(inputCols=['Year','CountryVec',
 'RegionVec',
 'ProvinceVec',
 'CityVec',
 'TargetVec',
 'NationalityVec',
 'GroupVec',
 'WeaponVec'],outputCol='features')

In [8]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[Country_indexer,Region_indexer,Province_indexer,City_indexer,Target_indexer,
                            Nationality_indexer,Group_indexer,Weapon_indexer,Attack_indexer,Country_encoder,Region_encoder,
                            Province_encoder,City_encoder,Target_encoder,Nationality_encoder,Group_encoder,Weapon_encoder,assembler])

# Pipeline

In [9]:
pipeline_model = pipeline.fit(df)
pipe_df = pipeline_model.transform(df)
pipe_df = pipe_df.select('label','features')
pipe_df.describe()

DataFrame[summary: string, label: string]

# Splitting of data

In [10]:
train_data, test_data = pipe_df.randomSplit([0.7,0.3])
print("Training Dataset Count: " + str(train_data.count()))
print("Test Dataset Count: " + str(test_data.count()))

Training Dataset Count: 78380
Test Dataset Count: 33476


# ML Algorithms

In [11]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import *
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier,GBTClassifier,RandomForestClassifier
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier, MultilayerPerceptronClassifier, LogisticRegression
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler, Normalizer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
import pandas as pd
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Random Forest

In [12]:
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = rf.fit(train_data)
predictions = rfModel.transform(test_data)

In [13]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [14]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

Test set accuracy = 0.5237483570319035


# Naive Bayes

In [15]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# create the trainer and set its parameters
nb = NaiveBayes(smoothing=0.5, modelType="multinomial")

# train the model
model = nb.fit(train_data)

# select example rows to display.
predictions = model.transform(test_data)
#predictions.show()

# compute accuracy on the test set
#evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
#print("Test set accuracy = " + str(accuracy))

evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Test Area Under ROC: 0.4582433811544721


In [20]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# train the model
model = nb.fit(train_data)

# select example rows to display.
predictions = model.transform(test_data)
#predictions.show()

# compute accuracy on the test set
#evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
#print("Test set accuracy = " + str(accuracy))

evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Test Area Under ROC: 0.464374650008304


# Logistic Regression

In [16]:
##Logistics Regression##
from pyspark.ml.classification import LogisticRegression

# Split our data. Note that the new DataFrame is being used.
train_data, test_data = pipe_df.randomSplit([0.8,0.2])
print("Training Dataset Count: " + str(train_data.count()))
print("Test Dataset Count: " + str(test_data.count()))

# Instantiate the model.
lr_model = LogisticRegression(featuresCol='features',labelCol='label')

# Fit the model.
lr_model = lr_model.fit(train_data)

# And evaluate the model using the test data.
predictions = lr_model.transform(test_data)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

Training Dataset Count: 89310
Test Dataset Count: 22546
Test set accuracy = 0.8096336378958574


In [19]:
##Logistics Regression##
from pyspark.ml.classification import LogisticRegression

# Split our data. Note that the new DataFrame is being used.
train_data, test_data = pipe_df.randomSplit([0.7,0.3])
print("Training Dataset Count: " + str(train_data.count()))
print("Test Dataset Count: " + str(test_data.count()))

# Instantiate the model.
lr_model = LogisticRegression(featuresCol='features',labelCol='label')

# Fit the model.
lr_model = lr_model.fit(train_data)

# And evaluate the model using the test data.
predictions = lr_model.transform(test_data)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

Training Dataset Count: 78247
Test Dataset Count: 33609
Test set accuracy = 0.8121931625457467


# One Vs Rest

In [18]:
from pyspark.ml.classification import LogisticRegression, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


# generate the train/test split.
(train_data, test_data) = pipe_df.randomSplit([0.7, 0.3])

# instantiate the base classifier.
lr = LogisticRegression(maxIter=5, tol=1E-6, fitIntercept=True)

# instantiate the One Vs Rest Classifier.
ovr = OneVsRest(classifier=lr)

# train the multiclass model.
ovrModel = ovr.fit(train_data)

# score the model on test data.
predictions = ovrModel.transform(test_data)

# obtain evaluator.
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")

# compute the classification error on test data.
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

Test set accuracy = 0.7437130726522372
