### Importing libraries

In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import functions as F
from pyspark.sql import types

#### Configuration variables

In [3]:
credentials_location = "mage-oregon-crime/oregon-crime-e130b790c75d.json"

conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", "lib/gcs-connector-hadoop3-2.2.5.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

#### Creating the context with the credentials

In [4]:
sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

your 131072x1 screen size is bogus. expect trouble
24/04/13 15:51:15 WARN Utils: Your hostname, dashel resolves to a loopback address: 127.0.1.1; using 172.31.179.36 instead (on interface eth0)
24/04/13 15:51:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/04/13 15:51:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


#### Creating Spark session

In [5]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

#### Schema for the oregon crime data. 

In [6]:
schema = types.StructType([
    types.StructField('_c0', types.StringType(), True), 
    types.StructField('Address', types.StringType(), True), 
    types.StructField('CaseNumber', types.StringType(), True), 
    types.StructField('CrimeAgainst', types.StringType(), True), 
    types.StructField('Neighborhood', types.StringType(), True), 
    types.StructField('OccurDate', types.StringType(), True), 
    types.StructField('OccurTime', types.Type(), True), 
    types.StructField('OffenseCategory', types.StringType(), True), 
    types.StructField('OffenseType', types.StringType(), True), 
    types.StructField('OpenDataLat', types.StringType(), True), 
    types.StructField('OpenDataLon', types.StringType(), True), 
    types.StructField('ReportDate', types.TimestampType(), True), 
    types.StructField('OffenseCount', types.IntegerType(), True)
    ])

#### Loading data for the Uniform Crime Stats

In [7]:
df_uni_arrests = spark.read.parquet('gs://oregon-crime-bucket/uniform_data/arrests_to_2024.parquet')
df_uni_leoka = spark.read.parquet('gs://oregon-crime-bucket/uniform_data/leoka_to_2024.parquet')
df_uni_offenses = spark.read.parquet('gs://oregon-crime-bucket/uniform_data/offences_to_2024.parquet')
df_uni_victims = spark.read.parquet('gs://oregon-crime-bucket/uniform_data/victims_to_2024.parquet')

                                                                                

#### Loading Oregon Crime data

In [8]:
df_crime_data = spark.read.option('header', 'true').schema(schema).csv('gs://oregon-crime-bucket/crime_data_csv/oregon_crime_2015_2023.csv')

#### Dropping first column that are Indexees comming from the csv file

In [9]:
df_crime_data = df_crime_data.drop('_c0')

In [10]:
df_crime_data.show(5)

+-------+-----------+------------+---------------+----------+---------+--------------------+--------------------+-----------+-----------+-------------------+------------+
|Address| CaseNumber|CrimeAgainst|   Neighborhood| OccurDate|OccurTime|     OffenseCategory|         OffenseType|OpenDataLat|OpenDataLon|         ReportDate|OffenseCount|
+-------+-----------+------------+---------------+----------+---------+--------------------+--------------------+-----------+-----------+-------------------+------------+
|   NULL| 15-X197430|      Person|       Piedmont|2015-05-12| 14:00:00|    Assault Offenses|        Intimidation|       NULL|       NULL|2015-05-12 00:00:00|           1|
|   NULL|15-X4282999|      Person|   Buckman West|2015-05-01| 21:43:00|    Assault Offenses|      Simple Assault|       NULL|       NULL|2015-05-01 00:00:00|           1|
|   NULL|15-X4283033|      Person|University Park|2015-05-01| 16:25:00|    Assault Offenses|      Simple Assault|       NULL|       NULL|2015-05-

#### Create new column with OccurData and OccurTime

In [11]:
df_crime_data = df_crime_data.withColumn('OccurDateTime', F.concat(df_crime_data['OccurDate'], df_crime_data['OccurTime']) )

In [12]:
df_crime_data.printSchema()

