### Data Transformation and Storage Script

#### Overview
This script processes and stores the latest customer churn data by performing the following steps:

## Steps

1. Fetch Latest Data from S3
- Identifies the most recent **processed train** and **API data** files in S3.
- Downloads them to a local temporary folder.

2. Apply Transformations
- **Normalization & Scaling:** 
  - `CreditScore`, `Age`, `Balance`, and `EstimatedSalary` are normalized or standardized.
- **Feature Engineering:** 
  - `BalancePerProduct` = `Balance` / `NumOfProducts`
  - Missing values in `BalancePerProduct` are set to 0.

3. Save and Upload
- Saves transformed files locally in the **transformed_data/** folder.
- Uploads the transformed files back to **S3 (transformed_data/ folder).**

4. Store in PostgreSQL with Versioning
- Appends transformed data to:
  - **`transformed_train_data`** (for training).
  - **`transformed_api_data`** (for inference).
- Includes a **timestamp-based version** for tracking.

#### Final Output
- **Transformed data is stored in PostgreSQL, versioned, and uploaded to S3.**  
**Process completed successfully!**


In [3]:
# uploads data in Postgres and Transformed folder in S3
import os
import pandas as pd
import boto3
import sqlalchemy
from sqlalchemy import create_engine
from urllib.parse import quote_plus
from datetime import datetime

# AWS S3 Configuration
S3_BUCKET = "dmml-bank-churn-data"
S3_PROCESSED_FOLDER = "processed_data/"
S3_TRANSFORMED_FOLDER = "transformed_data/"
s3_client = boto3.client("s3")

# PostgreSQL Configuration
PASSWORD = quote_plus("131412aA@")  # Properly encode special characters
DB_URI = f"postgresql+psycopg2://postgres:{PASSWORD}@localhost:5432/bank_churn"
engine = create_engine(DB_URI)

# Local storage paths
LOCAL_TRANSFORMED_FOLDER = "transformed_data/"
LOCAL_TEMP_FOLDER = os.path.join(os.getcwd(), "temp")
os.makedirs(LOCAL_TRANSFORMED_FOLDER, exist_ok=True)
os.makedirs(LOCAL_TEMP_FOLDER, exist_ok=True)

# Function to get latest file from S3
def get_latest_s3_file(prefix):
    response = s3_client.list_objects_v2(Bucket=S3_BUCKET, Prefix=prefix)
    files = [obj["Key"] for obj in response.get("Contents", [])]
    return max(files, key=lambda x: x.split("_")[-1]) if files else None

# Fetch latest train and API data from S3
latest_train_file = get_latest_s3_file(S3_PROCESSED_FOLDER + "prepared_train_data_")
latest_api_file = get_latest_s3_file(S3_PROCESSED_FOLDER + "prepared_api_data_")

# Download and load data
def load_s3_csv(file_key):
    if not file_key:
        raise FileNotFoundError("No matching file found on S3.")
    local_file = os.path.join(LOCAL_TEMP_FOLDER, os.path.basename(file_key))
    s3_client.download_file(S3_BUCKET, file_key, local_file)
    return pd.read_csv(local_file)

df_train = load_s3_csv(latest_train_file)
df_api = load_s3_csv(latest_api_file)

# Apply transformations (excluding 'Exited' for API data)
def transform_data(df, is_api=False):
    df["CreditScore"] = df["CreditScore"].apply(lambda x: (x - df["CreditScore"].min()) / (df["CreditScore"].max() - df["CreditScore"].min()))
    df["Age"] = (df["Age"] - df["Age"].mean()) / df["Age"].std()
    df["Balance"] = (df["Balance"] - df["Balance"].mean()) / df["Balance"].std()
    df["EstimatedSalary"] = (df["EstimatedSalary"] - df["EstimatedSalary"].mean()) / df["EstimatedSalary"].std()
    
    # Derived feature: Balance per Product
    df["BalancePerProduct"] = df["Balance"] / df["NumOfProducts"]
    df["BalancePerProduct"] = df["BalancePerProduct"].fillna(0)
    
    if is_api:
        df.drop(columns=["Exited"], errors='ignore', inplace=True)
    
    return df

df_train_transformed = transform_data(df_train)
df_api_transformed = transform_data(df_api, is_api=True)

# Save locally
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
train_local_file = os.path.join(LOCAL_TRANSFORMED_FOLDER, f"transformed_train_data_{timestamp}.csv")
api_local_file = os.path.join(LOCAL_TRANSFORMED_FOLDER, f"transformed_api_data_{timestamp}.csv")
df_train_transformed.to_csv(train_local_file, index=False)
df_api_transformed.to_csv(api_local_file, index=False)

# Upload back to S3 with correct folder
def upload_to_s3(file_path, folder):
    if os.path.exists(file_path):
        s3_client.upload_file(file_path, S3_BUCKET, folder + os.path.basename(file_path))
    else:
        print(f"⚠️ File not found for upload: {file_path}")

upload_to_s3(train_local_file, S3_TRANSFORMED_FOLDER)
upload_to_s3(api_local_file, S3_TRANSFORMED_FOLDER)

# Store in PostgreSQL with versioning
def store_in_sql(df, table_name):
    df["version"] = timestamp
    df.to_sql(table_name, engine, if_exists="append", index=False)

store_in_sql(df_train_transformed, "transformed_train_data")
store_in_sql(df_api_transformed, "transformed_api_data")

print("✅ Data transformation, storage, and versioning completed successfully!")


✅ Data transformation, storage, and versioning completed successfully!


In [7]:
# Create a database connection
conn = engine.connect()

# Query to get unique versions from both tables
query = """
SELECT 'transformed_train_data' AS table_name, version FROM transformed_train_data
UNION
SELECT 'transformed_api_data' AS table_name, version FROM transformed_api_data
ORDER BY version DESC;
"""

# Execute the query and fetch results
df_versions = pd.read_sql(query, conn)

# Display the versions
print(df_versions)

# Close the connection
conn.close()


               table_name          version
0    transformed_api_data  20250311_121740
1  transformed_train_data  20250311_121740
2    transformed_api_data  20250310_232359
3  transformed_train_data  20250310_232359
4  transformed_train_data  20250310_231953
5    transformed_api_data  20250310_231953
6  transformed_train_data  20250310_144945
7    transformed_api_data  20250310_144945
8  transformed_train_data  20250310_143250
9    transformed_api_data  20250310_143250
