# Transform raw datasets with AWS Glue Notebook Interactive Session


####  Set up and start the interactive session


In [2]:
%%configure
{
    'region': 'ap-southeast-2',
    'iam_role': 'arn:aws:iam::877030647703:role/nsw-transport-analytics',
    'idle_timeout' : 2880,
    'glue_version': '5.0',
    'worker_type': 'G.1X',
    'number_of_workers': 5
}

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
The following configurations have been updated: {'region': 'ap-southeast-2', 'iam_role': 'arn:aws:iam::877030647703:role/nsw-transport-analytics', 'idle_timeout': 2880, 'glue_version': '5.0', 'worker_type': 'G.1X', 'number_of_workers': 5}


In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Trying to create a Glue session for the kernel.
Session Type: etl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: b8837427-c3c5-4d84-acff-b58511fa3919
Applying the following default arguments:
--glue_kernel_version 1.0.9
--enable-glue-datacatalog true
Waiting for session b8837427-c3c5-4d84-acff-b58511fa3919 to get into ready status...
Session b8837427-c3c5-4d84-acff-b58511fa3919 has been created.



#### Create a DynamicFrame for raw data tables from AWS Glue Data Catalog


In [2]:
# Create DynamicFrame for raw dataset
raw = glueContext.create_dynamic_frame.from_catalog(database='nsw-transport-raw', table_name='all_modes_csv')
raw.printSchema()

root
|-- year_month: string
|-- card type: string
|-- travel_mode: string
|-- trip: double


In [3]:
# Rename the columns of the raw dataset
raw = raw.rename_field('card type','card_type')
raw.printSchema()

root
|-- year_month: string
|-- travel_mode: string
|-- trip: double
|-- card_type: string


In [4]:
# # Create DynamicFrame for legacy dataset
raw_legacy = glueContext.create_dynamic_frame.from_catalog(database='nsw-transport-raw', table_name='all_modes_legacy_csv')
raw_legacy.printSchema()

root
|-- col0: string
|-- col1: string
|-- col2: string
|-- col3: string


In [5]:
# Rename the columns of the legacy dataset
raw_legacy = raw_legacy.rename_field('col0','year_month')
raw_legacy = raw_legacy.rename_field('col1','card_type')
raw_legacy = raw_legacy.rename_field('col2','travel_mode')
raw_legacy = raw_legacy.rename_field('col3','trip')
raw_legacy.printSchema()

root
|-- year_month: string
|-- card_type: string
|-- travel_mode: string
|-- trip: string


#### Convert the DynamicFrame to a Spark DataFrame for transformation


In [6]:
# Convert to Spark DataFrame for raw dataset
df_raw = raw.toDF()
df_raw.show(5)

+----------+-----------+---------+---------+
|year_month|travel_mode|     trip|card_type|
+----------+-----------+---------+---------+
|   2024-01|      Ferry| 504780.0|    Adult|
|   2024-01| Light Rail|1203585.0|    Adult|
|   2024-01|      Metro| 709103.0|    Adult|
|   2024-01|      Train|9987934.0|    Adult|
|   2024-01|unallocated|      5.0|    Adult|
+----------+-----------+---------+---------+
only showing top 5 rows



In [7]:
# Convert to Spark DataFrame for legacy dataset
df_legacy = raw_legacy.toDF()
df_legacy.show(5)

+----------+-----------+-----------+-------+
|year_month|  card_type|travel_mode|   trip|
+----------+-----------+-----------+-------+
|Year_Month|  Card Type|Travel_Mode|   Trip|
|    Feb-23|      Adult|        Bus|6556480|
|    Jan-23|   Employee|        Bus|  53044|
|    Dec-22|Child/Youth|        Bus| 779633|
|    Dec-22|      Adult|        Bus|6005060|
+----------+-----------+-----------+-------+
only showing top 5 rows


#### Define reusable transformation functions


In [8]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *




In [9]:
# Function to exclude the misread header as first row in DataFrame
def exclude_header(df):
    return df.withColumn("row_num", F.row_number().over(Window.orderBy(F.monotonically_increasing_id()))) \
       .filter("row_num > 1") \
       .drop("row_num")

# Function to cast a single column
def cast_column_dtype(df, col_name, dtype):
    return df.withColumn(col_name, F.col(col_name).cast(dtype))

# Function to cast multiple columns
def cast_df(df, col_list, dtype_list):
    df_casted = df  # start from original df
    for col, dtype in zip(col_list, dtype_list):
        df_casted = cast_column_dtype(df_casted, col, dtype)
    return df_casted

# Function to format year month
def format_year_month(df, col):
    return df.withColumn(
        col,
        F.date_format(
            F.to_date(F.concat(F.col(col), F.lit('-01')), 'yyyy-MM-dd'),
            'MMM-yyyy'
        )
    )

