# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [2]:
%help

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.7 



# Available Magic Commands

## Sessions Magic

----
    %help                             Return a list of descriptions and input types for all magic commands. 
    %profile            String        Specify a profile in your aws configuration to use as the credentials provider.
    %region             String        Specify the AWS region in which to initialize a session. 
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\ USERNAME \.aws\config" on Windows.
    %idle_timeout       Int           The number of minutes of inactivity after which a session will timeout. 
                                      Default: 2880 minutes (48 hours).
    %timeout            Int           The number of minutes after which a session will timeout. 
                                      Default: 2880 minutes (48 hours).
    %session_id_prefix  String        Define a String that will precede all session IDs in the format 
                                      [session_id_prefix]-[session_id]. If a session ID is not provided,
                                      a random UUID will be generated.
    %status                           Returns the status of the current Glue session including its duration, 
                                      configuration and executing user / role.
    %session_id                       Returns the session ID for the running session.
    %list_sessions                    Lists all currently running sessions by ID.
    %stop_session                     Stops the current session.
    %glue_version       String        The version of Glue to be used by this session. 
                                      Currently, the only valid options are 2.0, 3.0 and 4.0. 
                                      Default: 2.0.
    %reconnect          String        Specify a live session ID to switch/reconnect to the sessions.
----

## Selecting Session Types

----
    %streaming          String        Sets the session type to Glue Streaming.
    %etl                String        Sets the session type to Glue ETL.
    %session_type       String        Specify a session_type to be used. Supported values: streaming and etl.
----

## Glue Config Magic 
*(common across all session types)*

