In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pyspark.sql.functions as F
from datetime import datetime
spark = SparkSession.builder.appName("cleaning data").getOrCreate()
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
from pyspark.sql.window import Window
import matplotlib.pyplot as plt
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder

In [0]:
dirpath_crime = 'dbfs:/FileStore/tables/crime' #crime directory in DBFS
dbutils.fs.mkdirs(dirpath_crime)
dirpath_weather = 'dbfs:/FileStore/tables/weather' #weather directory in DBFS
dbutils.fs.mkdirs(dirpath_weather)

Out[2]: True

In [0]:
def myDateParser(dateStr):   
    if len(dateStr) == 6 or len(dateStr) == 7 or len(dateStr) == 8: # "M/d/yy" (6), "M/dd/yy" (7), "MM/dd/yy" (8)
        newDate = datetime.strptime(dateStr, '%m/%d/%y')
    elif len(dateStr) == 10: # "yyyy-MM-dd" (10)
        newDate = datetime.strptime(dateStr, '%Y-%m-%d')
    else:
        return "Error: Date Function"
    parsedDate = newDate.strftime('%m/%d/%Y') #how i want it to look
    return parsedDate


In [0]:
crime_filepaths = [crime_file.path for crime_file in dbutils.fs.ls(dirpath_crime)]
path_to_population = {crime_filepaths[0]: 8345000, crime_filepaths[1]: 8374000, crime_filepaths[2]: 8404000, crime_filepaths[3]: 8434000, 
                      crime_filepaths[4]: 8464000, crime_filepaths[5]: 8494000, crime_filepaths[6]: 8525000, crime_filepaths[7]: 8555000,
                      crime_filepaths[8]: 8585000, crime_filepaths[9]: 8616000, crime_filepaths[10]: 8646000, crime_filepaths[11]: 8677000, 
                      crime_filepaths[12]: 8708000, crime_filepaths[13]: 8739000, crime_filepaths[14]: 8770000, crime_filepaths[15]: 8801000, 
                      crime_filepaths[16]: 8833000, crime_filepaths[17]: 8864000, crime_filepaths[18]: 8862000, crime_filepaths[19]: 8865000,
                      crime_filepaths[20]: 8877000, crime_filepaths[21]: 8901000}

def clean_crime_data(filepath):
    df = spark.read.format("csv").option("header", "true").load(filepath)
    df = df.dropna("all")  # only drop if all null -- should drop if any null 
    parsedDate = udf(myDateParser)
    df = df.withColumn("Date", to_date("Date", 'MM/dd/yyyy')) #.withColumn("Date", date_format("Date", "MM/dd/yyyy"))
   # df = df.withColumnRenamed("Date", "Crime Date")
    df = df.drop("Case Number", "IUCR", "Beat", "Community Area", "FBI Code", "X Coordinate", "Y Coordinate", "Updated On", "Location")
    df = df.dropna(subset=["Latitude","Longitude"])
    
    df = df.withColumn("Year", to_date(df["Year"], "yyyy"))
    df = df.withColumn("year_only", year(df["Year"]))
    #df = df.withColumn("Year", to_date("Year"))
    df =df.drop("Year")
    df = df.withColumnRenamed("year_only", "Year")
    
    df = df.withColumn("Arrest", col("Arrest").cast("integer"))
    df = df.withColumn("Domestic", col("Domestic").cast("integer"))
    df = df.withColumn("District", col("District").cast("double"))
    df = df.withColumn("Ward", col("Ward").cast("integer"))
    #df = df.withColumn("Latitude", col("Latitude").cast("float"))
    #df = df.withColumn("Longitude", col("Longitude").cast("float"))
    df = df.withColumnRenamed("Latitude", "Crime Latitude").withColumn("Crime Latitude", col("Crime Latitude").cast("float"))
    df = df.withColumnRenamed("Longitude", "Crime Longitude").withColumn("Crime Longitude", col("Crime Longitude").cast("float"))
    df = df.withColumn("Population", lit(path_to_population[filepath]))
    # df = df.withColumn("StationID") ... defining a radius so that we know if it's close or not 
    return df
 
crime_df_unioned = None
for filepath in crime_filepaths:
    crime_df = clean_crime_data(filepath)
    if crime_df_unioned == None:
        crime_df_unioned = crime_df
    else:
        crime_df_unioned = crime_df_unioned.union(crime_df)
#crime_df_unioned.printSchema()
crime_df_unioned.show()

