# All installs

In [18]:
# !pip install pyspark

In [19]:
# !pip freeze > requirements.txt

# Imports

In [20]:

from All_details import *

from pyspark.sql import SparkSession , functions as F

from pyspark.sql.types import FloatType

from pyspark.sql.window import Window 

spark = SparkSession.builder.appName('Crimes_in_Boston')\
    .config('spark.driver.extraClassPath', '/usr/lib/jvm/java-17-openjdk-amd64/lib/postgresql-42.5.0.jar')\
    .getOrCreate()
################################## jar file path maybe different for you ######################################

# Load csv into spark

In [21]:
crimes_df = spark.read.csv('DATA/crimes.csv', header=True, inferSchema=True)
offense_codes_df = spark.read.csv('DATA/offense_codes.csv', header=True, inferSchema=True)
police_district_codes_df = spark.read.csv('DATA/police_district_codes.csv', header=True, inferSchema=True)

# Preprocessing

perform all preprocessing here

In [22]:
# for SHOOTING column replace null values with 'N'
crimes_df = crimes_df.withColumn('SHOOTING', F.when(F.col('SHOOTING').isNull(), 'N')\
    .otherwise(F.col('SHOOTING')))

# remove OFFENSE_DESCRIPTION column
crimes_df = crimes_df.drop('OFFENSE_DESCRIPTION')
crimes_df.show(5)

# keep only first duplicate value in offence_code_df
offense_codes_df = offense_codes_df.dropDuplicates(['CODE'])
offense_codes_df.sort ('CODE').show(5)

