In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession \
    .builder \
    .appName("Queries") \
    .getOrCreate()  


24/01/30 00:35:59 WARN Utils: Your hostname, AORUS resolves to a loopback address: 127.0.1.1; using 192.168.1.101 instead (on interface eno1)
24/01/30 00:35:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/30 00:35:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df1 = spark.read.csv("./data/Crime_Data_from_2020_to_Present.csv", header = True, inferSchema = True)
df2 = spark.read.csv("./data/Crime_Data_from_2010_to_2019.csv", header=True, inferSchema=True)

Dataframe = df1.union(df2)

Dataframe = Dataframe.withColumn("Date Rptd", F.to_date("Date Rptd", "MM/dd/yyyy hh:mm:ss a"))
Dataframe = Dataframe.withColumn("DATE OCC", F.to_date("DATE OCC", "MM/dd/yyyy hh:mm:ss a"))
Dataframe = Dataframe.withColumn("Vict Age", F.col("Vict Age").cast("int"))
Dataframe = Dataframe.withColumn("LAT", F.col("LAT").cast("double"))
Dataframe = Dataframe.withColumn("LON", F.col("LON").cast("double"))

Dataframe.show()

24/01/30 00:36:07 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+---------+----------+----------+--------+----+-----------+-----------+--------+------+--------------------+--------------------+--------+--------+------------+---------+--------------------+--------------+--------------------+------+------------+--------+--------+--------+--------+--------------------+------------+-------+---------+
|    DR_NO| Date Rptd|  DATE OCC|TIME OCC|AREA|  AREA NAME|Rpt Dist No|Part 1-2|Crm Cd|         Crm Cd Desc|             Mocodes|Vict Age|Vict Sex|Vict Descent|Premis Cd|         Premis Desc|Weapon Used Cd|         Weapon Desc|Status| Status Desc|Crm Cd 1|Crm Cd 2|Crm Cd 3|Crm Cd 4|            LOCATION|Cross Street|    LAT|      LON|
+---------+----------+----------+--------+----+-----------+-----------+--------+------+--------------------+--------------------+--------+--------+------------+---------+--------------------+--------------+--------------------+------+------------+--------+--------+--------+--------+--------------------+------------+-------+---

In [4]:
total_rows = Dataframe.count()
print(f"Total Rows: {total_rows}")


Total Rows: 2979324


In [5]:
Dataframe.printSchema()

root
 |-- DR_NO: integer (nullable = true)
 |-- Date Rptd: date (nullable = true)
 |-- DATE OCC: date (nullable = true)
 |-- TIME OCC: integer (nullable = true)
 |-- AREA: integer (nullable = true)
 |-- AREA NAME: string (nullable = true)
 |-- Rpt Dist No: integer (nullable = true)
 |-- Part 1-2: integer (nullable = true)
 |-- Crm Cd: integer (nullable = true)
 |-- Crm Cd Desc: string (nullable = true)
 |-- Mocodes: string (nullable = true)
 |-- Vict Age: integer (nullable = true)
 |-- Vict Sex: string (nullable = true)
 |-- Vict Descent: string (nullable = true)
 |-- Premis Cd: integer (nullable = true)
 |-- Premis Desc: string (nullable = true)
 |-- Weapon Used Cd: integer (nullable = true)
 |-- Weapon Desc: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Status Desc: string (nullable = true)
 |-- Crm Cd 1: integer (nullable = true)
 |-- Crm Cd 2: integer (nullable = true)
 |-- Crm Cd 3: integer (nullable = true)
 |-- Crm Cd 4: integer (nullable = true)
 |-- LOCAT

Query 1 - Dataframe API

In [6]:
crime_data = Dataframe.withColumn("year", F.year("Date Rptd"))
crime_data = crime_data.withColumn("month", F.month("Date Rptd"))

count_df = crime_data.groupBy("year", "month").agg(F.count("*").alias("crime_count"))

window_spec = Window.partitionBy("year").orderBy(F.desc("crime_count"))

result = count_df.withColumn("ranking", F.row_number().over(window_spec)).filter("ranking <= 3").orderBy("year", "ranking", "month")

result.show(n=14*3)



