In [1]:
from pyspark.sql import SparkSession

# Stop existing Spark session if it exists
existing_spark = SparkSession.getActiveSession()
if existing_spark:
    existing_spark.stop()

# Initialize new Spark session with Delta Lake support
spark = SparkSession.builder \
    .appName("Takumi ETL") \
    .master("local[2]") \
    .config("spark.driver.memory", "5g") \
    .config("spark.executor.memory", "3g") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()


In [2]:
spark.conf.set("fs.s3a.access.key", "AKIAUMYCIDRW5XYRGE27")
spark.conf.set("fs.s3a.secret.key", "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")
spark.conf.set("fs.s3a.endpoint", "s3.amazonaws.com")

In [0]:
from pyspark.sql.functions import col, to_date
from pyspark.sql.types import *
import time

start = time.time()
source_s3_path = "s3a://input-data-bronze-layer-bucket/input_data/"
target_s3_path = "s3a://delta-lake-etl/input_delta_table/"

# Read data from source S3 bucket using Auto Loader (cloudFiles)
df = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")  # Change to "parquet" if needed
    .option("cloudFiles.schemaLocation", "/mnt/checkpoints/input_data")  # Schema tracking
    .load(source_s3_path)
)

# Add partition column (Extract date from timestamp)
df = df.withColumn("transaction_date", to_date(col("transaction_timestamp")))

# Write data to Delta Table in append mode with partitioning
query = (
    df.writeStream
    .format("delta")
    .option("checkpointLocation", "/mnt/checkpoints/input_data")  # Required for state tracking
    .option("mergeSchema", "true")  # Handles schema evolution
    .partitionBy("transaction_date")  # Partition by date
    .outputMode("append")  # Append to the Delta table
    .trigger(once=True)  # Process existing files and stop
    .start(target_s3_path)
)

# Wait for the streaming query to finish
query.awaitTermination()

# Optimize and Compact Delta Table (Without Z-Ordering)
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, target_s3_path)
delta_table.optimize().executeCompaction()  # Open-source Delta Lake compaction

# End time
end = time.time()
print(f"Time taken: {end - start} seconds")

Time taken: 16.229655027389526 seconds


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time

start = time.time()
source_s3_path = "s3a://input-data-bronze-layer-bucket/reference_market_data/"
target_s3_path = "s3a://delta-lake-etl/reference_market_data/"


# Read data from source S3 bucket using cloudFiles (CSV or Parquet)
df = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")  # Change to "parquet" if needed
    .option("cloudFiles.schemaLocation", "/mnt/checkpoints/reference_market_data")  # Schema tracking
    .load(source_s3_path)
)

# Write data to Delta Table in batch mode and store it in the target S3 bucket
query = (
    df.writeStream
    .format("delta")
    .option("checkpointLocation", "/mnt/checkpoints/reference_market_data")  # Required for state tracking
    .option("mergeSchema", "true")  # Handles schema evolution
    .outputMode("append")  # Append to the Delta table
    .trigger(once=True)  # This ensures job stops after processing existing files
    .start(target_s3_path)  # Write processed data to the target S3 bucket in Delta format
)

# Wait for the streaming query to finish
query.awaitTermination()

# End time
end = time.time()

# Print elapsed time
print(f"Time taken: {end - start} seconds")


Time taken: 15.884464502334595 seconds
