# Amazon S3 Integration

Read json files from S3, process them one by one, then save them back to S3

In [0]:
# Set AWS Credentials in Spark
# aws_access_key = dbutils.secrets.get(scope="aws-credentials", key="aws-access-key")
# aws_secret_key = dbutils.secrets.get(scope="aws-credentials", key="aws-secret-key")
# spark.conf.set("fs.s3a.access.key", aws_access_key)
# spark.conf.set("fs.s3a.secret.key", aws_secret_key)
# spark.conf.set("fs.s3a.endpoint", "s3.ap-southeast-2.amazonaws.com")  # Sydney region

In [0]:
# Mount S3 bucket to DBFS

# aws_access_key = dbutils.secrets.get(scope="aws-credentials", key="aws-access-key")
# aws_secret_key = dbutils.secrets.get(scope="aws-credentials", key="aws-secret-key")
# bucket_name = "ernest-aws-bucket"
# mount_point = "/mnt/stock-data"
# try:
#     dbutils.fs.mount(
#         source=f"s3a://{bucket_name}",
#         mount_point=mount_point,
#         extra_configs={
#             "fs.s3a.access.key": aws_access_key,
#             "fs.s3a.secret.key": aws_secret_key,
#             "fs.s3a.endpoint": "s3.ap-southeast-2.amazonaws.com"
#         }
#     )
#     print(f"Mounted {bucket_name} to {mount_point}")
# except Exception as e:
#     print(f"Mount failed: {e}")

In [0]:
# Access S3 via Catalog
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit

# Initialise Spark session
spark = SparkSession.builder.appName("StockDataTransform").getOrCreate()

# Define S3 path via external location
bucket_name = "ernest-aws-bucket"
s3_raw_data_path = f"s3://{bucket_name}/raw/data"
s3_archive_path = f"s3://{bucket_name}/raw/archive"

# List files in S3 raw/data/
file_list = dbutils.fs.ls(s3_raw_data_path)
json_files = [f.path for f in file_list if f.name.endswith(".json")]
print("Found JSON files:", json_files)

Found JSON files: ['s3://ernest-aws-bucket/raw/data/AAPL_raw_20250324_004550.json', 's3://ernest-aws-bucket/raw/data/GOOGL_raw_20250324_004551.json', 's3://ernest-aws-bucket/raw/data/MSFT_raw_20250324_004552.json', 's3://ernest-aws-bucket/raw/data/TSLA_raw_20250324_004553.json']


In [0]:
from datetime import datetime, timedelta
from pyspark.sql.functions import col, lit, input_file_name, regexp_extract

# Process each file

for json_file in json_files:
    filename = json_file.split("/")[-1]
    raw_df = spark.read.json(json_file)
    stock_symbol = raw_df.select("Meta Data.`2. Symbol`").first()[0]
    last_refreshed = raw_df.select("Meta Data.`3. Last Refreshed`").first()[0]
    print(f"Processing {stock_symbol}")
    print(f"Last refreshed: {last_refreshed}")

    # Check if last refreshed is within 3 days
    current_date = datetime.now()
    last_refreshed_date = datetime.strptime(last_refreshed, "%Y-%m-%d")
    if current_date - last_refreshed_date > timedelta(days=3):
        print(f"Skipping {stock_symbol} as it was last refreshed on {last_refreshed}")
        # Move file to archive
        try:
            dbutils.fs.mv(json_file, f"{s3_archive_path}/{filename}")
            print(f"Moved {json_file} to {s3_archive_path}/{filename}")
        except Exception as e:
            print(f"Error moving {json_file}: {e}")
        continue

    # Flatten Time Series (Daily)
    time_series_df = raw_df.select("Time Series (Daily).*")
    date_columns = time_series_df.columns
    rows = [time_series_df.select(lit(date).alias("date"), col(date).alias("values")) for date in date_columns]
    flattened_df = rows[0]
    for row_df in rows[1:]:
        flattened_df = flattened_df.union(row_df)

    # Flatten values struct
    clean_df = flattened_df.select(
        lit(stock_symbol).alias("stock_symbol"),
        col("date"),
        col("values.`1. open`").cast("float").alias("open"),
        col("values.`2. high`").cast("float").alias("high"),
        col("values.`3. low`").cast("float").alias("low"),
        col("values.`4. close`").cast("float").alias("close"),
        col("values.`5. volume`").cast("long").alias("volume")
    )
    
    # Add transformations
    clean_df = clean_df.withColumn("daily_range", col("high") - col("low"))
    # Save to S3
    output_s3_path = f"s3://{bucket_name}/processed/data/{stock_symbol}.parquet"
    clean_df.write.mode("overwrite").parquet(output_s3_path)
    print(f"Saved {stock_symbol} data to {output_s3_path}")

    # Verify
    verify_df = spark.read.parquet(output_s3_path)
    verify_df.show(5, truncate=False)

    # Move file to archive
    dbutils.fs.mv(json_file, f"{s3_archive_path}/{filename}")
    print(f"Moved {json_file} to {s3_archive_path}/{filename}")

print("All files processed.")

Processing AAPL
Last refreshed: 2025-03-21
Saved AAPL data to s3://ernest-aws-bucket/processed/data/AAPL.parquet
+------------+----------+-------+-------+------+------+--------+-----------+
|stock_symbol|date      |open   |high   |low   |close |volume  |daily_range|
+------------+----------+-------+-------+------+------+--------+-----------+
|AAPL        |2024-10-25|229.74 |233.22 |229.57|231.41|38802304|3.649994   |
|AAPL        |2024-10-28|233.32 |234.73 |232.55|233.4 |36087134|2.1799927  |
|AAPL        |2024-10-29|233.1  |234.325|232.32|233.67|35417247|2.0049896  |
|AAPL        |2024-10-31|229.34 |229.83 |225.37|225.91|64370086|4.4600067  |
|AAPL        |2024-11-01|220.965|225.35 |220.27|222.91|65276741|5.080002   |
+------------+----------+-------+-------+------+------+--------+-----------+
only showing top 5 rows
Moved s3://ernest-aws-bucket/raw/data/AAPL_raw_20250324_004550.json to s3://ernest-aws-bucket/raw/archive/AAPL_raw_20250324_004550.json
Processing GOOGL
Last refreshed: 2