+----+-----+-----------+-------+
|year|month|crime_count|ranking|
+----+-----+-----------+-------+
|2010|    3|      17595|      1|
|2010|    7|      17520|      2|
|2010|    5|      17338|      3|
|2011|    8|      17139|      1|
|2011|    5|      17050|      2|
|2011|    3|      16951|      3|
|2012|    8|      17697|      1|
|2012|   10|      17477|      2|
|2012|    5|      17391|      3|
|2013|    8|      17329|      1|
|2013|    7|      16714|      2|
|2013|    5|      16671|      3|
|2014|   10|      14131|      1|
|2014|    7|      14107|      2|
|2014|    9|      13871|      3|
|2015|    8|      18951|      1|
|2015|   10|      18916|      2|
|2015|    7|      18528|      3|
|2016|    8|      19779|      1|
|2016|   10|      19615|      2|
|2016|    7|      19262|      3|
|2017|   10|      20400|      1|
|2017|    8|      20086|      2|
|2017|    7|      19997|      3|
|2018|    5|      20248|      1|
|2018|    7|      19972|      2|
|2018|   10|      19814|      3|
|2019|    

                                                                                

Query 1 - SQL API

In [7]:
Dataframe.createOrReplaceTempView("Crime_data")

result = spark.sql("""
    WITH RankedCrimeData AS (
    SELECT
        YEAR(`Date Rptd`) AS year,
        MONTH(`Date Rptd`) AS month,
        COUNT(*) AS `crime_total`,
        ROW_NUMBER() OVER(PARTITION BY YEAR(`Date Rptd`) ORDER BY COUNT(*) DESC) as `#`
    FROM Crime_data
    GROUP BY year, month
)
SELECT year, month, crime_total, `#`
FROM RankedCrimeData
WHERE `#` <= 3
ORDER BY year, `#`, month;
""")

result.show()




+----+-----+-----------+---+
|year|month|crime_total|  #|
+----+-----+-----------+---+
|2010|    3|      17595|  1|
|2010|    7|      17520|  2|
|2010|    5|      17338|  3|
|2011|    8|      17139|  1|
|2011|    5|      17050|  2|
|2011|    3|      16951|  3|
|2012|    8|      17697|  1|
|2012|   10|      17477|  2|
|2012|    5|      17391|  3|
|2013|    8|      17329|  1|
|2013|    7|      16714|  2|
|2013|    5|      16671|  3|
|2014|   10|      14131|  1|
|2014|    7|      14107|  2|
|2014|    9|      13871|  3|
|2015|    8|      18951|  1|
|2015|   10|      18916|  2|
|2015|    7|      18528|  3|
|2016|    8|      19779|  1|
|2016|   10|      19615|  2|
+----+-----+-----------+---+
only showing top 20 rows



                                                                                

Query 2 - Dataframe API

In [8]:
street_crimes = Dataframe.filter(Dataframe['Premis Desc'] == 'STREET')

total_rows = street_crimes.count()
print(f"Total street crimes: {total_rows}")


street_crimes = street_crimes.withColumn(
    "Time of day",
    F.when((F.col("TIME OCC") >= 500) & (F.col("TIME OCC") < 1200), "morning")
    .when((F.col("TIME OCC") >= 1200) & (F.col("TIME OCC") < 1700), "noon")
    .when((F.col("TIME OCC") >= 1700) & (F.col("TIME OCC") < 2100), "afternoon")
    .when((F.col("TIME OCC") >= 2100) | (F.col("TIME OCC") < 500), "night")
    .otherwise("unknown")
)

street_crime_counts_by_time_of_day = street_crimes.groupBy("Time of day").agg(F.count("*").alias("crime_count")).orderBy("crime_count")

street_crime_counts_by_time_of_day.show()

                                                                                

Total street crimes: 692989




+-----------+-----------+
|Time of day|crime_count|
+-----------+-----------+
|    morning|     123128|
|       noon|     147303|
|  afternoon|     186208|
|      night|     236350|
+-----------+-----------+



                                                                                

Query 2 - RDD API

In [9]:
Dataframe_rdd = Dataframe.rdd

In [10]:
street_crimes_rdd = Dataframe_rdd.filter(lambda row: row['Premis Desc'] == 'STREET')

def time_of_day(hour):
    if 500 <= hour < 1200:
        return "morning"
    elif 1200 <= hour < 1700:
        return "noon"
    elif 1700 <= hour < 2100:
        return "afternoon"
    elif 2100 <= hour or hour < 500:
        return "night"
    else:
        return "unknown"

street_crimes_rdd_with_time_of_day = street_crimes_rdd.map(lambda row: row + ("Time of day", time_of_day(row['TIME OCC'])))

