# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 5.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.8 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 5.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: 2e9a42bc-8fbe-4759-afe1-042059b7a985
Applying the following default arguments:
--glue_kernel_version 1.0.8
--enable-glue-datacatalog true
Waiting for session 2e9a42bc-8fbe-4759-afe1-042059b7a985 to get into ready status...
Session 2e9a42bc-8fbe-4759-afe1-042059b7a985 ha

#### Example: Create a DynamicFrame from a table in the AWS Glue Data Catalog and display its schema


In [2]:
spark.conf.set("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
spark.conf.set("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
spark.conf.set("spark.sql.catalog.glue_catalog.warehouse", "s3://aws-ddf-pi-2025/processed/warehouse/")





In [3]:
df = spark.read.format("iceberg").load("glue_catalog.proyecto1db.stock_iceberg")
df.show(10)

+------+-------------------+--------------------+--------------------+--------------------+--------------------+-------+-----------+--------------------+-------------------+
|symbol|          timestamp|                open|                high|                 low|               close| volume|trade_count|                vwap|    local_timestamp|
+------+-------------------+--------------------+--------------------+--------------------+--------------------+-------+-----------+--------------------+-------------------+
|  MCHB|2020-12-08 19:00:00|1.135864975467302...|1.138232065323887...|1.135864975467302...|1.138232065323887...| 2085.0|       66.0|1.137048520395594...|2020-12-08 14:00:00|
|  MCHB|2021-07-07 17:30:00|1.328335264283393...|1.330730510767160...|1.325255661661406...|1.327993086214283...| 2552.0|       53.0|1.327993086214283...|2021-07-07 13:30:00|
|  MCHB|2019-06-26 18:00:00|9.586508265835406...|9.606376676749054...|9.583196864016464...|9.603065274930113...| 1501.0|       22.

In [5]:
from pyspark.sql import functions as F
# Get 10 random symbols
symbols = (
    df.select("symbol")
      .distinct()
      .orderBy(F.rand())
      .limit(10)
      .collect()
)
symbol_list = [r["symbol"] for r in symbols]
print(symbol_list)

['NMRK', 'DEC', 'PLTK', 'CORZ', 'ADTN', 'ACTU', 'FLEX', 'EMN', 'JXN', 'MKC']


In [6]:
# Filter main data for those symbols
df_sample = df.filter(F.col("symbol").isin(symbol_list))




In [8]:
# Write sample table via the Glue catalog
(
    df_sample.writeTo("glue_catalog.proyecto1db.stock_iceberg_sample")
    .tableProperty("format-version", "2")
    .createOrReplace()
)




In [19]:
df_sample = spark.read.format("iceberg").load("glue_catalog.proyecto1db.stock_iceberg_sample")




In [20]:
from pyspark.sql import functions as F
import datetime

symbol_ranges = (
    df_sample.groupBy("symbol")
    .agg(
        F.min("local_timestamp").alias("start_time"),
        F.max("local_timestamp").alias("end_time")
    )
)

start_date = datetime.date(2016, 1, 1)
end_date = datetime.date(2025, 12, 31)
time_start = datetime.time(4, 0)
time_end = datetime.time(19, 30)  # 7:30 p.m.
step = datetime.timedelta(minutes=30)

timestamps = []

current_date = start_date
while current_date <= end_date:
    if current_date.weekday() < 5:  # Mondayâ€“Friday only
        t = datetime.datetime.combine(current_date, time_start)
        while t.time() <= time_end:
            timestamps.append((t,))
            t += step
    current_date += datetime.timedelta(days=1)

grid_df = spark.createDataFrame(timestamps, ["local_timestamp"])


symbols_df = df_sample.select("symbol").distinct()
symbol_time_grid = symbols_df.crossJoin(grid_df)

symbol_time_grid = (
    symbol_time_grid.join(symbol_ranges, "symbol")
    .filter(
        (F.col("local_timestamp") >= F.col("start_time")) &
        (F.col("local_timestamp") <= F.col("end_time"))
    )
    .select("symbol", "local_timestamp")
)




In [21]:
symbol_time_grid = symbol_time_grid.orderBy("symbol", "local_timestamp")
symbol_time_grid.cache()  # optional but speeds up next operations

DataFrame[symbol: string, local_timestamp: timestamp]


#### Example: Write the data in the DynamicFrame to a location in Amazon S3 and a table for it in the AWS Glue Data Catalog


In [22]:
df_filled = (
    symbol_time_grid
    .join(df_sample, ["symbol", "local_timestamp"], how="left")
    .orderBy("symbol", "local_timestamp")
)

df_filled.show(50)

+------+-------------------+-------------------+------+------+------+------+-------+-----------+--------+
|symbol|    local_timestamp|          timestamp|  open|  high|   low| close| volume|trade_count|    vwap|
+------+-------------------+-------------------+------+------+------+------+-------+-----------+--------+
|  ACTU|2024-08-13 10:30:00|2024-08-13 14:30:00|  9.88|  9.88|  8.17|9.2125|20649.0|      150.0|9.386387|
|  ACTU|2024-08-13 11:00:00|2024-08-13 15:00:00|   9.0|   9.0|   7.5|   7.5|16030.0|       78.0|8.039615|
|  ACTU|2024-08-13 11:30:00|2024-08-13 15:30:00| 7.775|   8.4|   7.5|   8.4|10838.0|       57.0|8.057806|
|  ACTU|2024-08-13 12:00:00|2024-08-13 16:00:00|   8.4|   8.4|8.2165|  8.35| 6774.0|       22.0|8.288284|
|  ACTU|2024-08-13 12:30:00|2024-08-13 16:30:00| 8.625|  9.18|  8.35|  8.35| 9178.0|       65.0|8.732419|
|  ACTU|2024-08-13 13:00:00|               NULL|  NULL|  NULL|  NULL|  NULL|   NULL|       NULL|    NULL|
|  ACTU|2024-08-13 13:30:00|2024-08-13 17:30:0

In [23]:
from pyspark.sql import Window, functions as F






In [24]:
w = Window.partitionBy("symbol").orderBy("local_timestamp")
# Forward-fill last known close
df_filled = df_filled.withColumn(
    "last_close_ffill",
    F.last("close", ignorenulls=True).over(w)
)




In [26]:
# Get next non-null open (for initial fill)
w_reverse = Window.partitionBy("symbol").orderBy(F.col("local_timestamp").desc())

df_filled = (
    df_filled
    .withColumn("next_open_bfill", F.last("open", ignorenulls=True).over(w_reverse))
    .withColumn(
        "fill_value",
        F.when(F.col("last_close_ffill").isNotNull(), F.col("last_close_ffill"))
         .otherwise(F.col("next_open_bfill"))
    )
    .withColumn("open", F.coalesce(F.col("open"), F.col("fill_value")))
    .withColumn("high", F.coalesce(F.col("high"), F.col("fill_value")))
    .withColumn("low", F.coalesce(F.col("low"), F.col("fill_value")))
    .withColumn("close", F.coalesce(F.col("close"), F.col("fill_value")))
    .withColumn("vwap", F.coalesce(F.col("vwap"), F.col("fill_value")))
)




In [27]:
df_filled = (
    df_filled
    .withColumn("volume", F.coalesce(F.col("volume"), F.lit(0)))
    .withColumn("trade_count", F.coalesce(F.col("trade_count"), F.lit(0)))
)




In [32]:
df_filled = df_filled.drop("last_close_ffill", "next_open_bfill", "fill_value", "timestamp")
df_filled = df_filled.orderBy(
    F.col("symbol").asc(),
    F.col("local_timestamp").asc()
)
df_filled = df_filled.withColumn("timestamp", F.lit(None).cast("timestamp"))
df_filled.cache()  # optional but speeds up next operations
df_filled.show(20)

+------+-------------------+------+----+------+------+-------+-----------+--------+---------+
|symbol|    local_timestamp|  open|high|   low| close| volume|trade_count|    vwap|timestamp|
+------+-------------------+------+----+------+------+-------+-----------+--------+---------+
|  ACTU|2024-08-13 10:30:00|  9.88|9.88|  8.17|9.2125|20649.0|      150.0|9.386387|     NULL|
|  ACTU|2024-08-13 11:00:00|   9.0| 9.0|   7.5|   7.5|16030.0|       78.0|8.039615|     NULL|
|  ACTU|2024-08-13 11:30:00| 7.775| 8.4|   7.5|   8.4|10838.0|       57.0|8.057806|     NULL|
|  ACTU|2024-08-13 12:00:00|   8.4| 8.4|8.2165|  8.35| 6774.0|       22.0|8.288284|     NULL|
|  ACTU|2024-08-13 12:30:00| 8.625|9.18|  8.35|  8.35| 9178.0|       65.0|8.732419|     NULL|
|  ACTU|2024-08-13 13:00:00|  8.35|8.35|  8.35|  8.35|    0.0|        0.0|    8.35|     NULL|
|  ACTU|2024-08-13 13:30:00|8.5275|8.59|8.5275|  8.59|  408.0|        4.0|8.545357|     NULL|
|  ACTU|2024-08-13 14:00:00|   8.5| 8.5|   8.5|   8.5|  100.

In [33]:
df_filled.writeTo("glue_catalog.proyecto1db.stock_iceberg_sample").overwritePartitions()




In [34]:
df_filled_sample = df_sample = spark.read.format("iceberg").load("glue_catalog.proyecto1db.stock_iceberg_sample")




In [35]:
df_filled_sample.show(20)

+------+---------+------+----+------+------+-------+-----------+--------+-------------------+
|symbol|timestamp|  open|high|   low| close| volume|trade_count|    vwap|    local_timestamp|
+------+---------+------+----+------+------+-------+-----------+--------+-------------------+
|  ACTU|     NULL|  9.88|9.88|  8.17|9.2125|20649.0|      150.0|9.386387|2024-08-13 10:30:00|
|  ACTU|     NULL|   9.0| 9.0|   7.5|   7.5|16030.0|       78.0|8.039615|2024-08-13 11:00:00|
|  ACTU|     NULL| 7.775| 8.4|   7.5|   8.4|10838.0|       57.0|8.057806|2024-08-13 11:30:00|
|  ACTU|     NULL|   8.4| 8.4|8.2165|  8.35| 6774.0|       22.0|8.288284|2024-08-13 12:00:00|
|  ACTU|     NULL| 8.625|9.18|  8.35|  8.35| 9178.0|       65.0|8.732419|2024-08-13 12:30:00|
|  ACTU|     NULL|  8.35|8.35|  8.35|  8.35|    0.0|        0.0|    8.35|2024-08-13 13:00:00|
|  ACTU|     NULL|8.5275|8.59|8.5275|  8.59|  408.0|        4.0|8.545357|2024-08-13 13:30:00|
|  ACTU|     NULL|   8.5| 8.5|   8.5|   8.5|  100.0|        

In [None]:
s3output = glueContext.getSink(
  path="s3://bucket_name/folder_name",
  connection_type="s3",
  updateBehavior="UPDATE_IN_DATABASE",
  partitionKeys=[],
  compression="snappy",
  enableUpdateCatalog=True,
  transformation_ctx="s3output",
)
s3output.setCatalogInfo(
  catalogDatabase="demo", catalogTableName="populations"
)
s3output.setFormat("glueparquet")
s3output.writeFrame(DyF)