+-------+----------+--------------------+-------------------+--------------------+--------------------+------+--------+--------+----+--------------+---------------+----+----------+
|     ID|      Date|               Block|       Primary Type|         Description|Location Description|Arrest|Domestic|District|Ward|Crime Latitude|Crime Longitude|Year|Population|
+-------+----------+--------------------+-------------------+--------------------+--------------------+------+--------+--------+----+--------------+---------------+----+----------+
|1362430|2001-01-25|  059XX S INDIANA AV|          NARCOTICS|POSS: CANNABIS MO...|              STREET|     1|       0|     2.0|null|     41.786106|     -87.620476|2001|   8345000|
|1354040|2001-01-26| 027XX N LAWNDALE AV|            ASSAULT|              SIMPLE|           RESIDENCE|     0|       0|    25.0|null|      41.93044|      -87.71945|2001|   8345000|
|1354039|2001-01-28|048XX W FULLERTON AV|      OTHER OFFENSE|HARASSMENT BY TEL...|          DRU

In [0]:
primary_type_counts = crime_df_unioned.groupBy("Primary Type").agg(F.count("*").alias("Crime Count"))

primary_types = [row["Primary Type"] for row in primary_type_counts.collect()]
primary_counts = [row["Crime Count"] for row in primary_type_counts.collect()]

colors = ["b", 'r', 'g', 'k', 'y', 'm', 'c'] 

fig, ax = plt.subplots(figsize=(15,8))
ax.bar(primary_types, primary_counts, color = colors[:len(primary_types)])
ax.set_title("Total Primary Type Count")
ax.set_xlabel("Primary Type")
ax.set_ylabel("Crime Count")
plt.xticks(rotation=90)
plt.show()

In [0]:
narcoticsCrimes = crime_df_unioned.filter(crime_df_unioned['Primary Type'] == 'NARCOTICS')
narcoticsCrimes = narcoticsCrimes.groupBy('Description').agg(sum('Arrest').alias('Total Arrests'))
narcoticsCrimes = narcoticsCrimes.withColumn("% of Total Arrests", round(narcoticsCrimes["Total Arrests"]/sum("Total Arrests").over(Window.partitionBy()), 10)*100)
# narcoticsCrimes.show()

narcotics_Description_Type = [row["Description"] for row in narcoticsCrimes.collect()]
arrests_type = [row["% of Total Arrests"] for row in narcoticsCrimes.collect()]

colors = ["b", 'r', 'g', 'k', 'y', 'm', 'c'] #color = colors[:len(narcotics_Description_Type)]
fig, ax = plt.subplots(figsize=(10,22))
ax.barh( narcotics_Description_Type, arrests_type , color = colors[:len(narcotics_Description_Type)])
ax.set_ylabel("Narcotics Crime Type")
ax.set_xlabel("% of Arrests Made")
ax.set_title("Narcotics Arrests")
ax.tick_params(axis="x", rotation=0)

plt.subplots_adjust()
plt.show()

In [0]:
'''
total_count = crime_df_unioned.count()

null_count_long = crime_df_unioned.where(col("Crime Longitude").isNull()).count()
null_percent_long = null_count_long / total_count * 100
print(null_percent_long)

null_count_lat = crime_df_unioned.where(col("Crime Latitude").isNull()).count()
null_percent_lat = null_count_lat / total_count * 100
print(null_percent_lat)

null_count_ward = crime_df_unioned.where(col("Ward").isNull()).count()
null_percent_ward = null_count_ward/total_count * 100
print(null_percent_ward)

null_count_district = crime_df_unioned.where(col("District").isNull()).count()
null_percent_district = null_count_district/total_count * 100
print(null_percent_district)
'''

total_count = crime_df_unioned.count()
theft_count = crime_df_unioned.where(col("Primary Type") == "THEFT").count()
theft_percent = theft_count/total_count * 100
print(theft_percent)

In [0]:
rowNum_crime = crime_df_unioned.count()
colNum_crime = len(crime_df_unioned.columns)
crime_df_shape = (rowNum_crime, colNum_crime)
print(crime_df_shape)

In [0]:
# FILE PATH LIST
weather_filepaths = [file.path for file in dbutils.fs.ls(dirpath_weather)]

# FILE NAME LIST
weather_names = [file.name.split(".")[0] for file in dbutils.fs.ls(dirpath_weather)]

'''
# FILE_NAME TO (LATITUDE, LONGITUDE)
name_to_lat_long_dict = {weather_names[0]: (41.78038, -88.30925),
                         weather_names[1]: (42.1153, -88.1639),
                         weather_names[2]: (41.63178, -87.08803), 
                         weather_names[3]: (41.89641, -88.25119), 
                         weather_names[4]: (42.06278, -88.28617), 
                         weather_names[5]: (41.7091, -87.8761),
                         weather_names[6]: (41.78412, -87.75514), 
                         weather_names[7]: (42.25515, -88.07758), 
                         weather_names[8]: (41.96017, -87.93164),
                         weather_names[9]: (41.4947, -87.6802), 
                         weather_names[10]: (42.3091, -88.2533)}

# FILEPATH TO FILE NAME
path_to_name_dict = {weather_filepaths[0]: weather_names[0], 
                     weather_filepaths[1]: weather_names[1], 
                     weather_filepaths[2]: weather_names[2], 
                     weather_filepaths[3]: weather_names[3],
                     weather_filepaths[4]: weather_names[4],
                     weather_filepaths[5]: weather_names[5],
                     weather_filepaths[6]: weather_names[6],
                     weather_filepaths[7]: weather_names[7],
                     weather_filepaths[8]: weather_names[8],
                     weather_filepaths[9]: weather_names[9], 
                     weather_filepaths[10]: weather_names[10]}
 '''


