In [1]:
# imports

import os
from dotenv import load_dotenv
from openai import OpenAI
from IPython.display import Markdown, display, update_display

In [2]:
# Load environment variables in a file called .env
# Print the key prefixes to help with any debugging

load_dotenv()
openai_api_key = os.getenv('OPENAI_API_KEY')

if openai_api_key:
    print(f"OpenAI API Key exists and begins {openai_api_key[:8]}")
else:
    print("OpenAI API Key not set")

OpenAI API Key exists and begins sk-proj-


In [3]:
# Initialize and constants

load_dotenv(override=True)
api_key = os.getenv('OPENAI_API_KEY')

if api_key and api_key.startswith('sk-proj-') and len(api_key)>10:
    print("API key looks good so far")
else:
    print("There might be a problem with your API key? Please visit the troubleshooting notebook!")
    
MODEL = 'gpt-4o-mini'
openai = OpenAI()

API key looks good so far


In [None]:
# In this section i am passing all the required variables and doing the data cleaning on rawdata file by automated llm code. 
import boto3
import pandas as pd
import openai
import io
import json
import os
from dotenv import load_dotenv

# Load credentials from .env
load_dotenv()

AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY")
AWS_SECRET_KEY = os.getenv("AWS_SECRET_KEY")
AWS_REGION = os.getenv("AWS_REGION")
S3_BUCKET = os.getenv("S3_BUCKET_NAME")

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
openai.api_key = OPENAI_API_KEY

RAW_FOLDER = "youtube-rawdata/"
CLEANED_FOLDER = "youtube-cleandata/"

# Initialize S3 client
s3 = boto3.client(
    "s3",
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY,
    region_name=AWS_REGION
)

def read_csv_from_s3(key):
    """Read CSV file from S3 and return DataFrame."""
    response = s3.get_object(Bucket=S3_BUCKET, Key=key)
    return pd.read_csv(io.BytesIO(response['Body'].read()))

def read_json_from_s3(key):
    """Read JSON file from S3 and return dictionary."""
    response = s3.get_object(Bucket=S3_BUCKET, Key=key)
    return json.loads(response['Body'].read().decode('utf-8'))

def generate_cleaning_prompt(df, row_count, col_info, file_name):
    """Generate prompt for OpenAI to suggest cleaning code."""
    return f"""
    You are tasked with cleaning data from the file: {file_name}. This data is part of a dynamic ETL pipeline and needs to undergo basic but essential data cleaning. 
    Here’s key information about the dataset:

    - File Name: {file_name}
    - Total Rows: {row_count}
    - Columns and Data Types:
    {col_info}

    - Sample Rows (first 10):
    {df.head(10).to_string(index=False)}

    - Summary Statistics:
    {df.describe(include='all').to_string()}

    - Missing Values Per Column:
    {df.isnull().sum().to_string()}

    - Known Data Quality Rules:
        1. Timestamps (like publish_time) must be valid and standardized (ISO 8601 format preferred).
        2. Numeric columns (views, likes, dislikes) must be non-negative.
        3. Duplicate rows should be removed.
        4. Tags should be normalized by removing special characters.
        5. Category IDs must be present and can later be mapped to category names.
        6. String columns (title, description) should have leading/trailing whitespace removed.
        7. Handle all missing values appropriately (for example, missing tags should be 'No Tags').

    Please generate Python code (using pandas) that:
    - Cleans and validates the data according to the rules above.
    - Removes unnecessary columns if they appear redundant.
    - Adds appropriate comments to the code so the logic is clear.
    - Only return Python code (no explanations).

    Your output must be a **complete, executable pandas function called 'clean_youtube_data(df)'** which returns the cleaned DataFrame.
    """

def call_openai_for_cleaning_code(prompt):
    """Call OpenAI GPT-4o-mini to get Python code for cleaning."""
    systemprompt = f"""
You are a highly skilled data engineer with expertise in data quality, data profiling, and data cleaning. 
You specialize in writing Python code (using pandas) to clean, enrich, and validate data in ETL pipelines. 
You follow best practices for:
- Handling missing values
- Standardizing timestamps
- Removing duplicates
- Fixing schema inconsistencies
- Validating numeric columns
- Normalizing text columns (like tags)
- Mapping columns where needed (like category_id to category_name)
Your job is to always return clean, well-commented Python code that works directly on a pandas DataFrame called 'df'. You are not allowed to return explanations, only Python code.
"""
    print(systemprompt)
    print(prompt)
    response = openai.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": systemprompt},
            {"role": "user", "content": prompt}
        ]
    )
    print(response)
    resultcode = response.choices[0].message.content
    print(resultcode)
    return resultcode

