# Motor Vehicle Theft
## Raw(JSON) to Staging(DELTA TABLE)
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


In [1]:
%idle_timeout 10
%glue_version 5.0
%worker_type G.1X
%number_of_workers 2

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from pyspark.sql.functions import *
from pyspark.sql.types import * 
from awsglue.dynamicframe import DynamicFrame
import random 
from datetime import datetime
from pyspark.sql import SparkSession

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.7 
Current idle_timeout is None minutes.
idle_timeout has been set to 10 minutes.
Setting Glue version to: 5.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 2
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 2
Idle Timeout: 10
Session ID: 9f0136e1-8909-424b-ba36-ed32797f9d96
Applying the following default arguments:
--glue_kernel_version 1.0.7
--enable-glue-datacatalog true
Waiting for session 9f0136e1-8909-424b-ba36-ed32797f9d96 to get into ready status...
Session 9f0136e1-8909-424b-ba36-ed32797f9d96 has be

In [2]:
todays_date = datetime.today().strftime('%Y-%m-%d')




In [14]:
df = glueContext.create_data_frame_from_catalog(database='motor_theft_vehicles', table_name='raw_motor_theft_vehicles', push_down_predicate = f"report_date = '{todays_date}'")
df.printSchema() 

root
 |-- incident_id: string (nullable = true)
 |-- report_number: string (nullable = true)
 |-- report_datetime: string (nullable = true)
 |-- occurrence_datetime: string (nullable = true)
 |-- addressline: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- vehicle_id: string (nullable = true)
 |-- vehicle_year: string (nullable = true)
 |-- vehicle_color: string (nullable = true)
 |-- licenseplate: string (nullable = true)
 |-- vin: string (nullable = true)
 |-- recovery_status: string (nullable = true)
 |-- recovery_date: string (nullable = true)
 |-- borough: string (nullable = true)
 |-- data_source: string (nullable = true)
 |-- incident_status: string (nullable = true)
 |-- method_of_entry: string (nullable = true)
 |-- source_file_name: string (nullable = true)
 |-- load_timestamp: string (nullable = true)
 |-- report_date: string (nullable = true)


In [15]:
df = df.withColumn('addressline',trim(col('addressline').cast(StringType()))) \
.withColumn('city',trim(col('city').cast(StringType()))) \
.withColumn('incident_id',trim(col('incident_id').cast(StringType()))) \
.withColumn('incident_status',trim(col('incident_status').cast(StringType()))) \
.withColumn('licenseplate',trim(col('licenseplate').cast(StringType()))) \
.withColumn('method_of_entry',trim(col('method_of_entry').cast(StringType()))) \
.withColumn('occurrence_datetime',col('occurrence_datetime').cast(TimestampType())) \
.withColumn('recovery_date',col('recovery_date').cast(TimestampType())) \
.withColumn('recovery_status',trim(col('recovery_status').cast(StringType()))) \
.withColumn('report_datetime',col('report_datetime').cast(TimestampType())) \
.withColumn('report_number',col('report_number').cast(IntegerType())) \
.withColumn('state',trim(col('state').cast(StringType()))) \
.withColumn('vehicle_color',trim(col('vehicle_color').cast(StringType()))) \
.withColumn('vehicle_id',col('vehicle_id').cast(IntegerType())) \
.withColumn('vehicle_year',col('vehicle_year').cast(IntegerType())) \
.withColumn('vin',trim(col('vin').cast(StringType()))) \
.withColumn('zipcode',col('zipcode').cast(IntegerType())) \
.withColumn('source_file_name',trim(col('source_file_name').cast(StringType()))) \
.withColumn('data_source',trim(col('data_source').cast(StringType()))) \
.withColumn('load_timestamp',col('load_timestamp').cast(TimestampType())) \
.withColumn('report_date',col('report_date').cast(DateType()))




In [16]:
df = df.filter((year('report_date') >= 2000) & (year('report_date') < year(current_date()) + 1))




In [17]:
col_renamed_required = {'addressline':'address','licenseplate':'vehicle_license_plate','zipcode':'zip_code','vin':'vehicle_identification_number'}
df = df.withColumnsRenamed(col_renamed_required)




In [18]:
df = df.withColumn('incident_id',upper(col('incident_id')))
df = df.withColumn('incident_status',upper(col('incident_status')))
df = df.withColumn('report_number',concat(lit('LON'),col('report_number')))




In [19]:
df = df.withColumn('occurrence_datetime',date_format(col('occurrence_datetime'),'dd-MM-yyyy HH:mm:ss'))
df = df.withColumn('report_datetime',date_format(col('report_datetime'),'dd-MM-yyyy HH:mm:ss'))
df = df.withColumn('load_timestamp',date_format(col('load_timestamp'),'dd-MM-yyyy HH:mm:ss'))




In [20]:
df = df.dropDuplicates(subset=['incident_id','report_number','report_datetime'])




In [21]:
df = df.fillna('Unknown',subset=['incident_status','method_of_entry'])
df = df.replace({'NA':'Unknown'}, subset=['recovery_status'])




In [22]:
spark = (
    SparkSession.builder
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore")
        .getOrCreate()
)




In [24]:
df.write \
    .format("delta") \
    .mode("append") \
    .partitionBy("report_date") \
    .save("s3://motor-theft-vehicles-bucket/staging/") 



