In [None]:

from pyspark.sql.functions import monotonically_increasing_id, month, year, dayofweek,when, to_date, col

import pandas as pd

In [None]:

# Replace the placeholders with your actual values
storage_account_name = <acount name>
container_name = <container name>
sas_token = <SAS Token>

# Check if the directory is already mounted
if not any(mount.mountPoint == f"/mnt/{container_name}" for mount in dbutils.fs.mounts()):
    try:
        dbutils.fs.mount(
            source=f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/?{sas_token}",
            mount_point=f"/mnt/{container_name}",
            extra_configs={f"fs.azure.sas.{container_name}.{storage_account_name}.blob.core.windows.net": sas_token}
    )
        print("Mount successful")
    except Exception as e:
        print(f"Error: {e}")
# Additional error handling if needed
else:
    print("Directory already mounted")


In [None]:
%fs 
ls "/mnt/<container name>"

In [None]:

# Read a file from a mounted directory
file_path = dbutils.fs.ls("/mnt/<container name>/raw_arrest_data")

df = spark.read.csv("/mnt/<container name>/raw_arrest_data", header=True)
display(df.limit(1))


In [None]:
df = df.dropDuplicates()
df = df.dropna()
df = df.drop(col('New Georeferenced Column'))
df = df.filter(col('ARREST_DATE').isNotNull())

In [None]:
df = df.filter(df['LAW_CAT_CD'].isin(['M', 'F', 'V']))
df = df.filter(df['ARREST_BORO'].isin(['B', 'K', 'M', 'Q', 'S']))
df = df.filter(df['PERP_SEX'].isin(['M', 'F']))
df = df.withColumn('CASE_ID', monotonically_increasing_id())
df = df.withColumn('ARREST_DATE', to_date('ARREST_DATE', 'MM/dd/yyyy'))
display(df.limit(3))
df.count()

In [None]:
arrest_dim = df.select('CASE_ID', 'ARREST_KEY', 'ARREST_DATE', 'ARREST_BORO','ARREST_PRECINCT')

arrest_dim = arrest_dim.withColumn('ARREST_ID', monotonically_increasing_id())
#arrest_dim = arrest_dim.withColumn('ARREST_MONTH', month(col('ARREST_DATE')))
arrest_dim = arrest_dim.withColumn('ARREST_YEAR', year(col('ARREST_DATE')))
arrest_dim = arrest_dim.withColumn('ARREST_MONTH', month(col('ARREST_DATE')))
arrest_dim = arrest_dim.withColumn('ARREST_WEEKDAY', dayofweek(col('ARREST_DATE')))

arrest_dim = arrest_dim.withColumn('ARREST_BORO_NAME'
,when(arrest_dim['ARREST_BORO'] == 'B', 'Bronx')
.when(arrest_dim['ARREST_BORO'] == 'K', 'Brooklyn')
.when(arrest_dim['ARREST_BORO'] == 'M', 'Manhattan')
.when(arrest_dim['ARREST_BORO'] == 'Q', 'Queens')
.when(arrest_dim['ARREST_BORO'] == 'S', 'Staten Island'))

arrest_dim = arrest_dim[['ARREST_ID', 'CASE_ID', 'ARREST_KEY', 'ARREST_DATE', 'ARREST_MONTH', 'ARREST_YEAR', 'ARREST_WEEKDAY', 'ARREST_BORO', 'ARREST_BORO_NAME', 'ARREST_PRECINCT']]

display(arrest_dim.limit(10))
arrest_dim.count()

In [None]:
perp_dim = df.select('CASE_ID', 'PERP_SEX', 'PERP_RACE', 'AGE_GROUP')
perp_dim = perp_dim.withColumn('PERP_ID', monotonically_increasing_id())
perp_dim = perp_dim[['PERP_ID', 'CASE_ID', 'PERP_SEX', 'PERP_RACE', 'AGE_GROUP']]
display(perp_dim.limit(3))
perp_dim.count()
display(perp_dim.limit(10))

In [None]:
offence_dim = df.select('CASE_ID', 'OFNS_DESC', 'PD_DESC', 'PD_CD', 'KY_CD')
offence_dim = offence_dim.withColumn('OFFENCE_ID', monotonically_increasing_id())
offence_dim = offence_dim[['OFFENCE_ID', 'CASE_ID', 'OFNS_DESC', 'PD_DESC', 'PD_CD', 'KY_CD']]
display(offence_dim.limit(10))

