# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.5 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: fe962335-b012-40d6-ba2b-0dca0ffd7157
Applying the following default arguments:
--glue_kernel_version 1.0.5
--enable-glue-datacatalog true
Waiting for session fe962335-b012-40d6-ba2b-0dca0ffd7157 to get into ready status...
Session fe962335-b012-40d6-ba2b-0dca0ffd7157 ha

In [15]:
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, avg, when, floor, trim, concat, lit, desc, lag, sum, to_date, concat_ws, date_format
from pyspark.sql import functions as F




In [16]:
s3_path = "s3://live-flight-schedule-data/live_flight_schedule_data.json"

try:
    # Step 1: Load the JSON file using Spark with multiLine option
    # This handles JSON files that are not line-delimited (i.e., single JSON array or object)
    dataframe = spark.read.option("multiLine", "true").json(s3_path)
    
    # Check if DataFrame has data
    if dataframe.rdd.isEmpty():
        print("DataFrame is empty. Please verify the JSON structure and S3 path.")
    else:
        # Step 2: Convert Spark DataFrame to Glue DynamicFrame
        dynamic_frame = DynamicFrame.fromDF(dataframe, glueContext, "dynamic_frame")
        
        # Step 3: Flatten the nested fields using 'apply_mapping'
        flattened_dynamic_frame = dynamic_frame.apply_mapping([
            # Flattening 'airline' nested structure
            ('airline.iataCode', 'string', 'airline_iataCode', 'string'),
            ('airline.icaoCode', 'string', 'airline_icaoCode', 'string'),
            ('airline.name', 'string', 'airline_name', 'string'),

            # Flattening 'arrival' nested structure
            ('arrival.actualRunway', 'string', 'arrival_actualRunway', 'string'),
            ('arrival.actualTime', 'string', 'arrival_actualTime', 'string'),
            ('arrival.baggage', 'string', 'arrival_baggage', 'string'),
            ('arrival.delay', 'string', 'arrival_delay', 'string'),
            ('arrival.estimatedRunway', 'string', 'arrival_estimatedRunway', 'string'),
            ('arrival.estimatedTime', 'string', 'arrival_estimatedTime', 'string'),
            ('arrival.gate', 'string', 'arrival_gate', 'string'),
            ('arrival.iataCode', 'string', 'arrival_iataCode', 'string'),
            ('arrival.icaoCode', 'string', 'arrival_icaoCode', 'string'),
            ('arrival.scheduledTime', 'string', 'arrival_scheduledTime', 'string'),
            ('arrival.terminal', 'string', 'arrival_terminal', 'string'),

            # Flattening 'departure' nested structure
            ('departure.actualRunway', 'string', 'departure_actualRunway', 'string'),
            ('departure.actualTime', 'string', 'departure_actualTime', 'string'),
            ('departure.baggage', 'string', 'departure_baggage', 'string'),
            ('departure.delay', 'string', 'departure_delay', 'string'),
            ('departure.estimatedRunway', 'string', 'departure_estimatedRunway', 'string'),
            ('departure.estimatedTime', 'string', 'departure_estimatedTime', 'string'),
            ('departure.gate', 'string', 'departure_gate', 'string'),
            ('departure.iataCode', 'string', 'departure_iataCode', 'string'),
            ('departure.icaoCode', 'string', 'departure_icaoCode', 'string'),
            ('departure.scheduledTime', 'string', 'departure_scheduledTime', 'string'),
            ('departure.terminal', 'string', 'departure_terminal', 'string'),

            # Flattening 'flight' nested structure
            ('flight.iataNumber', 'string', 'flight_iataNumber', 'string'),
            ('flight.icaoNumber', 'string', 'flight_icaoNumber', 'string'),
            ('flight.number', 'string', 'flight_number', 'string'),

            # Top-level fields
            ('status', 'string', 'status', 'string'),
            ('type', 'string', 'type', 'string'),
        ])
        
        # Step 4: Convert the flattened DynamicFrame back to DataFrame
        flattened_df = flattened_dynamic_frame.toDF()
        
        # Step 5: Show the DataFrame content to ensure data is loaded correctly
        print("First 5 records in the DataFrame:")
        flattened_df.show(5)
        
except Exception as e:
    print(f"Error loading data: {str(e)}")