# FILE PATH: (LATITUDE, LONGITUDE)
path_to_lat_long = {weather_filepaths[0]: (41.78038, -88.30925), 
                    weather_filepaths[1]: (42.1153, -88.1639), 
                    weather_filepaths[2]: (41.63178, -87.08803), 
                    weather_filepaths[3]: (41.89641, -88.25119), 
                    weather_filepaths[4]: (42.06278, -88.28617), 
                    weather_filepaths[5]: (41.7091, -87.8761),
                    weather_filepaths[6]: (41.78412, -87.75514), 
                    weather_filepaths[7]: (42.25515, -88.07758), 
                    weather_filepaths[8]: (41.96017, -87.93164),
                    weather_filepaths[9]: (41.4947, -87.6802), 
                    weather_filepaths[10]: (42.3091, -88.2533)}

# FILEPATH: STATION
path_to_station = {weather_filepaths[0]: "Aurora", 
                   weather_filepaths[1]: "Barrington", 
                   weather_filepaths[2]: "Dunes National Park", 
                   weather_filepaths[3]: "Dupage Airport",
                   weather_filepaths[4]: "Elgin",
                   weather_filepaths[5]: "Little Red School House",
                   weather_filepaths[6]: "Midway Airport",
                   weather_filepaths[7]: "Mundelein",
                   weather_filepaths[8]: "O'Hare Airport",
                   weather_filepaths[9]: "Park Forest", 
                   weather_filepaths[10]: "Stratton Lock Dam"}

# STATION: (LAT, LONG)
station_to_lat_long = {"Aurora": (41.78038, -88.30925),
                        "Barrington": (42.1153, -88.1639),
                        "Dunes National Park": (41.63178, -87.08803), 
                        "Dupage Airport": (41.89641, -88.25119), 
                        "Elgin": (42.06278, -88.28617), 
                        "Little Red School House": (41.7091, -87.8761),
                        "Midway Airport": (41.78412, -87.75514), 
                        "Mundelein": (42.25515, -88.07758), 
                        "O'Hare Airport": (41.96017, -87.93164),
                        "Park Forest": (41.4947, -87.6802), 
                        "Stratton Lock Dam": (42.3091, -88.2533)}

In [0]:
def clean_weather_data(filepath):
    parsedDate = udf(myDateParser)
    df = spark.read.format("csv").option("header", "true").load(filepath)
    df = df.dropna("all")
    df = df.withColumn("Date", to_date(parsedDate("Date"), 'MM/dd/yyyy')) #.withColumn("Date", date_format("Date", "MM/dd/yyyy")
  
    condition = (col("Date") >= "2001-01-01") & (col("Date") <= "2022-12-31") #get dates in range[1/1/2001, 12/31/2022]
    df = df.filter(condition) 
    #df = df.withColumnRenamed("Date", "Weather Date")
    
    df = df.withColumn("TMAX", col("TMAX (Degrees Fahrenheit)").cast("double")).drop("TMAX (Degrees Fahrenheit)")
    df = df.withColumn("TMIN", col("TMIN (Degrees Fahrenheit)").cast("double")).drop("TMIN (Degrees Fahrenheit)")
    df = df.withColumn("TAVG", (col("TMAX") + col("TMIN")) / 2).drop("TAVG (Degrees Fahrenheit)") 
    
    df = df.withColumn("PRCP", col("PRCP (Inches)").cast("double")).drop("PRCP (Inches)") 
    df = df.withColumn("PRCP", when(col("PRCP").isNull(), 0).otherwise(col("PRCP")))
    
    df = df.withColumn("SNOW", col("SNOW (Inches)").cast("double")).drop("SNOW (Inches)") 
    df = df.withColumn("SNOW", when(col("SNOW").isNull(), 0).otherwise(col("SNOW")))
    
    df = df.withColumn("SNWD", col("SNWD (Inches)").cast("double")).drop("SNWD (Inches)")
    df = df.withColumn("SNWD", when(col("SNWD").isNull(), 0).otherwise(col("SNWD")))
    
    df = df.withColumn("Station", lit(path_to_station[filepath]))
    df = df.withColumn("Weather Latitude", lit(path_to_lat_long[filepath][0]))
    df = df.withColumn("Weather Longitude", lit(path_to_lat_long[filepath][1]))    
    return df
      
weather_df_unioned = None
for i in range(len(weather_filepaths)):
    #print(weather_filepaths[i])
    weather_df = clean_weather_data(weather_filepaths[i])
    if weather_df_unioned == None:
        weather_df_unioned = weather_df
    else:
        weather_df_unioned = weather_df_unioned.union(weather_df)
