# Tugas Clustering 

##### Ilham Muhammad Misbahuddin
##### 05111540000088

## Kebutuhan :
1. Operating System : Kali Linux 2019.1
2. Apache Spark 2.3.3
3. Scala 2.12.8
4. Python 2.7.16
5. PySpark 2.4.0
6. Findspark 1.3.0
7. Jupyter Notebook
8. matplotlib 2.2.3
9. bokeh 1.0.4
10. pixiedust 1.1.15
11. pandas 0.24.2

## Deskripsi Dataset
* Nama Dataset : [Traffic Violations in Maryland County](https://www.kaggle.com/rounak041993/traffic-violations-in-maryland-county)

<table>
    <thead>
        <tr>
            <th>Sumber Data</th>
            <th>Jumlah Baris</th>
            <th>Jumlah Colom</th>
            <th>Ukuran</th>
            <th>Format File</th>
        </tr>
    </thead>
    <tbody>
        <tr>
            <td>Traffic_Violations.csv</td>
            <td>1293487</td>
            <td>35</td>
            <td>469,4 MB</td>
            <td>CSV</td>
        </tr>
    </tbody>
</table>
    


## Inisialisasi Apache Spark

In [1]:
# Import findspark to make pyspark importable as a regular library
import findspark
findspark.init('/usr/local/spark')

In [2]:
# Import required python library
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

# Create Spark Session
# The entry point to programming Spark with the Dataset 
spark = SparkSession.builder.appName("Traffic Violations in Maryland County Clustering").getOrCreate()

In [3]:
# Print spark object ID
print(spark)

<pyspark.sql.session.SparkSession object at 0x7fd858752850>


## Load Dataset

In [4]:
# Load the dataset
df = spark.read.csv("/root/Lecture/BIGDATA/Traffic_Violations.csv", header=True, inferSchema=True)

In [5]:
# Print top 20 rows data
df.show()

+------------+------------+------+--------------------+--------------------+--------------------+----------------+-----------------+--------+-----+---------------+---------------+-----+------------------+------+------------------+-------+---------+-----+---------------+----+-----------+--------+------+--------------+-------------+--------------------+-----------------------+--------+------+---------------+------------+--------+-----------------+--------------------+
|Date Of Stop|Time Of Stop|Agency|           SubAgency|         Description|            Location|        Latitude|        Longitude|Accident|Belts|Personal Injury|Property Damage|Fatal|Commercial License|HAZMAT|Commercial Vehicle|Alcohol|Work Zone|State|    VehicleType|Year|       Make|   Model| Color|Violation Type|       Charge|             Article|Contributed To Accident|    Race|Gender|    Driver City|Driver State|DL State|      Arrest Type|         Geolocation|
+------------+------------+------+--------------------+---

In [6]:
# Count data rows
df.count()

1293487

In [7]:
# inferSchema is used to inference the actual datatype of columns, especially for dates and timestamp
df.printSchema()

root
 |-- Date Of Stop: string (nullable = true)
 |-- Time Of Stop: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- SubAgency: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Accident: string (nullable = true)
 |-- Belts: string (nullable = true)
 |-- Personal Injury: string (nullable = true)
 |-- Property Damage: string (nullable = true)
 |-- Fatal: string (nullable = true)
 |-- Commercial License: string (nullable = true)
 |-- HAZMAT: string (nullable = true)
 |-- Commercial Vehicle: string (nullable = true)
 |-- Alcohol: string (nullable = true)
 |-- Work Zone: string (nullable = true)
 |-- State: string (nullable = true)
 |-- VehicleType: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Violati

## Clustering

In [8]:
from pyspark.sql.functions import col

In [9]:
# Convert the type of Latitude and Longitude values into float and remove null values
df_preprocess = (df
       .withColumn("Latitude", col("Latitude").cast("float"))
       .withColumn("Longitude", col("Longitude").cast("float"))
       .dropna()
       )

In [10]:
df_preprocess.printSchema()

root
 |-- Date Of Stop: string (nullable = true)
 |-- Time Of Stop: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- SubAgency: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Latitude: float (nullable = true)
 |-- Longitude: float (nullable = true)
 |-- Accident: string (nullable = true)
 |-- Belts: string (nullable = true)
 |-- Personal Injury: string (nullable = true)
 |-- Property Damage: string (nullable = true)
 |-- Fatal: string (nullable = true)
 |-- Commercial License: string (nullable = true)
 |-- HAZMAT: string (nullable = true)
 |-- Commercial Vehicle: string (nullable = true)
 |-- Alcohol: string (nullable = true)
 |-- Work Zone: string (nullable = true)
 |-- State: string (nullable = true)
 |-- VehicleType: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Violation

In [11]:
df_preprocess.count()

1126191

In [12]:
# Register the dataframe as a SQL temporary view
df_preprocess.createOrReplaceTempView("traffic_violations")

In [13]:
# Retrieve the data needed:
# - Latitude
# - Longitude
# - Personal Injury
# - Property Damage

data = spark.sql("SELECT `Latitude` as latitude, `Longitude` as longtitude, `Personal Injury`, `Property Damage`\
                  FROM traffic_violations")

In [14]:
data.show()

+---------+----------+---------------+---------------+
| latitude|longtitude|Personal Injury|Property Damage|
+---------+----------+---------------+---------------+
|38.981724| -77.09276|             No|             No|
|39.162888| -77.22909|             No|            Yes|
|39.056976|-76.954636|             No|            Yes|
|39.093384| -77.07955|             No|             No|
|39.234844| -77.28154|             No|             No|
| 38.98273| -77.10075|             No|             No|
| 39.06914| -76.96968|             No|             No|
|38.983578|  -77.0931|             No|             No|
|39.161808| -77.25358|             No|             No|
|  39.0962| -76.98696|             No|             No|
|39.151665|-77.070625|             No|             No|
|39.129936| -77.16705|             No|             No|
|39.222794| -77.25369|             No|             No|
|39.067535| -77.14941|             No|             No|
|39.048206| -76.98472|             No|             No|
| 39.12324

In [15]:
data.count()

1126191

In [16]:
# Import VectorAssembler
from pyspark.ml.feature import VectorAssembler

vector_assembler = VectorAssembler(inputCols=["latitude", "longtitude"], outputCol="features")
df_assembled = vector_assembler.transform(data)

df_assembled.show()

+---------+----------+---------------+---------------+--------------------+
| latitude|longtitude|Personal Injury|Property Damage|            features|
+---------+----------+---------------+---------------+--------------------+
|38.981724| -77.09276|             No|             No|[38.9817237854003...|
|39.162888| -77.22909|             No|            Yes|[39.1628875732421...|
|39.056976|-76.954636|             No|            Yes|[39.0569763183593...|
|39.093384| -77.07955|             No|             No|[39.0933837890625...|
|39.234844| -77.28154|             No|             No|[39.2348442077636...|
| 38.98273| -77.10075|             No|             No|[38.9827308654785...|
| 39.06914| -76.96968|             No|             No|[39.0691413879394...|
|38.983578|  -77.0931|             No|             No|[38.9835777282714...|
|39.161808| -77.25358|             No|             No|[39.1618080139160...|
|  39.0962| -76.98696|             No|             No|[39.0961990356445...|
|39.151665|-

In [17]:
# Import KMeans
from pyspark.ml.clustering import KMeans

# Train a k-means model
kmeans = KMeans().setK(3).setSeed(1)
model = kmeans.fit(df_assembled.select('features'))

In [18]:
# Make predictions by transforming the initial dataframe
transform = model.transform(df_assembled)

transform.show()

+---------+----------+---------------+---------------+--------------------+----------+
| latitude|longtitude|Personal Injury|Property Damage|            features|prediction|
+---------+----------+---------------+---------------+--------------------+----------+
|38.981724| -77.09276|             No|             No|[38.9817237854003...|         0|
|39.162888| -77.22909|             No|            Yes|[39.1628875732421...|         0|
|39.056976|-76.954636|             No|            Yes|[39.0569763183593...|         0|
|39.093384| -77.07955|             No|             No|[39.0933837890625...|         0|
|39.234844| -77.28154|             No|             No|[39.2348442077636...|         0|
| 38.98273| -77.10075|             No|             No|[38.9827308654785...|         0|
| 39.06914| -76.96968|             No|             No|[39.0691413879394...|         0|
|38.983578|  -77.0931|             No|             No|[38.9835777282714...|         0|
|39.161808| -77.25358|             No|     

In [19]:
# Import ClusteringEvaluator
from pyspark.ml.evaluation import ClusteringEvaluator

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(transform)

In [20]:
# Print the Silhouette score
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.999995346055


In [21]:
centers = model.clusterCenters()

print("Cluster Centers: ")
for center in centers:
    print(center)

Cluster Centers: 
[ 39.08283411 -77.11124616]
[-91.72519021  38.85813823]
[38.88524881 39.10242081]


In [22]:
# Convert to Pandas
import pandas as pd

# Convert to CSV, so mapbox API can render it
transform.toPandas().to_csv('/root/Lecture/BIGDATA/datasets/big_data_traffic_violations.csv')

## Visualization

In [23]:
# Import pixiedust to do visualization
import pixiedust

# Open the CSV file
prediction = pixiedust.sampleData('file:///root/Lecture/BIGDATA/datasets/big_data_traffic_violations.csv')

Pixiedust database opened successfully


Downloading 'file:///root/Lecture/BIGDATA/datasets/big_data_traffic_violations.csv' from file:///root/Lecture/BIGDATA/datasets/big_data_traffic_violations.csv
Downloaded 85236984 bytes
Creating pySpark DataFrame for 'file:///root/Lecture/BIGDATA/datasets/big_data_traffic_violations.csv'. Please wait...
Loading file using 'SparkSession'
Successfully created pySpark DataFrame for 'file:///root/Lecture/BIGDATA/datasets/big_data_traffic_violations.csv'


![Mapboax Option](img/Screenshot001.png)

In [None]:
display(prediction)

* Simple
    
    ![img2](img/Screenshot002.png)

* Choropleth

    ![img3](img/Screenshot003.png)

* Choropleth Cluster
    
    ![img4](img/Screenshot004.png)