In [1]:
#import findspark
#findspark.init()
import pyspark
from matplotlib import pyplot as plt
from utils import *
from pyspark.sql.functions import collect_list
APP_NAME = 'Exploration-Notebook'
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc).builder.appName(APP_NAME).getOrCreate()

In [None]:
ds = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri",
"mongodb://localhost/crymeclarity.incidents").load()

In [None]:
from pyspark.sql.types import ArrayType, StructType, IntegerType, StructField, StringType, FloatType, TimestampType, DecimalType
from pyspark.sql.functions import udf
import pyspark.sql.functions as psf
from pyspark.sql.functions import col
import mpu

def assign_coordinate_to_lat_box(latitude):
    try:
        lat_box = abs(int(latitude/(1*.008726950000000073)))
        return lat_box
    except ValueError:
        return 0
    
def assign_coordinate_to_lon_box(longitude):
    try:
        lon_box = abs(int(longitude/(1*0.007254180000003885)))
        return lon_box
    except ValueError:
        return 0

def time_occ_to_seconds(time_occ):
    try:
        return int(time_occ[:2])*60**2 + int(time_occ[2:])*60
    except ValueError:
        return -99

actb_lat = udf(assign_coordinate_to_lat_box, IntegerType())
actb_lon = udf(assign_coordinate_to_lon_box, IntegerType())
ts_conv = udf(cla_timestamp_to_datetime, TimestampType())
t_occ_conv = udf(time_occ_to_seconds, IntegerType())
space_dist = udf(lambda w, x, y, z: mpu.haversine_distance((w, x), (y, z))*0.621371, FloatType())



In [None]:
# clean data

ds = ds.withColumn('date_occ', ts_conv(ds.date_occ)) #  convert timestamp strings to datetime
ds = ds.filter(ds['date_occ'] > datetime.datetime(year=2018, month=3, day=27))  # only days after jan 1 2018 / remove invalid strings
ds = ds.withColumn('time_occ_seconds', t_occ_conv(ds.time_occ))  # convert time occurred to seconds
ds = ds.filter(ds.time_occ_seconds >= 0)  # remove invalid choices
ds = ds.withColumn('date_occ_unix', psf.unix_timestamp(ds.date_occ))  # convert datetime to unix timestamp


ds = ds.withColumn('lat_bb_c', actb_lat(ds.location_1.coordinates[0]))  # assign coordinates to bounding box
ds = ds.withColumn('lon_bb_c', actb_lon(ds.location_1.coordinates[1]))  # assign coordinates to bounding box
ds = ds.withColumn('ts_occ_unix', ds.date_occ_unix + ds.time_occ_seconds)  # engineer timestamp in unix feature

In [None]:
#  A should require no cleaning as all the data should be pre-vetted by the generation script
A = spark.read.format("jdbc").options(
url ="jdbc:mysql://localhost/crymeweb?serverTimezone=UTC",
driver="com.mysql.jdbc.Driver",
dbtable="safety_safetyanalysisrequest",
user="root",
password=""
).load()

# engineer features
A = A.withColumn('lat_bb', actb_lat(A.latitude))  # assign coordinates to bounding box
A = A.withColumn('lon_bb', actb_lon(A.longitude))  # assign coordinates to bounding box
A = A.withColumn('timestamp_unix', psf.unix_timestamp(A.timestamp))  # convert datetime to unix timestamp

In [None]:
#  begin grid search and merge

results = None
for i in range(-1, 2):
    for j in range(-1, 2):
        B = A.withColumn('lat_bb', A.lat_bb + i)
        B = B.withColumn('lon_bb', A.lon_bb + j)
        
        res = B.join(ds, (B.lat_bb == ds.lat_bb_c)& (B.lon_bb == ds.lon_bb_c))
        
        res = res.filter(res.ts_occ_unix - res.timestamp_unix < 3600)
        res = res.filter(res.ts_occ_unix - res.timestamp_unix > 0)
        
        res = res.withColumn('distance', space_dist(
                res.longitude,
                res.latitude,
                res.location_1.coordinates[1],
                res.location_1.coordinates[0],
            ))
        res = res.filter(res.distance < .5)
        results = results.union(res) if results else res
        
results = results.groupBy(col('id')).count()



In [None]:
fin = A.join(results, "id", "left_outer")

In [None]:
exp = fin.toPandas()

In [None]:
exp.to_csv('/Users/ben/Desktop/crime_data.csv', index=False)