def clean_openai_code_response(response_text):
    """Remove markdown-like ```python and ``` from LLM code blocks."""

    if response_text is None:
        raise ValueError("Received no cleaning code from OpenAI.")
    
    if response_text.startswith("```"):
        response_text = response_text.split("\n", 1)[1]  # Remove first line
    if response_text.endswith("```"):
        response_text = response_text.rsplit("\n", 1)[0]  # Remove last line
    return response_text

def save_cleaning_code_to_file(cleaning_code, filename):
    """Save the cleaning code to a text file for audit."""
    code_filename = filename.replace('.csv', '_cleaning_code.txt')
    with open(code_filename, 'w', encoding='utf-8') as file:
        file.write(cleaning_code)
    print(f"📝 Cleaning code saved to {code_filename}")


def execute_cleaning_code(df, cleaning_code):
    """Execute cleaning code dynamically."""
    globals_dict = {"pd": pd, "df": df}
    exec(cleaning_code, globals_dict)
    if 'df' not in globals_dict:
        raise Exception("The cleaning code did not return 'df'.")
    return globals_dict['df']

def save_cleaned_data_to_s3(df, cleaned_filename):
    """Save cleaned data to S3."""
    csv_buffer = io.StringIO()
    df.to_csv(csv_buffer, index=False)
    s3.put_object(Bucket=S3_BUCKET, Key=f"{CLEANED_FOLDER}{cleaned_filename}", Body=csv_buffer.getvalue())
    print(f"✅ Cleaned data saved to s3://{S3_BUCKET}/{CLEANED_FOLDER}{cleaned_filename}")

def main(video_data_filename):
    print(f"📥 Reading raw data file: {video_data_filename} from S3...")

    # Load raw video data
    video_df = read_csv_from_s3(f"{RAW_FOLDER}{video_data_filename}")

    # Generate additional metadata for the prompt
    row_count = len(video_df)
    col_info = "\n".join([f"    - {col} ({dtype})" for col, dtype in video_df.dtypes.items()])

    print(row_count)
    print(col_info)
    # Generate the cleaning prompt for OpenAI
    cleaning_prompt = generate_cleaning_prompt(video_df, row_count, col_info, video_data_filename)

    print("\n🧠 Sending data to OpenAI for cleaning code generation...")
    cleaning_code = call_openai_for_cleaning_code(cleaning_prompt)
    
    print("\n Removing the starting markdown text")
    cleaning_code = clean_openai_code_response(cleaning_code)
    
    print("\n Saving the cleaning code to a txt file")
    save_cleaning_code_to_file(cleaning_code, video_data_filename)
    print("\n🔹 Received Cleaning Code:\n")
    print(cleaning_code)

    # Execute cleaning code dynamically
    print("\n🚀 Applying cleaning logic...")
    cleaned_df = execute_cleaning_code(video_df, cleaning_code)

    # Save cleaned data back to S3 (append "_cleaned" to filename)
    print("\n Saving the cleaning file to other s3 folder")
    cleaned_filename = video_data_filename.replace('.csv', '_cleaned.csv')
    save_cleaned_data_to_s3(cleaned_df, cleaned_filename)

    print(f"\n✅ Data cleaning complete and saved to {CLEANED_FOLDER}{cleaned_filename}")

if __name__ == "__main__":
    # Example: python youtube_cleaning_stage.py youtube_video_data.csv
    import sys
    if len(sys.argv) != 2:
        print("Usage: python youtube_cleaning_stage.py <video_data_filename>")
        sys.exit(1)
    
    video_data_filename = "INvideos.csv"
    main(video_data_filename)


📥 Reading raw data file: INvideos.csv from S3...
37352
    - video_id (object)
    - trending_date (object)
    - title (object)
    - channel_title (object)
    - category_id (int64)
    - publish_time (object)
    - tags (object)
    - views (int64)
    - likes (int64)
    - dislikes (int64)
    - comment_count (int64)
    - thumbnail_link (object)
    - comments_disabled (bool)
    - ratings_disabled (bool)
    - video_error_or_removed (bool)
    - description (object)

🧠 Sending data to OpenAI for cleaning code generation...