#weather_df_unioned.show()

In [0]:
weather_df_unioned.show()

+----------+----+-----+----+----+----+----+-------+----------------+-----------------+
|      Date|TMAX| TMIN|TAVG|PRCP|SNOW|SNWD|Station|Weather Latitude|Weather Longitude|
+----------+----+-----+----+----+----+----+-------+----------------+-----------------+
|2001-01-01|18.0|-10.0| 4.0| 0.0| 0.0| 0.0| Aurora|        41.78038|        -88.30925|
|2001-01-02|14.0| -2.0| 6.0| 0.0| 0.0| 0.0| Aurora|        41.78038|        -88.30925|
|2001-01-03|31.0| -4.0|13.5| 0.0| 0.0| 0.0| Aurora|        41.78038|        -88.30925|
|2001-01-04|36.0| 14.0|25.0| 0.0| 0.0| 0.0| Aurora|        41.78038|        -88.30925|
|2001-01-05|30.0| 24.0|27.0| 0.0| 0.0| 0.0| Aurora|        41.78038|        -88.30925|
|2001-01-06|37.0| 24.0|30.5|0.05| 0.0| 0.0| Aurora|        41.78038|        -88.30925|
|2001-01-07|35.0| 29.0|32.0| 0.0| 0.0| 0.0| Aurora|        41.78038|        -88.30925|
|2001-01-08|45.0| 30.0|37.5|0.01| 0.0| 0.0| Aurora|        41.78038|        -88.30925|
|2001-01-09|32.0| 19.0|25.5|0.42| 3.0| 0.0|

In [0]:
rowNum_weather = weather_df_unioned.count()
colNum_weather = len(weather_df_unioned.columns)
weather_df_shape = (rowNum_weather, colNum_weather)
print(weather_df_shape)

In [0]:
ohare_weather = weather_df_unioned.filter(weather_df_unioned.Station == "O'Hare Airport")
from pyspark.sql.functions import year

ohare_weather = ohare_weather.withColumn("Year", year(ohare_weather.Date))
max_temp_by_year = ohare_weather.groupBy("Year").agg(max("TMAX").alias("Max Temp"))
max_temp_by_year = max_temp_by_year.orderBy("Year")

crime_count = crime_df_unioned.withColumn("Year", year(crime_df_unioned.Date))
crime_count_by_year = crime_count.groupBy("Year").agg(count("ID").alias("Crime Count"))
crime_count_by_year = crime_count_by_year.orderBy("Year")

year_col = [row["Year"] for row in max_temp_by_year.collect()]
max_temp_col = [row["Max Temp"] for row in max_temp_by_year.collect()]

fig, ax1 = plt.subplots()

color = 'tab:red'
ax1.set_xlabel('Year')
ax1.set_ylabel('Max Temp (Degree Farenheit)', color=color)
ax1.plot(max_temp_by_year.select("Year").collect(), max_temp_by_year.select("Max Temp").collect(), color=color)
ax1.tick_params(axis='y', labelcolor=color)

ax2 = ax1.twinx()
color = 'tab:blue'
ax2.set_ylabel('Crime Count', color=color)
ax2.plot(crime_count_by_year.select("Year").collect(), crime_count_by_year.select("Crime Count").collect(), color=color)
ax2.tick_params(axis='y', labelcolor=color)

plt.show()

In [0]:
min_temp_by_year = ohare_weather.groupBy("Year").agg(min("TMIN").alias("Min Temp"))
min_temp_by_year = min_temp_by_year.orderBy("Year")

crime_count = crime_df_unioned.withColumn("Year", year(crime_df_unioned.Date))
crime_count_by_year = crime_count.groupBy("Year").agg(count("ID").alias("Crime Count"))
crime_count_by_year = crime_count_by_year.orderBy("Year")

year_col = [row["Year"] for row in min_temp_by_year.collect()]
min_temp_col = [row["Min Temp"] for row in min_temp_by_year.collect()]

fig, ax1 = plt.subplots()

color = 'tab:red'
ax1.set_xlabel('Year')
ax1.set_ylabel('Min Temp (Degree Farenheit)', color=color)
ax1.plot(min_temp_by_year.select("Year").collect(), min_temp_by_year.select("Min Temp").collect(), color=color)
ax1.tick_params(axis='y', labelcolor=color)

ax2 = ax1.twinx()
color = 'tab:blue'
ax2.set_ylabel('Crime Count', color=color)
ax2.plot(crime_count_by_year.select("Year").collect(), crime_count_by_year.select("Crime Count").collect(), color=color)
ax2.tick_params(axis='y', labelcolor=color)

plt.show()

In [0]:
avg_temp_by_year = ohare_weather.groupBy("Year").agg(avg("TAVG").alias("Avg Temp"))
avg_temp_by_year = avg_temp_by_year.orderBy("Year")