root
 |-- Address: string (nullable = true)
 |-- CaseNumber: string (nullable = true)
 |-- CrimeAgainst: string (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- OccurDate: string (nullable = true)
 |-- OccurTime: string (nullable = true)
 |-- OffenseCategory: string (nullable = true)
 |-- OffenseType: string (nullable = true)
 |-- OpenDataLat: string (nullable = true)
 |-- OpenDataLon: string (nullable = true)
 |-- ReportDate: timestamp (nullable = true)
 |-- OffenseCount: integer (nullable = true)
 |-- OccurDateTime: string (nullable = true)



In [13]:
df_crime_data = df_crime_data.withColumn('OccurDateTime', F.to_timestamp("OccurDateTime", "yyyy-MM-ddHH:mm:ss"))

In [14]:
# Drop OccurData and OccurTime columns
df_crime_data = df_crime_data.drop('OccurDate', 'OccurTime')

In [15]:
df_crime_data.show(3)

+-------+-----------+------------+---------------+----------------+--------------+-----------+-----------+-------------------+------------+-------------------+
|Address| CaseNumber|CrimeAgainst|   Neighborhood| OffenseCategory|   OffenseType|OpenDataLat|OpenDataLon|         ReportDate|OffenseCount|      OccurDateTime|
+-------+-----------+------------+---------------+----------------+--------------+-----------+-----------+-------------------+------------+-------------------+
|   NULL| 15-X197430|      Person|       Piedmont|Assault Offenses|  Intimidation|       NULL|       NULL|2015-05-12 00:00:00|           1|2015-05-12 14:00:00|
|   NULL|15-X4282999|      Person|   Buckman West|Assault Offenses|Simple Assault|       NULL|       NULL|2015-05-01 00:00:00|           1|2015-05-01 21:43:00|
|   NULL|15-X4283033|      Person|University Park|Assault Offenses|Simple Assault|       NULL|       NULL|2015-05-01 00:00:00|           1|2015-05-01 16:25:00|
+-------+-----------+------------+------

### Cheking Data sets

In [16]:
df_crime_data.printSchema()

root
 |-- Address: string (nullable = true)
 |-- CaseNumber: string (nullable = true)
 |-- CrimeAgainst: string (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- OffenseCategory: string (nullable = true)
 |-- OffenseType: string (nullable = true)
 |-- OpenDataLat: string (nullable = true)
 |-- OpenDataLon: string (nullable = true)
 |-- ReportDate: timestamp (nullable = true)
 |-- OffenseCount: integer (nullable = true)
 |-- OccurDateTime: timestamp (nullable = true)



In [17]:
df_crime_data.show(5)

+-------+-----------+------------+---------------+--------------------+--------------------+-----------+-----------+-------------------+------------+-------------------+
|Address| CaseNumber|CrimeAgainst|   Neighborhood|     OffenseCategory|         OffenseType|OpenDataLat|OpenDataLon|         ReportDate|OffenseCount|      OccurDateTime|
+-------+-----------+------------+---------------+--------------------+--------------------+-----------+-----------+-------------------+------------+-------------------+
|   NULL| 15-X197430|      Person|       Piedmont|    Assault Offenses|        Intimidation|       NULL|       NULL|2015-05-12 00:00:00|           1|2015-05-12 14:00:00|
|   NULL|15-X4282999|      Person|   Buckman West|    Assault Offenses|      Simple Assault|       NULL|       NULL|2015-05-01 00:00:00|           1|2015-05-01 21:43:00|
|   NULL|15-X4283033|      Person|University Park|    Assault Offenses|      Simple Assault|       NULL|       NULL|2015-05-01 00:00:00|           1|2

In [18]:
df_uni_arrests.show(5)

[Stage 7:>                                                          (0 + 1) / 1]

+----------------+------------------+---------------+--------------------+----------+----------+-------------------+-----------------------+------------------+------------------------+
|     agency_name|arrestee_age_group|arrestee_gender|       arrestee_race|    county|      date|nibrs_crime_against|nibrs_crime_description|nibrs_report_title|distinct_arrest_offenses|
+----------------+------------------+---------------+--------------------+----------+----------+-------------------+-----------------------+------------------+------------------------+
|Beaverton PD MIP|              0-10|           Male|Black or African ...|Washington|2021-10-14|             Person|         Simple Assault|    Simple Assault|                       1|
|     Bend PD MIP|              0-10|         Female|             Unknown| Deschutes|2020-08-04|              Other|   Oregon Specific C...|            Part 3|                       1|
|     Bend PD MIP|              0-10|         Female|               White| 

                                                                                

In [19]:
df_uni_leoka.show(5)

+--------------------+---------+------------+---------------------------+--------------------+-----------------------+------------------+----------------+-------------+-----------+------------------------+
|         agency_name|   county|incidentdate|leoka_assignment_type_nibrs|  leoka_circumstance|nibrs_crime_description|nibrs_report_title|victim_age_group|victim_gender|victim_race|distinct_offense_victims|
+--------------------+---------+------------+---------------------------+--------------------+-----------------------+------------------+----------------+-------------+-----------+------------------------+
|        Clackamas SO|Clackamas|  2023-06-10|       One Man Vehicle -...|Robberies in Prog...|         Simple Assault|    Simple Assault|              20|       Female|      White|                       1|
|      Gresham PD MIP|Multnomah|  2021-10-24|           Other - Assisted|           All Other|     Aggravated Assault|Aggravated Assault|              20|         Male|    Unkn

                                                                                

In [20]:
df_uni_victims.show(5)

[Stage 9:>                                                          (0 + 1) / 1]

+-----------+------+------------+-------------------+-----------------------+------------------+----------------+-------------+-----------+----------+------------------------+
|agency_name|county|incidentdate|nibrs_crime_against|nibrs_crime_description|nibrs_report_title|victim_age_group|victim_gender|victim_race|victimtype|distinct_offense_victims|
+-----------+------+------------+-------------------+-----------------------+------------------+----------------+-------------+-----------+----------+------------------------+
|  Marion SO|Marion|  2020-01-02|              Other|        OUCR Only Crime|            Part 3|            NULL|         NULL|       NULL|      NULL|                       0|
|  Marion SO|Marion|  2020-01-03|              Other|        OUCR Only Crime|            Part 3|            NULL|         NULL|       NULL|      NULL|                       0|
|  Marion SO|Marion|  2020-01-08|              Other|        OUCR Only Crime|            Part 3|            NULL|       

                                                                                

## Data transformation

### Victims dataset

In [21]:
# Eliminating rows where there ar no victims
df_uni_victims = df_uni_victims.filter(df_uni_victims["distinct_offense_victims"] != 0)

In [22]:
df_uni_victims.show(5)

[Stage 10:>                                                         (0 + 1) / 1]

+-----------+------+------------+-------------------+-----------------------+------------------+----------------+-------------+--------------------+----------+------------------------+
|agency_name|county|incidentdate|nibrs_crime_against|nibrs_crime_description|nibrs_report_title|victim_age_group|victim_gender|         victim_race|victimtype|distinct_offense_victims|
+-----------+------+------------+-------------------+-----------------------+------------------+----------------+-------------+--------------------+----------+------------------------+
|  Albany PD|  Linn|  2023-08-15|             Person|     Aggravated Assault|Aggravated Assault|            0-10|       Female|Black or African ...|Individual|                       1|
|  Albany PD|  Linn|  2021-01-06|             Person|     Aggravated Assault|Aggravated Assault|            0-10|         Male|  Hispanic or Latino|Individual|                       1|
|  Albany PD|  Linn|  2022-05-18|             Person|     Aggravated Assaul

                                                                                

In [23]:
df_uni_victims.filter(df_uni_victims["distinct_offense_victims"] == 0).count()

0

- No Distinct Offense Victims with value of 0

#### Checking for Null values

In [24]:
df_uni_victims.groupBy('victim_gender').count().show()

[Stage 13:>                                                         (0 + 1) / 1]

+-------------+------+
|victim_gender| count|
+-------------+------+
|         NULL|    16|
|       Female|321499|
|      Unknown|  7977|
|         Male|347796|
|         Null|432735|
+-------------+------+



                                                                                

#### Cleaning Null values

In [25]:
df_uni_victims.filter(df_uni_victims['victim_gender'] == 'Null').count()

                                                                                

432735

In [27]:
df_uni_victims = df_uni_victims.withColumn('victim_gender', F.when(F.col('victim_gender') == 'Null', 'Unknown').otherwise(F.col('victim_gender')))
df_uni_victims = df_uni_victims.fillna(value='Unknown', subset=['victim_gender'])

In [28]:
df_uni_victims.show(5)

[Stage 19:>                                                         (0 + 1) / 1]

+-----------+------+------------+-------------------+-----------------------+------------------+----------------+-------------+--------------------+----------+------------------------+
|agency_name|county|incidentdate|nibrs_crime_against|nibrs_crime_description|nibrs_report_title|victim_age_group|victim_gender|         victim_race|victimtype|distinct_offense_victims|
+-----------+------+------------+-------------------+-----------------------+------------------+----------------+-------------+--------------------+----------+------------------------+
|  Albany PD|  Linn|  2023-08-15|             Person|     Aggravated Assault|Aggravated Assault|            0-10|       Female|Black or African ...|Individual|                       1|
|  Albany PD|  Linn|  2021-01-06|             Person|     Aggravated Assault|Aggravated Assault|            0-10|         Male|  Hispanic or Latino|Individual|                       1|
|  Albany PD|  Linn|  2022-05-18|             Person|     Aggravated Assaul

                                                                                

In [29]:
df_uni_victims.groupBy('victim_gender').count().show()

[Stage 20:>                                                         (0 + 1) / 1]

+-------------+------+
|victim_gender| count|
+-------------+------+
|       Female|321499|
|      Unknown|440728|
|         Male|347796|
+-------------+------+



                                                                                

In [30]:
df_uni_victims.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_uni_victims.columns]).show()

