# Load Dataset

In [1]:
from pyspark.sql import SparkSession

In [2]:
from pyspark.sql import SparkSession

def init_spark():
    spark = SparkSession.builder.appName("CrimeOneYear").getOrCreate()
    sc = spark.sparkContext
    return spark,sc

In [3]:
spark,sc = init_spark()
df = spark.read.format("csv").option("header", "true").option("mode", "DROPMALFORMED").load("dataset_crimes.csv")

In [4]:
df.count()

264807

In [5]:
df.schema

StructType(List(StructField(CASE#,StringType,true),StructField(DATE  OF OCCURRENCE,StringType,true),StructField(BLOCK,StringType,true),StructField( IUCR,StringType,true),StructField( PRIMARY DESCRIPTION,StringType,true),StructField( SECONDARY DESCRIPTION,StringType,true),StructField( LOCATION DESCRIPTION,StringType,true),StructField(ARREST,StringType,true),StructField(DOMESTIC,StringType,true),StructField(BEAT,StringType,true),StructField(WARD,StringType,true),StructField(FBI CD,StringType,true),StructField(X COORDINATE,StringType,true),StructField(Y COORDINATE,StringType,true),StructField(LATITUDE,StringType,true),StructField(LONGITUDE,StringType,true),StructField(LOCATION,StringType,true)))

In [6]:
df.registerTempTable("table_crime")

# Select The Table that will be used

In [7]:
resTable = spark.sql("select `LATITUDE`,`LONGITUDE` from table_crime")

In [8]:
resTable = resTable.select(df.LATITUDE.cast("float"),
df.LONGITUDE.cast("float"))
resTable = resTable.filter(df.LATITUDE. isNotNull())
resTable = resTable.filter(df.LONGITUDE. isNotNull())

In [9]:
resTable.schema

StructType(List(StructField(LATITUDE,FloatType,true),StructField(LONGITUDE,FloatType,true)))

In [10]:
resTable.show()

+---------+----------+
| LATITUDE| LONGITUDE|
+---------+----------+
|41.897896| -87.76074|
| 41.85519| -87.62387|
|41.798634| -87.60482|
|41.780945|-87.621994|
|41.965405|-87.736206|
|41.850674|-87.735596|
| 41.93156|-87.712296|
|41.890266| -87.63109|
|41.895947| -87.62976|
| 41.86338|-87.695816|
| 41.86708|   -87.619|
|41.792778| -87.59163|
|41.769917|-87.663956|
| 41.87527| -87.62425|
|41.811363|-87.666275|
|41.787395| -87.69606|
|41.779728|-87.609566|
|41.860172| -87.72988|
|41.676212| -87.62172|
|41.789845|-87.652336|
+---------+----------+
only showing top 20 rows



# Assamble Feature

In [11]:
from pyspark.ml.feature import VectorAssembler

In [12]:
vecAssembler = VectorAssembler(inputCols=["LATITUDE", "LONGITUDE"], outputCol="features")

In [13]:
newDf = vecAssembler.transform(resTable)

In [14]:
newDf.show()

+---------+----------+--------------------+
| LATITUDE| LONGITUDE|            features|
+---------+----------+--------------------+
|41.897896| -87.76074|[41.8978958129882...|
| 41.85519| -87.62387|[41.8551902770996...|
|41.798634| -87.60482|[41.7986335754394...|
|41.780945|-87.621994|[41.7809448242187...|
|41.965405|-87.736206|[41.9654045104980...|
|41.850674|-87.735596|[41.8506736755371...|
| 41.93156|-87.712296|[41.9315605163574...|
|41.890266| -87.63109|[41.8902664184570...|
|41.895947| -87.62976|[41.8959465026855...|
| 41.86338|-87.695816|[41.8633804321289...|
| 41.86708|   -87.619|[41.8670806884765...|
|41.792778| -87.59163|[41.7927780151367...|
|41.769917|-87.663956|[41.7699165344238...|
| 41.87527| -87.62425|[41.8752708435058...|
|41.811363|-87.666275|[41.8113632202148...|
|41.787395| -87.69606|[41.7873954772949...|
|41.779728|-87.609566|[41.7797279357910...|
|41.860172| -87.72988|[41.8601722717285...|
|41.676212| -87.62172|[41.6762123107910...|
|41.789845|-87.652336|[41.789844

# Fit KMeans Model

In [15]:
from pyspark.ml.clustering import KMeans

In [16]:
kmeans = KMeans(k=10, seed=1)  # 2 clusters here
model = kmeans.fit(newDf.select('features'))

# Transform Initial Dataframe to Include Cluster

In [17]:
transformed = model.transform(newDf)
transformed.show()  

+---------+----------+--------------------+----------+
| LATITUDE| LONGITUDE|            features|prediction|
+---------+----------+--------------------+----------+
|41.897896| -87.76074|[41.8978958129882...|         6|
| 41.85519| -87.62387|[41.8551902770996...|         8|
|41.798634| -87.60482|[41.7986335754394...|         2|
|41.780945|-87.621994|[41.7809448242187...|         2|
|41.965405|-87.736206|[41.9654045104980...|         6|
|41.850674|-87.735596|[41.8506736755371...|         0|
| 41.93156|-87.712296|[41.9315605163574...|         6|
|41.890266| -87.63109|[41.8902664184570...|         8|
|41.895947| -87.62976|[41.8959465026855...|         8|
| 41.86338|-87.695816|[41.8633804321289...|         0|
| 41.86708|   -87.619|[41.8670806884765...|         8|
|41.792778| -87.59163|[41.7927780151367...|         2|
|41.769917|-87.663956|[41.7699165344238...|         4|
| 41.87527| -87.62425|[41.8752708435058...|         8|
|41.811363|-87.666275|[41.8113632202148...|         4|
|41.787395

# Draw Graph with Pixiedust

## (Only Showing 10000 Data)

In [18]:
import pixiedust

Pixiedust database opened successfully


In [None]:
display(transformed)

<img src="img/02_image_3.jpg">

# Additional

### Load map from mapbox to show cluster of 10000 crimes using pixiedust choropleth cluster

In [20]:
dataset = pixiedust.sampleData('file:///Users/User/Downloads/kuliah/big_data/TugasBigData/dataset_crimes.csv')

Downloading 'file:///Users/User/Downloads/kuliah/big_data/TugasBigData/dataset_crimes.csv' from file:///Users/User/Downloads/kuliah/big_data/TugasBigData/dataset_crimes.csv
Downloaded 49221849 bytes
Creating pySpark DataFrame for 'file:///Users/User/Downloads/kuliah/big_data/TugasBigData/dataset_crimes.csv'. Please wait...
Loading file using 'SparkSession'
Successfully created pySpark DataFrame for 'file:///Users/User/Downloads/kuliah/big_data/TugasBigData/dataset_crimes.csv'


In [None]:
display(dataset)

<img src="img/02_image_4.jpg">