## Using Pyspark to run SQL query on NYPD Shooting Dataset

In [51]:
data.to_csv('cleaned_data.csv', index=False)


## SQL Queries using pyspark

For Querying, we have taken the cleaned data and then downloaded pyspark and java

In [52]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar -xvf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q findspark


spark-3.1.2-bin-hadoop3.2/
spark-3.1.2-bin-hadoop3.2/R/
spark-3.1.2-bin-hadoop3.2/R/lib/
spark-3.1.2-bin-hadoop3.2/R/lib/sparkr.zip
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/worker/
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/worker/worker.R
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/worker/daemon.R
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/tests/
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/tests/testthat/
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/tests/testthat/test_basic.R
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/profile/
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/profile/shell.R
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/profile/general.R
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/doc/
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/doc/sparkr-vignettes.html
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/doc/sparkr-vignettes.Rmd
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/doc/sparkr-vignettes.R
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/doc/index.html
spark-3.1.2-bin-hadoop3.2/R/lib/SparkR/R/
spark-3.1.2-

In [53]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"


In [None]:
import findspark
findspark.init()

from pyspark.sql import SparkSession


# Create a Spark session
spark = SparkSession.builder.appName("shooting-analysis").getOrCreate()



In [55]:
# Load a CSV file into a PySpark DataFrame
file_path = "cleaned_data.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)


In [56]:
df.show(5)

+------------+----------+----------+------+-----------------+--------+-----------------+------------------+-------------+-----------------------+--------------+--------+---------+-------------+-------+--------------+------------+-------------+------------------+------------------+--------------------+
|INCIDENT_KEY|OCCUR_DATE|OCCUR_TIME|  BORO|LOC_OF_OCCUR_DESC|PRECINCT|JURISDICTION_CODE|LOC_CLASSFCTN_DESC|LOCATION_DESC|STATISTICAL_MURDER_FLAG|PERP_AGE_GROUP|PERP_SEX|PERP_RACE|VIC_AGE_GROUP|VIC_SEX|      VIC_RACE|  X_COORD_CD|   Y_COORD_CD|          Latitude|         Longitude|             Lon_Lat|
+------------+----------+----------+------+-----------------+--------+-----------------+------------------+-------------+-----------------------+--------------+--------+---------+-------------+-------+--------------+------------+-------------+------------------+------------------+--------------------+
|   228798151|2021-05-27|  21:30:00|QUEENS|          Unknown|     105|              0.0|   

In [61]:
df.printSchema()