[Stage 23:>                                                         (0 + 1) / 1]

+-----------+------+------------+-------------------+-----------------------+------------------+----------------+-------------+-----------+----------+------------------------+
|agency_name|county|incidentdate|nibrs_crime_against|nibrs_crime_description|nibrs_report_title|victim_age_group|victim_gender|victim_race|victimtype|distinct_offense_victims|
+-----------+------+------------+-------------------+-----------------------+------------------+----------------+-------------+-----------+----------+------------------------+
|          0|     0|           0|                  0|                      0|                 0|               0|            0|         16|         0|                       0|
+-----------+------+------------+-------------------+-----------------------+------------------+----------------+-------------+-----------+----------+------------------------+



                                                                                

In [31]:
df_uni_victims.groupBy('victim_race').count().show()

[Stage 26:>                                                         (0 + 1) / 1]

+--------------------+------+
|         victim_race| count|
+--------------------+------+
|American Indian/A...|  4417|
|  Hispanic or Latino| 54629|
|                NULL|    16|
|Native Hawaiian/O...|  2441|
|             Unknown| 86936|
|               White|478980|
|               Asian| 20115|
|Black or African ...| 29754|
|                Null|432735|
+--------------------+------+



                                                                                

In [32]:
df_uni_victims = df_uni_victims.withColumn('victim_race', F.when(F.col('victim_race') == 'Null', 'Unknown').otherwise(F.col('victim_race')))
df_uni_victims = df_uni_victims.fillna(value='Unknown', subset=['victim_race'])