First 5 records in the DataFrame:
+----------------+----------------+--------------+--------------------+------------------+---------------+-------------+-----------------------+---------------------+------------+----------------+----------------+---------------------+----------------+----------------------+--------------------+-----------------+---------------+-------------------------+-----------------------+--------------+------------------+------------------+-----------------------+------------------+-----------------+-----------------+-------------+------+---------+
|airline_iataCode|airline_icaoCode|  airline_name|arrival_actualRunway|arrival_actualTime|arrival_baggage|arrival_delay|arrival_estimatedRunway|arrival_estimatedTime|arrival_gate|arrival_iataCode|arrival_icaoCode|arrival_scheduledTime|arrival_terminal|departure_actualRunway|departure_actualTime|departure_baggage|departure_delay|departure_estimatedRunway|departure_estimatedTime|departure_gate|departure_iataCode|departur

***Drop rows with arrival time***

In [17]:
# Count the number of rows before dropping rows
initial_count = flattened_df.count()
print(f"Number of rows before dropping: {initial_count}")

# Filter to keep only rows where "arrival_actualTime" is null
filtered_df = flattened_df.filter(flattened_df["arrival_actualTime"].isNull())

# Count the number of rows after dropping
final_count = filtered_df.count()
print(f"Number of rows after dropping: {final_count}")

Number of rows before dropping: 5000
Number of rows after dropping: 3368


***ONLY GET DOMESTIC FLIGHTS***

In [18]:
domestic = ['MLU', 'ILE', 'HSV', 'OMA', 'YAK', 'PBI', 'SDF', 'PNS', 'CMH', 'DCA', 'EAU', 'LBB', 'MSN', 'BHM', 'VIS', 'MKG', 'CLL', 'RNO', 'BFL', 'MAZ', 'LNY', 'LGB', 'MKK', 'MHT', 'CLT', 'ELP', 'DRO', 'RKS', 'SGF', 'COD', 'WYS', 'SLC', 'OAK', 'ATW', 'BGR', 'AMA', 'LFT', 'PDX', 'DAL', 'MEM', 'EUG', 'SAN', 'BPT', 'FLL', 'OAJ', 'CHO', 'MRY', 'SCE', 'OTH', 'ORH', 'ANC', 'PHX', 'CHS', 'RIC', 'CRP', 'AEX', 'FNT', 'BOI', 'OKC', 'BMI', 'GSO', 'MOD', 'EGE', 'KOA', 'CPR', 'ALB', 'SAT', 'PSE', 'LIH', 'TUS', 'SNA', 'SPI', 'PIR', 'KTN', 'AKN', 'ISP', 'MQT', 'JNU', 'TWF', 'BDL', 'RST', 'ELM', 'FAI', 'PSG', 'GSP', 'CHA', 'RAP', 'SBP', 'OGG', 'SIT', 'PSP', 'LWS', 'MCI', 'BIL', 'CDV', 'ITH', 'MFE', 'MIA', 'SRQ', 'SMX', 'MDW', 'PSC', 'GRB', 'PMD', 'TPA', 'GST', 'FAT', 'BET', 'ACT', 'CKB', 'GFK', 'FSM', 'EVV', 'HPN', 'ONT', 'PIH', 'DAY', 'FCA', 'PHF', 'RHI', 'LMT', 'BGM', 'TUL', 'IYK', 'DLG', 'AVL', 'PWM', 'XNA', 'IPL', 'MAF', 'MOB', 'WRG', 'GCC', 'CAE', 'CWA', 'HNL', 'SAV', 'SBA', 'STL', 'KSM', 'VCT', 'BWI', 'BUR', 'CMX', 'MLI', 'FAY', 'GUC', 'SJU', 'LAN', 'SGU', 'EKO', 'ABI', 'MCN', 'ADK', 'ACK', 'MLB', 'LEX', 'IND', 'EWR', 'PVD', 'MFR', 'HLN', 'TXK', 'HTS', 'CIC', 'AUS', 'STT', 'DAB', 'FSD', 'ICT', 'TTN', 'PIT', 'MSP', 'LIT', 'ATL', 'DBQ', 'RDU', 'AGS', 'OME', 'SJT', 'ILM', 'ITO', 'SWF', 'ALO', 'OGD', 'RDD', 'TYR', 'ADQ', 'PUB', 'IAH', 'TVC', 'MBS', 'BTV', 'ABY', 'LAX', 'CLE', 'TLH', 'YUM', 'PLN', 'HVN', 'MSY', 'PFN', 'BUF', 'SEA', 'MGM', 'BLI', 'OTZ', 'TEX', 'ACV', 'CAK', 'MKE', 'BQN', 'JAC', 'CSG', 'TYS', 'IAD', 'JAX', 'DFW', 'MYR', 'SUX', 'CRW', 'ACY', 'GRR', 'SUN', 'SMF', 'AZO', 'MDT', 'CYS', 'STX', 'CID', 'SBN', 'ERI', 'TRI', 'EYW', 'ABE', 'SHV', 'BNA', 'BZN', 'SFO', 'BOS', 'HOU', 'GRK', 'FWA', 'CDC', 'JFK', 'TOL', 'LRD', 'GJT', 'ROW', 'SLE', 'OXR', 'SPS', 'MTJ', 'AVP', 'PHL', 'SPN', 'DSM', 'DTW', 'BIS', 'BJI', 'LAW', 'ORD', 'BTM', 'LNK', 'MCO', 'INL', 'SYR', 'FAR', 'DLH', 'GEG', 'GPT', 'LAS', 'RSW', 'HRL', 'GUM', 'RDM', 'CVG', 'ROA', 'GTF', 'ABQ', 'CMI', 'ASE', 'ROC', 'GGG', 'VPS', 'CEC', 'RFD', 'DEN', 'COS', 'DUT', 'GNV', 'SCC', 'IDA', 'CLD', 'ORF', 'PIA', 'YKM', 'HDN', 'JAN', 'ANI', 'SJC', 'BRW', 'FLG', 'LGA', 'BTR', 'MSO', 'APF', 'LSE']