street_crime_counts_by_time_of_day_rdd = street_crimes_rdd_with_time_of_day.map(lambda row: (row[-1], 1)).reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[1])

street_crime_counts_by_time_of_day_rdd.collect()


                                                                                

[('morning', 123128),
 ('noon', 147303),
 ('afternoon', 186208),
 ('night', 236350)]

Query 3 - Dataframe API

In [11]:
join_hint = "broadcast"

In [12]:
crime_data_2015 = Dataframe.filter(F.year("Date Rptd") == 2015)
crime_data_2015 = crime_data_2015.filter(F.col("Vict Descent").isNotNull())

revgecoding_file = "./data/revgecoding.csv"

revgecoding_df = spark.read.csv(revgecoding_file, header=True, inferSchema=True)


In [13]:
crime_data_2015 = crime_data_2015.hint(join_hint).join(revgecoding_df, ['LAT', 'LON'], 'left_outer').select("Vict Descent", "ZIPcode")
crime_data_2015.show()

24/01/30 00:36:31 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build left for left outer join.


+------------+-------+
|Vict Descent|ZIPcode|
+------------+-------+
|           W|  91356|
|           K|  90029|
|           W|  90036|
|           B|  90034|
|           W|  91402|
|           H|  90059|
|           B|  90502|
|           H|  90065|
|           H|  90052|
|           H|  91340|
|           H|  90027|
|           H|  90007|
|           H|   NULL|
|           H|  91328|
|           H|  91331|
|           B|  90043|
|           H|  91340|
|           H|  91306|
|           B|  91365|
|           H|   NULL|
+------------+-------+
only showing top 20 rows



                                                                                

In [14]:
print("No corresponding zip code for", crime_data_2015.filter(F.col("ZIPcode").isNull()).count(), "values out of ", crime_data_2015.count())
crime_data_2015 = crime_data_2015.filter(F.col("ZIPcode").isNotNull())

24/01/30 00:36:32 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build left for left outer join.
24/01/30 00:36:34 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build left for left outer join.

No corresponding zip code for 1934 values out of  190439


                                                                                

In [15]:
median_income_file = "./data/LA_income_2015.csv"

median_income_df = spark.read.csv(median_income_file, header=True, inferSchema=True)

median_income_df = median_income_df.withColumn("Estimated Median Income", F.regexp_replace(F.col("Estimated Median Income"), "[^\d.]", "").cast("int"))

top_incomes = median_income_df.orderBy(F.col("Estimated Median Income").desc()).limit(3)

lowest_incomes = median_income_df.orderBy(F.col("Estimated Median Income")).limit(3)

In [16]:
top_incomes.show()

