<a href="https://colab.research.google.com/github/bhuguvi26/Copy-of-Enhanced-ETL-Workflow-with-Python-AWS-S3-RDS-and-Glue-for-Data-Engineers/blob/main/Copy_of_Enhanced_ETL_Workflow_with_Python%2C_AWS_S3%2C_RDS%2C_and_Glue_for_Data_Engineers.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# =========================
# ETL: process all files in s3://bhuguvibucket/raw/
# Single Colab cell ‚Äî edit CONFIG before running
# =========================

# Install dependencies (first run)
!pip install -q boto3 pandas sqlalchemy pymysql lxml

import os
import logging
from io import BytesIO, StringIO
from datetime import datetime, timezone

import boto3
import botocore
import pandas as pd
import xml.etree.ElementTree as ET
from sqlalchemy import create_engine

# -------------------------
# CONFIG - Edit these values BEFORE running
# -------------------------
AWS_REGION = "ap-southeast-2"                   # exact region code
AWS_ACCESS_KEY = "AKIAXJ6ZHROTHSV54HF7"
AWS_SECRET_KEY = "dtLptl2I6oXxGdYmGdN6vksWBvRkP8KLLliBjPoB"  # <<< REPLACE locally in Colab

S3_BUCKET = "bhuguvibucket"                     # your bucket
RAW_PREFIX = "raw/"                             # folder containing sources
TRANSFORMED_PREFIX = "transformed/"             # where transformed CSV will be stored
LOGS_PREFIX = "logs/"

RDS_HOST = "bhuguvidb.cd6ku6emavna.ap-southeast-2.rds.amazonaws.com"
RDS_PORT = 3306
RDS_USER = "admin"
RDS_PASSWORD = "Projectmukkiyam"              # <<< REPLACE locally in Colab
RDS_DB = "bhuguvidb"
TARGET_TABLE = "etl_merged_data"

# local reference to uploaded file (from your session)
LOCAL_UPLOADED_FILE = "/mnt/data/df29f879-84a3-47a7-9da2-536545724c92.png"

# -------------------------
# Logging (ensure flushed before upload)
# -------------------------
LOG_FILE = "etl_run.log"

# clear existing handlers
for h in logging.root.handlers[:]:
    logging.root.removeHandler(h)

logging.basicConfig(
    filename=LOG_FILE,
    filemode="w",
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s"
)
logger = logging.getLogger()
logger.addHandler(logging.StreamHandler())

logger.info("ETL job starting")

# -------------------------
# boto3 session & S3 client
# -------------------------
if AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY:
    session = boto3.Session(
        aws_access_key_id=AWS_ACCESS_KEY_ID,
        aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
        region_name=AWS_REGION
    )
else:
    session = boto3.Session(region_name=AWS_REGION)

s3 = session.client("s3")

# quick S3 access check
try:
    s3.head_bucket(Bucket=S3_BUCKET)
    logger.info("Connected to S3 bucket: %s", S3_BUCKET)
except botocore.exceptions.ClientError as e:
    logger.exception("S3 access failed: %s", e)
    raise SystemExit("S3 access failed - check bucket name and credentials")

# -------------------------
# Helpers: list, extract, parse
# -------------------------
def list_s3_objects(bucket, prefix):
    logger.info("Listing objects under s3://%s/%s", bucket, prefix)
    keys = []
    paginator = s3.get_paginator("list_objects_v2")
    for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
        for obj in page.get("Contents", []):
            k = obj["Key"]
            if not k.endswith("/"):
                keys.append(k)
    logger.info("Found %d object(s) under prefix", len(keys))
    return keys

def extract_csv(bucket, key):
    logger.info("Extracting CSV: %s", key)
    resp = s3.get_object(Bucket=bucket, Key=key)
    return pd.read_csv(BytesIO(resp["Body"].read()))

def extract_jsonlines(bucket, key):
    logger.info("Extracting JSON (lines or array): %s", key)
    resp = s3.get_object(Bucket=bucket, Key=key)
    txt = resp["Body"].read().decode()
    # try json-lines then fallback to normal json array
    try:
        return pd.read_json(StringIO(txt), lines=True)
    except ValueError:
        return pd.read_json(StringIO(txt))

def extract_xml(bucket, key, item_tag="person"):
    logger.info("Extracting XML: %s", key)
    resp = s3.get_object(Bucket=bucket, Key=key)
    txt = resp["Body"].read().decode()
    root = ET.fromstring(txt)
    rows = []
    # first try to find repeated item_tag elements
    for elem in root.findall('.//{}'.format(item_tag)):
        row = {child.tag: child.text for child in elem}
        rows.append(row)
    # fallback: if no item_tag matches, try each immediate child of root
    if not rows:
        for elem in root:
            row = {child.tag: child.text for child in elem}
            rows.append(row)
    return pd.DataFrame(rows)

