In [2]:
# Import findspark to read SPARK_HOME and HADOOP_HOME
import findspark
findspark.init()

In [3]:
# Import required library
from pyspark.sql import SparkSession

# Create Spark Session
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

In [4]:
# Print Spark object ID
print(spark)

<pyspark.sql.session.SparkSession object at 0x00000244E70426D8>


# Loading Data Using Spark

In [13]:
df = spark.read.csv("C://Users//LENOVO//Documents//big-data-master//notebook//dataset//sf-police-calls-for-service-and-incidents//police-department-incidents.csv", header=True, inferSchema=True)

In [14]:
df.show()

+----------+--------------------+--------------------+---------+-------------------+-----+----------+--------------------+--------------------+-----------------+----------------+--------------------+-------------+
|IncidntNum|            Category|            Descript|DayOfWeek|               Date| Time|PdDistrict|          Resolution|             Address|                X|               Y|            Location|         PdId|
+----------+--------------------+--------------------+---------+-------------------+-----+----------+--------------------+--------------------+-----------------+----------------+--------------------+-------------+
|  50436712|             ASSAULT|             BATTERY|Wednesday|2005-04-20 00:00:00|04:00|   MISSION|                NONE| 18TH ST / CASTRO ST|-122.435002864271|37.7608878061245|"{'longitude': '-...|""city"":""""|
|  80049078|       LARCENY/THEFT|GRAND THEFT FROM ...|   Sunday|2008-01-13 00:00:00|18:00|      PARK|                NONE|1100 Block of CLA...|-

In [15]:
df.count()

2215024

In [16]:
df.schema

StructType(List(StructField(IncidntNum,IntegerType,true),StructField(Category,StringType,true),StructField(Descript,StringType,true),StructField(DayOfWeek,StringType,true),StructField(Date,TimestampType,true),StructField(Time,StringType,true),StructField(PdDistrict,StringType,true),StructField(Resolution,StringType,true),StructField(Address,StringType,true),StructField(X,DoubleType,true),StructField(Y,DoubleType,true),StructField(Location,StringType,true),StructField(PdId,StringType,true)))

In [17]:
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("incidents")

In [18]:
result = spark.sql("SELECT DISTINCT Date FROM incidents")

In [19]:
result.show()

+-------------------+
|               Date|
+-------------------+
|2006-04-13 00:00:00|
|2006-09-24 00:00:00|
|2006-07-31 00:00:00|
|2011-09-13 00:00:00|
|2004-12-10 00:00:00|
|2003-03-27 00:00:00|
|2004-09-25 00:00:00|
|2013-06-28 00:00:00|
|2009-09-09 00:00:00|
|2004-06-12 00:00:00|
|2004-05-29 00:00:00|
|2011-10-12 00:00:00|
|2004-01-28 00:00:00|
|2013-11-01 00:00:00|
|2009-07-14 00:00:00|
|2010-06-21 00:00:00|
|2010-08-16 00:00:00|
|2003-10-25 00:00:00|
|2005-11-28 00:00:00|
|2012-09-03 00:00:00|
+-------------------+
only showing top 20 rows



# DATA MINING PROCESS

In [58]:
#1.Menampilkan kategori kejadian yang paling banyak terjadi

query1 = spark.sql("SELECT Category, count(Category) as Jumlah \
                    FROM incidents \
                    GROUP BY Category \
                    ORDER BY Jumlah DESC")

In [59]:
query1.show()

+--------------------+------+
|            Category|Jumlah|
+--------------------+------+
|       LARCENY/THEFT|480448|
|      OTHER OFFENSES|309358|
|        NON-CRIMINAL|238323|
|             ASSAULT|194694|
|       VEHICLE THEFT|126602|
|       DRUG/NARCOTIC|119628|
|           VANDALISM|116059|
|            WARRANTS|101379|
|            BURGLARY| 91543|
|      SUSPICIOUS OCC| 80444|
|      MISSING PERSON| 64961|
|             ROBBERY| 55867|
|               FRAUD| 41542|
|     SECONDARY CODES| 25831|
|FORGERY/COUNTERFE...| 23050|
|         WEAPON LAWS| 22234|
|            TRESPASS| 19449|
|        PROSTITUTION| 16701|
|     STOLEN PROPERTY| 11891|
|SEX OFFENSES, FOR...| 11742|
+--------------------+------+
only showing top 20 rows



In [60]:
#2.Menampilkan daftar kejadian yang belum diselesaikan

query2 = spark.sql("SELECT Category, Descript, Resolution FROM incidents WHERE Resolution='NONE' ")

In [61]:
query2.show()

+--------------+--------------------+----------+
|      Category|            Descript|Resolution|
+--------------+--------------------+----------+
|       ASSAULT|             BATTERY|      NONE|
| LARCENY/THEFT|GRAND THEFT FROM ...|      NONE|
|      BURGLARY|BURGLARY OF APART...|      NONE|
| LARCENY/THEFT|GRAND THEFT FROM ...|      NONE|
|  NON-CRIMINAL|STAY AWAY OR COUR...|      NONE|
| LARCENY/THEFT|GRAND THEFT FROM ...|      NONE|
| LARCENY/THEFT| PETTY THEFT BICYCLE|      NONE|
| VEHICLE THEFT|        STOLEN TRUCK|      NONE|
|  NON-CRIMINAL|       LOST PROPERTY|      NONE|
| LARCENY/THEFT|GRAND THEFT FROM ...|      NONE|
| LARCENY/THEFT|GRAND THEFT FROM ...|      NONE|
| LARCENY/THEFT|GRAND THEFT FROM ...|      NONE|
| VEHICLE THEFT|VEHICLE, RECOVERE...|      NONE|
|      BURGLARY|BURGLARY OF APART...|      NONE|
| LARCENY/THEFT|GRAND THEFT FROM ...|      NONE|
|OTHER OFFENSES|VIOLATION OF REST...|      NONE|
| VEHICLE THEFT|   STOLEN AUTOMOBILE|      NONE|
| VEHICLE THEFT|   S

In [62]:
#3.Menampilkan distrik yang paling banyak terjadi kejadian 

query3 = spark.sql("SELECT PdDistrict, count(PdDistrict) as Jumlah \
                    FROM incidents \
                    GROUP BY PdDistrict \
                    ORDER BY Jumlah DESC")

In [63]:
query3.show()

+----------+------+
|PdDistrict|Jumlah|
+----------+------+
|  SOUTHERN|399785|
|   MISSION|300076|
|  NORTHERN|272713|
|   CENTRAL|226255|
|   BAYVIEW|221000|
| INGLESIDE|194180|
|TENDERLOIN|191746|
|   TARAVAL|166971|
|      PARK|125479|
|  RICHMOND|116818|
|      null|     0|
+----------+------+



In [64]:
#4.Menampilkan tanggal yang paling banyak terjadi kejadian 

query4 = spark.sql("SELECT Date, count(Date) as Jumlah \
                    FROM incidents \
                    GROUP BY Date \
                    ORDER BY Jumlah DESC")

In [65]:
query4.show()

+-------------------+------+
|               Date|Jumlah|
+-------------------+------+
|2011-01-01 00:00:00|   650|
|2013-01-01 00:00:00|   627|
|2013-11-01 00:00:00|   626|
|2003-01-01 00:00:00|   622|
|2006-01-01 00:00:00|   621|
|2009-01-01 00:00:00|   606|
|2015-06-28 00:00:00|   598|
|2010-01-01 00:00:00|   569|
|2016-01-01 00:00:00|   567|
|2004-04-01 00:00:00|   563|
|2017-06-25 00:00:00|   557|
|2008-01-01 00:00:00|   555|
|2013-10-04 00:00:00|   555|
|2015-10-01 00:00:00|   554|
|2015-01-01 00:00:00|   553|
|2010-10-01 00:00:00|   550|
|2017-09-01 00:00:00|   548|
|2012-10-01 00:00:00|   547|
|2005-01-01 00:00:00|   546|
|2013-03-01 00:00:00|   545|
+-------------------+------+
only showing top 20 rows



In [67]:
# Save the results to CSV --> partitioned CSV
query1.write \
  .option("header", "true") \
  .csv("file:///C:/Users/LENOVO/Documents/big-data-master/notebook/dataset/sf-police-calls-for-service-and-incidents/query1.csv")

In [68]:
# Convert to Pandas
import pandas as pd
queryPandas = query2.toPandas()

In [69]:
# Save to single CSV
queryPandas.to_csv("C:/Users/LENOVO/Documents/big-data-master/notebook/dataset/sf-police-calls-for-service-and-incidents/queryPandas.csv", index=False)