## Spark Initialization

In [1]:
# Call findspark
import findspark
findspark.init()

In [2]:
# Import required library
from pyspark.sql import SparkSession

# Create Spark Session
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

In [3]:
# Print Spark object ID
print(spark)

<pyspark.sql.session.SparkSession object at 0x000000000678B198>


## Load Dataset & Pre-processing

In [4]:
# Datasets can be downloaded from https://www.kaggle.com/cityofLA/crime-in-los-angeles
df = spark.read.csv("D:/TC/6BigData/Dataset/crime-in-los-angeles/Crime_Data_2010_2017.csv", header=True, inferSchema=True)

In [5]:
df.printSchema()

root
 |-- DR Number: integer (nullable = true)
 |-- Date Reported: string (nullable = true)
 |-- Date Occurred: string (nullable = true)
 |-- Time Occurred: integer (nullable = true)
 |-- Area ID: integer (nullable = true)
 |-- Area Name: string (nullable = true)
 |-- Reporting District: integer (nullable = true)
 |-- Crime Code: integer (nullable = true)
 |-- Crime Code Description: string (nullable = true)
 |-- MO Codes: string (nullable = true)
 |-- Victim Age: double (nullable = true)
 |-- Victim Sex: string (nullable = true)
 |-- Victim Descent: string (nullable = true)
 |-- Premise Code: double (nullable = true)
 |-- Premise Description: string (nullable = true)
 |-- Weapon Used Code: double (nullable = true)
 |-- Weapon Description: string (nullable = true)
 |-- Status Code: string (nullable = true)
 |-- Status Description: string (nullable = true)
 |-- Crime Code 1: double (nullable = true)
 |-- Crime Code 2: double (nullable = true)
 |-- Crime Code 3: double (nullable = true)


In [6]:
df.count()

1584316

In [7]:
df.show()

+---------+-------------+-------------+-------------+-------+-----------+------------------+----------+----------------------+--------------------+----------+----------+--------------+------------+--------------------+----------------+--------------------+-----------+------------------+------------+------------+------------+------------+--------------------+------------+--------------------+
|DR Number|Date Reported|Date Occurred|Time Occurred|Area ID|  Area Name|Reporting District|Crime Code|Crime Code Description|            MO Codes|Victim Age|Victim Sex|Victim Descent|Premise Code| Premise Description|Weapon Used Code|  Weapon Description|Status Code|Status Description|Crime Code 1|Crime Code 2|Crime Code 3|Crime Code 4|             Address|Cross Street|           Location |
+---------+-------------+-------------+-------------+-------+-----------+------------------+----------+----------------------+--------------------+----------+----------+--------------+------------+-------------

In [8]:
df_select = df.select('DR Number','Location ')

In [9]:
df_select.show()

+---------+--------------------+
|DR Number|           Location |
+---------+--------------------+
|  1208575|(33.9829, -118.3338)|
|102005556|(34.0454, -118.3157)|
|      418| (33.942, -118.2717)|
|101822289|(33.9572, -118.2717)|
| 42104479|(34.2009, -118.6369)|
|120125367|(34.0591, -118.2412)|
|101105609|(34.1211, -118.2048)|
|101620051| (34.241, -118.3987)|
|101910498|(34.3147, -118.4589)|
|120908292|(34.2012, -118.4662)|
|101927640|(34.2354, -118.4517)|
|101323794|(34.0294, -118.2671)|
|121207315|(33.9651, -118.2783)|
|121215506|(33.9651, -118.2754)|
|121504289| (34.2012, -118.416)|
|121801608|(33.9456, -118.2652)|
|130100507|(34.0431, -118.2536)|
|130100508|(34.0503, -118.2504)|
|130100509|(34.0352, -118.2583)|
|130100515| (34.048, -118.2577)|
+---------+--------------------+
only showing top 20 rows



In [10]:
df_select.count()

1584316

In [11]:
# Register the DataFrame as a SQL temporary view
df_select.createOrReplaceTempView("crime")

In [12]:
query = spark.sql("SELECT DISTINCT * \
            FROM crime").na.drop()

In [13]:
query.show()

