## __1. Setup Spark and load other libraries__

In [30]:
import pyspark 
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline
np.random.seed(60)

import pandas as pd
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator 
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

In [31]:
spark = pyspark.sql.SparkSession.builder \
    .appName("Crime_Classification") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()
sc = spark.sparkContext

## __2. Data Extraction__

In [32]:
crime_dataset = pd.read_csv('train.csv')
crime_dataset['Latlong'] = crime_dataset['X']*crime_dataset['Y']
crime_dataset.drop(['X','Y','Dates'],axis='columns',inplace=True)

In [33]:
crime_dataset

Unnamed: 0,Category,Descript,DayOfWeek,PdDistrict,Resolution,Address,Latlong
0,WARRANTS,WARRANT ARREST,Wednesday,NORTHERN,"ARREST, BOOKED",OAK ST / LAGUNA ST,-4624.588916
1,OTHER OFFENSES,TRAFFIC VIOLATION ARREST,Wednesday,NORTHERN,"ARREST, BOOKED",OAK ST / LAGUNA ST,-4624.588916
2,OTHER OFFENSES,TRAFFIC VIOLATION ARREST,Wednesday,NORTHERN,"ARREST, BOOKED",VANNESS AV / GREENWICH ST,-4627.691645
3,LARCENY/THEFT,GRAND THEFT FROM LOCKED AUTO,Wednesday,NORTHERN,NONE,1500 Block of LOMBARD ST,-4627.847257
4,LARCENY/THEFT,GRAND THEFT FROM LOCKED AUTO,Wednesday,PARK,NONE,100 Block of BRODERICK ST,-4624.699819
...,...,...,...,...,...,...,...
878044,ROBBERY,ROBBERY ON THE STREET WITH A GUN,Monday,TARAVAL,NONE,FARALLONES ST / CAPITOL AV,-4618.426865
878045,LARCENY/THEFT,GRAND THEFT FROM LOCKED AUTO,Monday,INGLESIDE,NONE,600 Block of EDNA ST,-4620.177499
878046,LARCENY/THEFT,GRAND THEFT FROM LOCKED AUTO,Monday,SOUTHERN,NONE,5TH ST / FOLSOM ST,-4624.432596
878047,VANDALISM,"MALICIOUS MISCHIEF, VANDALISM OF VEHICLES",Monday,SOUTHERN,NONE,TOWNSEND ST / 2ND ST,-4623.988577


In [34]:
new_data = crime_dataset.to_csv('preproccesing_data.csv', index=False)

# __3.Define Structure to build Pipeline__

In [35]:
df = spark.read.format('csv')\
          .option('header','true')\
          .option('inferSchema', 'true')\
          .option('timestamp', 'true')\
          .load('preproccesing_data.csv')


In [36]:
df.columns

['Category',
 'Descript',
 'DayOfWeek',
 'PdDistrict',
 'Resolution',
 'Address',
 'Latlong']

In [37]:
print('Dataframe Structure')
print('----------------------------------')
print(df.printSchema())
print(' ')
print('Dataframe preview')
print(df.show(5))
print(' ')
print('----------------------------------')
print('Total number of rows', df.count())

Dataframe Structure
----------------------------------
root
 |-- Category: string (nullable = true)
 |-- Descript: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- PdDistrict: string (nullable = true)
 |-- Resolution: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Latlong: double (nullable = true)

None
 
Dataframe preview
+--------------+--------------------+---------+----------+--------------+--------------------+------------------+
|      Category|            Descript|DayOfWeek|PdDistrict|    Resolution|             Address|           Latlong|
+--------------+--------------------+---------+----------+--------------+--------------------+------------------+
|      WARRANTS|      WARRANT ARREST|Wednesday|  NORTHERN|ARREST, BOOKED|  OAK ST / LAGUNA ST|-4624.588915745816|
|OTHER OFFENSES|TRAFFIC VIOLATION...|Wednesday|  NORTHERN|ARREST, BOOKED|  OAK ST / LAGUNA ST|-4624.588915745816|
|OTHER OFFENSES|TRAFFIC VIOLATION...|Wednesday|  NORTHERN|ARR

In [38]:
def top_n_list(df,name_column, N):
    print("Total number of unique value of"+' '+name_column+''+':'+' '+str(df.select(name_column).distinct().count()))
    print(' ')
    print('Top'+' '+str(N)+' '+'Crime'+' '+name_column)
    df.groupBy(name_column).count().withColumnRenamed('count','totalValue').orderBy(col('totalValue').desc()).show(N)
    
    
top_n_list(df, 'Resolution',12)


Total number of unique value of Resolution: 17
 
