## San Francisco Crime Modeling

Go here for the [details](https://www.kaggle.com/c/sf-crime) on the Kaggle competition

### Predictive Goal:  "Given time and location, you must predict the category of crime that occurred."

Data profiling contained in a separate notebook ("SanFranCrime.ipynb")

In [3]:
sc

<pyspark.context.SparkContext at 0x10bfc9490>

### Load the dataset from the prepared Parquet file

In [30]:
parqFileName = '/Users/bill.walrond/Documents/dsprj/data/SanFranCrime/train.pqt'
sfc_train = sqlContext.read.parquet(parqFileName)
print sfc_train.count()
print sfc_train.printSchema()
sfc_train = sfc_train.cache()

878049
root
 |-- Dates: timestamp (nullable = true)
 |-- Category: string (nullable = true)
 |-- Descript: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- PdDistrict: string (nullable = true)
 |-- Resolution: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- X: float (nullable = true)
 |-- Y: float (nullable = true)

None


In [5]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import numpy as np

## Step 1:  Establish and evaluate a baseline 

From the profiling results, the most frequent category of crime by far is "LARCENY/THEFT".  We can set our baseline prediction to assume every crime is LARCENY/THEFT regardless of the actual category or any of the other attributes.  Then, evaluate how accurate our baseline preditions are.  Later, we will compare how much better/worse the machine learning methods are compared to this baseline.

For now, we're going to start with Precision-Recall for our evaluation framework.  Later, we may consider additional evaluation metrics (e.g. AUC).

In [6]:
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="Category", outputCol="indexedLabel").fit(sfc_train)
sfc_train_t = labelIndexer.transform(sfc_train)
sfc_train_t = sfc_train_t.cache()

baseline_preds = sfc_train_t.selectExpr('indexedLabel as prediction', 'double(0) as label')
baseline_preds = baseline_preds.cache()

evaluator = MulticlassClassificationEvaluator(predictionCol='prediction') 
evaluator.evaluate(baseline_preds) 
print 'Precision: {:08.6f}'.format(evaluator.evaluate(baseline_preds, {evaluator.metricName: 'precision'}))
print 'Recall:  {:08.6f}'.format(evaluator.evaluate(baseline_preds, {evaluator.metricName: 'recall'}))

Precision: 0.199192
Recall:  0.199192


### Thus, our machine learning results must be better than guessing that every category is LARCENY/THEFT.

## Step 2:  Prepare the features

In [None]:
sampdatafile = '/Users/bill.walrond/opt/spark-1.6.1-bin-hadoop2.6/data/mllib/sample_libsvm_data.txt'
sampdata = sqlContext.read.format("libsvm").load(sampdatafile)
sampdata.head(1)
# the sampdata is read and transformed directly into rows of label and features(SparseVector)

In [12]:
indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
indexerModel = indexer.fit(sampdata)

# Create new column "indexed" with categorical values transformed to indices
indexedData = indexerModel.transform(sampdata)
indexedData.show()