crime_count = crime_df_unioned.withColumn("Year", year(crime_df_unioned.Date))
crime_count_by_year = crime_count.groupBy("Year").agg(count("ID").alias("Crime Count"))
crime_count_by_year = crime_count_by_year.orderBy("Year")

year_col = [row["Year"] for row in avg_temp_by_year.collect()]
avg_temp_col = [row["Avg Temp"] for row in avg_temp_by_year.collect()]

fig, ax1 = plt.subplots()

color = 'tab:red'
ax1.set_xlabel('Year')
ax1.set_ylabel('Avg Temp (Degree Farenheit)', color=color)
ax1.plot(avg_temp_by_year.select("Year").collect(), avg_temp_by_year.select("Avg Temp").collect(), color=color)
ax1.tick_params(axis='y', labelcolor=color)

ax2 = ax1.twinx()
color = 'tab:blue'
ax2.set_ylabel('Crime Count', color=color)
ax2.plot(crime_count_by_year.select("Year").collect(), crime_count_by_year.select("Crime Count").collect(), color=color)
ax2.tick_params(axis='y', labelcolor=color)

plt.show()

In [0]:
midway_weather = weather_df_unioned.filter(weather_df_unioned.Station == "Midway Airport")
from pyspark.sql.functions import year

midway_weather = midway_weather.withColumn("Year", year(midway_weather.Date))
max_temp_by_year = midway_weather.groupBy("Year").agg(max("TMAX").alias("Max Temp"))
max_temp_by_year = max_temp_by_year.orderBy("Year")

crime_count = crime_df_unioned.withColumn("Year", year(crime_df_unioned.Date))
crime_count_by_year = crime_count.groupBy("Year").agg(count("ID").alias("Crime Count"))
crime_count_by_year = crime_count_by_year.orderBy("Year")

year_col = [row["Year"] for row in max_temp_by_year.collect()]
max_temp_col = [row["Max Temp"] for row in max_temp_by_year.collect()]

fig, ax1 = plt.subplots()

color = 'tab:red'
ax1.set_xlabel('Year')
ax1.set_ylabel('Max Temp (Degree Farenheit)', color=color)
ax1.set_title('Maximum Temperature vs Crime Count at Midway Airport per Year')
ax1.plot(max_temp_by_year.select("Year").collect(), max_temp_by_year.select("Max Temp").collect(), color=color)
ax1.tick_params(axis='y', labelcolor=color)

ax2 = ax1.twinx()
color = 'tab:blue'
ax2.set_ylabel('Crime Count', color=color)
ax2.plot(crime_count_by_year.select("Year").collect(), crime_count_by_year.select("Crime Count").collect(), color=color)
ax2.tick_params(axis='y', labelcolor=color)

plt.show()

In [0]:
min_temp_by_year = midway_weather.groupBy("Year").agg(min("TMIN").alias("Min Temp"))
min_temp_by_year = min_temp_by_year.orderBy("Year")

crime_count = crime_df_unioned.withColumn("Year", year(crime_df_unioned.Date))
crime_count_by_year = crime_count.groupBy("Year").agg(count("ID").alias("Crime Count"))
crime_count_by_year = crime_count_by_year.orderBy("Year")

year_col = [row["Year"] for row in min_temp_by_year.collect()]
min_temp_col = [row["Min Temp"] for row in min_temp_by_year.collect()]

fig, ax1 = plt.subplots()

color = 'tab:red'
ax1.set_xlabel('Year')
ax1.set_ylabel('Min Temp (Degree Farenheit)', color=color)
ax1.set_title('Minimum Temperature vs Crime Count at Midway Airport per Year')
ax1.plot(min_temp_by_year.select("Year").collect(), min_temp_by_year.select("Min Temp").collect(), color=color)
ax1.tick_params(axis='y', labelcolor=color)

ax2 = ax1.twinx()
color = 'tab:blue'
ax2.set_ylabel('Crime Count', color=color)
ax2.plot(crime_count_by_year.select("Year").collect(), crime_count_by_year.select("Crime Count").collect(), color=color)
ax2.tick_params(axis='y', labelcolor=color)

plt.show()

In [0]:
avg_temp_by_year = midway_weather.groupBy("Year").agg(avg("TAVG").alias("Avg Temp"))
avg_temp_by_year = avg_temp_by_year.orderBy("Year")

crime_count = crime_df_unioned.withColumn("Year", year(crime_df_unioned.Date))
crime_count_by_year = crime_count.groupBy("Year").agg(count("ID").alias("Crime Count"))
crime_count_by_year = crime_count_by_year.orderBy("Year")

year_col = [row["Year"] for row in avg_temp_by_year.collect()]
avg_temp_col = [row["Avg Temp"] for row in avg_temp_by_year.collect()]

fig, ax1 = plt.subplots()

