## Clean & Pipe Recent Crime Events


---
Obj: Identify valid crimes for our analysis and pipe them into our web app.

In [1]:
import os
import sys

try:
    import pyspark
except ImportError:
    import findspark
    findspark.init()
    import pyspark
    
    
# give notebook access to crymepipelines app modules
CRYMEPIPELINES_PATH = '/home/ben/.envs/cc/CrymeClarity/crymepipelines/src'
sys.path.insert(0, CRYMEPIPELINES_PATH)

#build spark session
APP_NAME = 'CRYME_PIPELINE_DEV'
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc).builder.appName(APP_NAME).getOrCreate()


In [2]:
# import cyrmepipelines app modules
import csv
from datetime import datetime, timedelta
import os
import pickle as p
import shutil

from shared.objects.samples import SamplesManager
from shared.settings import CF_TRUST_DELAY, START_DATE, cf_conn, cp_conn, TMP_DIR, BIN_DIR
from tasks.base import SparkCrymeTask, NativeCrymeTask
from utils import crime_occ_udf, ts_to_minutes_in_day_udf, ts_to_hour_of_day_udf, ts_to_day_of_week_udf, ts_conv, safety_rel_crimes
from tasks.mixins import SearchForCrimesMixin

In [3]:
raw_crime_incidents = SparkCrymeTask(spark).load_df_from_crymefeeder("incidents")

In [4]:
raw_crime_incidents.count()

1959459

In [5]:
raw_crime_incidents.show(1)

+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+--------------------+------------------+--------------------+-----------------+------------------+-------+---------+------+--------+--------+--------+--------------------+------------+--------------------+--------------------+---------+--------------------+--------------------+-------+---------+--------------------+--------------------+-----------+------+-----------+--------+--------+------------+--------+--------------------+--------------+
|:@computed_region_2dna_qi2s|:@computed_region_k96s_3jcv|:@computed_region_kqwf_mjcx|:@computed_region_qz3q_ghft|:@computed_region_tatf_ua23|:@computed_region_ur2y_g4cx|         :created_at|               :id|         :updated_at|         :version|               _id|area_id|area_name|crm_cd|crm_cd_1|crm_cd_2|crm_cd_3|         crm_cd_desc|cross_street|            date_occ|           

Thats a lotta data, lets just look at rows with "date occ" within the last 30 days.

In [6]:
crime_incidents = raw_crime_incidents.withColumn('date_occ', ts_conv(raw_crime_incidents.date_occ))
crime_incidents = crime_incidents.filter(crime_incidents.date_occ > datetime.now().date() - timedelta(days=30))

#### Crime Types

In [7]:
by_type = crime_incidents.groupBy('crm_cd_desc').agg({'_id': 'count', 'crm_cd': 'first'}).orderBy("count(_id)")
by_type.show(by_type.count(), False)

+--------------------------------------------------------+----------+-------------+
|crm_cd_desc                                             |count(_id)|first(crm_cd)|
+--------------------------------------------------------+----------+-------------+
|DISHONEST EMPLOYEE - GRAND THEFT                        |1         |345          |
|EMBEZZLEMENT, PETTY THEFT ($950 & UNDER)                |1         |670          |
|ILLEGAL DUMPING                                         |1         |949          |
|DRIVING WITHOUT OWNER CONSENT (DWOC)                    |1         |433          |
|SEX,UNLAWFUL(INC MUTUAL CONSENT, PENETRATION W/ FRGN OBJ|1         |810          |
|TILL TAP - PETTY ($950 & UNDER)                         |1         |471          |
|PIMPING                                                 |1         |805          |
|CHILD STEALING                                          |1         |922          |
|BIKE - ATTEMPTED STOLEN                                 |1         |485    

So not all of these seem like real "safety" threats, for example "Theft of Identity". I think I should create a datastructure containing all crime_cd_descs or crime_cds that could be considered violent or threatening. (Added to utils.py)

In [15]:
crime_incidents = crime_incidents.filter(crime_incidents.crm_cd.isin(list(safety_rel_crimes.keys())))

Finally, there are a bunch of columns that dont seem too interesting so lets drop those and flatten the coordinates.

In [22]:
crime_incidents = crime_incidents.withColumn('Latitude', crime_incidents.location_1.coordinates[0])
crime_incidents = crime_incidents.withColumn('Longitude', crime_incidents.location_1.coordinates[1])

In [23]:
crime_incidents.first()

Row(:@computed_region_2dna_qi2s='1', :@computed_region_k96s_3jcv='492', :@computed_region_kqwf_mjcx='11', :@computed_region_qz3q_ghft='23448', :@computed_region_tatf_ua23='943', :@computed_region_ur2y_g4cx='1', :created_at='2019-05-01T13:41:45.118Z', :id='row-223w_rk24-nys2', :updated_at='2019-05-01T13:48:47.496Z', :version='rv-ewhu.6nd5_2v5a', _id='row-223w_rk24-nys2', area_id='04', area_name='Hollenbeck', crm_cd='510', crm_cd_1='510', crm_cd_2=None, crm_cd_3=None, crm_cd_desc='VEHICLE - STOLEN', cross_street=None, date_occ=datetime.datetime(2019, 4, 13, 0, 0), date_rptd='2019-04-14T00:00:00.000', dr_no='190408547', location='600    MOULTON                      AV', location_1=Row(type='Point', coordinates=[-118.2189, 34.066]), mocodes=None, premis_cd='108', premis_desc='PARKING LOT', record_inserted_at=datetime.datetime(2019, 5, 6, 7, 49, 53, 400000), rpt_dist_no='0421', status='IC', status_desc='Invest Cont', time_occ='2330', vict_age='0', vict_descent=None, vict_sex=None, weapon_de

In [25]:
crime_incidents = crime_incidents.select(['_id', 'crm_cd', 'crm_cd_desc', 'date_occ', 'time_occ', 'premis_desc', 'longitude', 'latitude'])

Row(_id='row-223w_rk24-nys2', crm_cd='510', crm_cd_desc='VEHICLE - STOLEN', date_occ=datetime.datetime(2019, 4, 13, 0, 0), time_occ='2330', longitude=34.066, latitude=-118.2189)