In [33]:
df_uni_victims.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_uni_victims.columns]).show()

[Stage 29:>                                                         (0 + 1) / 1]

+-----------+------+------------+-------------------+-----------------------+------------------+----------------+-------------+-----------+----------+------------------------+
|agency_name|county|incidentdate|nibrs_crime_against|nibrs_crime_description|nibrs_report_title|victim_age_group|victim_gender|victim_race|victimtype|distinct_offense_victims|
+-----------+------+------------+-------------------+-----------------------+------------------+----------------+-------------+-----------+----------+------------------------+
|          0|     0|           0|                  0|                      0|                 0|               0|            0|          0|         0|                       0|
+-----------+------+------------+-------------------+-----------------------+------------------+----------------+-------------+-----------+----------+------------------------+



                                                                                

In [34]:
df_uni_victims.groupBy('victim_race').count().show()

[Stage 32:>                                                         (0 + 1) / 1]

+--------------------+------+
|         victim_race| count|
+--------------------+------+
|American Indian/A...|  4417|
|  Hispanic or Latino| 54629|
|Native Hawaiian/O...|  2441|
|             Unknown|519687|
|               White|478980|
|               Asian| 20115|
|Black or African ...| 29754|
+--------------------+------+



                                                                                

#### Change IncidentData data type to timestamp