color = 'tab:red'
ax1.set_xlabel('Year')
ax1.set_ylabel('Avg Temp (Degree Farenheit)', color=color)
ax1.set_title('Average Temperature vs Crime Count at Midway Airport per Year')
ax1.plot(avg_temp_by_year.select("Year").collect(), avg_temp_by_year.select("Avg Temp").collect(), color=color)
ax1.tick_params(axis='y', labelcolor=color)

ax2 = ax1.twinx()
color = 'tab:blue'
ax2.set_ylabel('Crime Count', color=color)
ax2.plot(crime_count_by_year.select("Year").collect(), crime_count_by_year.select("Crime Count").collect(), color=color)
ax2.tick_params(axis='y', labelcolor=color)

plt.show()

In [0]:
# using Haversine formula to calculate the distance between two points using their latitudes and longitudes
def calc_distance(c_lat, c_long, w_lat, w_long):
    earth_radius = 6371
    lat = radians(w_lat - c_lat)
    long = radians(w_long - c_long)
    a = sin(lat / 2) * sin(lat / 2) + cos(radians(c_lat)) * cos(radians(w_lat)) * sin(long / 2) * sin(long / 2)
    c = 2 * atan2(sqrt(a), sqrt(1-a))
    distance = (earth_radius * c) * 0.621371 
    return distance # miles 

In [0]:
rows = []
for station_key in station_to_lat_long.keys():
    lat, long = station_to_lat_long[station_key]
    row = (station_key, lat, long)
    rows.append(row)
    
station_df = spark.createDataFrame(rows, ["StationID", "Latitude", "Longitude"])
station_df = station_df.withColumn("Latitude", col("Latitude"))
station_df = station_df.withColumn("Longitude", col("Longitude"))
station_df.show()
                    

+--------------------+--------+---------+
|           StationID|Latitude|Longitude|
+--------------------+--------+---------+
|              Aurora|41.78038|-88.30925|
|          Barrington| 42.1153| -88.1639|
| Dunes National Park|41.63178|-87.08803|
|      Dupage Airport|41.89641|-88.25119|
|               Elgin|42.06278|-88.28617|
|Little Red School...| 41.7091| -87.8761|
|      Midway Airport|41.78412|-87.75514|
|           Mundelein|42.25515|-88.07758|
|      O'Hare Airport|41.96017|-87.93164|
|         Park Forest| 41.4947| -87.6802|
|   Stratton Lock Dam| 42.3091| -88.2533|
+--------------------+--------+---------+



In [0]:
joined_df = crime_df_unioned.crossJoin(station_df)
joined_df = joined_df.withColumn("dist", calc_distance(col("Crime Latitude"), col("Crime Longitude"), col("Latitude"), col("Longitude")))
grouped_df = joined_df.groupBy("ID").agg({"dist": "min", "StationID": "first"})
grouped_df = grouped_df.withColumnRenamed("min(dist)", "min_distance").withColumnRenamed("first(StationID)", "Station")
crime_df = crime_df_unioned.join(grouped_df, "ID", "left").drop("Crime Latitude", "Crime Longitude", "min_distance", "District", "Ward")
#crime_df.show(5)

In [0]:
weather_crime_df = weather_df_unioned.join(crime_df, ['Date', 'Station'], how='inner')
weather_crime_df.show()

+----------+-------+----+----+----+----+----+----+----------------+-----------------+--------+--------------------+-----------------+--------------------+--------------------+------+--------+----+----------+
|      Date|Station|TMAX|TMIN|TAVG|PRCP|SNOW|SNWD|Weather Latitude|Weather Longitude|      ID|               Block|     Primary Type|         Description|Location Description|Arrest|Domestic|Year|Population|
+----------+-------+----+----+----+----+----+----+----------------+-----------------+--------+--------------------+-----------------+--------------------+--------------------+------+--------+----+----------+
|2015-03-18| Aurora|46.0|28.0|37.0| 0.0| 0.0| 0.0|        41.78038|        -88.30925|10000036| 079XX S LOOMIS BLVD|        NARCOTICS|POSS: CANNABIS 30...|              STREET|     1|       0|2015|   8770000|
|2015-03-18| Aurora|45.0|23.0|34.0| 0.0| 0.0| 0.0|        41.78038|        -88.30925|10000036| 079XX S LOOMIS BLVD|        NARCOTICS|POSS: CANNABIS 30...|              

In [0]:
primary_Crime_Type_df = weather_crime_df.select("Primary Type").distinct().rdd.flatMap(lambda x: x).collect()
print(primary_Crime_Type_df)