# Function to format short year to full length year
def expand_year_month(df, col):
    return df.withColumn(
        col,
        F.date_format(
            F.to_date(F.col(col), "MMM-yy"),
            "MMM-yyyy"
        )
    )

# Function to add separated year and month columns
def add_date_columns(df):
    return df.withColumn(
        "year_num", 
        F.date_format(F.to_date(F.col("year_month"), "MMM-yyyy"), "yyyy")
    ).withColumn(
        "month_num",
        F.date_format(F.to_date(F.col("year_month"), "MMM-yyyy"), "MM")
    )

# Function to union DataFrames
def union_df(df1, df2):
    return df1.unionByName(df2, allowMissingColumns = False)

# Function to check and remove duplicates
def check_remove_duplicates(df):
    df_no_dups = df.dropDuplicates()
    count_before = df.count()
    count_after = df_no_dups.count()
    print(f"Count before: {count_before}")
    print(f"Count after: {count_after}")
    print(f"Total duplicates removed: {count_before - count_after}")
    return df_no_dups

# Function to check and remove rows with null values
def check_remove_nulls(df):
    null_counts = df.select([  # Count nulls per column
        F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c)
        for c in df.columns
    ])
    print("Null counts per column:")
    null_counts.show() 
    df_no_nulls = df.na.drop() # Remove rows with any nulls
    print(f"Count before: {df.count()}")
    print(f"Count after: {df_no_nulls.count()}")
    print(f"Total rows removed: {df.count() - df_no_nulls.count()}")
    return df_no_nulls

# Function to add season column to DataFrame
def add_season(df):
    return df.withColumn(
        "season",
        F.when(F.col("month_num").isin(12, 1, 2), "Summer")  # Dec, Jan, Feb
         .when(F.col("month_num").isin(3, 4, 5), "Autumn")   # Mar, Apr, May
         .when(F.col("month_num").isin(6, 7, 8), "Winter")   # Jun, Jul, Aug
         .when(F.col("month_num").isin(9, 10, 11), "Spring") # Sep, Oct, Nov
    )

# Function to transform card type to more meaningful metrics
def transform_card_type(df):
    df_transform =  df.withColumn(
        "trip_type",
        F.when(F.col("card_type").contains("Single"), "One Time")
        .otherwise("Card Tap")
    )

    df_transform = df_transform.withColumn(
        "cardtype_new",
        F.when(F.col())
    )




#### Transform raw datasets


In [11]:
# Remove the misread header from legacy dataset
legacy_cleaned = exclude_header(df_legacy)
legacy_cleaned.show(5)

+----------+-----------+-----------+-------+
|year_month|  card_type|travel_mode|   trip|
+----------+-----------+-----------+-------+
|    Feb-23|      Adult|        Bus|6556480|
|    Jan-23|   Employee|        Bus|  53044|
|    Dec-22|Child/Youth|        Bus| 779633|
|    Dec-22|      Adult|        Bus|6005060|
|    Nov-22|Free Travel|        Bus|  29299|
+----------+-----------+-----------+-------+
only showing top 5 rows


In [12]:
# Expand the year to full length on legacy dataset
legacy_formated = expand_year_month(legacy_cleaned, 'year_month')
legacy_formated.show(5)

+----------+-----------+-----------+-------+
|year_month|  card_type|travel_mode|   trip|
+----------+-----------+-----------+-------+
|  Feb-2023|      Adult|        Bus|6556480|
|  Jan-2023|   Employee|        Bus|  53044|
|  Dec-2022|Child/Youth|        Bus| 779633|
|  Dec-2022|      Adult|        Bus|6005060|
|  Nov-2022|Free Travel|        Bus|  29299|
+----------+-----------+-----------+-------+
only showing top 5 rows


In [13]:
# Reformat month_year column of raw dataset
raw_cleaned = format_year_month(df_raw, 'year_month')
raw_cleaned.show(5)

+----------+-----------+---------+---------+
|year_month|travel_mode|     trip|card_type|
+----------+-----------+---------+---------+
|  Jan-2024|      Ferry| 504780.0|    Adult|
|  Jan-2024| Light Rail|1203585.0|    Adult|
|  Jan-2024|      Metro| 709103.0|    Adult|
|  Jan-2024|      Train|9987934.0|    Adult|
|  Jan-2024|unallocated|      5.0|    Adult|
+----------+-----------+---------+---------+
only showing top 5 rows


In [14]:
# Union legacy and raw dataset to make full dataset 
df_union = union_df(legacy_formated, raw_cleaned)
df_union.show()