In [35]:
df_uni_victims = df_uni_victims.withColumn('incidentdate', F.to_timestamp("incidentdate", "yyyy-MM-dd"))

In [36]:
df_uni_victims.show(3)

[Stage 35:>                                                         (0 + 1) / 1]

+-----------+------+-------------------+-------------------+-----------------------+------------------+----------------+-------------+--------------------+----------+------------------------+
|agency_name|county|       incidentdate|nibrs_crime_against|nibrs_crime_description|nibrs_report_title|victim_age_group|victim_gender|         victim_race|victimtype|distinct_offense_victims|
+-----------+------+-------------------+-------------------+-----------------------+------------------+----------------+-------------+--------------------+----------+------------------------+
|  Albany PD|  Linn|2023-08-15 00:00:00|             Person|     Aggravated Assault|Aggravated Assault|            0-10|       Female|Black or African ...|Individual|                       1|
|  Albany PD|  Linn|2021-01-06 00:00:00|             Person|     Aggravated Assault|Aggravated Assault|            0-10|         Male|  Hispanic or Latino|Individual|                       1|
|  Albany PD|  Linn|2022-05-18 00:00:00|

                                                                                

### Arrests data null value check

In [37]:
df_uni_arrests.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_uni_arrests.columns]).show()

[Stage 36:>                                                         (0 + 1) / 1]

+-----------+------------------+---------------+-------------+------+----+-------------------+-----------------------+------------------+------------------------+
|agency_name|arrestee_age_group|arrestee_gender|arrestee_race|county|date|nibrs_crime_against|nibrs_crime_description|nibrs_report_title|distinct_arrest_offenses|
+-----------+------------------+---------------+-------------+------+----+-------------------+-----------------------+------------------+------------------------+
|          0|                 0|              0|            0|     0|   0|                  0|                      0|                 0|                       0|
+-----------+------------------+---------------+-------------+------+----+-------------------+-----------------------+------------------+------------------------+



                                                                                

In [38]:
# Change Date column data type to timestamp
df_uni_arrests = df_uni_arrests.withColumn('date', F.to_timestamp("date", "yyyy-MM-dd"))

In [39]:
df_uni_arrests.show(5)

[Stage 39:>                                                         (0 + 1) / 1]

+----------------+------------------+---------------+--------------------+----------+-------------------+-------------------+-----------------------+------------------+------------------------+
|     agency_name|arrestee_age_group|arrestee_gender|       arrestee_race|    county|               date|nibrs_crime_against|nibrs_crime_description|nibrs_report_title|distinct_arrest_offenses|
+----------------+------------------+---------------+--------------------+----------+-------------------+-------------------+-----------------------+------------------+------------------------+
|Beaverton PD MIP|              0-10|           Male|Black or African ...|Washington|2021-10-14 00:00:00|             Person|         Simple Assault|    Simple Assault|                       1|
|     Bend PD MIP|              0-10|         Female|             Unknown| Deschutes|2020-08-04 00:00:00|              Other|   Oregon Specific C...|            Part 3|                       1|
|     Bend PD MIP|            

                                                                                

### LEOKA data null value check

In [40]:
df_uni_leoka.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_uni_leoka.columns]).show()

+-----------+------+------------+---------------------------+------------------+-----------------------+------------------+----------------+-------------+-----------+------------------------+
|agency_name|county|incidentdate|leoka_assignment_type_nibrs|leoka_circumstance|nibrs_crime_description|nibrs_report_title|victim_age_group|victim_gender|victim_race|distinct_offense_victims|
+-----------+------+------------+---------------------------+------------------+-----------------------+------------------+----------------+-------------+-----------+------------------------+
|          0|     0|           0|                          0|                 0|                      0|                 0|               0|           16|         16|                       0|
+-----------+------+------------+---------------------------+------------------+-----------------------+------------------+----------------+-------------+-----------+------------------------+



                                                                                

