# Modelo predictivo (Espacial) de siniestros en las calles de Santiago
**Model will be split in train/test (80%/20%), using 2013-2017 Dataset and Validated with 2018 Dataset**
- UDD - MDS18 - BDA
- Final Delivery 
- 41__Final_Geo_Project_PySpark_MLib_XGBoost_Model_Gr100_dataset-2013-17
- 09 August 2019

**Running dataset created from 2013 to 2017 as df**

Developed based on:
- Big Data Analytics, Oscar Peredo 
- [PySpark ML and XGBoost full integration tested on the Kaggle Titanic dataset](https://towardsdatascience.com/pyspark-and-xgboost-integration-tested-on-the-kaggle-titanic-dataset-4e75a568bdb)
- [Machine Learning with PySpark and MLlib — Solving a Binary Classification Problem](https://towardsdatascience.com/machine-learning-with-pyspark-and-mllib-solving-a-binary-classification-problem-96396065d2aa)
- [Build an end-to-end Machine Learning Model with MLlib in pySpark.](https://towardsdatascience.com/build-an-end-to-end-machine-learning-model-with-mllib-in-pyspark-4917bdf289c5)

## Preliminar Installations

- Download the XGBoost jars - The python code will need two scala jars dependencies in order to work. You can download them directly from maven:<br>
[xgboost4j]( https://mvnrepository.com/artifact/ml.dmlc/xgboost4j/0.72?source=post_page---------------------------)<br>
[xgboost4j-spark](https://mvnrepository.com/artifact/ml.dmlc/xgboost4j-spark/0.72?source=post_page---------------------------)
<br><br>
- Download the PySpark XGBoost code:<br>
[PySpark XGBoost](https://github.com/dmlc/xgboost/files/2161553/sparkxgb.zip?source=post_page---------------------------)
<br><br>
- FindSpark: PySpark isn't on sys.path by default, but it can be used as a regular library. This can be addressed by either symlinking pyspark into your site-packages, or adding pyspark to sys.path at runtime, as done by findspark.<br>
[findspark](https://github.com/minrk/findspark)<br><br>


## General functions

In [2]:
def find_num_cat_features(df):
    cat_cols = [item[0] for item in df.dtypes if item[1].startswith('string')]
    print(str(len(cat_cols)) + '  categorical features')

    num_cols = [
        item[0] for item in df.dtypes
        if item[1].startswith('int') | item[1].startswith('double')
    ][1:]
    print(str(len(num_cols)) + '  numerical features')
    return num_cols, cat_cols

## Libraries and pySpark initialization

In [4]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars xgboost4j-spark-0.72.jar,xgboost4j-0.72.jar pyspark-shell'

In [5]:
import numpy as np
import pandas as pd

In [6]:
import findspark
findspark.init()

In [7]:
import pyspark
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *
from sparkxgb import XGBoostEstimator
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml import Pipeline,PipelineModel
from pyspark.sql.functions import rank, sum, col, mean, round, when

In [8]:
spark = SparkSession\
        .builder\
        .appName("PySpark XGBOOST Grid 100m Data Siniestros 2013-2017")\
        .getOrCreate()

In [9]:
spark.sparkContext.addPyFile("./sparkxgb/sparkxgb.zip")

## Dataset - Generated from Notebook: 
- 30_Sprint_IV_Geo_Project_Final Datasets_Creation

In [10]:
!ls ../data/

[34mCONASET[m[m                              geo_stgo_100_crash_test_dataset.csv
[34mOSM_Chile[m[m                            geo_stgo_100_crash_train_dataset.csv
final_test_dataset_grid_100.csv      geo_stgo_100_estatic_dataset.csv
final_train_dataset_grid_100.csv


In [11]:
df = spark.read.csv('../data/final_train_dataset_grid_100.csv',
                    header=True,
                    inferSchema=True)
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- X: double (nullable = true)
 |-- Y: double (nullable = true)
 |-- bank: integer (nullable = true)
 |-- bench: integer (nullable = true)
 |-- beverages: integer (nullable = true)
 |-- bus_stop: integer (nullable = true)
 |-- bus_stop_100: integer (nullable = true)
 |-- cafe: integer (nullable = true)
 |-- convenience: integer (nullable = true)
 |-- convenience_100: integer (nullable = true)
 |-- convenience_200: integer (nullable = true)
 |-- crossing: integer (nullable = true)
 |-- crossing_100: integer (nullable = true)
 |-- fast_food: integer (nullable = true)
 |-- fast_food_100: integer (nullable = true)
 |-- fast_food_200: integer (nullable = true)
 |-- fuel: integer (nullable = true)
 |-- intercect: integer (nullable = true)
 |-- kindergarten: integer (nullable = true)
 |-- motorway_junction: integer (nullable = true)
 |-- parking: integer (nullable = true)
 |-- parking_bicycle: integer (nullable = tr

## Verifying dataset balance

In [12]:
# now let's see how many categorical and numerical features we have:
num_cols, cat_cols = find_num_cat_features(df)

0  categorical features
54  numerical features


In [13]:
df.groupby('SINIESTRO').count().toPandas()

Unnamed: 0,SINIESTRO,count
0,1,17579
1,0,45450


`There is an imbalanced ratio of (0.72 and 0.28)

## Preparing Data for Machine Learning

 Verify numerical features:

In [14]:
numericCols = num_cols[1:-1] # Taking out id and Target variable
numericCols

['X',
 'Y',
 'bank',
 'bench',
 'beverages',
 'bus_stop',
 'bus_stop_100',
 'cafe',
 'convenience',
 'convenience_100',
 'convenience_200',
 'crossing',
 'crossing_100',
 'fast_food',
 'fast_food_100',
 'fast_food_200',
 'fuel',
 'intercect',
 'kindergarten',
 'motorway_junction',
 'parking',
 'parking_bicycle',
 'pharmacy',
 'railway_station',
 'railway_station_100',
 'restaurant',
 'restaurant_100',
 'school',
 'school_100',
 'school_200',
 'stop',
 'stop_100',
 'taxi',
 'traffic_signals',
 'traffic_signals_100',
 'turning_circle',
 'ATROPELLO_100',
 'ATROPELLO_200',
 'CAIDA_100',
 'CAIDA_200',
 'CHOQUE_100',
 'CHOQUE_200',
 'COLISION_100',
 'COLISION_200',
 'INCENDIO_100',
 'INCENDIO_200',
 'OTRO TIPO_100',
 'OTRO TIPO_200',
 'SEV_Index_100',
 'SEV_Index_200',
 'VOLCADURA_100',
 'VOLCADURA_200']

In [15]:
categoricalColumns = cat_cols
cols = df.columns

Category Indexing, One-Hot Encoding and VectorAssembler - a feature transformer that merges multiple columns into a vector column.

In [16]:
stages = []

for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol=categoricalCol,
                                  outputCol=categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
    
label_stringIdx = StringIndexer(inputCol='SINIESTRO', outputCol='label')
stages += [label_stringIdx]

# Assemble the columns into a feature vector
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [17]:
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)
selectedCols = ['label', 'features'] + cols
df = df.select(selectedCols)
df.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- _c0: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- X: double (nullable = true)
 |-- Y: double (nullable = true)
 |-- bank: integer (nullable = true)
 |-- bench: integer (nullable = true)
 |-- beverages: integer (nullable = true)
 |-- bus_stop: integer (nullable = true)
 |-- bus_stop_100: integer (nullable = true)
 |-- cafe: integer (nullable = true)
 |-- convenience: integer (nullable = true)
 |-- convenience_100: integer (nullable = true)
 |-- convenience_200: integer (nullable = true)
 |-- crossing: integer (nullable = true)
 |-- crossing_100: integer (nullable = true)
 |-- fast_food: integer (nullable = true)
 |-- fast_food_100: integer (nullable = true)
 |-- fast_food_200: integer (nullable = true)
 |-- fuel: integer (nullable = true)
 |-- intercect: integer (nullable = true)
 |-- kindergarten: integer (nullable = true)
 |-- motorway_junction: integer (nullable = true)
 |-- p

In [18]:
train, test = df.randomSplit([0.8, 0.2], seed=24)

## XGBoost Model

In [19]:
xgboost = XGBoostEstimator(
    featuresCol="features", 
    labelCol="label", 
    predictionCol="prediction"
)

In [20]:
xgbModel = xgboost.fit(train)

In [21]:
predictions = xgbModel.transform(test)

In [22]:
predictions.select('id', 'label', 'prediction').show(5)

+---+-----+----------+
| id|label|prediction|
+---+-----+----------+
|  0|  0.0|       0.0|
|  3|  0.0|       0.0|
| 14|  0.0|       0.0|
| 18|  0.0|       0.0|
| 23|  0.0|       0.0|
+---+-----+----------+
only showing top 5 rows



In [23]:
evaluator = MulticlassClassificationEvaluator(labelCol='label', metricName="accuracy")

In [24]:
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

Test Error = 0.210117


`Almost same as Test Error (0.212002) got with GBT Default parameters.`

In [26]:
spark.stop()

---