+----------+--------------------+-----------+-------+
|year_month|           card_type|travel_mode|   trip|
+----------+--------------------+-----------+-------+
|  Feb-2023|               Adult|        Bus|6556480|
|  Jan-2023|            Employee|        Bus|  53044|
|  Dec-2022|         Child/Youth|        Bus| 779633|
|  Dec-2022|               Adult|        Bus|6005060|
|  Nov-2022|         Free Travel|        Bus|  29299|
|  Jun-2022|            Employee|        Bus|  55746|
|  Dec-2021|Adult Single Bus ...|        Bus|    124|
|  Mar-2021|            Employee|        Bus|  62780|
|  Jul-2020|         Child/Youth|        Bus| 863848|
|  Mar-2020|               Adult|        Bus|9373131|
|  Feb-2020|         Free Travel|        Bus|  39465|
|  Oct-2019|Adult Single Bus ...|        Bus|  68763|
|  Sep-2019|Child/Youth Singl...|        Bus|   3586|
|  Feb-2019|Adult Single Bus ...|        Bus|  54290|
|  Mar-2018|Adult Single Bus ...|        Bus| 149812|
|  Oct-2017|Adult Single Bus

In [16]:
df_extra = add_date_columns(df_union)
df_extra.show(5)

+----------+-----------+-----------+-------+--------+---------+
|year_month|  card_type|travel_mode|   trip|year_num|month_num|
+----------+-----------+-----------+-------+--------+---------+
|  Feb-2023|      Adult|        Bus|6556480|    2023|       02|
|  Jan-2023|   Employee|        Bus|  53044|    2023|       01|
|  Dec-2022|Child/Youth|        Bus| 779633|    2022|       12|
|  Dec-2022|      Adult|        Bus|6005060|    2022|       12|
|  Nov-2022|Free Travel|        Bus|  29299|    2022|       11|
+----------+-----------+-----------+-------+--------+---------+
only showing top 5 rows


In [17]:
# Cast correct data type to numeric columns
col_list = ['trip', 'year_num', 'month_num']
dtype_list = [LongType(), IntegerType(), IntegerType()]
df_casted = cast_df(df_extra, col_list, dtype_list)
df_casted.printSchema()

root
 |-- year_month: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- travel_mode: string (nullable = true)
 |-- trip: long (nullable = true)
 |-- year_num: integer (nullable = true)
 |-- month_num: integer (nullable = true)


In [18]:
# Remove duplicates from full data
df_no_dups = check_remove_duplicates(df_casted)

Count before: 6551
Count after: 6542
Total duplicates removed: 9


In [24]:
# Check and remove null values
df_cleaned = check_remove_nulls(df_no_dups)

Null counts per column:
+----------+---------+-----------+----+--------+---------+
|year_month|card_type|travel_mode|trip|year_num|month_num|
+----------+---------+-----------+----+--------+---------+
|         0|        0|          0|  24|       0|        0|
+----------+---------+-----------+----+--------+---------+

Count before: 6542
Count after: 6518
Total rows removed: 24


In [None]:
# Add season column to dataset
df_season = add_season(df_cleaned)
df_season.show(5)

In [None]:
# 

In [25]:
# Reorder, rename neccessary columns and finalise dataset
new_order = ['year_month', 'month_num', 'year_num', 'card_type', 'travel_mode', 'trip']
df_final = df_cleaned.select(new_order) # Reorder columns
df_final = (  # Rename columns
    df_final
    .withColumnRenamed('year_month', 'month_year')
    .withColumnRenamed('trip', 'trip_count')
)
df_final.show(5)

+----------+---------+--------+--------------------+-----------+----------+
|month_year|month_num|year_num|           card_type|travel_mode|trip_count|
+----------+---------+--------+--------------------+-----------+----------+
|  Aug-2019|        8|    2019|         Free Travel|        Bus|     42665|
|  May-2022|        5|    2022|         Child/Youth|        Bus|    550795|
|  Feb-2017|        2|    2017|Adult Single Bus ...|        Bus|     39559|
|  Jan-2019|        1|    2019|    Senior/Pensioner|        Bus|   3970989|
|  May-2019|        5|    2019|    Senior/Pensioner|        Bus|   4586037|
+----------+---------+--------+--------------------+-----------+----------+
only showing top 5 rows


#### Write transformed data in the DynamicFrame to S3 and update AWS Glue Data Catalog


In [28]:
# Make a DyF for final dataset
from awsglue.dynamicframe import DynamicFrame
DyF = DynamicFrame.fromDF(df_final, glueContext, 'transformed_dataset')
DyF.printSchema()

root
|-- month_year: string
|-- month_num: int
|-- year_num: int
|-- card_type: string
|-- travel_mode: string
|-- trip_count: long


In [29]:
# Write DyF to S3 bucket as parquet format
s3output = glueContext.getSink(
  path="s3://nsw-transport-data/transformed",
  connection_type="s3",
  partitionKeys=['year_num', 'month_num'],
  compression="snappy",
  transformation_ctx="s3output",
)
s3output.setFormat("glueparquet")
s3output.writeFrame(DyF)

<awsglue.dynamicframe.DynamicFrame object at 0x7f39f541ead0>
