In [None]:
# Schema Validator

import boto3
import csv
import io
import os

s3 = boto3.client("s3")

EXPECTED_SCHEMA = {
    "Employee_ID": "int",
    "Employee_Name": "str",
    "College_Degree": "str",
    "Department": "str",
    "Job_Role": "str",
    "DMC_Campus": "str",
    "Email": "str",
    "Phone_Number": "int",
    "Performance_Rating": "int",
    "Submission Date": "str"
} # <-- configure according to your schema


def lambda_handler(event, context):
    for record in event["Records"]:
        bucket = record["s3"]["bucket"]["name"]
        key = record["s3"]["object"]["key"]

        if not key.startswith("raw-data/"):
            print(f"Skipping file {key}")
            continue

        print(f"Validating file: s3://{bucket}/{key}")

        try:
            obj = s3.get_object(Bucket=bucket, Key=key)
            body = obj["Body"].read().decode("utf-8").strip()
            reader = csv.DictReader(io.StringIO(body))
            headers = reader.fieldnames

            if not headers:
                print("❌ No headers found.")
                move_file(bucket, key, "invalid/")
                continue

            expected_headers = list(EXPECTED_SCHEMA.keys())
            if headers != expected_headers:
                print(f"❌ Header mismatch. Found: {headers}, Expected: {expected_headers}")
                move_file(bucket, key, "invalid/")
                continue

            bad_row_count = 0
            for row in reader:
                for col, dtype in EXPECTED_SCHEMA.items():
                    value = row.get(col, "").strip()

                    if value == "":
                        continue

                    if dtype == "int":
                        if not value.isdigit():
                            bad_row_count += 1
                            print(f"Type error at row: {row}")
                            break


            if bad_row_count > 0:
                print(f"❌ Found {bad_row_count} rows with invalid data types.")
                move_file(bucket, key, "invalid/")
            else:
                print("✅ Schema and data types valid.")
                move_file(bucket, key, "valid/")

        except Exception as e:
            print(f"⚠️ Exception while validating {key}: {e}")
            move_file(bucket, key, "invalid/")


def move_file(bucket, source_key, dest_prefix):
    filename = os.path.basename(source_key)
    dest_key = f"{dest_prefix}{filename}"

    s3.copy_object(Bucket=bucket, CopySource=f"{bucket}/{source_key}", Key=dest_key)
    s3.delete_object(Bucket=bucket, Key=source_key)

    print(f"Moved {source_key} → {dest_key}")

In [None]:
# Python Transformation Logic

import pandas as pd

df = pd.read_csv(r"YOUR_FILE")
df = df.drop_duplicates()

df['Employee_Name'] = df['Employee_Name'].str.title().str.strip()
df['Employee_Name'] = df['Employee_Name'].str.replace(r'^(Dr\.|Mr\.|Ms\.|Mrs\.)\s*|,?\s*(DVM|MD|PhD|DDS|Esq\.|Jr\.|Sr\.)$', '', regex=True).str.strip()

df['Job_Role'] = df['Job_Role'].str.title().str.strip()

df['Email'] = df['Email'].str.replace('(?i)example', 'gmail', regex=True)
df['Email'] = df['Email'].str.replace(r'(@)(.*)', lambda m: m.group(1) + m.group(2).lower(), regex=True)

df['Phone_Number'] = '+63 ' + df['Phone_Number'].astype(str)
df

# Converted to PySpark

import sys
import boto3
from datetime import datetime
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.context import SparkContext
import pyspark.sql.functions as F
import pyspark.sql.types as T

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# 🧭 Step 1: Read valid data
dyf = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={"paths": ["s3://YOUR_BUCKET_NAME/valid/"]},
    format="csv",
    format_options={"withHeader": True}
)
df = dyf.toDF().dropDuplicates()

# 🧩 Step 2: Enforce consistent datatypes
df = df.select(
    F.col("Employee_ID").cast("string").alias("Employee_ID"),
    F.col("Employee_Name").cast("string").alias("Employee_Name"),
    F.col("College_Degree").cast("string").alias("College_Degree"),
    F.col("Department").cast("string").alias("Department"),
    F.col("Job_Role").cast("string").alias("Job_Role"),
    F.col("DMC_Campus").cast("string").alias("DMC_Campus"),
    F.col("Email").cast("string").alias("Email"),
    F.col("Phone_Number").cast("string").alias("Phone_Number"),
    F.col("Performance_Rating").cast("int").alias("Performance_Rating"),
    F.col("Submission Date").cast("timestamp").alias("Submission_Date")
)

# 🧹 Step 3: Clean string columns
df = df.withColumn(
    "Employee_Name",
    F.regexp_replace(
        F.initcap(F.trim(F.col("Employee_Name"))),
        r'^(Dr\.|Mr\.|Ms\.|Mrs\.)\s*|,?\s*(DVM|MD|PhD|DDS|Esq\.|Jr\.|Sr\.)$',
        ''
    )
).withColumn(
    "Job_Role", F.initcap(F.trim(F.col("Job_Role")))
).withColumn(
    "Submission_Date", F.to_date(F.col("Submission_Date"), "yyyy-MM-dd")
).withColumn(
    "Email", F.regexp_replace(F.col("Email"), '(?i)example', 'gmail')
).withColumn(
    "Email",
    F.concat(
        F.regexp_extract(F.col("Email"), r'^[^@]+', 0),
        F.lit('@'),
        F.lower(F.regexp_extract(F.col("Email"), r'@(.+)', 1))
    )
).withColumn(
    "Phone_Number", F.concat(F.lit("+63 "), F.col("Phone_Number").cast("string"))
).filter(
    F.col("Email").isNotNull() & F.col("Phone_Number").isNotNull()
)