root
 |-- INCIDENT_KEY: integer (nullable = true)
 |-- OCCUR_DATE: string (nullable = true)
 |-- OCCUR_TIME: string (nullable = true)
 |-- BORO: string (nullable = true)
 |-- LOC_OF_OCCUR_DESC: string (nullable = true)
 |-- PRECINCT: integer (nullable = true)
 |-- JURISDICTION_CODE: double (nullable = true)
 |-- LOC_CLASSFCTN_DESC: string (nullable = true)
 |-- LOCATION_DESC: string (nullable = true)
 |-- STATISTICAL_MURDER_FLAG: boolean (nullable = true)
 |-- PERP_AGE_GROUP: string (nullable = true)
 |-- PERP_SEX: string (nullable = true)
 |-- PERP_RACE: string (nullable = true)
 |-- VIC_AGE_GROUP: string (nullable = true)
 |-- VIC_SEX: string (nullable = true)
 |-- VIC_RACE: string (nullable = true)
 |-- X_COORD_CD: double (nullable = true)
 |-- Y_COORD_CD: double (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Lon_Lat: string (nullable = true)



In [58]:
df.describe().show()


+-------+-------------------+----------+----------+-------------+-----------------+------------------+-------------------+------------------+-------------+--------------+--------+--------------------+-------------+-------+--------------------+------------------+------------------+-------------------+-------------------+--------------------+
|summary|       INCIDENT_KEY|OCCUR_DATE|OCCUR_TIME|         BORO|LOC_OF_OCCUR_DESC|          PRECINCT|  JURISDICTION_CODE|LOC_CLASSFCTN_DESC|LOCATION_DESC|PERP_AGE_GROUP|PERP_SEX|           PERP_RACE|VIC_AGE_GROUP|VIC_SEX|            VIC_RACE|        X_COORD_CD|        Y_COORD_CD|           Latitude|          Longitude|             Lon_Lat|
+-------+-------------------+----------+----------+-------------+-----------------+------------------+-------------------+------------------+-------------+--------------+--------+--------------------+-------------+-------+--------------------+------------------+------------------+-------------------+-------------

In [62]:
# Creating a temporary view from the DataFrame
df.createOrReplaceTempView("shooting_data")


### Aggregate functions

1.This specific query will count the total incidents by borough (BORO column) from the temporary view shooting_data.

In [63]:
# SQL query
total_incidents = spark.sql("SELECT BORO, COUNT(*) AS Total_Incidents FROM shooting_data GROUP BY BORO")

# Show the results
total_incidents.show()


+-------------+---------------+
|         BORO|Total_Incidents|
+-------------+---------------+
|       QUEENS|           4086|
|     BROOKLYN|          10933|
|        BRONX|           7937|
|    MANHATTAN|           3572|
|STATEN ISLAND|            416|
+-------------+---------------+



2. This specific query will count of Incidents by Location Classification Description



In [65]:
query_loc_class= spark.sql("SELECT LOC_CLASSFCTN_DESC, COUNT(*) AS Total_Incidents FROM shooting_data GROUP BY LOC_CLASSFCTN_DESC")
query_loc_class.show()

+------------------+---------------+
|LOC_CLASSFCTN_DESC|Total_Incidents|
+------------------+---------------+
|           HOUSING|            280|
|            STREET|           1096|
|           Unknown|          25241|
|           VEHICLE|             22|
|       PARKING LOT|              7|
|             OTHER|             29|
|           TRANSIT|             15|
|        PLAYGROUND|             30|
|        COMMERCIAL|             99|
|          DWELLING|            125|
+------------------+---------------+



3. This query groups the incidents by borough and results the count of incidents along with average lattitude and longitude

In [66]:
hotspot_query = spark.sql(
    "SELECT BORO, COUNT(*) AS Total_Incidents, AVG(Latitude) AS Avg_Latitude, AVG(Longitude) AS Avg_Longitude " +
    "FROM shooting_data " +
    "GROUP BY BORO " +
    "ORDER BY Total_Incidents DESC"
)
hotspot_query.show()

+-------------+---------------+------------------+------------------+
|         BORO|Total_Incidents|      Avg_Latitude|     Avg_Longitude|
+-------------+---------------+------------------+------------------+
|     BROOKLYN|          10933| 40.66546370692403|-73.93151784010021|
|        BRONX|           7937|40.844312371626984|-73.89061304349094|
|       QUEENS|           4086| 40.69077553157295|-73.80975110728258|
|    MANHATTAN|           3572| 40.80180873336863|-73.95075536573273|
|STATEN ISLAND|            416| 40.62565279985583|-74.08340659814182|
+-------------+---------------+------------------+------------------+



> This query groups the incidents by borough (BORO column) and calculates the total incidents in each borough.
It also computes the average latitude and longitude values within each borough to potentially identify the center or average location of incidents in that area.
Results are sorted in descending order of total incidents to highlight areas with the highest number of reported incidents.

4. Filtering Incidents by Borough and Jurisdiction Code


In [67]:
filter_incidents = spark.sql(
    "SELECT BORO, JURISDICTION_CODE, COUNT(*) AS Total_Incidents " +
    "FROM shooting_data " +
    "WHERE BORO = 'BRONX' AND JURISDICTION_CODE = 0 " +
    "GROUP BY BORO, JURISDICTION_CODE"
)
filter_incidents.show()

+-----+-----------------+---------------+
| BORO|JURISDICTION_CODE|Total_Incidents|
+-----+-----------------+---------------+
|BRONX|              0.0|           6833|
+-----+-----------------+---------------+



> The above query results the count of incidents where boro is BRONX and its JURISDICTION_CODE is 0

5. Joining Two Dataframes


In [79]:
# Load a CSV file into a PySpark DataFrame
file_path = "NYPD_Hate_Crimes.csv"
other_data = spark.read.csv(file_path, header=True, inferSchema=True)
other_data.createOrReplaceTempView("other_data")



In [70]:
other_data.show()

+-----------------+---------------------+------------+------------------+-----------------------+--------------------+------+-----------------------------+--------------------+--------------------+-----------------------+--------------------+-----------+---------+
|Full Complaint ID|Complaint Year Number|Month Number|Record Create Date|Complaint Precinct Code| Patrol Borough Name|County|Law Code Category Description| Offense Description| PD Code Description|Bias Motive Description|    Offense Category|Arrest Date|Arrest Id|
+-----------------+---------------------+------------+------------------+-----------------------+--------------------+------+-----------------------------+--------------------+--------------------+-----------------------+--------------------+-----------+---------+
|  201904612204817|                 2019|           2|        02/08/2019|                     46|   PATROL BORO BRONX| BRONX|                       FELONY|      FELONY ASSAULT|ASSAULT 2,1,UNCLA...|   ANTI-

In [71]:
other_data.describe()

DataFrame[summary: string, Full Complaint ID: string, Complaint Year Number: string, Month Number: string, Record Create Date: string, Complaint Precinct Code: string, Patrol Borough Name: string, County: string, Law Code Category Description: string, Offense Description: string, PD Code Description: string, Bias Motive Description: string, Offense Category: string, Arrest Date: string, Arrest Id: string]

In [72]:
join_query = spark.sql(
    "SELECT s.*, o.* " +
    "FROM shooting_data s " +
    "INNER JOIN other_data o ON s.BORO = o.County"
)
join_query.show()

+------------+----------+----------+------+-----------------+--------+-----------------+------------------+-------------+-----------------------+--------------+--------+---------+-------------+-------+--------+----------+----------+------------------+------------------+--------------------+-----------------+---------------------+------------+------------------+-----------------------+--------------------+------+-----------------------------+--------------------+--------------------+-----------------------+--------------------+-----------+---------+
|INCIDENT_KEY|OCCUR_DATE|OCCUR_TIME|  BORO|LOC_OF_OCCUR_DESC|PRECINCT|JURISDICTION_CODE|LOC_CLASSFCTN_DESC|LOCATION_DESC|STATISTICAL_MURDER_FLAG|PERP_AGE_GROUP|PERP_SEX|PERP_RACE|VIC_AGE_GROUP|VIC_SEX|VIC_RACE|X_COORD_CD|Y_COORD_CD|          Latitude|         Longitude|             Lon_Lat|Full Complaint ID|Complaint Year Number|Month Number|Record Create Date|Complaint Precinct Code| Patrol Borough Name|County|Law Code Category Description|

In [None]:
join_query = spark.sql(
    "SELECT s.*, o.* " +
    "FROM shooting_data s " +
    "INNER JOIN other_data o ON s.BORO = o.County " +
    "WHERE YEAR(s.OCCUR_DATE) = 2022"
)
join_query.show()

+------------+----------+----------+-----+-----------------+--------+-----------------+------------------+-------------+-----------------------+--------------+--------+--------------+-------------+-------+--------+----------+----------+-----------+------------+--------------------+-----------------+---------------------+------------+------------------+-----------------------+-------------------+------+-----------------------------+--------------------+--------------------+-----------------------+--------------------+-----------+---------+
|INCIDENT_KEY|OCCUR_DATE|OCCUR_TIME| BORO|LOC_OF_OCCUR_DESC|PRECINCT|JURISDICTION_CODE|LOC_CLASSFCTN_DESC|LOCATION_DESC|STATISTICAL_MURDER_FLAG|PERP_AGE_GROUP|PERP_SEX|     PERP_RACE|VIC_AGE_GROUP|VIC_SEX|VIC_RACE|X_COORD_CD|Y_COORD_CD|   Latitude|   Longitude|             Lon_Lat|Full Complaint ID|Complaint Year Number|Month Number|Record Create Date|Complaint Precinct Code|Patrol Borough Name|County|Law Code Category Description| Offense Description

> In the above query I have used inner join to join our shooting dataset and other crime dataset of NYPD based on BORO and County

6. Analyzing Incidents with Statistical Murder Flag


In [73]:
Stat_flag = spark.sql(
    "SELECT STATISTICAL_MURDER_FLAG, COUNT(*) AS Total_Incidents " +
    "FROM shooting_data " +
    "GROUP BY STATISTICAL_MURDER_FLAG"
)
Stat_flag.show()


+-----------------------+---------------+
|STATISTICAL_MURDER_FLAG|Total_Incidents|
+-----------------------+---------------+
|                   true|           5186|
|                  false|          21758|
+-----------------------+---------------+



7. Grouping Incidents by Location Description with Minimum and Maximum Coordinates


In [78]:
Loc_desc = spark.sql(
    "SELECT LOCATION_DESC, COUNT(*) AS Total_Incidents, " +
    "MIN(Latitude) AS Min_Latitude, MAX(Latitude) AS Max_Latitude, " +
    "MIN(Longitude) AS Min_Longitude, MAX(Longitude) AS Max_Longitude " +
    "FROM shooting_data " +
    "GROUP BY LOCATION_DESC"
)
Loc_desc.show()

+--------------------+---------------+------------------+------------------+------------------+------------------+
|       LOCATION_DESC|Total_Incidents|      Min_Latitude|      Max_Latitude|     Min_Longitude|     Max_Longitude|
+--------------------+---------------+------------------+------------------+------------------+------------------+
|     COMMERCIAL BLDG|            288| 40.57530440500005| 40.90379283400006|-74.08746849499995|-73.72852514799996|
|         CHAIN STORE|              5| 40.67110691100004| 40.82102634300002|     -73.991190938|-73.75288168799993|
|   FACTORY/WAREHOUSE|              8|40.670557271000064|40.853656355000055|-73.99612283899995|-73.84162434299996|
|    PHOTO/COPY STORE|              1| 40.72709850100006| 40.72709850100006|-73.97664016999995|-73.97664016999995|
|            HOSPITAL|             62|         40.635216|40.896504099000026|        -74.106642|-73.75949597199997|
|      SMALL MERCHANT|             37| 40.60341429300007|         40.858666|    

> From above sql queries we are able to find out which boro has the maximum nuber of incidents,What is the average latitude and longitude locations of each boro.Based on each Location Descriptions , how many accidents has occured and their min and max lattitude longitude locations