# Testing the Spark Batch File

This spark batch file was tested locally using a smaller version of the crash data stored in the s3 bucket along with the larger file. This jupyter notebook was run in the public docker container image called: `jupyter/pyspark-notebook`.


In [1]:
from pathlib import Path
import os
import sys
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

# Append the current working directory to the system path to import the updated script
sys.path.append(str(Path.cwd().parent))

# Import the function from the updated script
from src.spark_clustering import load_and_process_crash_data
import config

AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')

# Initialize Spark session with AWS credentials
conf = SparkConf() \
    .setAppName("CrashDataProcessor") \
    .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .set("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \
    .set("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY_ID) \
    .set("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY) \
    .set("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .set("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.1")

sc = SparkContext(conf=conf)
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

# Set logging level to reduce verbosity
spark.sparkContext.setLogLevel("ERROR")

################################## run batch file on data ##################################

def run():
    #'s3a://public-crash-data/clean-data/'
    S3_OUTPUT_PATH = config.S3_OUTPUT_DIR 
    S3_OUTPUT_PATH_EDIT = S3_OUTPUT_PATH[:2] + 'a' + S3_OUTPUT_PATH[2:] # add an 'a' to the s3 url
    #'s3a://public-crash-data/raw-data/combined_cleaned_group_crash.csv'
    S3_INPUT_URL = config.S3_RAW_DATA_URL 
    S3_INPUT_URL_EDIT = S3_INPUT_URL[:2] + 'a' + S3_INPUT_URL[2:] # add an 'a' to the s3 url
    
    # load and process kmeans model
    crash_data_object = load_and_process_crash_data(spark, S3_INPUT_URL_EDIT)
    crash_data_object.assemble_features()
    # Run KMeans clustering
    crash_data_clustered = crash_data_object.KMeans_model()
    
    # Compute fatality rate and save results to S3
    crash_data_clustered.compute_fatality_rate(
        cluster_col='kmeans_cluster',
        save_to_s3=True,
        s3_path=S3_OUTPUT_PATH_EDIT
    )

run()

AWS_ACCESS_KEY_ID: AKIAQ3EGPHEFWG5P2WSX
+--------+---+----------+-----------+------------+---+---+---+---+---+---+---+--------------------+
|       0|  1|         2|          3|           4|  5|  6|  7|  8|  9| 10| 11|            features|
+--------+---+----------+-----------+------------+---+---+---+---+---+---+---+--------------------+
|15657177|  N|06/02/2019|32.28023063|-97.74661628|  0|  0|  0|  0|  1|  0|  0|[32.28023063,-97....|
|16406486|  N|05/09/2019|33.46381977|-94.41472753|  0|  0|  0|  1|  1|  0|  0|[33.46381977,-94....|
|16473665|  N|06/15/2019|30.66068326|-93.89387604|  0|  1|  0|  1|  0|  1|  0|[30.66068326,-93....|
|16871051|  N|06/12/2019|33.20390036|-96.59665891|  0|  0|  0|  2|  0|  0|  0|[33.20390036,-96....|
|16995273|  N|05/01/2019|29.78455155| -95.5627439|  0|  0|  1|  6|  0|  1|  0|[29.78455155,-95....|
|17028441|  N|04/24/2019|32.78551402|-96.91866541|  0|  0|  0|  3|  0|  0|  0|[32.78551402,-96....|
|17028486|  N|04/24/2019| 32.9076668|-96.57670095|  0|  0|  