In [None]:
# === Spark + Glue basic cheat sheet ===

from pyspark.context import SparkContext
# GlueContext â€” this is a part of AWS Glue SDK
from awsglue.context import GlueContext 
from typing import Optional
from awsglue.context import GlueContext
from pyspark.sql import DataFrame


# Create SparkContext and GlueContext
sc = SparkContext.getOrCreate()
glue_context = GlueContext(sc)

# SparkSession is the main entry point for DataFrame and SQL API
spark = glue_context.spark_session

In [None]:

# === Wrapper initialization ===
class SparkWrapper:
    def __init__(
        self,
        glue_context: GlueContext,
        database_name: Optional[str] = None,
    ):
        self.spark = glue_context.spark_session  # SparkSession
        self.database_name = database_name

        if self.database_name:
            self.spark.sql(f"USE {self.database_name}")

    def sql(self, query: str) -> DataFrame:
        # Execute Spark SQL (lazy)
        return self.spark.sql(query)

    def read_table(self, table_name: str, condition: Optional[str] = None) -> DataFrame:
        # Read table from Glue Catalog
        full_name = f"{self.database_name}.{table_name}" if self.database_name else table_name

        if condition:
            return self.sql(f"SELECT * FROM {full_name} WHERE {condition}")

        return self.spark.read.table(full_name)
    

# Custom wrapper around Spark/Glue logic
spark_wrapper = SparkWrapper(
    glue_context=glue_context,
    database_name="my_database"
)

In [None]:
# === Read table from Glue Catalog ===
df_table = spark_wrapper.read_table("contacts")
# Reads table using spark.read.table("database.contacts")


# === Read table with filter condition ===
df_filtered = spark_wrapper.read_table(
    "contacts",
    condition="is_deleted = false"
)
# Executes Spark SQL with WHERE condition

In [None]:
# === Execute Spark SQL ===
df_sql = spark_wrapper.sql("""
    SELECT id, email
    FROM contacts
    WHERE email IS NOT NULL
""")
# Runs spark.sql() and returns DataFrame (lazy execution)

In [None]:
# === Read Athena view via JDBC ===
df_athena = spark_wrapper.read_athena_view(
    view_name="v_contacts",
    athena_output="s3://athena-results/",
    kms_key="kms-key-id"
)
# Athena executes query and stores result in S3
# Spark reads the result via JDBC


# === Register Athena DataFrame as temp view ===
df_athena.createOrReplaceTempView("v_contacts")
# Temp view can be used in Spark SQL


In [None]:
# === Read data from Aurora (PostgreSQL) ===
df_aurora = spark_wrapper.read_aurora_table_to_df(
    credentials=creds,
    table_name="contacts",
    database_name="app_db"
)
# JDBC read from Aurora into Spark DataFrame


# === Update table data using join logic ===
result_df = spark_wrapper.update_table_data_from_df(
    new_data_df=new_df,
    table_name="contacts",
    columns=["id", "email", "record_hash", "is_deleted"],
    id_column_names=["id"]
)
# Left join new vs old data
# New records are inserted
# Old records are kept if record_hash is the same
# is_deleted is updated from new data


# === Action (triggers Spark execution) ===
result_df.write.mode("overwrite").saveAsTable("my_database.contacts")
# Actions like write/count trigger Spark job execution