# Traffic Data ETL Demo

This is a demo ETL process that extracts traffic data stored as CSV in S3, transforms it, and loads it into Redshift.

## Boilerplate

In [72]:
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())
println('test')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Load the data source

In [100]:
traffic = glueContext.create_dynamic_frame.from_catalog(
    database = "traffic",
    table_name = "traffic"
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Drop unwanted fields

In [101]:
traffic = traffic.drop_fields([
    'location',
    'boundaries - zip codes',
    'community areas',
    'census tracts',
    'wards',
    ':@computed_region_awaf_s7ux'
])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Map remaining fields

In [102]:
def reformat_date(record):
    """Change 'date of count' format from mm/dd/yyyy to yyyy-mm-dd."""
    date = record['date of count']
    month, day, year = date.split('/')
    record['date of count'] = year + '-' + month + '-' + day
    return record

def split_vehicle_volume(record):
    """Split 'vehicle volume by each direction of traffic' into
    separate fields for each direction.
    """
    volumes = record['vehicle volume by each direction of traffic']
    direction_volumes = volumes.split('/')
    for direction_volume in direction_volumes:
        if ':' not in direction_volume:
            continue
        direction, volume = direction_volume.split(':')
        volume = int(volume.strip())
        direction = direction.lower()
        if 'north' in direction:
            record['vehicle volume north'] = volume
        elif 'south' in direction:
            record['vehicle volume south'] = volume
        elif 'west' in direction:
            record['vehicle volume west'] = volume
        elif 'east' in direction:
            record['vehicle volume east'] = volume
    return record

# Convert date format and split vehicle volume field by direction
traffic = traffic.map(reformat_date).map(split_vehicle_volume)

# Map field names and types
traffic = traffic.apply_mapping([
    ('id', 'long', 'id', 'long'),
    ('traffic volume count location  address', 'string', 'address', 'string'),
    ('street', 'string', 'street', 'string'),
    ('date of count', 'string', 'date_of_count', 'date'),
    ('total passing vehicle volume', 'long', 'total_passing_vehicle_volume', 'long'),
    ('vehicle volume north', 'int', 'vehicle_volume_north', 'long'),
    ('vehicle volume south', 'int', 'vehicle_volume_south', 'long'),
    ('vehicle volume west', 'int', 'vehicle_volume_west', 'long'),
    ('vehicle volume east', 'int', 'vehicle_volume_east', 'long'),
    ('latitude', 'double', 'latitude', 'double'),
    ('longitude', 'double', 'longitude', 'double'),
    ('zip codes', 'long', 'zip_codes', 'long'),
])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Write data to Redshift

In [104]:
glueContext.write_dynamic_frame.from_jdbc_conf(frame=traffic,
                                               catalog_connection='redshift',
                                               connection_options={'dbtable': 'traffic', 'database': 'dev'},
                                               redshift_tmp_dir='s3://rk-analytics-sandbox/temp-dir/'
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…