+-----+--------------------+--------------------+
|label|            features|             indexed|
+-----+--------------------+--------------------+
|  0.0|(692,[127,128,129...|(692,[127,128,129...|
|  1.0|(692,[158,159,160...|(692,[158,159,160...|
|  1.0|(692,[124,125,126...|(692,[124,125,126...|
|  1.0|(692,[152,153,154...|(692,[152,153,154...|
|  1.0|(692,[151,152,153...|(692,[151,152,153...|
|  0.0|(692,[129,130,131...|(692,[129,130,131...|
|  1.0|(692,[158,159,160...|(692,[158,159,160...|
|  1.0|(692,[99,100,101,...|(692,[99,100,101,...|
|  0.0|(692,[154,155,156...|(692,[154,155,156...|
|  0.0|(692,[127,128,129...|(692,[127,128,129...|
|  1.0|(692,[154,155,156...|(692,[154,155,156...|
|  0.0|(692,[153,154,155...|(692,[153,154,155...|
|  0.0|(692,[151,152,153...|(692,[151,152,153...|
|  1.0|(692,[129,130,131...|(692,[129,130,131...|
|  0.0|(692,[154,155,156...|(692,[154,155,156...|
|  1.0|(692,[150,151,152...|(692,[150,151,152...|
|  0.0|(692,[124,125,126...|(692,[124,125,126...|


In [15]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

df = sqlContext.createDataFrame([
    (0, "a"),
    (1, "b"),
    (2, "c"),
    (3, "a"),
    (4, "a"),
    (5, "c")
], ["id", "category"])

stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
print indexed.show(5)
encoder = OneHotEncoder(dropLast=False, inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
encoded.select("id", "categoryVec").show(truncate=False)


+---+--------+-------------+
| id|category|categoryIndex|
+---+--------+-------------+
|  0|       a|          0.0|
|  1|       b|          2.0|
|  2|       c|          1.0|
|  3|       a|          0.0|
|  4|       a|          0.0|
+---+--------+-------------+
only showing top 5 rows

None
+---+-------------+
|id |categoryVec  |
+---+-------------+
|0  |(3,[0],[1.0])|
|1  |(3,[2],[1.0])|
|2  |(3,[1],[1.0])|
|3  |(3,[0],[1.0])|
|4  |(3,[0],[1.0])|
|5  |(3,[1],[1.0])|
+---+-------------+



In [21]:
print sfc_train.printSchema()

root
 |-- Dates: timestamp (nullable = true)
 |-- Category: string (nullable = true)
 |-- Descript: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- PdDistrict: string (nullable = true)
 |-- Resolution: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- X: float (nullable = true)
 |-- Y: float (nullable = true)

None


### Encoding the categorical features ...

In [31]:
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

cols = ['Descript','DayOfWeek','PdDistrict','Resolution','Address']

for col in cols:
    stringIndexer = StringIndexer(inputCol=col, outputCol=col+'Index')
    model = stringIndexer.fit(sfc_train)
    sfc_train = model.transform(sfc_train)
    encoder = OneHotEncoder(dropLast=False, inputCol=col+'Index', outputCol=col+'Vec')
    sfc_train = encoder.transform(sfc_train)

print sfc_train.count()
print sfc_train.printSchema()

878049
root
 |-- Dates: timestamp (nullable = true)
 |-- Category: string (nullable = true)
 |-- Descript: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- PdDistrict: string (nullable = true)
 |-- Resolution: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- X: float (nullable = true)
 |-- Y: float (nullable = true)
 |-- DescriptIndex: double (nullable = true)
 |-- DescriptVec: vector (nullable = true)
 |-- DayOfWeekIndex: double (nullable = true)
 |-- DayOfWeekVec: vector (nullable = true)
 |-- PdDistrictIndex: double (nullable = true)
 |-- PdDistrictVec: vector (nullable = true)
 |-- ResolutionIndex: double (nullable = true)
 |-- ResolutionVec: vector (nullable = true)
 |-- AddressIndex: double (nullable = true)
 |-- AddressVec: vector (nullable = true)

None


In [41]:
sfc_train.select('Address','AddressIndex','AddressVec').show(10,truncate=False)

+-----------------------------+------------+--------------------+
|Address                      |AddressIndex|AddressVec          |
+-----------------------------+------------+--------------------+
|VANNESS AV / GREENWICH ST    |7781.0      |(23228,[7781],[1.0])|
|JEFFERSON ST / LEAVENWORTH ST|6049.0      |(23228,[6049],[1.0])|
|MENDELL ST / HUDSON AV       |5846.0      |(23228,[5846],[1.0])|
|2000 Block of BUSH ST        |3197.0      |(23228,[3197],[1.0])|
|1600 Block of WEBSTER ST     |3081.0      |(23228,[3081],[1.0])|
|0 Block of STOCKTON ST       |72.0        |(23228,[72],[1.0])  |
|23RD ST / WISCONSIN ST       |4847.0      |(23228,[4847],[1.0])|
|GEARY BL / LAGUNA ST         |246.0       |(23228,[246],[1.0]) |
|400 Block of HYDE ST         |417.0       |(23228,[417],[1.0]) |
|STOCKTON ST / SUTTER ST      |441.0       |(23228,[441],[1.0]) |
+-----------------------------+------------+--------------------+
only showing top 10 rows



### Assembling the feature vector ...

In [50]:
vector_cols = [name for name,type in sfc_train.dtypes if 'Vec' in name ] + ['X','Y']
assembler = VectorAssembler(inputCols=vector_cols, outputCol="features")
output = assembler.transform(sfc_train)
print output.select('features').show(10,truncate=False)

+------------------------------------------------------------------------------------------------------+
|features                                                                                              |
+------------------------------------------------------------------------------------------------------+
|(24143,[44,880,888,897,8694,24141,24142],[1.0,1.0,1.0,1.0,1.0,-122.42436218261719,37.8004150390625])  |
|(24143,[8,880,890,896,6962,24141,24142],[1.0,1.0,1.0,1.0,1.0,-122.4190902709961,37.80780029296875])   |
|(24143,[10,880,889,897,6759,24141,24142],[1.0,1.0,1.0,1.0,1.0,-122.38639831542969,37.738983154296875])|
|(24143,[0,880,888,896,4110,24141,24142],[1.0,1.0,1.0,1.0,1.0,-122.43101501464844,37.78738784790039])  |
|(24143,[12,880,888,896,3994,24141,24142],[1.0,1.0,1.0,1.0,1.0,-122.43131256103516,37.78586959838867]) |
|(24143,[19,880,891,896,985,24141,24142],[1.0,1.0,1.0,1.0,1.0,-122.4063491821289,37.78602981567383])   |
|(24143,[121,880,889,896,5760,24141,24142],[1.0,1.0,1.0