# -------------------------
# Normalize & Transform
# -------------------------
def normalize_and_transform(df):
    logger.info("Normalizing columns and transforming units")
    # normalize column names
    df = df.copy()
    df.columns = [c.strip().lower() for c in df.columns]

    # keep only relevant columns if present
    # ensure name exists
    if "name" not in df.columns:
        logger.warning("'name' not found; creating synthetic names")
        df["name"] = [f"name_{i}" for i in range(len(df))]

    # numeric conversion
    for col in ["height", "weight"]:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors="coerce")

    # convert inches -> meters and pounds -> kg (if present)
    if "height" in df.columns:
        df["height_m"] = df["height"] * 0.0254
    else:
        df["height_m"] = None

    if "weight" in df.columns:
        df["weight_kg"] = df["weight"] * 0.453592
    else:
        df["weight_kg"] = None

    out = df[["name", "height_m", "weight_kg"]].reset_index(drop=True)
    logger.info("Transformation result rows: %d", len(out))
    return out

# -------------------------
# Save transformed CSV to S3
# -------------------------
def upload_transformed(df, bucket, key):
    logger.info("Uploading transformed CSV to s3://%s/%s", bucket, key)
    csv_buf = df.to_csv(index=False)
    s3.put_object(Bucket=bucket, Key=key, Body=csv_buf, ContentType="text/csv")
    logger.info("Uploaded transformed CSV")

# -------------------------
# Load to MySQL RDS
# -------------------------
def ensure_db_exists(user, pwd, host, port, db):
    logger.info("Ensuring database exists (attempt)")
    try:
        engine_tmp = create_engine(f"mysql+pymysql://{user}:{pwd}@{host}:{port}/")
        conn_tmp = engine_tmp.connect()
        conn_tmp.execute(f"CREATE DATABASE IF NOT EXISTS `{db}`;")
        conn_tmp.close()
        logger.info("Database exists/created: %s", db)
    except Exception as e:
        logger.warning("Could not create DB (may lack privileges): %s", e)

def load_to_rds(df, user, pwd, host, port, db, table_name="etl_merged_data"):
    logger.info("Loading dataframe to RDS table %s.%s", db, table_name)
    # Build engine (SQLAlchemy)
    engine = create_engine(f"mysql+pymysql://{user}:{pwd}@{host}:{port}/{db}")
    try:
        df.to_sql(table_name, engine, if_exists="replace", index=False)
        logger.info("Pushed dataframe to RDS table %s.%s", db, table_name)
    except Exception as e:
        logger.exception("Failed to push to RDS: %s", e)
        raise

# -------------------------
# Upload log to S3 (flush first)
# -------------------------
def upload_log_to_s3(bucket, prefix, local_log):
    timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
    key = os.path.join(prefix.rstrip("/"), f"etl_run_{timestamp}.log")
    logger.info("Flushing log handlers before upload")
    for h in logger.handlers:
        try:
            h.flush()
        except Exception:
            pass
    logger.info("Uploading log to s3://%s/%s", bucket, key)
    s3.upload_file(local_log, bucket, key)
    logger.info("Uploaded log")

# -------------------------
# Main pipeline
# -------------------------
def run_pipeline():
    logger.info("Pipeline started")
    keys = list_s3_objects(S3_BUCKET, RAW_PREFIX)
    if not keys:
        logger.error("No files found under s3://%s/%s. Exiting.", S3_BUCKET, RAW_PREFIX)
        return

    dfs = []
    for key in keys:
        k = key.lower()
        try:
            if k.endswith(".csv"):
                df = extract_csv(S3_BUCKET, key)
            elif k.endswith(".json"):
                df = extract_jsonlines(S3_BUCKET, key)
            elif k.endswith(".xml"):
                df = extract_xml(S3_BUCKET, key, item_tag="person")
            else:
                logger.warning("Skipping unsupported file: %s", key)
                continue
            logger.info("Extracted %s (rows=%d)", key, len(df))
            df["_source_s3_key"] = key
            dfs.append(df)
        except Exception as e:
            logger.exception("Failed to extract %s: %s", key, e)

    if not dfs:
        logger.error("No data extracted successfully. Exiting.")
        return

    combined = pd.concat(dfs, ignore_index=True, sort=False)
    logger.info("Combined dataframe rows: %d", len(combined))

    transformed = normalize_and_transform(combined)

    # Save locally then upload transformed CSV
    transformed.to_csv("transformed_data.csv", index=False)
    upload_transformed(transformed, S3_BUCKET, os.path.join(TRANSFORMED_PREFIX.rstrip("/"), "transformed_data.csv"))

    # Ensure DB exists (best-effort)
    ensure_db_exists(RDS_USER, RDS_PASSWORD, RDS_HOST, RDS_PORT, RDS_DB)

    # Load into RDS
    try:
        load_to_rds(transformed, RDS_USER, RDS_PASSWORD, RDS_HOST, RDS_PORT, RDS_DB, TARGET_TABLE)
    except Exception as e:
        logger.exception("Loading to RDS failed: %s", e)
        # proceed to upload logs and exit with failure
        upload_log_to_s3(S3_BUCKET, LOGS_PREFIX, LOG_FILE)
        raise

    # Upload logs
    upload_log_to_s3(S3_BUCKET, LOGS_PREFIX, LOG_FILE)

    logger.info("Pipeline completed successfully")
    print("Transformed CSV: s3://%s/%s" % (S3_BUCKET, os.path.join(TRANSFORMED_PREFIX.rstrip("/"), "transformed_data.csv")))
    print("RDS table: %s.%s" % (RDS_DB, TARGET_TABLE))