In [0]:
'''
offense_involving_children = primary_Crime_Type_df[0]
criminal_sexual_assualt = primary_Crime_Type_df[1] #-
stalking = primary_Crime_Type_df[2]
public_peace_violation = primary_Crime_Type_df[3]
obscenity = primary_Crime_Type_df[4]
arson = primary_Crime_Type_df[5]
gambling = primary_Crime_Type_df[6]
criminal_trespass = primary_Crime_Type_df[7]
non_criminal = primary_Crime_Type_df[8] #--
liquor_law_violation = primary_Crime_Type_df[9]
motor_vehicle_theft = primary_Crime_Type_df[10]
theft = primary_Crime_Type_df[11]
battery = primary_Crime_Type_df[12]
robbery = primary_Crime_Type_df[13]
homicide = primary_Crime_Type_df[14]
ritualism = primary_Crime_Type_df[15]
public_indecency = primary_Crime_Type_df[16]
crim_sexual_assualt = primary_Crime_Type_df[17] #-
human_trafficking = primary_Crime_Type_df[18]
intimidation = primary_Crime_Type_df[19]
prostitution = primary_Crime_Type_df[20]
deceptive_practice = primary_Crime_Type_df[21]
concealed_carry_license_violation = primary_Crime_Type_df[22]
sex_offense = primary_Crime_Type_df[23]
criminal_damage = primary_Crime_Type_df[24]
narcotics = primary_Crime_Type_df[25]
other_offense = primary_Crime_Type_df[26]
kidnapping = primary_Crime_Type_df[27]
burglary = primary_Crime_Type_df[28]
weapons_violation = primary_Crime_Type_df[29]
other_narcotic_violation = primary_Crime_Type_df[30]
interference_with_public_officer = primary_Crime_Type_df[31]
non_criminal_subject_specified = primary_Crime_Type_df[32]
non__criminal = primary_Crime_Type_df[33] #--
domestic_violence = primary_Crime_Type_df[34]
'''

In [0]:
ohare_and_midway_weather = weather_df_unioned.filter(weather_df_unioned.Station.isin("O'Hare Airport"))
from pyspark.sql.functions import year

ohare_and_midway_weather = ohare_and_midway_weather.withColumn("Year", year(ohare_and_midway_weather.Date))
prcp_by_year = ohare_and_midway_weather.groupBy("Year").agg(max("PRCP").alias("Precipitation"))
prcp_by_year = prcp_by_year.orderBy("Year")

crime_count = crime_df_unioned.withColumn("Year", year(crime_df_unioned.Date))
crime_count_by_year = crime_count.groupBy("Year").agg(count("ID").alias("Crime Count"))
crime_count_by_year = crime_count_by_year.orderBy("Year")

x = crime_count_by_year.select("Crime Count").rdd.flatMap(lambda x: x).collect()
y = prcp_by_year.select("Precipitation").rdd.flatMap(lambda x: x).collect()
c = prcp_by_year.select("Year").rdd.flatMap(lambda x: x).collect()

fig, ax = plt.subplots(figsize=(10, 8))
sc = ax.scatter(c, y, c=x, cmap='coolwarm')

ax.set_xlabel('Year')
ax.set_ylabel('Precipitation')
ax.set_title('Precipitation vs. Crime Count over Years at OHare Airport')
fig.colorbar(sc)
plt.show()



In [0]:

weather_crime_df = weather_crime_df.na.drop()
theft_damages = when(col("Description").isin("$500 AND UNDER", "FROM BUILDING", "PURSE-SNATCHING"), 0).otherwise(1)
weather_crime_df = weather_crime_df.withColumn("Theft Damages", theft_damages)
theft_type = 'THEFT'
theft_type_df = weather_crime_df.filter(weather_crime_df['Primary Type'] == theft_type)
train_df, test_df = theft_type_df.randomSplit([0.7, 0.3], seed=42)
indexer = StringIndexer(inputCol="Station", outputCol="StationIndex")

train_data = indexer.fit(train_df).transform(train_df)
test_data = indexer.fit(test_df).transform(test_df)

encoder = OneHotEncoder(inputCols=["StationIndex"], outputCols=["StationVec"])
independent_features = VectorAssembler(inputCols=["TAVG", "TMAX", "TMIN", "PRCP", "SNOW", "StationVec"], outputCol="features")
encoder_model = encoder.fit(train_data)

train_data = encoder_model.transform(train_data)
test_data = encoder_model.transform(test_data)

train_data = independent_features.transform(train_data)
test_data = independent_features.transform(test_data)

train_data_sample = train_data.sample(fraction=0.4, seed=42)
test_data_sample = test_data.sample(fraction=0.4, seed=42)



In [0]:
train_data_sample = train_data.sample(fraction=0.4, seed=42)
test_data_sample = test_data.sample(fraction=0.4, seed=42)

lr = LinearRegression(featuresCol='features', labelCol='Theft Damages')
lrModel = lr.fit(train_data)

print(lrModel)
predictions = lrModel.transform(test_data)
predictions.show()

evalReg = RegressionEvaluator(labelCol="Theft Damages", predictionCol="prediction", metricName="mse")
mse = evalReg.evaluate(predictions)
print(mse)

accuracy = predictions.filter(predictions["Theft Damages"] == predictions["prediction"]).count() / predictions.count()
print(accuracy)

