# NYC Job to Load S3 Data to Staging DB - RDS PostgreSQL


####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 5.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3
import os
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import coalesce, col, round, to_date
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Initialize a GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

s3_client = boto3.client('s3')

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.7 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 5.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: ce15daf1-2a75-4e37-bbe4-dca24ed6303e
Applying the following default arguments:
--glue_kernel_version 1.0.7
--enable-glue-datacatalog true
Waiting for session ce15daf1-2a75-4e37-bbe4-dca24ed6303e to get into ready status...
Session ce15daf1-2a75-4e37-bbe4-dca24ed6303e ha

#### Setup necessary place holders and configuration details


In [2]:
# S3 path to load

source_files = [
    's3://xxxx/nyc_sourcedata/PayrollData/',
    's3://xxxx/nyc_sourcedata/EmployeeData/',
    's3://xxxx/nyc_sourcedata/TitleData/',
    's3://xxxx/nyc_sourcedata/AgencyData/'
]

archive_files = [
    's3://xxxx/nyc_archivedata/PayrollData/',
    's3://xxxx/nyc_archivedata/EmployeeData/',
    's3://xxxx/nyc_archivedata/TitleData/',
    's3://xxxx/nyc_archivedata/AgencyData/'
]

bucket_name = 'xxxx'
s3_file_path = 'env_file/config.json'
local_file_path = 'config.json'

# RDS connection details
try:
    s3_client.download_file(bucket_name, s3_file_path, local_file_path)
    print(f"Downloaded {s3_file_path} from S3 bucket {bucket_name} to {local_file_path}")
except Exception as e:
    print(f"Error downloading file from S3: {e}")
    raise

with open(local_file_path) as config_file:
    config = json.load(config_file)

rds_user = config['rds_user']
rds_password = config['rds_password']
rds_host = config['rds_host']
rds_port = config['rds_port']
rds_db_name = config['rds_db_name']
rds_schema = config.get('rds_schema')

# JDBC URL for PostgreSQL
jdbc_url = f'jdbc:postgresql://{rds_host}:{rds_port}/{rds_db_name}'

# Valid Table Details
employee_table_name = 'Employee'
payroll_table_name = 'Payroll'
title_table_name = 'Title'
agency_table_name = 'Agency'

full_employee_table_name = f"{rds_schema}.{employee_table_name}"
full_payroll_table_name = f"{rds_schema}.{payroll_table_name}"
full_title_table_name = f"{rds_schema}.{title_table_name}"
full_agency_table_name = f"{rds_schema}.{agency_table_name}"

# Invalid Table Details
payroll_issue = 'payroll_data_issues'
payroll_issues = f"{rds_schema}.{payroll_issue}"

Downloaded env_file/config.json from S3 bucket cnytolubckt to config.json


#### Read the Data in S3 and convert to AWS DynamicFrame

In [3]:
dynamic_frames = {}
data_frames = {}

for i, path in enumerate(source_files, start=1):
    # Create each DynamicFrame
    dynamic_frame = glueContext.create_dynamic_frame.from_options(
        connection_type="s3",
        connection_options={"paths": [path]},
        format="csv",
        format_options={"withHeader": True}
    )
    
    # Store each AWS DynamicFrame in the dictionary
    dynamic_frames[f"dynamic_frame_{i}"] = dynamic_frame
    
    # Convert to python DataFrame and store in the dictionary
    data_frames[f"dataframe_{i}"] = dynamic_frame.toDF()
    



#### Convert aws DynamicFrame to python DataFrae

#### Configure the JDBC properties

In [4]:
# Configure the JDBC properties
jdbc_properties = {
    "user": rds_user,
    "password": rds_password,
    "driver": "org.postgresql.Driver"
}




## Transformations

#### Payroll Transformation

In [None]:
payroll_df = data_frames["dataframe_1"]

# Handle agencyid and agencycode columns
if "agencycode" in payroll_df.columns and "agencyid" in payroll_df.columns:
    # Both exist: merge into agencyid
    payroll_df = payroll_df.withColumn('agencyid', coalesce(col("agencyid"), col("agencycode"))).drop("agencycode")
elif "agencycode" in payroll_df.columns:
    # Only agencycode exists: rename to agencyid
    payroll_df = payroll_df.withColumnRenamed("agencycode", "agencyid")

payroll_df = payroll_df.withColumn('fiscalyear', col('fiscalyear').cast('int')) \
                       .withColumn('payrollnumber', col('payrollnumber').cast('int')) \
                       .withColumn('agencyid', col('agencyid').cast('int')) \
                       .withColumn('agencyname', col('agencyid').cast('int')) \
                       .withColumn('employeeid', col('employeeid').cast('string')) \
                       .withColumn('lastname', col('lastname').cast('string')) \
                       .withColumn('firstname', col('firstname').cast('string')) \
                       .withColumn('agencystartdate', to_date(col('agencystartdate'), 'MM/dd/yyyy')) \
                       .withColumn('titlecode', col('titlecode').cast('string')) \
                       .withColumn('titledescription', col('titledescription').cast('string')) \
                       .withColumn('leavestatusasofjune30', col('leavestatusasofjune30').cast('string')) \
                       .withColumn('basesalary', round(col('basesalary').cast('double'), 2)) \
                       .withColumn('paybasis', col('paybasis').cast('string')) \
                       .withColumn('regularhours', col('regularhours').cast('double')) \
                       .withColumn('regulargrosspaid', round(col('regulargrosspaid').cast('double'), 2)) \
                       .withColumn('othours', col('othours').cast('double')) \
                       .withColumn('totalotpaid', round(col('totalotpaid').cast('double'), 2)) \
                       .withColumn('totalotherpay', round(col('totalotherpay').cast('double'), 2)) \
                       .withColumn('worklocationborough', col('worklocationborough').cast('string'))