----

    %%configure         Dictionary    A json-formatted dictionary consisting of all configuration parameters for 
                                      a session. Each parameter can be specified here or through individual magics.
    %iam_role           String        Specify an IAM role ARN to execute your session with.
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\%USERNAME%\.aws\config` on Windows.
    %number_of_workers  int           The number of workers of a defined worker_type that are allocated 
                                      when a session runs.
                                      Default: 5.
    %additional_python_modules  List  Comma separated list of additional Python modules to include in your cluster 
                                      (can be from Pypi or S3).
    %%tags        Dictionary          Specify a json-formatted dictionary consisting of tags to use in the session.
    
    %%assume_role Dictionary, String  Specify a json-formatted dictionary or an IAM role ARN string to create a session 
                                      for cross account access.
                                      E.g. {valid arn}
                                      %%assume_role 
                                      'arn:aws:iam::XXXXXXXXXXXX:role/AWSGlueServiceRole' 
                                      E.g. {credentials}
                                      %%assume_role
                                      {
                                            "aws_access_key_id" : "XXXXXXXXXXXX",
                                            "aws_secret_access_key" : "XXXXXXXXXXXX",
                                            "aws_session_token" : "XXXXXXXXXXXX"
                                       }
----

                                      
## Magic for Spark Sessions (ETL & Streaming)

----
    %worker_type        String        Set the type of instances the session will use as workers. 
    %connections        List          Specify a comma separated list of connections to use in the session.
    %extra_py_files     List          Comma separated list of additional Python files From S3.
    %extra_jars         List          Comma separated list of additional Jars to include in the cluster.
    %spark_conf         String        Specify custom spark configurations for your session. 
                                      E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer
----

## Action Magic

----

    %%sql               String        Run SQL code. All lines after the initial %%sql magic will be passed
                                      as part of the SQL code.  
    %matplot      Matplotlib figure   Visualize your data using the matplotlib library.
                                      E.g. 
                                      import matplotlib.pyplot as plt
                                      # Set X-axis and Y-axis values
                                      x = [5, 2, 8, 4, 9]
                                      y = [10, 4, 8, 5, 2]
                                      # Create a bar chart 
                                      plt.bar(x, y) 
                                      # Show the plot
                                      %matplot plt    
    %plotly            Plotly figure  Visualize your data using the plotly library.
                                      E.g.
                                      import plotly.express as px
                                      #Create a graphical figure
                                      fig = px.line(x=["a","b","c"], y=[1,3,2], title="sample figure")
                                      #Show the figure
                                      %plotly fig

  
                
----



####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 5
%extra_py_files s3://aws-glue-assets-992382490096-us-east-1/jar/delta-core_2.12-1.0.0.jar
%extra_jars s3://aws-glue-assets-992382490096-us-east-1/jar/delta-core_2.12-1.0.0.jar
%spark_conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from pyspark.sql.functions import lit, col
from awsglue.dynamicframe import DynamicFrame
import boto3
import pandas as pd
import os
from datetime import datetime
import shutil
from tqdm import tqdm
from datetime import datetime, timedelta

s3 = boto3.client('s3')
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.8 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 3.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Extra py files to be included:
s3://aws-glue-assets-992382490096-us-east-1/jar/delta-core_2.12-1.0.0.jar
Extra jars to be included:
s3://aws-glue-assets-992382490096-us-east-1/jar/delta-core_2.12-1.0.0.jar
Previous Spark configuration: None
Setting new Spark configuration to: spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
s3://aws-glue-assets-992382490096-us-east-1/jar/delta-core_2.12-1.0.0.ja

#### Process to Refresh Bronze Table to Make All Silver Values = 0


This step is done once to restart the process of creating values for the silver table.
Only run this when you want to restart the process from scartch

In [2]:
# # Initialize the Spark session
# spark = SparkSession.builder \
#     .appName("Delta Lake Upsert Data Aggregations") \
#     .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
#     .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
#     .config("spark.databricks.delta.retentionDurationCheck.enabled", "false") \
#     .getOrCreate()

# # Define S3 bucket paths
# bronzetable = "s3://aws-glue-assets-992382490096-us-east-1/glue-logs/cleaned_filepaths_delta/"

# delta_table = DeltaTable.forPath(spark, bronzetable)
# spark_df = delta_table.toDF().filter("silver = 1")
# pandas_df = spark_df.toPandas()




In [3]:
# pandas_df.shape

(13280, 2)


In [4]:
# test_list = list(pandas_df.file_path)
# len(test_list)

13280


In [5]:
# silvertable = "s3://aws-glue-assets-992382490096-us-east-1/glue-logs/silver_table/"
# delta_table_exists = DeltaTable.isDeltaTable(spark, silvertable)

# bronze_delta_table = DeltaTable.forPath(spark, bronzetable) 
# silver_delta_table = DeltaTable.forPath(spark, silvertable)

# test_df = spark.createDataFrame([(path,) for path in test_list], ["file_path"])
# test_df = test_df.withColumn("silver", lit(0))




In [6]:
# bronze_delta_table.alias("target").merge(
#     test_df.alias("source"),
#     "target.file_path = source.file_path"
#     ).whenMatchedUpdateAll().execute()

# #Stop the Spark session
# spark.stop()




#### Process Signal Data Files from MBTA and save them into a Table

In [2]:
target_date = datetime.now().strftime('%Y_%m_%d')

# Initialize the Spark session
spark = SparkSession.builder \
    .appName("Delta Lake Upsert Data Aggregations") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.databricks.delta.retentionDurationCheck.enabled", "false") \
    .getOrCreate()

# Define S3 bucket paths
bronzetable = "s3://aws-glue-assets-992382490096-us-east-1/glue-logs/cleaned_filepaths_delta/"

delta_table = DeltaTable.forPath(spark, bronzetable)
spark_df = delta_table.toDF().filter("silver = 0").filter(
    (col("file_path").contains(target_date)) & (~col("file_path").contains("_detail"))
)
# spark_df_filtered = spark_df.filter(col("file_path").contains("2025_03_27"))
pandas_df = spark_df.toPandas()




In [3]:
pandas_df.shape

(205, 2)


In [None]:
# test_list = list(pandas_df.file_path)
# len(test_list)

In [4]:
# Define the patterns you're looking for
patterns = [
"ARL0127","ARL0156","ARL0219","ARL0562","ARL2620","CAM0636",
"CAM4873","MAL0001","MAL0002","MAL0003","MAL0004","MAL0005",
"MAL0006","MAL0007","MALD0001","MALD0002","MALD0003","MALD0004",
"MALD0005","MALD0006","MALD0007","SOM0315","SOM0603","SOM0827",
"ARL004","ARL001","ARL002","ARL003","ARL005","CAM015",
"CAM018","MAL001","MAL002","MAL003","MAL004","MAL006",
"MAL007","MAL009","MAL001","MAL002","MAL003","MAL004",
"MAL006","MAL007","MAL009","SOM001","SOM002","SOM006"
]

# Function to handle pagination and filter files
def list_and_filter_files(bucket_name, prefix, target_date):
    paginator = s3.get_paginator('list_objects_v2')
    filtered_file_paths = []
    
    # Iterate over paginated results
    for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix):
        if 'Contents' in page:
            for obj in page['Contents']:
                file_path = obj['Key']
                
                # Filter for files containing the target date, matching patterns, and NOT containing "_detail"
                if (target_date in file_path) and \
                   any(pattern in file_path for pattern in patterns) and \
                   ("_detail" not in file_path):
                    filtered_file_paths.append(f"s3://{bucket_name}/{file_path}")
    
    return filtered_file_paths

# Specify the S3 bucket and prefix (if any)
bucket_name = 'mbta-tsp-signal'
prefix = 'csv/'

# Get the filtered file paths for the target date
file_paths = list_and_filter_files(bucket_name, prefix, target_date)
print(f"Filtered File Paths: {len(file_paths)} found")

Filtered File Paths: 106 found


In [None]:
def find_common_items(my_list, spark_df, column_name):
    list_df = spark.createDataFrame([(item,) for item in my_list], ["item"])
    common_df = list_df.join(spark_df, list_df.item == col(column_name), "inner")
    common_items = [row.item for row in common_df.select("item").distinct().collect()]
    return common_items

common_items = find_common_items(file_paths, spark_df, "file_path")
# print(common_items)

In [8]:
def download_files(file_paths, local_dir):
    if not os.path.exists(local_dir):
        os.makedirs(local_dir)
    for file_path in tqdm(file_paths, desc="Downloading files"):  # Add tqdm here
        bucket_name = file_path.split('/')[2]
        key = '/'.join(file_path.split('/')[3:])
        local_filename = os.path.join(local_dir, os.path.basename(file_path))
        
        response = s3.get_object(Bucket=bucket_name, Key=key)
        with open(local_filename, 'wb') as file:
            file.write(response['Body'].read())




In [9]:
def process_files(file_paths):
    local_directory = '/tmp/downloaded_files/'
    download_files(file_paths, local_directory)

    df_list = []
    for root, dirs, files in os.walk(local_directory):
        for file in tqdm(files, desc="Processing files"):
            file_path = os.path.join(local_directory, os.path.basename(file))
            try:
                df = pd.read_csv(file_path, header=None, names=['time', 'event', 'param'])
                df['intersection_id'] = file.split('_')[0]
                df['date'] = datetime.strptime(f"{file.split('_')[3]}_{file.split('_')[4]}_{file.split('_')[5]}", '%Y_%m_%d')
                df['time'] = pd.to_datetime(df['time']) 
            except Exception as e:
                print(f"Skipping file {file_path}: Could not read as CSV. Error: {e}")
                continue
            except pd.errors.EmptyDataError:
                print(f"Skipping empty file: {file_path}")
                continue
            df_list.append(df)
    for filename in os.listdir(local_directory):
        file_path = os.path.join(local_directory, filename)
        try:
            if os.path.isfile(file_path) or os.path.islink(file_path):
                os.unlink(file_path)
            elif os.path.isdir(file_path):
                shutil.rmtree(file_path)
        except Exception as e:
            print('Failed to delete %s. Reason: %s' % (file_path, e))

    return pd.concat(df_list, ignore_index=True)




In [None]:
# --- Process in Chunks ---
chunk_size = 10
silvertable = "s3://aws-glue-assets-992382490096-us-east-1/glue-logs/silver_table/"
delta_table_exists = DeltaTable.isDeltaTable(spark, silvertable)

bronze_delta_table = DeltaTable.forPath(spark, bronzetable) 
silver_delta_table = DeltaTable.forPath(spark, silvertable)

for i in range(0, len(common_items), chunk_size):
    chunk = common_items[i:i + chunk_size]
    print(f"Processing chunk {i // chunk_size + 1} of {len(common_items) // chunk_size + 1}")
    
    # Process the chunk
    combined_df = process_files(chunk)
    combined_df = spark.createDataFrame(combined_df)

    # Perform UPSERT (MERGE)
    silver_delta_table.alias("target").merge(
        combined_df.alias("source"),
        "target.intersection_id = source.intersection_id AND target.time = source.time AND target.event = source.event"
        ).whenNotMatchedInsertAll().execute()
    
    test_df = spark.createDataFrame([(path,) for path in chunk], ["file_path"])
    test_df = test_df.withColumn("silver", lit(1))
        
    bronze_delta_table.alias("target").merge(
        test_df.alias("source"),
        "target.file_path = source.file_path"
        ).whenMatchedUpdateAll().execute()

#Stop the Spark session
spark.stop()

In [10]:
# # --- Process in Chunks ---
# chunk_size = 100
# silvertable = "s3://aws-glue-assets-992382490096-us-east-1/glue-logs/silver_table/"
# delta_table_exists = DeltaTable.isDeltaTable(spark, silvertable)

# bronze_delta_table = DeltaTable.forPath(spark, bronzetable) 
# silver_delta_table = DeltaTable.forPath(spark, silvertable)

# for i in range(0, len(common_items), chunk_size):
#     chunk = common_items[i:i + chunk_size]
#     print(f"Processing chunk {i // chunk_size + 1} of {len(common_items) // chunk_size + 1}")
    
#     # Process the chunk
#     combined_df = process_files(chunk)
#     combined_df = spark.createDataFrame(combined_df)

#     # Perform UPSERT (MERGE)
#     silver_delta_table.alias("target").merge(
#         combined_df.alias("source"),
#         "target.intersection_id = source.intersection_id AND target.time = source.time AND target.event = source.event"
#         ).whenNotMatchedInsertAll().execute()
#     # combined_df.write.format("delta").mode("overwrite").save(silvertable)
    
#     test_df = spark.createDataFrame([(path,) for path in chunk], ["file_path"])
#     test_df = test_df.withColumn("silver", lit(1))
        
#     bronze_delta_table.alias("target").merge(
#         test_df.alias("source"),
#         "target.file_path = source.file_path"
#         ).whenMatchedUpdateAll().execute()

# #Stop the Spark session
# spark.stop()

Processing chunk 1 of 2
Downloading files: 100%|##########| 100/100 [00:06<00:00, 14.99it/s]
Processing files: 100%|##########| 100/100 [00:06<00:00, 15.28it/s]