LinearRegressionModel: uid=LinearRegression_807b7ee80a6c, numFeatures=11
+----------+-------+----+-----+----+----+----+----+----------------+-----------------+-------+--------------------+------------+--------------------+--------------------+------+--------+----+----------+-------------+------------+-------------+--------------------+-------------------+
|      Date|Station|TMAX| TMIN|TAVG|PRCP|SNOW|SNWD|Weather Latitude|Weather Longitude|     ID|               Block|Primary Type|         Description|Location Description|Arrest|Domestic|Year|Population|Theft Damages|StationIndex|   StationVec|            features|         prediction|
+----------+-------+----+-----+----+----+----+----+----------------+-----------------+-------+--------------------+------------+--------------------+--------------------+------+--------+----+----------+-------------+------------+-------------+--------------------+-------------------+
|2001-01-01| Aurora|18.0|-10.0| 4.0| 0.0| 0.0| 0.0|        41.78038|    

In [0]:

evalPrecision = MulticlassClassificationEvaluator(labelCol="Theft Damages", predictionCol="prediction", metricName="weightedPrecision")
print(evalPrecision)
evalRecall = MulticlassClassificationEvaluator(labelCol="Theft Damages", predictionCol="prediction", metricName="weightedRecall")
recall = evalRecall.evaluate(predictions)
# print(evalRecall)
print(recall)
f1_eval = MulticlassClassificationEvaluator(labelCol="Theft Damages", predictionCol="prediction", metricName="f1")
f1 = f1_eval.evaluate(predictions)

print(f1)

MulticlassClassificationEvaluator_0bf3adb84d63
0.0
0.0


In [0]:
evalPrecision.show()

In [0]:
from pyspark.ml.classification import RandomForestClassifier

train_data_sample = train_data.sample(fraction=0.4, seed=42)
test_data_sample = test_data.sample(fraction=0.4, seed=42)

rf = RandomForestClassifier(labelCol="Theft Damages", featuresCol="features")
rfModel = rf.fit(train_data_sample)

# Make predictions on the test data
predictions_rf = rfModel.transform(test_data_sample)
predictions_rf.show()

# Evaluate the accuracy of the model using the MulticlassClassificationEvaluator
evaluator_rf = MulticlassClassificationEvaluator(labelCol="Theft Damages", predictionCol="prediction", metricName="accuracy")
accuracy_rf = evaluator_rf.evaluate(predictions_rf)
print("Accuracy = %g" % accuracy_rf)

+----------+-------+----+-----+----+----+----+----+----------------+-----------------+-------+--------------------+------------+--------------------+--------------------+------+--------+----+----------+-------------+------------+-------------+--------------------+--------------------+--------------------+----------+
|      Date|Station|TMAX| TMIN|TAVG|PRCP|SNOW|SNWD|Weather Latitude|Weather Longitude|     ID|               Block|Primary Type|         Description|Location Description|Arrest|Domestic|Year|Population|Theft Damages|StationIndex|   StationVec|            features|       rawPrediction|         probability|prediction|
+----------+-------+----+-----+----+----+----+----+----------------+-----------------+-------+--------------------+------------+--------------------+--------------------+------+--------+----+----------+-------------+------------+-------------+--------------------+--------------------+--------------------+----------+
|2001-01-01| Aurora|18.0|-10.0| 4.0| 0.0| 0.0|

In [0]:
print(recall)

In [0]:
print(f1)

In [0]:

new_data = spark.createDataFrame([(25, 0.5)], ["TAVG", "TMAX", "TMIN", "PRCP", "SNOW", "StationVec"])
new_data = assembler.transform(new_data)

predicted_thefts = lrModel.transform(new_data).collect()[0]["prediction"]

In [0]:
predicted_thefts.show()

In [0]:
'''
SCRATCH

weather_crime_joined = weather_df_unioned.join(crime_df_unioned, on = ["Date", "StationID"])

y_train = weather_crime_joined.groupBy("StationID").sum("Primary Type" where PrimaryType == Theft)....calculate distrubiton of description of damages per STATIONID at dates 

Date     |  StationID  | NumTheft  | SeverityDistribution | LocationDistribution
1/1/2001 | Aurora      | 200       | >$500                | Apartment 

x_train -- weather_crime_joined.split

% Template
x_train = VectorAssembler(inputCols=["TMAX", "TMIN", "TAVG", "PRCP"], outputCol="weather feature")
y_train = assembler.transform(weather_crime_joined_df).select("StationID", "weather_features")

# y_train = assembler.transform(weather_crime_joined_df).select("StationID", "crime_features") 

train_data, test_data = data_with_features.randomSplit([0.8, 0.2])

# call import
# params for now
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_Model = lr.fit(train_data)

# predictions on the testing data
pred = lr_Model.transform(test_data)
predicted_theft = prediction.select("theft").collect()[0][0]
print("Predicted theft:", predicted_theft)

# root mean squared error (RMSE)
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName="rmse")
rmse = evaluator.evaluate(pred)
print("Root Mean Squared Error (RMSE):", rmse)
'''