In [41]:
df_uni_leoka.groupBy(['victim_gender', 'victim_race']).count().show()

+-------------+--------------------+-----+
|victim_gender|         victim_race|count|
+-------------+--------------------+-----+
|         Male|               White| 2323|
|       Female|             Unknown|   27|
|         Male|             Unknown|  307|
|       Female|Black or African ...|   15|
|         NULL|                NULL|   16|
|         Male|Native Hawaiian/O...|   15|
|       Female|               White|  373|
|         Male|Black or African ...|   69|
|         Male|American Indian/A...|    8|
|      Unknown|               White|    6|
|       Female|  Hispanic or Latino|   17|
|         Male|               Asian|   73|
|         Male|  Hispanic or Latino|  152|
|      Unknown|             Unknown|   55|
|       Female|               Asian|   13|
+-------------+--------------------+-----+



                                                                                

In [42]:
df_uni_leoka = df_uni_leoka.fillna(value='Unknown', subset=['victim_gender'])
df_uni_leoka = df_uni_leoka.fillna(value='Unknown', subset=['victim_race'])

In [43]:
df_uni_leoka.groupBy(['victim_gender', 'victim_race']).count().show()

[Stage 46:>                                                         (0 + 1) / 1]

+-------------+--------------------+-----+
|victim_gender|         victim_race|count|
+-------------+--------------------+-----+
|         Male|               White| 2323|
|       Female|             Unknown|   27|
|         Male|             Unknown|  307|
|       Female|Black or African ...|   15|
|         Male|Native Hawaiian/O...|   15|
|       Female|               White|  373|
|         Male|Black or African ...|   69|
|         Male|American Indian/A...|    8|
|      Unknown|               White|    6|
|       Female|  Hispanic or Latino|   17|
|         Male|               Asian|   73|
|         Male|  Hispanic or Latino|  152|
|      Unknown|             Unknown|   71|
|       Female|               Asian|   13|
+-------------+--------------------+-----+



                                                                                

In [44]:
# Change IncidentDate column data type to timestamp
df_uni_leoka = df_uni_leoka.withColumn('incidentdate', F.to_timestamp("incidentdate", "yyyy-MM-dd"))

In [45]:
df_uni_leoka.show(3)

+--------------------+---------+-------------------+---------------------------+--------------------+-----------------------+------------------+----------------+-------------+-----------+------------------------+
|         agency_name|   county|       incidentdate|leoka_assignment_type_nibrs|  leoka_circumstance|nibrs_crime_description|nibrs_report_title|victim_age_group|victim_gender|victim_race|distinct_offense_victims|
+--------------------+---------+-------------------+---------------------------+--------------------+-----------------------+------------------+----------------+-------------+-----------+------------------------+
|        Clackamas SO|Clackamas|2023-06-10 00:00:00|       One Man Vehicle -...|Robberies in Prog...|         Simple Assault|    Simple Assault|              20|       Female|      White|                       1|
|      Gresham PD MIP|Multnomah|2021-10-24 00:00:00|           Other - Assisted|           All Other|     Aggravated Assault|Aggravated Assault|    

### Offenses data null value check

In [46]:
df_uni_offenses.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_uni_offenses.columns]).show()

[Stage 50:>                                                         (0 + 1) / 1]

+-----------+------+------------+-------------------+-----------------------+------------------+-----------------+
|agency_name|county|incidentdate|nibrs_crime_against|nibrs_crime_description|nibrs_report_title|distinct_offenses|
+-----------+------+------------+-------------------+-----------------------+------------------+-----------------+
|          0|     0|           0|                  0|                      0|                 0|                0|
+-----------+------+------------+-------------------+-----------------------+------------------+-----------------+



                                                                                

In [47]:
# Change IncidentData column data type to timestamp
df_uni_offenses = df_uni_offenses.withColumn('incidentdate', F.to_timestamp("incidentdate", "yyyy-MM-dd"))