In [19]:
new_df = filtered_df.filter(
    filtered_df["arrival_iataCode"].isin(domestic) & filtered_df["departure_iataCode"].isin(domestic)
)




***NEXT***

In [20]:
# Step 5: Select the specific columns you want to keep
columns_to_select = [
    'departure_scheduledTime',
    'departure_iataCode',
    'arrival_iataCode',
    'airline_iataCode',
    'flight_iataNumber',
    'arrival_scheduledTime'
]

# Create a new DataFrame 'df' by selecting specific columns
df = new_df.select(*columns_to_select)

# Show the first few records of the new DataFrame
print("First 5 records in the new DataFrame:")
df.show(5)

First 5 records in the new DataFrame:
+-----------------------+------------------+----------------+----------------+-----------------+---------------------+
|departure_scheduledTime|departure_iataCode|arrival_iataCode|airline_iataCode|flight_iataNumber|arrival_scheduledTime|
+-----------------------+------------------+----------------+----------------+-----------------+---------------------+
|   2024-11-03T18:51:...|               DFW|             SPS|              AS|           AS6185| 2024-11-03T19:53:...|
|   2024-11-03T18:51:...|               DFW|             SPS|              QR|           QR2355| 2024-11-03T19:53:...|
|   2024-11-03T18:51:...|               DFW|             SPS|              QR|           QR2328| 2024-11-03T19:53:...|
|   2024-11-03T18:51:...|               DFW|             SPS|              AA|           AA4921| 2024-11-03T19:53:...|
|   2024-11-03T18:15:...|               PUB|             CLD|                |                 | 2024-11-03T19:28:...|
+---------

In [21]:
from pyspark.sql.functions import split, regexp_replace, year, month, dayofmonth

# Step 1: Split 'departure_scheduledTime' into 'flightdate' and 'departure_time'
df = df.withColumn("flightdate", split(col("departure_scheduledTime"), "T")[0])
df = df.withColumn("departure_time", split(col("departure_scheduledTime"), "T")[1])

# Step 2: Create 'deptime' column by converting 'departure_time' to an integer format "HHMM"
# Extract "HHMM" portion by replacing ":" and truncating milliseconds
df = df.withColumn("deptime", regexp_replace(col("departure_time"), ":", "").substr(0, 4).cast("int"))
df = df.withColumn("year", year(col("flightdate").cast("date")))
df = df.withColumn("month", month(col("flightdate").cast("date")))
df = df.withColumn("dateofmonth", dayofmonth(col("flightdate").cast("date")))

# Step 3: Drop the original 'departure_scheduledTime' column if it’s no longer needed
df = df.drop("departure_scheduledTime", "departure_time")

df = df.withColumnRenamed('departure_iataCode', 'origin') \
        .withColumnRenamed('arrival_iataCode', 'dest') \
        .withColumnRenamed('airline_iataCode', 'uniquecarrier') \
        .withColumnRenamed('flight_iataNumber', 'flightnum') \
        .withColumnRenamed('arrival_scheduledTime', 'crsarrival') 

