In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
sqlcontext = SQLContext(sc)

# read data
df = sqlContext.read.load("Chicago_Crimes_2012-2017.csv", format='com.databricks.spark.csv', header = 'true', inferSchema='true')


ModuleNotFoundError: No module named 'pyspark'

In [None]:
df.head(2)

In [None]:
df.columns

In [None]:
df.printSchema()

In [None]:
# Data summary:
df.describe().toPandas().transpose()

In [None]:
# Drop unecessary columns:
df = df.drop("Description")
df = df.drop("Unnamed: 0")
df = df.drop("ID")
df = df.drop("IUCR")
df = df.drop("Case Number")
df = df.drop("X Coordinate")
df = df.drop("Location")
df = df.drop("Y Coordinate")
df = df.drop("Date")

In [None]:
# Remove missing values:
df = df.na.drop()

In [None]:
# Drop duplicates:
df = df.dropDuplicates()

In [None]:
# Check number of rows
df.count(),len(df.columns)

In [None]:
df.printSchema()

In [None]:
# drop rows with Primary type = 'NON-CRIMINAL (SUBJECT SPECIFIED)','NON-CRIMINAL' and 'NON - CRIMINAL'
df = df.filter(df['Primary Type']!='NON-CRIMINAL (SUBJECT SPECIFIED)')
df = df.filter(df['Primary Type']!='NON-CRIMINAL')
df = df.filter(df['Primary Type']!='NON - CRIMINAL')

In [None]:
# Check:
df.select(['Primary Type']).distinct().count()

# NB Model #

In [None]:
# Encoding
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df) for column in list(set(df.columns)-set(['Beat', 'District', 'Ward', 'Community Area', 'Year', 'Latitude', 'Longitude', 'Month', 'Day', 'Hour', 'Minute'])) ]

In [None]:
pipeline = Pipeline(stages=indexers)
df_r = pipeline.fit(df).transform(df)

df_r.show(1)

In [None]:
#Checking:
df_r.columns

In [None]:
# remove the Encoded columns
df_r = df_r.drop("Primary Type")
df_r = df_r.drop("FBI Code")
df_r = df_r.drop("Arrest")
df_r = df_r.drop("Block")
df_r = df_r.drop("Location Description")
df_r = df_r.drop("Domestic")

In [None]:
df_r.columns

In [None]:
# Convert negative values from 'Longitude':
from pyspark.sql.functions import abs

df_r = df_r.withColumn('Longitude',abs(df_r.Longitude))
df_r.head()

In [None]:
# Create a column with all the features:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = [
 'FBI Code_index',
 'Primary Type_index',
 'Block_index',
 'Location Description_index',
 'Domestic_index',
 'Beat',
 'District',
 'Ward',
 'Community Area',
 'Year',
 'Latitude',
 'Longitude',
 'Month',
 'Day',
 'Hour',
 'Minute',],outputCol = "features") 
df_r_features = vectorAssembler.transform(df_r) 
df_r_features.head(1)

In [None]:
# Check:
df_r_features.columns

In [None]:
# Rename "Arrested_index" to "label"
df_r_features = df_r_features.withColumnRenamed("Arrest_index","label")

In [None]:
# Check
df_r_features.printSchema()

In [None]:
# Check
df_r_features.select(['label']).distinct().count()

In [None]:
# Split the Data to train and test data:
splits = df_r_features.randomSplit([0.7,0.3], 123) 
# optional value 42 is seed for sampling 
Training_df = splits[0] 
Testing_df = splits[1]

In [None]:
# Naive bayes:
from pyspark.ml.classification import NaiveBayes
NB = NaiveBayes(modelType="multinomial")

In [None]:
# Train model
nbmodel = NB.fit(Training_df)

In [None]:
# Predict for df_r_features:
NB_predictions_df = nbmodel.transform(Testing_df)
#predictions_df.show(5, True)

In [None]:
NB_predictions_df.select('prediction','label').show(20)

In [None]:
# Evaluation of the NB model:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="precision") 
nbaccuracy = evaluator.evaluate(NB_predictions_df) 
print("The Naive Bayes model accuracy = " + str(nbaccuracy))


# Logistic Regression #

In [None]:
# Encoding
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df) for column in list(set(df.columns)-set(['Arrest','Beat', 'District', 'Ward', 'Community Area', 'Year', 'Latitude', 'Longitude', 'Month', 'Day', 'Hour', 'Minute'])) ]



In [None]:
pipeline = Pipeline(stages=indexers)
df_r_DT = pipeline.fit(df).transform(df)

df_r_DT.show(1)

In [None]:
# Check:
df_r_DT.printSchema()

In [None]:
# Change arrest from boolean into string:
from pyspark.sql.types import StringType,BooleanType,DateType
from pyspark.sql.functions import col
df_r_DT = df_r_DT.withColumn("Arrest",col("Arrest").cast(StringType()))

In [None]:
# String indexing Arrest:
indexer = StringIndexer(inputCol = "Arrest", outputCol = "Arrest_index")
indexed = indexer.fit(df_r_DT).transform(df_r_DT)

In [None]:
indexed.printSchema()

In [None]:
# Could be uncessary:
# remove the Encoded columns
indexed = indexed.drop("Primary Type")
df_r = df_r.drop("FBI Code")
df_r = df_r.drop("Arrest")
df_r = df_r.drop("Block")
df_r = df_r.drop("Location Description")
df_r = df_r.drop("Domestic")

In [None]:
# Convert negative values from 'Longitude':
from pyspark.sql.functions import abs

indexed = indexed.withColumn('Longitude',abs(indexed.Longitude))
indexed.head()

In [None]:
from pyspark.ml.feature import VectorAssembler
# Create a column with all the features:
vectorAssembler = VectorAssembler(inputCols = [
 'FBI Code_index',
 'Primary Type_index',
 'Block_index',
 'Location Description_index',
 'Domestic_index',
 'Beat',
 'District',
 'Ward',
 'Community Area',
 'Year',
 'Latitude',
 'Longitude',
 'Month',
 'Day',
 'Hour',
 'Minute',],outputCol = "features") 
df_r_features = vectorAssembler.transform(indexed) 
df_r_features.head(1)

In [None]:
# Rename "Arrested_index" to "label"
df_r_features = df_r_features.withColumnRenamed("Arrest_index","label")

In [None]:
# Take only 'features' and 'labls'
df_r2 = df_r_features.select('features','label')

In [None]:
# Check:
df_r2.printSchema()

In [None]:
# Split the Data to train and test data:
splits = df_r2.randomSplit([0.6,0.4], 123) 
# optional value 42 is seed for sampling 
Training_df1 = splits[0] 
Testing_df1 = splits[1]


In [None]:
# logistic regression:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
lrModel = lr.fit(Training_df1)

In [None]:
# prdict:
predictions = lrModel.transform(Testing_df1)
predictions.select( 'label', 'rawPrediction', 'prediction', 'probability').show(10)

In [None]:
# Evaluation of logistic regression:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator.evaluate(predictions))