# ✅ Optional sanity check
expected_columns = [
    "Employee_ID", "Employee_Name", "College_Degree", "Department", "Job_Role",
    "DMC_Campus", "Email", "Phone_Number", "Performance_Rating", "Submission_Date"
]
missing = [c for c in expected_columns if c not in df.columns]
if missing:
    print(f"⚠️ Warning: Missing columns in input data: {missing}")

# 🪄 Step 4: Write cleaned data to S3 (Parquet)
cleaned_dyf = DynamicFrame.fromDF(df, glueContext, "cleaned_dyf")
sink = glueContext.getSink(
    path="s3://YOUR_BUCKET_NAME/transformed/",
    connection_type="s3",
    updateBehavior="UPDATE_IN_DATABASE",
    compression="snappy",
    enableUpdateCatalog=True,
    transformation_ctx="sink"
)
sink.setCatalogInfo(catalogDatabase="YOUR_DATABASE_NAME", catalogTableName="YOUR_CLEANED_TABLE_NAME")
sink.setFormat("glueparquet")
sink.writeFrame(cleaned_dyf)

# 📦 Step 5: Archive previous transformed files (to DEEP_ARCHIVE)
s3 = boto3.client("s3")
bucket = "YOUR_BUCKET_NAME"
response = s3.list_objects_v2(Bucket=bucket, Prefix="transformed/")
today = datetime.now().strftime("%Y-%m-%d")
archive_prefix = f"archive/{today}/"

if "Contents" in response:
    for obj in response["Contents"]:
        copy_source = {"Bucket": bucket, "Key": obj["Key"]}
        archive_key = obj["Key"].replace("transformed/", archive_prefix)
        s3.copy_object(
            CopySource=copy_source,
            Bucket=bucket,
            Key=archive_key,
            StorageClass="DEEP_ARCHIVE"
        )

print("✅ Glue transformation, type enforcement, and archiving complete.")
job.commit()

In [None]:
# LAMBDA-ATHENA GLOBAL DEDUPLICATION

import boto3

def lambda_handler(event, context):
    athena = boto3.client('athena')

    query = """
    CREATE OR REPLACE VIEW "YOUR_DATABASE_NAME"."YOUR_DEDUPLICATED_NAME" AS
    SELECT *
    FROM (
        SELECT *,
               ROW_NUMBER() OVER (PARTITION BY employee_id) AS rn
        FROM "YOUR_DATABASE_NAME"."YOUR_CLEAN_TABLE_NAME"
    )
    WHERE rn = 1;
    """

    response = athena.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Catalog': 'AwsDataCatalog',
            'Database': 'YOUR_DATABASE_NAME'
        },
        ResultConfiguration={
            'OutputLocation': 's3://YOUR_ATHENA_QUERY_LOCATION/'
        }
    )

    print(f"Athena deduplication view created. Query ID: {response['QueryExecutionId']}")

In [None]:
# Python File Exporter with Checkpoints

import pandas as pd
import boto3
import io
from datetime import datetime

ACCESS_KEY = "YOUR_SECRET_KEY"
SECRET_KEY = "YOUR_SECRET_KEY"
REGION = "YOUR_REGION"
BUCKET_NAME = "YOUR_BUCKET_NAME"
CHECKPOINT_KEY = "checkpoints/last_submission_datetime.txt"
SHEET_URL = "YOUR_FORM_LINK/export?format=csv"

s3 = boto3.client(
    "s3",
    aws_access_key_id=ACCESS_KEY,
    aws_secret_access_key=SECRET_KEY,
    region_name=REGION
)

# 🧭 Step 1: Load last checkpoint (datetime-aware)
try:
    checkpoint_obj = s3.get_object(Bucket=BUCKET_NAME, Key=CHECKPOINT_KEY)
    last_checkpoint = checkpoint_obj["Body"].read().decode("utf-8").strip()
    last_checkpoint = pd.to_datetime(last_checkpoint)
    print(f"📍 Last checkpoint: {last_checkpoint}")
except s3.exceptions.ClientError:
    last_checkpoint = None
    print("⚠️ No checkpoint found, exporting all data...")

# 🧭 Step 2: Load sheet and parse datetime
df = pd.read_csv(SHEET_URL)
df["Submission Date"] = pd.to_datetime(df["Submission Date"], errors="coerce")

if "Submission ID" in df.columns:
    df = df.drop(columns=["Submission ID"])

# 🧭 Step 3: Filter new rows based on datetime
if last_checkpoint is not None:
    df = df[df["Submission Date"] > last_checkpoint]

if df.empty:
    print("✅ No new submissions since last checkpoint.")
else:
    # ✅ For export readability — show only date in CSV
    df_export = df.copy()
    df_export["Submission Date"] = df_export["Submission Date"].dt.date

    # 🧭 Step 4: Export filtered data to S3
    timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    file_name = f"google_sheet_responses_{timestamp}.csv"
    s3_key = f"raw-data/{file_name}"

    csv_buffer = io.StringIO()
    df_export.to_csv(csv_buffer, index=False)
    s3.put_object(Bucket=BUCKET_NAME, Key=s3_key, Body=csv_buffer.getvalue())
    print(f"✅ Uploaded new data to s3://{BUCKET_NAME}/{s3_key}")

    # 🧭 Step 5: Update checkpoint with full datetime precision
    new_checkpoint = df["Submission Date"].max()
    new_checkpoint_str = new_checkpoint.strftime("%Y-%m-%d %H:%M:%S")
    s3.put_object(Bucket=BUCKET_NAME, Key=CHECKPOINT_KEY, Body=new_checkpoint_str)
    print(f"🕒 Updated checkpoint to {new_checkpoint_str}")