Top 12 Crime Resolution
+--------------------+----------+
|          Resolution|totalValue|
+--------------------+----------+
|                NONE|    526790|
|      ARREST, BOOKED|    206403|
|       ARREST, CITED|     77004|
|             LOCATED|     17101|
|   PSYCHOPATHIC CASE|     14534|
|           UNFOUNDED|      9585|
|     JUVENILE BOOKED|      5564|
|COMPLAINANT REFUS...|      3976|
|DISTRICT ATTORNEY...|      3934|
|      NOT PROSECUTED|      3714|
|      JUVENILE CITED|      3332|
|PROSECUTED BY OUT...|      2504|
+--------------------+----------+
only showing top 12 rows



## __4. Partition the dataset into Training and Test dataset__

In [39]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator 
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

In [40]:
# Creando una lista de columnas categóricas, excluyendo 'Category'
categorical_columns = ['Descript','DayOfWeek','PdDistrict','Resolution','Address']

In [43]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df) for column in categorical_columns]

encoder = OneHotEncoder(inputCols=[indexer.getOutputCol() for indexer in indexers],
                        outputCols=[column+"_ohe" for column in categorical_columns])

# Agregar un StringIndexer solo para 'Category'
category_indexer = StringIndexer(inputCol="Category", outputCol="Category_index").fit(df)

pipeline = Pipeline(stages=indexers + [encoder, category_indexer])


In [44]:
df = pipeline.fit(df).transform(df)

In [45]:
df.show()

+--------------+--------------------+---------+----------+--------------+--------------------+-------------------+--------------+---------------+----------------+----------------+-------------+-----------------+-------------+--------------+--------------+--------------------+--------------+
|      Category|            Descript|DayOfWeek|PdDistrict|    Resolution|             Address|            Latlong|Descript_index|DayOfWeek_index|PdDistrict_index|Resolution_index|Address_index|     Descript_ohe|DayOfWeek_ohe|PdDistrict_ohe|Resolution_ohe|         Address_ohe|Category_index|
+--------------+--------------------+---------+----------+--------------+--------------------+-------------------+--------------+---------------+----------------+----------------+-------------+-----------------+-------------+--------------+--------------+--------------------+--------------+
|      WARRANTS|      WARRANT ARREST|Wednesday|  NORTHERN|ARREST, BOOKED|  OAK ST / LAGUNA ST| -4624.588915745816|          

In [46]:
# Updating feature_columns
feature_columns = [column+"_ohe" for column in categorical_columns] + ['Latlong']

(train_data, test_data) = df.randomSplit([0.7, 0.3])

In [47]:
feature_columns

['Descript_ohe',
 'DayOfWeek_ohe',
 'PdDistrict_ohe',
 'Resolution_ohe',
 'Address_ohe',
 'Latlong']

In [48]:
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)

lr = LogisticRegression(featuresCol='scaledFeatures', labelCol='Category_index', maxIter=10)

pipeline = Pipeline(stages=[assembler, scaler, lr])

In [49]:
model = pipeline.fit(train_data)

In [50]:
predictions = model.transform(test_data)
#predictions = predictions.select('Category_index', 'prediction')
predictions.select('Category_index', 'prediction')
predictions.show()

+--------+--------+---------+----------+-----------------+--------------------+-------------------+--------------+---------------+----------------+----------------+-------------+-----------------+-------------+--------------+--------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+----------+
|Category|Descript|DayOfWeek|PdDistrict|       Resolution|             Address|            Latlong|Descript_index|DayOfWeek_index|PdDistrict_index|Resolution_index|Address_index|     Descript_ohe|DayOfWeek_ohe|PdDistrict_ohe|Resolution_ohe|         Address_ohe|Category_index|            features|      scaledFeatures|       rawPrediction|         probability|prediction|
+--------+--------+---------+----------+-----------------+--------------------+-------------------+--------------+---------------+----------------+----------------+-------------+-----------------+-------------+--------------+--------------+----------------

In [61]:
import numpy as np

coefficients_matrix = model.stages[-1].coefficientMatrix
coefficients_array = coefficients_matrix.toArray()
odds_ratios = np.exp(coefficients_array)
print(odds_ratios)


[[7.04228683 0.63160298 0.71886709 ... 0.99883832 0.99981407 0.99767474]
 [0.765952   0.85642248 0.79185587 ... 0.99952225 1.00178606 1.00309971]
 [0.75937192 3.92294725 0.8596325  ... 0.99957884 1.00020227 1.00086189]
 ...
 [0.99901337 0.99937476 0.9993946  ... 0.99999659 0.99999693 1.00006902]
 [0.99978012 0.99985434 0.9998613  ... 0.99999914 0.99999919 1.00000136]
 [0.99993935 0.99995972 0.99996168 ... 0.99999976 0.99999977 0.99999563]]