+---------+--------------------+
|DR Number|           Location |
+---------+--------------------+
|100215572| (34.0472, -118.301)|
|100513021|(33.7324, -118.2879)|
|101000698|(34.2011, -118.5404)|
|101000880|(34.1975, -118.5524)|
|101005206|(34.1867, -118.4948)|
|101005317|  (34.213, -118.537)|
|101007575|(34.1629, -118.5202)|
|101007661| (34.212, -118.5448)|
|101008029|(34.2084, -118.5448)|
|101008526|(34.2069, -118.5033)|
|101009974| (34.1939, -118.512)|
|101010469|(34.1877, -118.5535)|
|101011015|(34.1449, -118.5616)|
|101012476|(34.1939, -118.4815)|
|101012920|(34.1548, -118.4721)|
|101013642|(34.1606, -118.5076)|
|101014484|(34.1392, -118.4991)|
|101014506|(34.1573, -118.4901)|
|101015008|(34.1866, -118.5535)|
|101015247|(34.1615, -118.5142)|
+---------+--------------------+
only showing top 20 rows



In [14]:
query.count()

1584307

In [15]:
# Import pandas
import pandas as pd

In [16]:
# Convert the result into pandas dataframe then save it as single csv file
pd_query = query.toPandas()
pd_query.to_csv("D:/TC/6BigData/Dataset/crime-data.csv", index=False)

## Clustering

In [17]:
df = spark.read.csv("D:/TC/6BigData/Dataset/crime-data.csv", header=True, inferSchema=True)

In [18]:
df.printSchema()

root
 |-- DR Number: integer (nullable = true)
 |-- Location : string (nullable = true)



In [19]:
from pyspark.sql import functions as F
split_col = F.split(df['Location '], ',')
df = df.withColumn('lat', split_col.getItem(0))
df = df.withColumn('lon', split_col.getItem(1))

In [20]:
df.show()

+---------+--------------------+--------+-----------+
|DR Number|           Location |     lat|        lon|
+---------+--------------------+--------+-----------+
|100215572| (34.0472, -118.301)|(34.0472|  -118.301)|
|100513021|(33.7324, -118.2879)|(33.7324| -118.2879)|
|101000698|(34.2011, -118.5404)|(34.2011| -118.5404)|
|101000880|(34.1975, -118.5524)|(34.1975| -118.5524)|
|101005206|(34.1867, -118.4948)|(34.1867| -118.4948)|
|101005317|  (34.213, -118.537)| (34.213|  -118.537)|
|101007575|(34.1629, -118.5202)|(34.1629| -118.5202)|
|101007661| (34.212, -118.5448)| (34.212| -118.5448)|
|101008029|(34.2084, -118.5448)|(34.2084| -118.5448)|
|101008526|(34.2069, -118.5033)|(34.2069| -118.5033)|
|101009974| (34.1939, -118.512)|(34.1939|  -118.512)|
|101010469|(34.1877, -118.5535)|(34.1877| -118.5535)|
|101011015|(34.1449, -118.5616)|(34.1449| -118.5616)|
|101012476|(34.1939, -118.4815)|(34.1939| -118.4815)|
|101012920|(34.1548, -118.4721)|(34.1548| -118.4721)|
|101013642|(34.1606, -118.50

In [21]:
df.printSchema()

root
 |-- DR Number: integer (nullable = true)
 |-- Location : string (nullable = true)
 |-- lat: string (nullable = true)
 |-- lon: string (nullable = true)



In [22]:
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("crime")

In [23]:
query = spark.sql("SELECT `DR Number` as dr_number, REPLACE (lat,'(') as lat, REPLACE (lon,')') as lon\
            FROM crime")

In [24]:
query.show()

+---------+-------+----------+
|dr_number|    lat|       lon|
+---------+-------+----------+
|100215572|34.0472|  -118.301|
|100513021|33.7324| -118.2879|
|101000698|34.2011| -118.5404|
|101000880|34.1975| -118.5524|
|101005206|34.1867| -118.4948|
|101005317| 34.213|  -118.537|
|101007575|34.1629| -118.5202|
|101007661| 34.212| -118.5448|
|101008029|34.2084| -118.5448|
|101008526|34.2069| -118.5033|
|101009974|34.1939|  -118.512|
|101010469|34.1877| -118.5535|
|101011015|34.1449| -118.5616|
|101012476|34.1939| -118.4815|
|101012920|34.1548| -118.4721|
|101013642|34.1606| -118.5076|
|101014484|34.1392| -118.4991|
|101014506|34.1573| -118.4901|
|101015008|34.1866| -118.5535|
|101015247|34.1615| -118.5142|
+---------+-------+----------+
only showing top 20 rows



In [25]:
df.printSchema()

root
 |-- DR Number: integer (nullable = true)
 |-- Location : string (nullable = true)
 |-- lat: string (nullable = true)
 |-- lon: string (nullable = true)



In [26]:
# Convert the type of Latitude and Longitude values into double
query = query.withColumn("lat", query["lat"].cast("double"))
query = query.withColumn("lon", query["lon"].cast("double"))

In [27]:
query.show()

+---------+-------+---------+
|dr_number|    lat|      lon|
+---------+-------+---------+
|100215572|34.0472| -118.301|
|100513021|33.7324|-118.2879|
|101000698|34.2011|-118.5404|
|101000880|34.1975|-118.5524|
|101005206|34.1867|-118.4948|
|101005317| 34.213| -118.537|
|101007575|34.1629|-118.5202|
|101007661| 34.212|-118.5448|
|101008029|34.2084|-118.5448|
|101008526|34.2069|-118.5033|
|101009974|34.1939| -118.512|
|101010469|34.1877|-118.5535|
|101011015|34.1449|-118.5616|
|101012476|34.1939|-118.4815|
|101012920|34.1548|-118.4721|
|101013642|34.1606|-118.5076|
|101014484|34.1392|-118.4991|
|101014506|34.1573|-118.4901|
|101015008|34.1866|-118.5535|
|101015247|34.1615|-118.5142|
+---------+-------+---------+
only showing top 20 rows



In [28]:
query.printSchema()

root
 |-- dr_number: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)