You are a highly skilled data engineer with expertise in data quality, data profiling, and data cleaning. 
You specialize in writing Python code (using pandas) to clean, enrich, and validate data in ETL pipelines. 
You follow best practices for:
- Handling missing values
- Standardizing timestamps
- Removing duplicates
- Fixing schema inconsistencies
- Validating numeric columns
- Normalizing text columns (like tags)
- Mapping columns where needed (like catego



✅ Cleaned data saved to s3://eltpipelinellmautomation/youtube-cleandata/INvideos_cleaned.csv

✅ Data cleaning complete and saved to youtube-cleandata/INvideos_cleaned.csv


In [12]:
# In this section I am passing all the required variables and doing the data cleaning on rawdata file by automated llm code. Included Error Handling. 
import boto3
import pandas as pd
import openai
import io
import json
import os
from dotenv import load_dotenv

# Load credentials from .env
load_dotenv()

AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY")
AWS_SECRET_KEY = os.getenv("AWS_SECRET_KEY")
AWS_REGION = os.getenv("AWS_REGION")
S3_BUCKET = os.getenv("S3_BUCKET_NAME")

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
openai.api_key = OPENAI_API_KEY

RAW_FOLDER = "youtube-rawdata/"
CLEANED_FOLDER = "youtube-cleandata/"

# Initialize S3 client
s3 = boto3.client(
    "s3",
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY,
    region_name=AWS_REGION
)

def read_csv_from_s3(key):
    """Read CSV file from S3 and return DataFrame."""
    try:
        response = s3.get_object(Bucket=S3_BUCKET, Key=key)
        return pd.read_csv(io.BytesIO(response['Body'].read()))
    except Exception as e:
        raise RuntimeError(f"❌ Failed to read {key} from S3: {e}")

def read_json_from_s3(key):
    """Read JSON file from S3 and return dictionary."""
    try:
        response = s3.get_object(Bucket=S3_BUCKET, Key=key)
        return json.loads(response['Body'].read().decode('utf-8'))
    except Exception as e:
        raise RuntimeError(f"❌ Failed to read {key} from S3: {e}")

def generate_cleaning_prompt(df, row_count, col_info, file_name):
    """Generate prompt for OpenAI to suggest cleaning code."""
    return f"""
    You are tasked with cleaning data from the file: {file_name}. This data is part of a dynamic ETL pipeline and needs to undergo basic but essential data cleaning. 
    Here’s key information about the dataset:

    - File Name: {file_name}
    - Total Rows: {row_count}
    - Columns and Data Types:
    {col_info}

    - Sample Rows (first 10):
    {df.head(10).to_string(index=False)}

    - Summary Statistics:
    {df.describe(include='all').to_string()}

    - Missing Values Per Column:
    {df.isnull().sum().to_string()}

    - Known Data Quality Rules:
        1. Timestamps (like publish_time) must be valid and standardized (ISO 8601 format preferred).
        2. Numeric columns (views, likes, dislikes) must be non-negative.
        3. Duplicate rows should be removed.
        4. Tags should be normalized by removing special characters.
        5. Category IDs must be present and can later be mapped to category names.
        6. String columns (title, description) should have leading/trailing whitespace removed.
        7. Handle all missing values appropriately (for example, missing tags should be 'No Tags').

    Please generate Python code (using pandas) that:
    - Cleans and validates the data according to the rules above.
    - Removes unnecessary columns if they appear redundant.
    - Adds appropriate comments to the code so the logic is clear.
    - Only return Python code (no explanations).

    Your output must be a **complete, executable pandas function called 'clean_youtube_data(df)'** which returns the cleaned DataFrame.
    """

def call_openai_for_cleaning_code(prompt):
    """Call OpenAI GPT-4o-mini to get Python code for cleaning, with error handling."""
    systemprompt = f"""
You are a highly skilled data engineer with expertise in data quality, data profiling, and data cleaning. 
You specialize in writing Python code (using pandas) to clean, enrich, and validate data in ETL pipelines. 
You follow best practices for:
- Handling missing values
- Standardizing timestamps
- Removing duplicates
- Fixing schema inconsistencies
- Validating numeric columns
- Normalizing text columns (like tags)
- Mapping columns where needed (like category_id to category_name)
Your job is to always return clean, well-commented Python code that works directly on a pandas DataFrame called 'df'. You are not allowed to return explanations, only Python code.
"""

    print(systemprompt)
    print(prompt)

    try:
        response = openai.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": systemprompt},
                {"role": "user", "content": prompt}
            ]
        )
        if response and response.choices and len(response.choices) > 0:
            resultcode = response.choices[0].message.content
            return resultcode
        else:
            raise ValueError("❌ OpenAI returned an empty or invalid response.")
    except Exception as e:
        raise RuntimeError(f"❌ Error calling OpenAI API: {e}")