+--------+--------------------+-----------------------+
|Zip Code|           Community|Estimated Median Income|
+--------+--------------------+-----------------------+
|   90272|Los Angeles (Cast...|                 166021|
|   90077|Los Angeles (Bel ...|                 164281|
|   91210|Glendale (Glendal...|                 158162|
+--------+--------------------+-----------------------+



In [17]:
lowest_incomes.show()

+--------+--------------------+-----------------------+
|Zip Code|           Community|Estimated Median Income|
+--------+--------------------+-----------------------+
|   90021|Los Angeles (Down...|                  12813|
|   91046|Glendale (Verdugo...|                  15568|
|   90058|Los Angeles (Sout...|                  17018|
+--------+--------------------+-----------------------+



In [18]:
descents_low = crime_data_2015.hint(join_hint).join(lowest_incomes, F.col("ZIPcode") == F.col("Zip Code"), "inner")
descents_high = crime_data_2015.hint(join_hint).join(top_incomes, F.col("ZIPcode") == F.col("Zip Code"), "inner")

descents_low = descents_low.groupBy("Vict Descent").agg(F.count("*"))
descents_high = descents_high.groupBy("Vict Descent").agg(F.count("*"))

descents_low = descents_low.withColumnRenamed("Vict Descent", "Victim Descent")
descents_low = descents_low.withColumnRenamed("count(1)", "#")

descents_high = descents_high.withColumnRenamed("Vict Descent", "Victim Descent")
descents_high = descents_high.withColumnRenamed("count(1)", "#")

In [19]:
descents_high.show()

                                                                                

+--------------+---+
|Victim Descent|  #|
+--------------+---+
|             W|312|
|             H| 51|
|             B| 14|
|             X| 26|
|             O|102|
|             A| 16|
+--------------+---+



In [20]:
descents_high.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[Vict Descent#30], functions=[count(1)])
   +- HashAggregate(keys=[Vict Descent#30], functions=[partial_count(1)])
      +- Project [Vict Descent#30]
         +- BroadcastHashJoin [cast(ZIPcode#909 as int)], [Zip Code#1020], Inner, BuildLeft, false
            :- BroadcastExchange HashedRelationBroadcastMode(List(cast(cast(input[1, string, true] as int) as bigint)),false), [plan_id=1612]
            :  +- Project [Vict Descent#30, ZIPcode#909]
            :     +- BroadcastHashJoin [knownfloatingpointnormalized(normalizenanandzero(LAT#43)), knownfloatingpointnormalized(normalizenanandzero(LON#44))], [knownfloatingpointnormalized(normalizenanandzero(LAT#907)), knownfloatingpointnormalized(normalizenanandzero(LON#908))], Inner, BuildLeft, false
            :        :- BroadcastExchange HashedRelationBroadcastMode(List(knownfloatingpointnormalized(normalizenanandzero(input[1, double, true])), knownfloatingpointn

In [21]:
descents_low.show()

                                                                                

+--------------+---+
|Victim Descent|  #|
+--------------+---+
|             C|  1|
|             H|985|
|             B|323|
|             W|267|
|             O|160|
|             X| 44|
|             A| 28|
|             K|  3|
|             I|  1|
+--------------+---+



In [22]:
descents_low.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[Vict Descent#30], functions=[count(1)])
   +- HashAggregate(keys=[Vict Descent#30], functions=[partial_count(1)])
      +- Project [Vict Descent#30]
         +- BroadcastHashJoin [cast(ZIPcode#909 as int)], [Zip Code#1020], Inner, BuildLeft, false
            :- BroadcastExchange HashedRelationBroadcastMode(List(cast(cast(input[1, string, true] as int) as bigint)),false), [plan_id=1943]
            :  +- Project [Vict Descent#30, ZIPcode#909]
            :     +- BroadcastHashJoin [knownfloatingpointnormalized(normalizenanandzero(LAT#43)), knownfloatingpointnormalized(normalizenanandzero(LON#44))], [knownfloatingpointnormalized(normalizenanandzero(LAT#907)), knownfloatingpointnormalized(normalizenanandzero(LON#908))], Inner, BuildLeft, false
            :        :- BroadcastExchange HashedRelationBroadcastMode(List(knownfloatingpointnormalized(normalizenanandzero(input[1, double, true])), knownfloatingpointn

In [23]:
print("Spark UI: ", spark.sparkContext.uiWebUrl)

Spark UI:  http://AORUS.home:4040


Query 4a - Dataframe API

In [24]:
gun_crimes = Dataframe.filter(F.col('Weapon Used Cd').like('1%'))

gun_crimes.show()

+---------+----------+----------+--------+----+----------+-----------+--------+------+--------------------+--------------------+--------+--------+------------+---------+--------------------+--------------+--------------------+------+------------+--------+--------+--------+--------+--------------------+--------------------+-------+---------+
|    DR_NO| Date Rptd|  DATE OCC|TIME OCC|AREA| AREA NAME|Rpt Dist No|Part 1-2|Crm Cd|         Crm Cd Desc|             Mocodes|Vict Age|Vict Sex|Vict Descent|Premis Cd|         Premis Desc|Weapon Used Cd|         Weapon Desc|Status| Status Desc|Crm Cd 1|Crm Cd 2|Crm Cd 3|Crm Cd 4|            LOCATION|        Cross Street|    LAT|      LON|
+---------+----------+----------+--------+----+----------+-----------+--------+------+--------------------+--------------------+--------+--------+------------+---------+--------------------+--------------+--------------------+------+------------+--------+--------+--------+--------+--------------------+-----------

In [25]:
LAPD_stations_file = "./data/LAPD_Police_Stations.csv"

LAPD_stations_df = spark.read.csv(LAPD_stations_file, header=True, inferSchema=True)
LAPD_stations_df= LAPD_stations_df.withColumn("X", F.col("X").cast("double"))
LAPD_stations_df= LAPD_stations_df.withColumn("Y", F.col("Y").cast("double"))

LAPD_stations_df.show()

+--------------+----------------+---+----------------+--------------------+----+
|             X|               Y|FID|        DIVISION|            LOCATION|PREC|
+--------------+----------------+---+----------------+--------------------+----+
|-118.289241553|33.7576608970001|  1|          HARBOR|2175 JOHN S. GIBS...|   5|
|-118.275394206|33.9386273800001|  2|       SOUTHEAST|    145 W. 108TH ST.|  18|
|-118.277669655|33.9703073800001|  3|     77TH STREET|    7600 S. BROADWAY|  12|
|-118.419841576|33.9916553210001|  4|         PACIFIC|  12312 CULVER BLVD.|  14|
|-118.305141563|34.0105753400001|  5|       SOUTHWEST|1546 MARTIN LUTHE...|   3|
|-118.256118891|    34.012355905|  6|          NEWTON|3400 S. CENTRAL AVE.|  13|
|-118.247294123|      34.0440195|  7|         CENTRAL|      251 E. 6TH ST.|   1|
|-118.450779541|34.0437774120001|  8|WEST LOS ANGELES|    1663 BUTLER AVE.|   8|
|-118.213067956|    34.045008769|  9|      HOLLENBECK|     2111 E. 1ST ST.|   4|
|-118.342829525|    34.04674

In [26]:
import geopy.distance

def get_distance(lon, lat, x, y):
    return geopy.distance.geodesic((lat, lon), (y, x)).km

get_distance_udf = F.udf(get_distance)

In [27]:
get_distance(118, -34.5, 118.1, -34)

56.22314650111336

In [28]:
gun_crimes = gun_crimes.withColumn("year", F.year("Date Rptd"))
gun_crimes = gun_crimes.hint(join_hint).join(LAPD_stations_df, F.col("AREA") == F.col("PREC"), "inner").select("year", "DIVISION", "LAT", "LON", "X", "Y")
gun_crimes = gun_crimes.withColumn("Distance", get_distance_udf("LON", "LAT", "X", "Y")).select("year", "DIVISION", "Distance")
gun_crimes.show()

[Stage 62:>                                                         (0 + 1) / 1]

+----+--------+------------------+
|year|DIVISION|          Distance|
+----+--------+------------------+
|2019|  HARBOR| 5.127342291561631|
|2019|  HARBOR| 3.712303302437452|
|2019|  HARBOR| 4.738243911349246|
|2019|  HARBOR|3.2189567144752944|
|2019|  HARBOR| 5.295542135155128|
|2019|  HARBOR|3.7177433963381357|
|2019|  HARBOR|1.8491120400797305|
|2019|  HARBOR| 4.140677660008592|
|2019|  HARBOR|1.6793654889664646|
|2019|  HARBOR|1.9155357951860488|
|2019|  HARBOR|3.2189567144752944|
|2019|  HARBOR|1.9155357951860488|
|2019|  HARBOR|1.3074790808332148|
|2019|  HARBOR|1.7248496567158007|
|2019|  HARBOR|2.4082199069248196|
|2019|  HARBOR|  6.98466056595288|
|2019|  HARBOR| 3.920185606191569|
|2019|  HARBOR| 5.291601159615955|
|2019|  HARBOR|3.1687980374812006|
|2019|  HARBOR|3.7118709805615584|
+----+--------+------------------+
only showing top 20 rows



                                                                                

In [29]:
result = gun_crimes.groupBy("year").agg(
    F.avg("Distance").alias("avg_distance"),
    F.count("Distance").alias("#")
).orderBy("year")
result.show()

[Stage 64:>                                                         (0 + 1) / 1]

+----+------------------+-----+
|year|      avg_distance|    #|
+----+------------------+-----+
|2010|   4.3263411171494| 8162|
|2011|2.7904269556134818| 7225|
|2012| 37.48698144373701| 6539|
|2013| 2.830277105758418| 5851|
|2014|10.470155426256726| 4882|
|2015|2.7062554193223765| 6729|
|2016| 2.717656509409676| 8094|
|2017| 4.339232987437758| 7781|
|2018| 2.735588513251946| 7414|
|2019|2.7408268365459576| 7135|
|2020| 8.613999633753178| 8496|
|2021|32.849532153520606|12101|
|2022|2.6120711144746553|10067|
|2023|2.5574529789345273| 8484|
+----+------------------+-----+



                                                                                

In [30]:
result = gun_crimes.groupBy("DIVISION").agg(
    F.avg("Distance").alias("avg_distance"),
    F.count("Distance").alias("#")
).orderBy(F.col("#").desc())
result.show()

[Stage 68:>                                                         (0 + 1) / 1]

+---------------+------------------+-----+
|       DIVISION|      avg_distance|    #|
+---------------+------------------+-----+
|    77TH STREET| 5.745872330844333|16518|
|      SOUTHEAST|13.561976270352464|13188|
|         NEWTON| 9.896845692177612| 9581|
|      SOUTHWEST| 4.160551099530682| 8615|
|     HOLLENBECK| 15.03408590027148| 6095|
|         HARBOR|13.377359089703047| 5422|
|        RAMPART|  4.10628661100912| 4976|
|        MISSION| 7.545645264792488| 4450|
|        OLYMPIC|1.8346273989145405| 4305|
|      NORTHEAST|10.459075190590417| 3838|
|       FOOTHILL| 3.810820653031423| 3761|
|      HOLLYWOOD|12.119771738174052| 3541|
|        CENTRAL|  4.78918793616405| 3447|
|       WILSHIRE|13.366513228684692| 3420|
|NORTH HOLLYWOOD| 14.03279776776521| 3339|
|    WEST VALLEY| 17.11957243238202| 2782|
|        PACIFIC|13.276221563348493| 2640|
|       VAN NUYS|2.2158642494800787| 2637|
|     DEVONSHIRE| 18.57173283138311| 2598|
|        TOPANGA| 3.478763348039055| 2300|
+----------

                                                                                

Query 4b - Dataframe API

In [31]:
gun_crimes = Dataframe.filter(F.col('Weapon Used Cd').like('1%'))
gun_crimes = gun_crimes.withColumn("year", F.year("Date Rptd"))
gun_crimes = gun_crimes.hint(join_hint).join(LAPD_stations_df, F.col("AREA") == F.col("PREC"), "inner").select("year", "DIVISION", "LAT", "LON", "X", "Y")
gun_crimes.show()



+----+--------+-------+---------+--------------+----------------+
|year|DIVISION|    LAT|      LON|             X|               Y|
+----+--------+-------+---------+--------------+----------------+
|2019|  HARBOR|33.7884|-118.2479|-118.289241553|33.7576608970001|
|2019|  HARBOR|33.7252| -118.299|-118.289241553|33.7576608970001|
|2019|  HARBOR|33.7836|-118.2486|-118.289241553|33.7576608970001|
|2019|  HARBOR|33.7289|-118.2846|-118.289241553|33.7576608970001|
|2019|  HARBOR|33.7933|-118.2512|-118.289241553|33.7576608970001|
|2019|  HARBOR|33.7242|-118.2869|-118.289241553|33.7576608970001|
|2019|  HARBOR|33.7412|-118.2924|-118.289241553|33.7576608970001|
|2019|  HARBOR|33.7868|-118.2613|-118.289241553|33.7576608970001|
|2019|  HARBOR|33.7433|-118.2835|-118.289241553|33.7576608970001|
|2019|  HARBOR| 33.741|-118.2838|-118.289241553|33.7576608970001|
|2019|  HARBOR|33.7289|-118.2846|-118.289241553|33.7576608970001|
|2019|  HARBOR| 33.741|-118.2838|-118.289241553|33.7576608970001|
|2019|  HA

                                                                                

In [32]:
lapd = spark.read.csv(LAPD_stations_file, header=True, inferSchema=True)
lapd = lapd.withColumn("X", F.col("X").cast("double"))
lapd = lapd.withColumn("Y", F.col("Y").cast("double"))

In [33]:
def find_closest_police_station(longitude, latitude,):
    locs = [
    (-118.289241553,33.7576608970001),
    (-118.275394206,33.9386273800001),
    (-118.277669655,33.9703073800001),
    (-118.419841576,33.9916553210001),
    (-118.305141563,34.0105753400001),
    (-118.256118891,34.012355905),
    (-118.247294123,34.0440195),
    (-118.450779541,34.0437774120001),
    (-118.213067956,34.045008769),
    (-118.342829525,34.046747682),
    (-118.291175911,34.050208529),
    (-118.266979649,34.056690437),
    (-118.33066931,34.095833225),
    (-118.249414484,34.119200666),
    (-118.385859348,34.1716939300001),
    (-118.445225709,34.1837432730001),
    (-118.547454438,34.193397227),
    (-118.599636542,34.221376654),
    (-118.410417183,34.2530912220001),
    (-118.531373363,34.256969059),
    (-118.468197808,34.272979397)]


    min_distance = float('inf')

    for loc in locs:
        station_lon, station_lat = loc
        distance = get_distance(longitude, latitude, station_lon, station_lat)

        if distance < min_distance:
            min_distance = distance

    return min_distance

    
find_closest_police_station_udf = F.udf(find_closest_police_station)

In [33]:
find_closest_police_station(-118.266979649, 34.056690437)

0.0

In [34]:
gun_crimes = gun_crimes.withColumn("Distance", find_closest_police_station_udf("LON", "LAT")).select("year", "DIVISION", "Distance")
gun_crimes.show()

                                                                                

+----+--------+------------------+
|year|DIVISION|          Distance|
+----+--------+------------------+
|2019|  HARBOR| 5.127342291561631|
|2019|  HARBOR| 3.712303302437452|
|2019|  HARBOR| 4.738243911349246|
|2019|  HARBOR|3.2189567144752944|
|2019|  HARBOR| 5.295542135155128|
|2019|  HARBOR|3.7177433963381357|
|2019|  HARBOR|1.8491120400797305|
|2019|  HARBOR| 4.140677660008592|
|2019|  HARBOR|1.6793654889664646|
|2019|  HARBOR|1.9155357951860488|
|2019|  HARBOR|3.2189567144752944|
|2019|  HARBOR|1.9155357951860488|
|2019|  HARBOR|1.3074790808332148|
|2019|  HARBOR|1.7248496567158007|
|2019|  HARBOR|2.4082199069248196|
|2019|  HARBOR|  6.98466056595288|
|2019|  HARBOR| 3.920185606191569|
|2019|  HARBOR| 5.291601159615955|
|2019|  HARBOR|3.1687980374812006|
|2019|  HARBOR|3.7118709805615584|
+----+--------+------------------+
only showing top 20 rows



In [37]:
result = gun_crimes.groupBy("year").agg(
    F.avg("Distance").alias("avg_distance"),
    F.count("Distance").alias("#")
).orderBy("year")
result.show()

[Stage 84:>                                                         (0 + 1) / 1]

+----+------------------+-----+
|year|      avg_distance|    #|
+----+------------------+-----+
|2010|3.9762380227744667| 8162|
|2011|2.4582677442240577| 7225|
|2012| 37.13364209524679| 6539|
|2013| 2.459283926814491| 5851|
|2014|10.104120118078391| 4882|
|2015|  2.38834425479313| 6729|
|2016|2.4258511630950776| 8094|
|2017| 4.007069303740121| 7781|
|2018|2.4114913328041054| 7414|
|2019|2.4303624630677114| 7135|
|2020| 8.304682016179004| 8496|
|2021|  32.5358929041811|12101|
|2022| 2.315125883987165|10067|
|2023|2.2715967638620036| 8484|
+----+------------------+-----+



                                                                                

In [None]:
result = gun_crimes.groupBy("DIVISION").agg(
    F.avg("Distance").alias("avg_distance"),
    F.count("Distance").alias("#")
).orderBy(F.col("#").desc())
result.show()



+---------------+------------------+-----+
|       DIVISION|      avg_distance|    #|
+---------------+------------------+-----+
|    77TH STREET|  5.19309759370878|16518|
|      SOUTHEAST|13.471602738284755|13188|
|         NEWTON| 9.444725167756307| 9581|
|      SOUTHWEST|3.5601573646826825| 8615|
|     HOLLENBECK|15.022619534245688| 6095|
|         HARBOR|13.290425819008904| 5422|
|        RAMPART|3.9680769025657825| 4976|
|        MISSION| 6.768267212875467| 4450|
|        OLYMPIC|1.7393567696353007| 4305|
|      NORTHEAST|10.185143645838624| 3838|
|       FOOTHILL|3.7223003120431004| 3761|
|      HOLLYWOOD|12.112232296565534| 3541|
|        CENTRAL| 4.715568664043192| 3447|
|       WILSHIRE|13.079051083443495| 3420|
|NORTH HOLLYWOOD| 13.83065202357141| 3339|
|    WEST VALLEY|16.481518315328803| 2782|
|        PACIFIC|13.190959733187475| 2640|
|       VAN NUYS|2.1875146795048224| 2637|
|     DEVONSHIRE| 17.58786730446147| 2598|
|        TOPANGA| 3.173372840027675| 2300|
+----------

                                                                                