# Filter rows with NULL or 0 values in RegularGrossPaid or RegularHours
payroll_invalid_df = payroll_df.filter(
    (col("RegularGrossPaid").isNull() | (col("RegularGrossPaid") == 0)) |
    (col("RegularHours").isNull() | (col("RegularHours") == 0))
)

# Filter valid rows
payroll_valid_df = payroll_df.filter(
    ~((col("RegularGrossPaid").isNull() | (col("RegularGrossPaid") == 0)) |
    (col("RegularHours").isNull() | (col("RegularHours") == 0)))
)

#### Employee Transformation

In [5]:
employee_df = data_frames["dataframe_2"]

employee_df = employee_df.withColumn("EmployeeID", col("EmployeeID").cast('string')) \
                         .withColumn("LastName", col("LastName").cast('string')) \
                         .withColumn("FirstName", col("FirstName").cast('string'))




#### Title Transformation

In [None]:
title_df = data_frames["dataframe_3"]

title_df = title_df.withColumn("TitleCode", col("TitleCode").cast('string')) \
                   .withColumn("TitleDescription", col("TitleDescription").cast('string'))

#### Agency Transformation

In [None]:
agency_df = data_frames["dataframe_4"]

agency_df = agency_df.withColumn("AgencyID", col("AgencyID").cast('int')) \
                     .withColumn("AgencyName", col("AgencyName").cast('string'))

#### Write DataFrame to RDS using the JDBC connection

In [None]:
# Write DataFrame to RDS - Employee Table - using the JDBC connection
try:
    print(f"Reading data from {source_files[1]}")
    print(f"Connecting to RDS: {rds_host}")
    if not employee_df.rdd.isEmpty():
        employee_df.write.jdbc(
            url=jdbc_url,
            table=full_employee_table_name,
            mode="append",
            properties=jdbc_properties
        )
        print(f"Valid Data successfully written to RDS table {full_employee_table_name}")
except Exception as e:
    print(f"Error: {str(e)}")

In [None]:
# Write DataFrame to RDS - Title Table - using the JDBC connection
try:
    print(f"Reading data from {source_files[2]}")
    if not title_df.rdd.isEmpty():
        title_df.write.jdbc(
            url=jdbc_url,
            table=full_title_table_name,
            mode="append",
            properties=jdbc_properties
        )
        print(f"Valid Data successfully written to RDS table {full_title_table_name}")
except Exception as e:
    print(f"Error: {str(e)}")

In [None]:
# Write DataFrame to RDS - Agency Table - using the JDBC connection
try:
    print(f"Reading data from {source_files[3]}")
    if not agency_df.rdd.isEmpty():
        agency_df.write.jdbc(
            url=jdbc_url,
            table=full_agency_table_name,
            mode="append",
            properties=jdbc_properties
        )
        print(f"Valid Data successfully written to RDS table {full_agency_table_name}")
except Exception as e:
    print(f"Error: {str(e)}")

In [None]:
# Write DataFrame to RDS - Payroll Table - using the JDBC connection
try:
    print(f"Reading data from {source_files[0]}")
    print(f"Connecting to RDS: {rds_host}")
    if not payroll_valid_df.rdd.isEmpty():
        payroll_valid_df.write.jdbc(
            url=jdbc_url,
            table=full_payroll_table_name,
            mode="overwrite",
            properties=jdbc_properties
        )
        print(f"Valid Data successfully written to RDS table {full_payroll_table_name}")
        
    if not payroll_invalid_df.rdd.isEmpty():
        payroll_invalid_df.write.jdbc(
            url=jdbc_url,
            table=payroll_issues,
            mode="overwrite",
            properties=jdbc_properties
        )
        print(f"Invalid Payroll Data successfully written to RDS table {payroll_issues}")
    
except Exception as e:
    print(f"Error: {str(e)}")

#### House Keeping - Move processed files to Archive Folder

In [None]:
# Function to move files between folders
def move_s3_files(source_folder, archive_folder):
    # Extract bucket name and folder paths
    bucket_name = source_folder.split('/')[2]
    source_prefix = '/'.join(source_folder.split('/')[3:])
    archive_prefix = '/'.join(archive_folder.split('/')[3:])
    
    # List all objects in the source folder
    response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=source_prefix)
    if 'Contents' in response:
        for obj in response['Contents']:
            source_key = obj['Key']
            
            # Skip folders (S3 "folders" are keys that end with a '/')
            if source_key.endswith('/'):
                continue

            file_name = os.path.basename(source_key)  # Get the file name
            destination_key = f"{archive_prefix}{file_name}"

            try:
                # Copy the file to the archive folder
                s3_client.copy_object(
                    Bucket=bucket_name,
                    CopySource={'Bucket': bucket_name, 'Key': source_key},
                    Key=destination_key
                )
                
                # Delete the file from the source folder
                s3_client.delete_object(Bucket=bucket_name, Key=source_key)
                print(f"Moved {source_key} to {destination_key}")
            
            except Exception as e:
                print(f"Error moving {source_key}: {str(e)}")
    else:
        print(f"No files found in {source_folder}")

In [None]:
# Run the function
for source_folder, archive_folder in zip(source_files, archive_files):
    move_s3_files(source_folder, archive_folder)