In [1]:
# Import findspark to read SPARK_HOME and HADOOP_HOME

import findspark
findspark.init()

# Import required library
from pyspark.sql import SparkSession

# Create Spark Session
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

print(spark)

<pyspark.sql.session.SparkSession object at 0x0000024646B3CA20>


In [2]:
# Load Data
# Kita hilangkan data yang memiliki nilai kosong dengan tambahan .na.drop()
df = spark.read.csv("D://5115100117//Big Data//Pengenalan Apache Spark//Case_Data_from_San_Francisco_311__SF311.csv", header=True, inferSchema=True).na.drop()

In [3]:
df.count()

293015

In [4]:
df.createOrReplaceTempView("casedata")

In [5]:
#Filter by Open311 source and delete '(' on column Point

dfvoice = spark.sql("SELECT CaseID, Category, Address, Neighborhood, REPLACE(Point, '(', '') as Point, Source \
            FROM casedata \
            WHERE Source = 'Open311'")

dfvoice.createOrReplaceTempView("casedata")

In [6]:
# delete ')' on column Point

dfvoice = spark.sql("SELECT CaseID, Category, Address, Neighborhood, REPLACE(Point, ')', '') as Point, Source \
            FROM casedata")

dfvoice.createOrReplaceTempView("casedata")

In [7]:
# delete ' ' (space) on column Point

dfvoice = spark.sql("SELECT CaseID, Category, Address, Neighborhood, REPLACE(Point, ' ', '') as Point, Source \
            FROM casedata")

dfvoice.createOrReplaceTempView("casedata")

In [8]:
# separate lat and long from column point

dfvoice = spark.sql("SELECT CaseID, Category, Address, Neighborhood, Point, \
             LEFT(Point, instr(Point, ',') - 1)  as Latitude, \
             RIGHT(Point, instr(Reverse(Point), ',') - 1)  as Longitude \
             FROM casedata")

dfvoice.show()

+-------+--------------------+--------------------+-------------------+--------------------+----------------+-----------------+
| CaseID|            Category|             Address|       Neighborhood|               Point|        Latitude|        Longitude|
+-------+--------------------+--------------------+-------------------+--------------------+----------------+-----------------+
|5050169|Graffiti Public P...|Intersection of B...|Northern Waterfront|37.7993016356325,...|37.7993016356325|-122.397591392155|
|5085637|Graffiti Public P...|4877 MISSION ST, ...|          Excelsior|37.720311024426,-...| 37.720311024426|-122.437895757944|
|5128084|Graffiti Public P...|Intersection of F...|             Cayuga|37.7200154759567,...|37.7200154759567|-122.438329268192|
|4906662|Street and Sidewa...|Intersection of K...|        North Beach|37.7989693135443,...|37.7989693135443|-122.405645390632|
|4906669|Street and Sidewa...|3681 18TH ST, SAN...|    Mission Dolores|37.761341470692,-...| 37.76134147

In [9]:
#vectorizing

from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import DoubleType

dfvoice = dfvoice.withColumn("Latitude", dfvoice["Latitude"].cast("double"))
dfvoice = dfvoice.withColumn("Longitude", dfvoice["Longitude"].cast("double"))

assembler = VectorAssembler(
    inputCols=["Latitude", "Longitude"],
    outputCol='features')

dfvoice = assembler.transform(dfvoice)
dfvoice.show()

+-------+--------------------+--------------------+-------------------+--------------------+----------------+-----------------+--------------------+
| CaseID|            Category|             Address|       Neighborhood|               Point|        Latitude|        Longitude|            features|
+-------+--------------------+--------------------+-------------------+--------------------+----------------+-----------------+--------------------+
|5050169|Graffiti Public P...|Intersection of B...|Northern Waterfront|37.7993016356325,...|37.7993016356325|-122.397591392155|[37.7993016356325...|
|5085637|Graffiti Public P...|4877 MISSION ST, ...|          Excelsior|37.720311024426,-...| 37.720311024426|-122.437895757944|[37.720311024426,...|
|5128084|Graffiti Public P...|Intersection of F...|             Cayuga|37.7200154759567,...|37.7200154759567|-122.438329268192|[37.7200154759567...|
|4906662|Street and Sidewa...|Intersection of K...|        North Beach|37.7989693135443,...|37.79896931354

In [10]:
# Train model
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

kmeans = KMeans().setK(10).setSeed(1)
model = kmeans.fit(dfvoice)

In [11]:
# Make a prediction
predictions = model.transform(dfvoice)
predictions.show(5)

+-------+--------------------+--------------------+-------------------+--------------------+----------------+-----------------+--------------------+----------+
| CaseID|            Category|             Address|       Neighborhood|               Point|        Latitude|        Longitude|            features|prediction|
+-------+--------------------+--------------------+-------------------+--------------------+----------------+-----------------+--------------------+----------+
|5050169|Graffiti Public P...|Intersection of B...|Northern Waterfront|37.7993016356325,...|37.7993016356325|-122.397591392155|[37.7993016356325...|         5|
|5085637|Graffiti Public P...|4877 MISSION ST, ...|          Excelsior|37.720311024426,-...| 37.720311024426|-122.437895757944|[37.720311024426,...|         8|
|5128084|Graffiti Public P...|Intersection of F...|             Cayuga|37.7200154759567,...|37.7200154759567|-122.438329268192|[37.7200154759567...|         8|
|4906662|Street and Sidewa...|Intersecti

In [12]:
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.543721119631829


In [13]:
# Shows the result
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Cluster Centers: 
[  37.77744699 -122.41324379]
[  37.72947853 -122.39812946]
[  37.77098687 -122.45008883]
[  37.74297677 -122.48774681]
[  37.77394805 -122.49010872]
[  37.79391908 -122.40578954]
[  37.79033236 -122.42805511]
[  37.76494328 -122.42326642]
[  37.72493925 -122.44228068]
[  37.75434326 -122.40898489]


In [14]:
# Visualization using pixiedust
import pixiedust

Pixiedust database opened successfully


In [None]:
display(predictions)

# Hasil Clustering

![cluster](img/cluster_result.png)