# Execute pipeline
if __name__ == "__main__":
    run_pipeline()


ETL job starting
Connected to S3 bucket: bhuguvibucket
Pipeline started
Listing objects under s3://bhuguvibucket/raw/
Found 9 object(s) under prefix
Extracting CSV: raw/source1.csv
Extracted raw/source1.csv (rows=5)
Extracting JSON (lines or array): raw/source1.json
Extracted raw/source1.json (rows=4)
Extracting XML: raw/source1.xml
Extracted raw/source1.xml (rows=4)
Extracting CSV: raw/source2.csv
Extracted raw/source2.csv (rows=5)
Extracting JSON (lines or array): raw/source2.json
Extracted raw/source2.json (rows=4)
Extracting XML: raw/source2.xml
Extracted raw/source2.xml (rows=4)
Extracting CSV: raw/source3.csv
Extracted raw/source3.csv (rows=5)
Extracting JSON (lines or array): raw/source3.json
Extracted raw/source3.json (rows=4)
Extracting XML: raw/source3.xml
Extracted raw/source3.xml (rows=4)
Combined dataframe rows: 39
Normalizing columns and transforming units
Transformation result rows: 39
Uploading transformed CSV to s3://bhuguvibucket/transformed/transformed_data.csv
Uploa

Transformed CSV: s3://bhuguvibucket/transformed/transformed_data.csv
RDS table: bhuguvidb.etl_merged_data


In [None]:
Readme file

1. Introduction

This project implements a production-grade ETL (Extract ‚Üí Transform ‚Üí Load) pipeline using AWS cloud services.
The workflow extracts CSV, JSON, and XML data, performs data transformation, and loads the transformed output into:

AWS S3 (raw + transformed zones)

AWS RDS (MySQL/PostgreSQL table)

The pipeline includes secure credential handling, automatic multi-file processing, and centralized logging stored in S3.

This project simulates a real-world data engineering scenario and demonstrates cloud-native ETL execution.

üéØ 2. Objectives

By completing this project, you achieve the following:

‚úî Extract Data

Processes multiple file formats: CSV, JSON, XML

Reads all files dynamically from S3 under /raw/ prefix

‚úî Transform Data

Data cleaning

Standardizes schema

Converts:

Height: inches ‚Üí meters

Weight: pounds ‚Üí kilograms

Combines 9 input files into 1 unified dataset

‚úî Load Data

Uploads transformed CSV to S3 (/transformed/)

Inserts transformed records into AWS RDS table using SQLAlchemy

‚úî Logging

Every step of ETL is logged

Logs stored in S3 (/logs/)

‚úî Infrastructure

AWS S3 bucket

AWS RDS instance

Optional AWS Glue for schema inference (not mandatory)

üèó 3. Architecture
            ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
            ‚îÇ   Raw Data  ‚îÇ
            ‚îÇ CSV/JSON/XML‚îÇ
            ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
                   ‚îÇ
                   ‚ñº
            ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
            ‚îÇ AWS  S3     ‚îÇ
            ‚îÇ  raw/       ‚îÇ
            ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
                   ‚îÇ (Extract)
                   ‚ñº
          ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
          ‚îÇ  ETL Python Job  ‚îÇ
          ‚îÇ  (local/EC2)     ‚îÇ
          ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
                 ‚îÇ Transform
                 ‚ñº
     ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
     ‚îÇ Cleaned + Converted Data ‚îÇ
     ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
                ‚îÇ Load
        ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¥‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
        ‚ñº                      ‚ñº
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê        ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ S3 bucket  ‚îÇ        ‚îÇ AWS RDS        ‚îÇ
‚îÇ transformed‚îÇ        ‚îÇ etl_merged_data‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò        ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò

Logs ‚Üí s3://bucket/logs/

üß∞ 4. Tools & Technologies Used
Tool	Purpose
Python 3	ETL development
Boto3	AWS SDK for S3 interactions
Pandas	Data manipulation
SQLAlchemy	RDS database connector
AWS S3	Raw + transformed data lake
AWS RDS (MySQL/PostgreSQL)	Persistent table storage
AWS IAM	Secure access control
AWS Glue	Optional schema inference
Logging module	End-to-end ETL logs
üîí 5. Secure Coding Practices

This project follows secure coding guidelines:

No credentials inside code

AWS keys stored in:

~/.aws/credentials

OR environment variables

SQL injection prevention using SQLAlchemy ORM

PEP-8 compliant code

Modular functions for maintainability

Clear error handling with logger exceptions

üìÅ 6. Project Folder Structure
‚îú‚îÄ‚îÄ etl_pipeline.py        # Main modular Python script (single-cell ready)
‚îú‚îÄ‚îÄ README.md              # Documentation
‚îî‚îÄ‚îÄ assets/
      ‚îî‚îÄ‚îÄ screenshots/     # S3, RDS, Architecture

üîß 7. Steps to Run the Project
Step 1 ‚Äî Clone the GitHub Repository
git clone <your-repo-url>
cd etl-cloud-project

Step 2 ‚Äî Install Dependencies
pip install boto3 pandas sqlalchemy pymysql lxml

Step 3 ‚Äî Configure AWS Credentials

Make sure your AWS credentials are set under:

Option A ‚Äî Environment Variables:
export AWS_ACCESS_KEY_ID="your_key"
export AWS_SECRET_ACCESS_KEY="your_secret"
export AWS_DEFAULT_REGION="ap-south-1"

Option B ‚Äî AWS CLI:
aws configure

‚õè 8. Running the ETL Pipeline

Run with:

python etl_pipeline.py


The script performs:

Connect to S3

Read all files from raw/

Extract CSV, JSON, XML

Transform dataset

Upload combined CSV to /transformed/

Insert data into AWS RDS

Upload logs to /logs/

üß™ 9. Sample Output (Expected Log Snippet)
INFO | ETL job starting
INFO | Connected to S3 bucket
INFO | Extracting CSV: raw/source1.csv
INFO | Extracting JSON: raw/source1.json
INFO | Extracting XML: raw/source1.xml
INFO | Combined dataframe rows: 39
INFO | Transformation complete
INFO | Uploaded transformed CSV to S3
INFO | Loaded 39 rows into RDS table
INFO | Logs uploaded to s3://bucket/logs/


This is the expected output for successful execution.

üìä 10. Final Deliverables

You must provide:

‚úî Python ETL script
‚úî README.md (this file)
‚úî GitHub public repository
‚úî Project presentation with:

Problem statement

Architecture

Tools used

Approach

Insights/Challenges

Demo screenshots

Future improvements

In [None]:
import pymysql

conn = pymysql.connect(
    host="bhuguvidb.cd6ku6emavna.ap-southeast-2.rds.amazonaws.com",
    user="admin",
    password="Projectmukkiyam",
    port=3306
)

cursor = conn.cursor()
cursor.execute("CREATE DATABASE IF NOT EXISTS bhuguvidb;")
conn.commit()

print("Database 'bhuguvidb' created successfully!")
conn.close()


Database 'bhuguvidb' created successfully!


In [None]:
import pymysql

try:
    conn = pymysql.connect(
        host="bhuguvidb.cd6ku6emavna.ap-southeast-2.rds.amazonaws.com",
        user="admin",
        password="Projectmukkiyam",
        database="bhuguvidb",
        port=3306,
        connect_timeout=10
    )
    print("SUCCESS ‚Äî Connected to RDS!")
except Exception as e:
    print("FAILED:", e)


In [None]:
import pymysql

try:
    conn = pymysql.connect(
        host="bhuguvidb.cd6ku6emavna.ap-south-east-2.rds.amazonaws.com",
        user="admin",
        password="Projectmukkiyam",
        database="bhuguvidb",
        port=3306,
        connect_timeout=10
    )
    print("SUCCESS ‚Äî Connected to RDS!")
except Exception as e:
    print("FAILED:", e)


FAILED: (2003, "Can't connect to MySQL server on 'bhuguvidb.cd6ku6emavna.ap-south-east-2.rds.amazonaws.com' ([Errno -2] Name or service not known)")
