In [0]:
## connnecting to container

spark.conf.set(
    "fs.azure.account.key.azdestabm001.dfs.core.windows.net",
    dbutils.secrets.get(scope="az-blob-storage-account_key", key="az-de-st-key-01"))

## Data loading and cleaning

In [0]:
raw_mv_file = dbutils.fs.ls(f"abfss://motorvehiclecollisiondata@azdestabm001.dfs.core.windows.net/raw")
raw_mv_file

In [0]:
from pyspark.sql.functions import col, to_date, when, count, month, year, dayofmonth, desc, asc, udf
from pyspark.sql.types import StringType, IntegerType

In [0]:
### reading csv file into spark dataframe

df = spark.read.format('csv').option('header', True).load('abfss://motorvehiclecollisiondata@azdestabm001.dfs.core.windows.net/raw/MotorVehicleCollisionData.csv')
df.show(5)

In [0]:
display(df)

In [0]:
## printing schema

df.printSchema()

In [0]:
df = df.withColumn('ZIP CODE', col('ZIP CODE').cast('int'))
df = df.withColumn('LATITUDE', col('LATITUDE').cast('double'))
df = df.withColumn('LONGITUDE', col('LONGITUDE').cast('double'))
df = df.withColumn('NUMBER OF PERSONS INJURED', col('NUMBER OF PERSONS INJURED').cast('int'))
df = df.withColumn('NUMBER OF PERSONS KILLED', col('NUMBER OF PERSONS KILLED').cast('int'))
df = df.withColumn('NUMBER OF CYCLIST INJURED', col('NUMBER OF CYCLIST INJURED').cast('int'))
df = df.withColumn('NUMBER OF PEDESTRIANS INJURED', col('NUMBER OF PEDESTRIANS INJURED').cast('int'))
df = df.withColumn('NUMBER OF PEDESTRIANS KILLED', col('NUMBER OF PEDESTRIANS KILLED').cast('int'))
df = df.withColumn('NUMBER OF CYCLIST KILLED', col('NUMBER OF CYCLIST KILLED').cast('int'))
df = df.withColumn('NUMBER OF MOTORIST INJURED', col('NUMBER OF MOTORIST INJURED').cast('int'))
df = df.withColumn('NUMBER OF MOTORIST KILLED', col('NUMBER OF MOTORIST KILLED').cast('int'))
df = df.withColumn('COLLISION_ID', col('COLLISION_ID').cast('int'))
df = df.withColumn('CRASH DATE', to_date(col('CRASH DATE'), 'MM/dd/yyyy'))
df.printSchema()

In [0]:
display(df)

In [0]:
## counting number of null values from each column

display(df.select([count(when(col(x).isNull(), True)).alias(x) for x in df.columns]))

In [0]:
# lets drop rows with null crash date and null collision id

df = df.filter(col('CRASH DATE').isNotNull()).filter(col('COLLISION_ID').isNotNull())
display(df.select([count(when(col(x).isNull(), True)).alias(x) for x in df.columns]))

In [0]:
## lets change the name of columns to lower case and replace space with underscore, better for data storage in target table

for col_name in df.columns:

    new_column_name = '_'.join(col_name.split(' ')).lower()
    df = df.withColumnRenamed(col_name, new_column_name)

display(df)

In [0]:
## lets add creash_year, crash_month, crash_date column

df = df.withColumn('crash_month', month(col('crash_date')))\
    .withColumn('crash_year', year(col('crash_date')))\
        .withColumn('crash_day', dayofmonth(col('crash_date')))
display(df)

In [0]:
## lets drop location column as this column is combination of columns, latitude and longitude

df = df.drop(col('location'))\
    .drop(col('vehicle_type_code_3'))\
        .drop(col('vehicle_type_code_4'))\
            .drop(col('vehicle_type_code_5'))\
                .drop(col('contributing_factor_vehicle_2'))\
                    .drop(col('contributing_factor_vehicle_3'))\
                        .drop(col('contributing_factor_vehicle_4'))\
                            .drop(col('contributing_factor_vehicle_5'))
display(df)

## Data Analysis

### 1. Count Of Total Collisions per year

In [0]:
df.groupBy(col('crash_year')).agg({'collision_id': 'count'}).sort('crash_year').show()

### 2. Count Of Total Collisions per month of year

In [0]:
df.groupBy(col('crash_year'), col('crash_month')).agg({'collision_id': 'count'}).sort(['crash_year', 'crash_month']).show()

### 3.Most Accident prone borough.district

In [0]:
df.filter(col('borough').isNotNull()).groupBy(col('borough')).agg({'collision_id': 'count'}).sort(['borough', 'count(collision_id)']).show()

### 4.count of total injured ppl per year

In [0]:
df.filter(col('number_of_persons_injured').isNotNull()).groupBy(col('crash_year')).agg({'number_of_persons_injured': 'sum'}).sort('crash_year').show()

### 5.Number of ppl killed per year in the crashes

In [0]:
df.filter(col('number_of_persons_killed').isNotNull()).groupBy(col('crash_year')).agg({'number_of_persons_killed': 'sum'}).sort('crash_year').show()

### 6.Number of pedestrians killed per year in the crashes

In [0]:
df.filter(col('number_of_pedestrians_killed').isNotNull()).groupBy(col('crash_year')).agg({'number_of_pedestrians_killed': 'sum'}).sort('crash_year').show()

### 7.Analyze vehicle type involved in the crashes

In [0]:
display(df.fillna({'vehicle_type_code_1': 'Unspecified'}).groupBy('vehicle_type_code_1').agg({'collision_id': 'count'}).sort(desc(col('count(collision_id)'))))

One thing we noticed from above analysis is that, due to inconsistency of cases, even though taxi and TAXI are same, they are counted as different, lets create UDF to tacle this prpblem for all string data  

In [0]:
def to_lower_case(str_):
    print(str_)
    if str_ is not None:
        return str_.lower()
string_data_to_lower_case = udf(to_lower_case, StringType())
str_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, StringType)]
print(str_cols)
for col_name in str_cols:
    if col_name == 'crash_time':
        continue
    else:
        df = df.withColumn(col_name, string_data_to_lower_case(col(col_name)))
display(df)

In [0]:
display(df.fillna({'vehicle_type_code_1': 'unspecified'}).groupBy('vehicle_type_code_1').agg({'collision_id': 'count'}).sort(desc(col('count(collision_id)'))))

### 8.Top reasons for crashes

In [0]:
df.fillna({'contributing_factor_vehicle_1': 'unspecified'}).groupBy(col('contributing_factor_vehicle_1')).agg({'collision_id': 'count'}).sort(desc(col('count(collision_id)'))).show()

## Clean data storage into Azure blob storage

In [0]:
### first lets drop unncecessary columns for downstream analysis using PowerBI and fill string null values with 'unspecified' value

display(df)

In [0]:
str_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, StringType)]
for col_ in str_cols:
    if col_ == 'crash_time':
        continue
    else:
        df = df.fillna({col_ : 'unspecified'})
display(df)

In [0]:
df = df.drop(col('crash_time'))\
    .drop(col('cross_street_name'))\
        .drop(col('off_street_name'))
display(df)

In [0]:
df.repartition(1).write.format('com.databricks.spark.csv').option('header', 'true').mode('overwrite').save('abfss://motorvehiclecollisiondata@azdestabm001.dfs.core.windows.net/clean')