In [48]:
df_uni_offenses.show(5)

[Stage 53:>                                                         (0 + 1) / 1]

+-----------+------+-------------------+-------------------+-----------------------+------------------+-----------------+
|agency_name|county|       incidentdate|nibrs_crime_against|nibrs_crime_description|nibrs_report_title|distinct_offenses|
+-----------+------+-------------------+-------------------+-----------------------+------------------+-----------------+
|  Albany PD|  Linn|2020-01-01 00:00:00|              Other|   Oregon Specific C...|            Part 3|                1|
|  Albany PD|  Linn|2020-01-02 00:00:00|              Other|   Oregon Specific C...|            Part 3|                5|
|  Albany PD|  Linn|2020-01-03 00:00:00|              Other|   Oregon Specific C...|            Part 3|                1|
|  Albany PD|  Linn|2020-01-04 00:00:00|              Other|   Oregon Specific C...|            Part 3|                3|
|  Albany PD|  Linn|2020-01-05 00:00:00|              Other|   Oregon Specific C...|            Part 3|                5|
+-----------+------+----

                                                                                

### Null values in the crime dataset

In [49]:
df_crime_data.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_crime_data.columns]).show()



+-------+----------+------------+------------+---------------+-----------+-----------+-----------+----------+------------+-------------+
|Address|CaseNumber|CrimeAgainst|Neighborhood|OffenseCategory|OffenseType|OpenDataLat|OpenDataLon|ReportDate|OffenseCount|OccurDateTime|
+-------+----------+------------+------------+---------------+-----------+-----------+-----------+----------+------------+-------------+
|  44967|         0|           0|       17547|              0|          0|      56486|      56486|         0|           0|            0|
+-------+----------+------------+------------+---------------+-----------+-----------+-----------+----------+------------+-------------+



                                                                                

- Null values in Address, Neighborhood and Lat-Lon due to privacy. We keep all data 

In [50]:
print(df_uni_offenses.printSchema())
print(df_uni_leoka.printSchema())
print(df_uni_arrests.printSchema())
print(df_uni_victims.printSchema())

root
 |-- agency_name: string (nullable = true)
 |-- county: string (nullable = true)
 |-- incidentdate: timestamp (nullable = true)
 |-- nibrs_crime_against: string (nullable = true)
 |-- nibrs_crime_description: string (nullable = true)
 |-- nibrs_report_title: string (nullable = true)
 |-- distinct_offenses: long (nullable = true)

None
root
 |-- agency_name: string (nullable = true)
 |-- county: string (nullable = true)
 |-- incidentdate: timestamp (nullable = true)
 |-- leoka_assignment_type_nibrs: string (nullable = true)
 |-- leoka_circumstance: string (nullable = true)
 |-- nibrs_crime_description: string (nullable = true)
 |-- nibrs_report_title: string (nullable = true)
 |-- victim_age_group: string (nullable = true)
 |-- victim_gender: string (nullable = false)
 |-- victim_race: string (nullable = false)
 |-- distinct_offense_victims: long (nullable = true)

None
root
 |-- agency_name: string (nullable = true)
 |-- arrestee_age_group: string (nullable = true)
 |-- arrestee_g

### Upload clean data to GCS

In [51]:
# Save the data to BigQuery
df_crime_data.coalesce(4).write.parquet('gs://oregon-crime-bucket/crime_data_parquet/', mode='overwrite')

                                                                                

In [52]:
df_uni_arrests.write.parquet('gs://oregon-crime-bucket/clean_uniform_data_parquet/arrets/', mode='overwrite')
df_uni_leoka.write.parquet('gs://oregon-crime-bucket/clean_uniform_data_parquet/leoka/', mode='overwrite')
df_uni_offenses.write.parquet('gs://oregon-crime-bucket/clean_uniform_data_parquet/offenses/', mode='overwrite')
df_uni_victims.write.parquet('gs://oregon-crime-bucket/clean_uniform_data_parquet/victims', mode='overwrite')

                                                                                