In [None]:
location_dim = df.select('CASE_ID', 'X_COORD_CD', 'Y_COORD_CD', 'Latitude', 'Longitude')
location_dim = location_dim.withColumn('LOCATION_ID', monotonically_increasing_id())
location_dim = location_dim[['LOCATION_ID', 'CASE_ID', 'X_COORD_CD', 'Y_COORD_CD', 'Latitude', 'Longitude']]
display(location_dim.limit(10))

In [None]:

law_dim = df.select('CASE_ID', 'LAW_CODE', 'LAW_CAT_CD')
law_dim = law_dim.withColumn('LAW_ID', monotonically_increasing_id())
law_dim = law_dim.withColumn('LAW_CODE_DESC'
,when(law_dim['LAW_CAT_CD'] == 'F', 'Felony')
.when(law_dim['LAW_CAT_CD'] == 'M', 'Misdemeanor')
.when(law_dim['LAW_CAT_CD'] == 'V', 'Violation')
)

law_dim = law_dim[['LAW_ID', 'CASE_ID', 'LAW_CODE', 'LAW_CAT_CD', 'LAW_CODE_DESC']]
display(law_dim.limit(3))

In [None]:
juris_dim = df.select('CASE_ID', 'JURISDICTION_CODE')
juris_dim = juris_dim.withColumn('JURISDICTION_ID', monotonically_increasing_id())
juris_dim = juris_dim.withColumn('JURISDICTION_CODE_DESC'
,when(juris_dim['JURISDICTION_CODE'] == 0, 'Patrol')
.when(juris_dim['JURISDICTION_CODE'] == 1, 'Transit')
.when(juris_dim['JURISDICTION_CODE'] == 2, 'Housing')
.otherwise('Non NYPD')
)

juris_dim = juris_dim[['JURISDICTION_ID', 'CASE_ID', 'JURISDICTION_CODE', 'JURISDICTION_CODE_DESC']]
display(juris_dim.limit(3))

In [None]:


fact_table = df.alias("df") \
    .join(arrest_dim.alias("arrest"), col("df.ARREST_KEY") == col("arrest.ARREST_KEY"), 'inner') \
    .join(perp_dim.alias("perp"), col("df.CASE_ID") == col("perp.CASE_ID"), 'inner') \
    .join(offence_dim.alias("offence"), col("df.CASE_ID") == col("offence.CASE_ID"), 'inner') \
    .join(law_dim.alias("law"), col("df.CASE_ID") == col("law.CASE_ID"), 'inner') \
    .join(location_dim.alias("location"), col("df.CASE_ID") == col("location.CASE_ID"), 'inner') \
    .join(juris_dim.alias("juris"), col("df.CASE_ID") == col("juris.CASE_ID"), 'inner') \
    .select(
        col("df.CASE_ID").alias("CASE_ID"),
        col("arrest.ARREST_ID"),
        col("perp.PERP_ID"),
        col("offence.OFFENCE_ID"),
        col("law.LAW_ID"),
        col("location.LOCATION_ID"),
        col("juris.JURISDICTION_ID")
    )

display(fact_table.limit(10))


In [None]:
dbutils.fs.mkdirs('/mnt/<container name>/transformed_data/')

In [None]:

# Write each DataFrame to its respective CSV file
arrest_dim.write.mode('overwrite').csv('/mnt/<container name>/transformed_data/arrest_dim.csv', header=True)
perp_dim.write.mode('overwrite').csv('/mnt/<container name>/transformed_data/perp_dim.csv', header=True)
offence_dim.write.mode('overwrite').csv('/mnt/<container name>/transformed_data/offence_dim.csv', header=True)
law_dim.write.mode('overwrite').csv('/mnt/<container name>/transformed_data/law_dim.csv', header=True)
location_dim.write.mode('overwrite').csv('/mnt/<container name>/transformed_data/location_dim.csv', header=True)
juris_dim.write.mode('overwrite').csv('/mnt/<container name>/transformed_data/juris_dim.csv', header=True)
fact_table.write.mode('overwrite').csv('/mnt/<container name>/transformed_data/fact_table.csv', header=True)