In [62]:
len(odds_ratios)

39

In [22]:
evaluator = MulticlassClassificationEvaluator(labelCol="Category_index").setPredictionCol("prediction").evaluate(predictions)
print(' ')
print('--------------------------Accuracy-----------------------------')
print(' ')
print('               accuracy:{}'.format(evaluator))


 
--------------------------Accuracy-----------------------------
 
               accuracy:0.9967394426394165


In [23]:
from pyspark.mllib.evaluation import MulticlassMetrics
predictions.groupBy('Category_index', 'prediction').count().show()

# Calculate the elements of the confusion matrix
TN = predictions.filter('prediction = 0 AND Category_index  = prediction').count()
TP = predictions.filter('prediction = 1 AND Category_index  = prediction').count()
FN = predictions.filter('prediction = 0 AND Category_index  = 1').count()
FP = predictions.filter('prediction = 1 AND Category_index  = 0').count()

recall = TP / (TP + FN)
precision = TN / (TN + FP)

balance_accuracy = 0.5 * (recall + precision)

print('balance_accuracy = {:.8f}'.format(balance_accuracy))

+--------------+----------+-----+
|Category_index|prediction|count|
+--------------+----------+-----+
|          22.0|       5.0|  248|
|          36.0|      36.0|   45|
|           7.0|       7.0|12702|
|          35.0|      35.0|   43|
|          12.0|      12.0| 5044|
|           3.0|      23.0|   86|
|           1.0|       1.0|37768|
|          31.0|      31.0|  128|
|          10.0|      10.0| 7816|
|          28.0|      28.0|  365|
|          14.0|      14.0| 2993|
|          27.0|      27.0|  429|
|           5.0|      22.0|  446|
|          21.0|      21.0| 1258|
|          17.0|      17.0| 2143|
|          26.0|      26.0|  564|
|           2.0|       2.0|27689|
|           1.0|       0.0|    7|
|          24.0|      24.0|  652|
|          23.0|      23.0|  662|
+--------------+----------+-----+
only showing top 20 rows

balance_accuracy = 0.99990735


In [63]:
from pyspark.ml.classification import DecisionTreeClassifier


dt = DecisionTreeClassifier(featuresCol='scaledFeatures', labelCol='Category_index')
pipeline = Pipeline(stages=[assembler, scaler, dt])
model_dt = pipeline.fit(train_data)
importances = model_dt.stages[-1].featureImportances
for feature, importance in zip(feature_columns, importances):
    print(f"Feature: {feature}, Importance: {importance}")

Feature: Descript_ohe, Importance: 0.30254720845283917
Feature: DayOfWeek_ohe, Importance: 0.19087469107356056
Feature: PdDistrict_ohe, Importance: 0.16916970521804756
Feature: Resolution_ohe, Importance: 0.17667415026118588
Feature: Address_ohe, Importance: 0.0
Feature: Latlong, Importance: 0.16073424499436692


In [64]:
predictions_dt = model_dt.transform(test_data)

# Ver las predicciones
predictions_dt.select('Category_index', 'prediction').show(10)

+--------------+----------+
|Category_index|prediction|
+--------------+----------+
|          27.0|       1.0|
|          27.0|       1.0|
|          27.0|       1.0|
|          27.0|       1.0|
|          27.0|       1.0|
|          27.0|       1.0|
|          27.0|       1.0|
|          27.0|       1.0|
|          27.0|       1.0|
|          27.0|       1.0|
+--------------+----------+
only showing top 10 rows



In [65]:
predictions_dt.show()

+--------+--------+---------+----------+-----------------+--------------------+-------------------+--------------+---------------+----------------+----------------+-------------+-----------------+-------------+--------------+--------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+----------+
|Category|Descript|DayOfWeek|PdDistrict|       Resolution|             Address|            Latlong|Descript_index|DayOfWeek_index|PdDistrict_index|Resolution_index|Address_index|     Descript_ohe|DayOfWeek_ohe|PdDistrict_ohe|Resolution_ohe|         Address_ohe|Category_index|            features|      scaledFeatures|       rawPrediction|         probability|prediction|
+--------+--------+---------+----------+-----------------+--------------------+-------------------+--------------+---------------+----------------+----------------+-------------+-----------------+-------------+--------------+--------------+----------------

In [69]:
evaluator_dt = MulticlassClassificationEvaluator(labelCol="Category_index").setPredictionCol("prediction").evaluate(predictions_dt)
print(' ')
print('--------------------------Accuracy-----------------------------')
print(' ')
print('               accuracy:{}'.format(evaluator_dt))

 
--------------------------Accuracy-----------------------------
 
               accuracy:0.31997874157596834