def clean_openai_code_response(response_text):
    """Remove markdown-like ```python and ``` from LLM code blocks."""
    if response_text is None:
        raise ValueError("❌ Received no cleaning code from OpenAI.")

    if response_text.startswith("```"):
        response_text = response_text.split("\n", 1)[1]
    if response_text.endswith("```"):
        response_text = response_text.rsplit("\n", 1)[0]
    return response_text

def save_cleaning_code_to_file(cleaning_code, filename):
    """Save the cleaning code to a text file for audit."""
    code_filename = filename.replace('.csv', '_cleaning_code.txt')
    with open(code_filename, 'w', encoding='utf-8') as file:
        file.write(cleaning_code)
    print(f"📝 Cleaning code saved to {code_filename}")

def execute_cleaning_code(df, cleaning_code):
    """Execute cleaning code dynamically with error handling."""
    globals_dict = {"pd": pd, "df": df}
    try:
        exec(cleaning_code, globals_dict)
        if 'df' not in globals_dict:
            raise ValueError("❌ The cleaning code did not return 'df'.")
        return globals_dict['df']
    except Exception as e:
        raise RuntimeError(f"❌ Error executing cleaning code: {e}")

def save_cleaned_data_to_s3(df, cleaned_filename):
    """Save cleaned data to S3."""
    try:
        csv_buffer = io.StringIO()
        df.to_csv(csv_buffer, index=False)
        s3.put_object(Bucket=S3_BUCKET, Key=f"{CLEANED_FOLDER}{cleaned_filename}", Body=csv_buffer.getvalue())
        print(f"✅ Cleaned data saved to s3://{S3_BUCKET}/{CLEANED_FOLDER}{cleaned_filename}")
    except Exception as e:
        raise RuntimeError(f"❌ Failed to save cleaned data to S3: {e}")

def main(video_data_filename):
    print(f"📥 Reading raw data file: {video_data_filename} from S3...")

    video_df = read_csv_from_s3(f"{RAW_FOLDER}{video_data_filename}")

    row_count = len(video_df)
    col_info = "\n".join([f"    - {col} ({dtype})" for col, dtype in video_df.dtypes.items()])
    print(row_count)
    print(col_info)

    cleaning_prompt = generate_cleaning_prompt(video_df, row_count, col_info, video_data_filename)

    print("\n🧠 Sending data to OpenAI for cleaning code generation...")
    cleaning_code = call_openai_for_cleaning_code(cleaning_prompt)

    cleaning_code = clean_openai_code_response(cleaning_code)

    save_cleaning_code_to_file(cleaning_code, video_data_filename)

    cleaned_df = execute_cleaning_code(video_df, cleaning_code)

    cleaned_filename = video_data_filename.replace('.csv', '_cleaned.csv')
    save_cleaned_data_to_s3(cleaned_df, cleaned_filename)

    print(f"\n✅ Data cleaning complete and saved to {CLEANED_FOLDER}{cleaned_filename}")

if __name__ == "__main__":
    video_data_filename = "INvideos.csv"
    main(video_data_filename)


📥 Reading raw data file: INvideos.csv from S3...
37352
    - video_id (object)
    - trending_date (object)
    - title (object)
    - channel_title (object)
    - category_id (int64)
    - publish_time (object)
    - tags (object)
    - views (int64)
    - likes (int64)
    - dislikes (int64)
    - comment_count (int64)
    - thumbnail_link (object)
    - comments_disabled (bool)
    - ratings_disabled (bool)
    - video_error_or_removed (bool)
    - description (object)

🧠 Sending data to OpenAI for cleaning code generation...

You are a highly skilled data engineer with expertise in data quality, data profiling, and data cleaning. 
You specialize in writing Python code (using pandas) to clean, enrich, and validate data in ETL pipelines. 
You follow best practices for:
- Handling missing values
- Standardizing timestamps
- Removing duplicates
- Fixing schema inconsistencies
- Validating numeric columns
- Normalizing text columns (like tags)
- Mapping columns where needed (like catego