+---------------+------------+--------------------+--------+--------------+--------+-------------------+----+-----+-----------+----+----------+-----------+-----------+------------+--------------------+
|INCIDENT_NUMBER|OFFENSE_CODE|  OFFENSE_CODE_GROUP|DISTRICT|REPORTING_AREA|SHOOTING|   OCCURRED_ON_DATE|YEAR|MONTH|DAY_OF_WEEK|HOUR|  UCR_PART|     STREET|        Lat|        Long|            Location|
+---------------+------------+--------------------+--------+--------------+--------+-------------------+----+-----+-----------+----+----------+-----------+-----------+------------+--------------------+
|     I182070945|         619|             Larceny|     D14|           808|       N|2018-09-02 13:00:00|2018|    9|     Sunday|  13|  Part One| LINCOLN ST|42.35779134|-71.13937053|(42.35779134, -71...|
|     I182070943|        1402|           Vandalism|     C11|           347|       N|2018-08-21 00:00:00|2018|    8|    Tuesday|   0|  Part Two|   HECLA ST|42.30682138|-71.06030035|(42.30682138

## 6. (window function) Partition by district , order by  year and then count the offenses including rolling count for each district

In [23]:
## 6. (window function) Partition by district , order by  year and then rolling count the offenses

window = Window.partitionBy('DISTRICT').orderBy('YEAR')

window_df = crimes_df.filter(F.col('DISTRICT').isNotNull()) # remove null values

window_df = window_df.groupBy('DISTRICT', 'YEAR').agg(F.count('INCIDENT_NUMBER').alias('count_incidents'))

window_df = window_df.withColumn('rolling_count_incidents', F.sum('count_incidents').over(window))

window_df.show()


################### SAVE to POSTGRES #######################
window_df.write.jdbc(url=URL, table='rolling_count', mode='overwrite', properties=Properties)



+--------+----+---------------+-----------------------+
|DISTRICT|YEAR|count_incidents|rolling_count_incidents|
+--------+----+---------------+-----------------------+
|      A1|2015|           6015|                   6015|
|      A1|2016|          10923|                  16938|
|      A1|2017|          11375|                  28313|
|      A1|2018|           7404|                  35717|
|     A15|2015|           1027|                   1027|
|     A15|2016|           1986|                   3013|
|     A15|2017|           2167|                   5180|
|     A15|2018|           1325|                   6505|
|      A7|2015|           2426|                   2426|
|      A7|2016|           4130|                   6556|
|      A7|2017|           4264|                  10820|
|      A7|2018|           2724|                  13544|
|      B2|2015|           8687|                   8687|
|      B2|2016|          15706|                  24393|
|      B2|2017|          15680|                 

## 7. Pivot incident years and count incident monthwise

In [24]:
## 7. Pivot incident years and count incident monthwise

pivot_df = crimes_df.groupBy('MONTH').pivot('YEAR').count().sort('MONTH')
pivot_df.show()

################### SAVE to POSTGRES #######################
pivot_df.write.jdbc(url=URL, table='pivot_YEAR', mode='overwrite', properties=Properties)


+-----+----+----+----+----+
|MONTH|2015|2016|2017|2018|
+-----+----+----+----+----+
|    1|null|7835|7993|7782|
|    2|null|7308|7408|6937|
|    3|null|8199|8179|7768|
|    4|null|8101|8069|7916|
|    5|null|8578|8715|8906|
|    6|4191|8558|8985|8834|
|    7|8324|8619|9075|8538|
|    8|8342|8938|9206|8337|
|    9|8414|8522|8940| 667|
|   10|8308|8583|8846|null|
|   11|7818|7922|7935|null|
|   12|7991|7951|7535|null|
+-----+----+----+----+----+



## 8. Count crimes involving any kind of Robbery in   each district

In [25]:
## 8. Count crimes involving any kind of "Robbery" in  each district name wise. district name is in police_district_codes_df

robbery_df = crimes_df.filter(F.col('OFFENSE_CODE_GROUP')== 'Robbery')

robbery_df = robbery_df.join(police_district_codes_df, robbery_df.DISTRICT == police_district_codes_df.District_Code, how='left')

robbery_df = robbery_df.filter(F.col('DISTRICT').isNotNull()) # remove null values

robbery_df = robbery_df.groupBy('DISTRICT', 'District_Name').count().sort('DISTRICT')

robbery_df.show()

################### SAVE to POSTGRES #######################
robbery_df.write.jdbc(url=URL, table='robbery_in_each_district', mode='overwrite', properties=Properties)


+--------+-------------+-----+
|DISTRICT|District_Name|count|
+--------+-------------+-----+
|      A1|     Downtown|  659|
|     A15|  Charlestown|   84|
|      A7|  East Boston|  232|
|      B2|      Roxbury|  860|
|      B3|     Mattapan|  530|
|     C11|   Dorchester|  660|
|      C6| South Boston|  263|
|     D14|     Brighton|  175|
|      D4|    South End|  626|
|     E13|Jamaica Plain|  250|
|     E18|    Hyde Park|  170|
|      E5| West Roxbury|   96|
+--------+-------------+-----+



## 9. For each day, list the hour when the incident number is highest.

In [26]:
## 9. For each day, list the hour when the incident number is highest alonmg with the count of incidents

incident_df = crimes_df.groupBy('DAY_OF_WEEK', 'HOUR').count().sort('DAY_OF_WEEK', 'HOUR')

incident_df = incident_df.withColumn('max_incident', F.max('count').over(Window.partitionBy('DAY_OF_WEEK')))    

incident_df = incident_df.filter(F.col('count') == F.col('max_incident')) 

incident_df = incident_df.drop('max_incident')

incident_df.show()

################### SAVE to POSTGRES #######################
incident_df.write.jdbc(url=URL, table='Day_incident_hour', mode='overwrite', properties=Properties)


+-----------+----+-----+
|DAY_OF_WEEK|HOUR|count|
+-----------+----+-----+
|     Friday|  17| 3252|
|     Monday|  17| 3254|
|   Saturday|   0| 2612|
|     Sunday|   0| 2400|
|   Thursday|  18| 3033|
|    Tuesday|  17| 3241|
|  Wednesday|  17| 3153|
+-----------+----+-----+



## 10. List highest crime/offense group in each district (name) and the number of incidents

In [27]:
## 10. List highest crime/offense group in each district (name) and the number of incidents

district_crime_count = crimes_df.join(police_district_codes_df, crimes_df.DISTRICT == police_district_codes_df.District_Code, how='left')

district_crime_count = district_crime_count.filter(F.col('DISTRICT').isNotNull()) # remove null values

district_crime_count = district_crime_count.groupBy('DISTRICT', 'District_Name', 'OFFENSE_CODE_GROUP').count().sort('DISTRICT', 'count', ascending=False)

district_crime_count = district_crime_count.dropDuplicates(['DISTRICT']) # keep only first duplicate value 
# as we sorted the dataframe in descending order of count column so first value will be highest

district_crime_count.show()

################### SAVE to POSTGRES #######################
district_crime_count.write.jdbc(url=URL, table='highest_crime_in_each_district_with_count', mode='overwrite', properties=Properties)


+--------+-------------+--------------------+-----+
|DISTRICT|District_Name|  OFFENSE_CODE_GROUP|count|
+--------+-------------+--------------------+-----+
|      A1|     Downtown|             Larceny| 4704|
|     A15|  Charlestown|Motor Vehicle Acc...|  960|
|      A7|  East Boston|Motor Vehicle Acc...| 1516|
|      B2|      Roxbury|Motor Vehicle Acc...| 6407|
|      B3|     Mattapan|Motor Vehicle Acc...| 3836|
|     C11|   Dorchester|Motor Vehicle Acc...| 5305|
|      C6| South Boston|Motor Vehicle Acc...| 2699|
|     D14|     Brighton|Motor Vehicle Acc...| 2857|
|      D4|    South End|             Larceny| 7313|
|     E13|Jamaica Plain|Motor Vehicle Acc...| 2166|
|     E18|    Hyde Park|Motor Vehicle Acc...| 2366|
|      E5| West Roxbury|Motor Vehicle Acc...| 1813|
+--------+-------------+--------------------+-----+



# Extra Question 

## 16. As each degreee of longitude is 100 km apart,  list crimes with counts (yearwsie)  within a 111km radius of BOSTON police headquater which is at 42.33397849555639, -71.09079628933894 (lat, long) 

In [28]:
## 16. As each degreee of longitude is 111km apart,  list crimes with counts (yearwsie)  within a 100 km radius of BOSTON police headquater which is at 42.33397849555639, -71.09079628933894 (lat, long) 

#udf to calculate distance between two 
def degree_distance(lat1, long1, lat2, long2):
    return 111 * F.sqrt(F.pow(lat1 - lat2, 2) + F.pow(long1 - long2, 2)) # return distance in km 


F.udf(degree_distance, FloatType())

crimes_radius_111_df = crimes_df.withColumn('Distance_Apart', degree_distance(F.col('Lat'), F.col('Long'), 42.33397849555639, -71.09079628933894))

crimes_radius_111_df = crimes_radius_111_df.filter(F.col('Distance_Apart') <= 111)

crimes_radius_111_df=crimes_radius_111_df.groupBy('OFFENSE_CODE_GROUP').pivot('YEAR').count().sort('OFFENSE_CODE_GROUP')

crimes_radius_111_df.show()


################### SAVE to POSTGRES #######################
crimes_radius_111_df.write.jdbc(url=URL, table='crimes_yearwise_in_111km_radius', mode='overwrite', properties=Properties)


+--------------------+----+----+----+----+
|  OFFENSE_CODE_GROUP|2015|2016|2017|2018|
+--------------------+----+----+----+----+
|  Aggravated Assault|1317|2177|2241|1532|
|            Aircraft|   4|   4|  17|   5|
|               Arson|  11|  33|  31|  16|
|Assembly or Gathe...| 236| 313| 233| 130|
|          Auto Theft| 934|1409|1316| 870|
| Auto Theft Recovery| 129| 288| 346| 214|
|          Ballistics| 159| 284| 327| 166|
|   Biological Threat|null|null|   2|null|
|           Bomb Hoax|  16|  36|  10|   9|
|Burglary - No Pro...|   1|   1|null|null|
| Commercial Burglary| 253| 427| 427| 219|
|    Confidence Games| 650|1051| 867| 512|
|      Counterfeiting| 271| 481| 452| 223|
| Criminal Harassment|  42|  35|  28|  26|
|  Disorderly Conduct| 451| 741| 788| 420|
|      Drug Violation|3060|4651|4115|2770|
|        Embezzlement|  61|  84| 108|  42|
|        Evading Fare|  78| 124| 110|  77|
|          Explosives|   6|   7|   5|   6|
|Fire Related Reports| 300| 594| 573| 389|
+----------

## 17.  List all crimes that occurred in all district (namewsie) and in the  August 2016

In [29]:
## 17.  List all crimes that occurred in all district (namewsie) and in the  August 2016 

crimes_august_2016_df = crimes_df.filter(F.col('YEAR') == 2016).filter(F.col('MONTH') == 8)

crimes_august_2016_df = crimes_august_2016_df.join(police_district_codes_df, crimes_august_2016_df.DISTRICT == police_district_codes_df.District_Code, how='left')

crimes_august_2016_df = crimes_august_2016_df.filter(F.col('DISTRICT').isNotNull()) # remove null values


crimes_august_2016_df = crimes_august_2016_df.groupBy('District_Name').agg(F.collect_set('OFFENSE_CODE_GROUP').alias('CRIME_GROUP')).sort('DISTRICT_NAME')

crimes_august_2016_df.show()

################### SAVE to POSTGRES #######################
crimes_august_2016_df.write.jdbc(url=URL, table='crime_group_in_all_districts_in_august_2016', mode='overwrite', properties=Properties)


+-------------+--------------------+
|District_Name|         CRIME_GROUP|
+-------------+--------------------+
|     Brighton|[Fraud, Auto Thef...|
|  Charlestown|[Fraud, Auto Thef...|
|   Dorchester|[Fraud, Auto Thef...|
|     Downtown|[Fraud, Auto Thef...|
|  East Boston|[Fraud, Auto Thef...|
|    Hyde Park|[Fraud, Auto Thef...|
|Jamaica Plain|[Fraud, Auto Thef...|
|     Mattapan|[Fraud, Auto Thef...|
|      Roxbury|[Fraud, Auto Thef...|
| South Boston|[Fraud, Auto Thef...|
|    South End|[Fraud, Auto Thef...|
| West Roxbury|[Fraud, Auto Thef...|
+-------------+--------------------+

