In [54]:
import numpy as np
from math import sqrt

import datetime
import csv
import matplotlib.pyplot as plt

from pyspark.mllib.clustering import KMeans, KMeansModel

In [55]:
with open('data/detroit.csv') as f:
    csv_file = csv.reader(f)
    next(csv_file)
    crime_list = []
    for row in csv_file:
        crime_list.append(row)

In [56]:
data = []
for row in crime_list:
    crime_date = datetime.datetime.strptime(row[2], '%m/%d/%Y').date()
    crime_pos  = np.array([float(row[-2]), float(row[-1])])
    crime_type = row[3].split('-')[0].strip()
    crime_type = crime_type.split('(')[0].strip()
    crime_type = crime_type.split(',')[0].strip()
    data.append([crime_date, crime_pos, crime_type])

In [57]:
data[1:5]

[[datetime.date(2014, 1, 5), array([ 42.3005, -83.1012]), 'MURDER'],
 [datetime.date(2014, 1, 8), array([ 42.3665, -82.9364]), 'HOMICIDE'],
 [datetime.date(2014, 1, 11), array([ 42.3931, -83.0033]), 'HOMICIDE'],
 [datetime.date(2014, 1, 15), array([ 42.3927, -82.983 ]), 'MURDER']]

In [20]:
data_rdd = sc.parallelize(data)
histogram_year_rdd = data_rdd.map(lambda x: (x[0].month, 1)).reduceByKey(lambda x, y: x + y).sortByKey()
year_count = histogram_year_rdd.collect()
histogram_category_rdd = data_rdd.flatMap(lambda x: [(category, 1) for category in x[2]]).reduceByKey(lambda x, y: x + y).sortByKey()
category_count = histogram_category_rdd.collect()

In [36]:
def category_filter(entry, category_types):
    return category_types == [] or entry in category_types

In [37]:
def date_filter(entry, dates):
    return dates == [] or entry in dates

In [46]:
def cluster(date_range, category_types, num_clusters):
    #Label data
    filtered_rdd = data_rdd.filter(lambda x: date_filter(x[0], date_range) and category_filter(x[2], category_types))
    pos_rdd= filtered_rdd.map(lambda x: x[1])
    clusters = KMeans.train(pos_rdd, num_clusters, maxIterations=100, initializationMode="random")
    label_rdd = filtered_rdd.map(lambda x: [x[1], clusters.centers[clusters.predict(x[1])]])
    
    #Cluster and distribution
    return label_rdd, clusters

In [39]:
def sse(point, center):
    return sqrt(sum([i**2 for i in (point - center)]))

In [40]:
def error_plot(dates, categories, cluster_range):
    error = []
    for num_cluster in cluster_range:
        label_rdd, clusters = cluster(dates, categories, num_cluster)
        error.append(label_rdd.map(lambda x: x[0], x[1]).reduce(lambda x, y: x + y))

In [53]:
error_plot([], [], range(50))

Py4JJavaError: An error occurred while calling o264.trainKMeansModel.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 27.0 failed 1 times, most recent failure: Lost task 0.0 in stage 27.0 (TID 21, localhost): java.lang.ArrayIndexOutOfBoundsException: 0
	at org.apache.spark.mllib.clustering.KMeans$$anonfun$11.apply(KMeans.scala:195)
	at org.apache.spark.mllib.clustering.KMeans$$anonfun$11.apply(KMeans.scala:191)
	at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
	at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:64)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