# Step 2: Show the updated DataFrame with the new columns
df.show(5)

+------+----+-------------+---------+--------------------+----------+-------+----+-----+-----------+
|origin|dest|uniquecarrier|flightnum|          crsarrival|flightdate|deptime|year|month|dateofmonth|
+------+----+-------------+---------+--------------------+----------+-------+----+-----+-----------+
|   DFW| SPS|           AS|   AS6185|2024-11-03T19:53:...|2024-11-03|   1851|2024|   11|          3|
|   DFW| SPS|           QR|   QR2355|2024-11-03T19:53:...|2024-11-03|   1851|2024|   11|          3|
|   DFW| SPS|           QR|   QR2328|2024-11-03T19:53:...|2024-11-03|   1851|2024|   11|          3|
|   DFW| SPS|           AA|   AA4921|2024-11-03T19:53:...|2024-11-03|   1851|2024|   11|          3|
|   PUB| CLD|             |         |2024-11-03T19:28:...|2024-11-03|   1815|2024|   11|          3|
+------+----+-------------+---------+--------------------+----------+-------+----+-----+-----------+
only showing top 5 rows


***STORING***

In [22]:
# s3 = "s3://live-flight-schedule-data/clean-live-data.json"

# df.write \
#     .mode("overwrite") \
#     .json(s3)
import json
import boto3

# AWS S3 Configuration
s3_client = boto3.client('s3')
bucket_name = 'live-flight-schedule-data'
json_filename = 'clean-live-data/new.json'

# Collect data as a list of dictionaries
data = df.toPandas().to_dict(orient='records')

# Convert list of dictionaries to a JSON array string
json_array = json.dumps(data)

# Upload the JSON array directly to S3
try:
    s3_client.put_object(Bucket=bucket_name, Key=json_filename, Body=json_array)
    print(f'Successfully written JSON array to s3://{bucket_name}/{output_key}')
except Exception as e:
    print(f"Error uploading JSON to S3: {e}")

{'ResponseMetadata': {'RequestId': 'KQYX4JQEHBPKHA0R', 'HostId': '3mTBXGSPtbcxbU5XPLlpdzwQuZGpYKnqBXGGURk4aZVwnOmUzZXx6b4OvrS946TN+zcdcVcTuvk=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': '3mTBXGSPtbcxbU5XPLlpdzwQuZGpYKnqBXGGURk4aZVwnOmUzZXx6b4OvrS946TN+zcdcVcTuvk=', 'x-amz-request-id': 'KQYX4JQEHBPKHA0R', 'date': 'Sun, 10 Nov 2024 08:52:00 GMT', 'x-amz-server-side-encryption': 'AES256', 'etag': '"459989d6eff4010f8a04a3b7641e22c8"', 'content-length': '0', 'server': 'AmazonS3'}, 'RetryAttempts': 0}, 'ETag': '"459989d6eff4010f8a04a3b7641e22c8"', 'ServerSideEncryption': 'AES256'}
Error uploading JSON to S3: name 'output_key' is not defined


In [10]:
print(type(df)) 

<class 'pyspark.sql.dataframe.DataFrame'>


#### Example: Create a DynamicFrame from a table in the AWS Glue Data Catalog and display its schema


In [None]:
# dyf = glueContext.create_dynamic_frame.from_catalog(database='database_name', table_name='table_name')
# dyf.printSchema()

#### Example: Convert the DynamicFrame to a Spark DataFrame and display a sample of the data


In [None]:
# df = dyf.toDF()
# df.show()

#### Example: Visualize data with matplotlib


In [None]:
# import matplotlib.pyplot as plt

# # Set X-axis and Y-axis values
# x = [5, 2, 8, 4, 9]
# y = [10, 4, 8, 5, 2]
  
# # Create a bar chart 
# plt.bar(x, y)
  
# # Show the plot
# %matplot plt

#### Example: Write the data in the DynamicFrame to a location in Amazon S3 and a table for it in the AWS Glue Data Catalog


In [None]:
# s3output = glueContext.getSink(
#   path="s3://bucket_name/folder_name",
#   connection_type="s3",
#   updateBehavior="UPDATE_IN_DATABASE",
#   partitionKeys=[],
#   compression="snappy",
#   enableUpdateCatalog=True,
#   transformation_ctx="s3output",
# )
# s3output.setCatalogInfo(
#   catalogDatabase="demo", catalogTableName="populations"
# )
# s3output.setFormat("glueparquet")
# s3output.writeFrame(DyF)