In [29]:
# Convert the result into pandas dataframe then save it as single csv file
pd_query = query.toPandas()
pd_query.to_csv("D:/TC/6BigData/Dataset/crime-data-final.csv", index=False)

In [30]:
# Assembling Vector
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["lat","lon"],
    outputCol='features')

query = assembler.transform(query)
query.show()

+---------+-------+---------+-------------------+
|dr_number|    lat|      lon|           features|
+---------+-------+---------+-------------------+
|100215572|34.0472| -118.301| [34.0472,-118.301]|
|100513021|33.7324|-118.2879|[33.7324,-118.2879]|
|101000698|34.2011|-118.5404|[34.2011,-118.5404]|
|101000880|34.1975|-118.5524|[34.1975,-118.5524]|
|101005206|34.1867|-118.4948|[34.1867,-118.4948]|
|101005317| 34.213| -118.537|  [34.213,-118.537]|
|101007575|34.1629|-118.5202|[34.1629,-118.5202]|
|101007661| 34.212|-118.5448| [34.212,-118.5448]|
|101008029|34.2084|-118.5448|[34.2084,-118.5448]|
|101008526|34.2069|-118.5033|[34.2069,-118.5033]|
|101009974|34.1939| -118.512| [34.1939,-118.512]|
|101010469|34.1877|-118.5535|[34.1877,-118.5535]|
|101011015|34.1449|-118.5616|[34.1449,-118.5616]|
|101012476|34.1939|-118.4815|[34.1939,-118.4815]|
|101012920|34.1548|-118.4721|[34.1548,-118.4721]|
|101013642|34.1606|-118.5076|[34.1606,-118.5076]|
|101014484|34.1392|-118.4991|[34.1392,-118.4991]|


In [31]:
# Train model
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

kmeans = KMeans().setK(7).setSeed(1)
model = kmeans.fit(query)

In [32]:
# Make a prediction
predictions = model.transform(query)
predictions.show()

+---------+-------+---------+-------------------+----------+
|dr_number|    lat|      lon|           features|prediction|
+---------+-------+---------+-------------------+----------+
|100215572|34.0472| -118.301| [34.0472,-118.301]|         3|
|100513021|33.7324|-118.2879|[33.7324,-118.2879]|         4|
|101000698|34.2011|-118.5404|[34.2011,-118.5404]|         6|
|101000880|34.1975|-118.5524|[34.1975,-118.5524]|         6|
|101005206|34.1867|-118.4948|[34.1867,-118.4948]|         6|
|101005317| 34.213| -118.537|  [34.213,-118.537]|         6|
|101007575|34.1629|-118.5202|[34.1629,-118.5202]|         6|
|101007661| 34.212|-118.5448| [34.212,-118.5448]|         6|
|101008029|34.2084|-118.5448|[34.2084,-118.5448]|         6|
|101008526|34.2069|-118.5033|[34.2069,-118.5033]|         6|
|101009974|34.1939| -118.512| [34.1939,-118.512]|         6|
|101010469|34.1877|-118.5535|[34.1877,-118.5535]|         6|
|101011015|34.1449|-118.5616|[34.1449,-118.5616]|         6|
|101012476|34.1939|-118.

In [33]:
# Shows the result
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Cluster Centers: 
[  34.21233957 -118.42095375]
[0. 0.]
[  33.97874926 -118.28550115]
[  34.07017927 -118.27026604]
[  33.7692004  -118.28606439]
[  34.0262328  -118.40587631]
[  34.210646   -118.56138953]
