# Lambda Functions

## automate_support_logs_etl

Below includes data transformation of log data as well.
This code is to convert raw .log files to processed .parquet files

In [None]:
import json
import boto3
import pandas as pd
from io import StringIO
import re
import pyarrow as pa
import pyarrow.parquet as pq
import io

def save_parquet_to_s3(df, bucket, key):
    # Convert DataFrame to Parquet
    table = pa.Table.from_pandas(df, preserve_index=False)
    parquet_buffer = io.BytesIO()
    pq.write_table(table, parquet_buffer)

    # Upload to S3
    s3 = boto3.client('s3')
    s3.put_object(Bucket=bucket, Key=key, Body=parquet_buffer.getvalue())
    print(f"✅ Parquet saved to s3://{bucket}/{key}")
    

def read_log_from_s3(bucket, key):
    s3 = boto3.client('s3')
    response = s3.get_object(Bucket=bucket, Key=key)
    log_data = response['Body'].read().decode('utf-8')
    return log_data


def lambda_handler(event, context):
    # 1: read data from the bucket
    # Get bucket and object key from the S3 event trigger
    record = event['Records'][0]
    bucket_name = record['s3']['bucket']['name']
    input_key = record['s3']['object']['key']

    print(f"📥 Triggered by: s3://{bucket_name}/{input_key}")

    # Step 2: Read log data
    raw_logs = read_log_from_s3(bucket_name, input_key)

    # Split the log entries using the delimiter
    entries = [entry.strip() for entry in raw_logs.split('---') if entry.strip()]

    # Regex pattern to extract data
    log_pattern = re.compile(
        r'(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(?P<log_level>[A-Za-z0-9_]+)\] '
        r'(?P<component>[^\s]+) - TicketID=(?P<ticket_id>[^\s]+) SessionID=(?P<session_id>[^\s]+)\s*'
        r'IP=(?P<ip>.*?) \| ResponseTime=(?P<response_time>-?\d+)ms \| CPU=(?P<cpu>[\d.]+)% \| EventType=(?P<event_type>.*?) \| Error=(?P<error>\w+)\s*'
        r'UserAgent="(?P<user_agent>.*?)"\s*'
        r'Message="(?P<message>.*?)"\s*'
        r'Debug="(?P<debug>.*?)"\s*'
        r'TraceID=(?P<trace_id>.*)'
    )

    # Extract structured data
    parsed_entries = []
    for entry in entries:
        match = log_pattern.search(entry)
        if match:
            parsed_entries.append(match.groupdict())

    # Create DataFrame
    df = pd.DataFrame(parsed_entries)

    # Data cleaning
    # i) Drop trace_id column
    df = df.drop('trace_id', axis=1)

    # ii) Remove Negative response time
    df = df[df['response_time'].astype(int) >= 0]

    # iii) typo-fix in log_level 
    fix_log_level = {'INF0': 'INFO', 'DEBG': 'DEBUG', 'warnING': 'WARNING', 'EROR': 'ERROR'}
    df['log_level'] = df['log_level'].replace(fix_log_level)

    # iv) Remove duplicate rows
    df = df.drop_duplicates()

    # v) Change to appropriate data types (response_time, cpu, timestamp, error)
    df['response_time'] = df['response_time'].astype(int)
    df['cpu'] = df['cpu'].astype(float)
    df['error'] = df['error'].str.lower().map({'true': True, 'false': False})

    # timestamp conversion
    df['timestamp'] = pd.to_datetime(df['timestamp'], format='%Y-%m-%d %H:%M:%S', errors='coerce').astype('datetime64[ms]')
    print(df.shape)
    print(df.head())

    # Save the data (Upload Parquet to S3)
    output_file_name = input_key.split('/')[2].replace('.log', '.parquet')
    output_key = f'support-logs/processed/{output_file_name}'
    save_parquet_to_s3(df, bucket_name, output_key)

## automate_support_tickets_etl

Transformation for csv files is done in AWS Glue , you can find glue script in glue file below, this code is just automate transformation and load.
This code is to convert raw .csv files to processed .parquet files

In [None]:
import boto3

glue = boto3.client('glue',region_name='<your region>')

def lambda_handler(event, context):
    # Get the S3 file path
    bucket = event['Records'][0]['s3']['bucket']['name']
    input_key = event['Records'][0]['s3']['object']['key']
    s3_input_path = f's3://{bucket}/{input_key}'

    print(f"Triggering Glue job with file: {s3_input_path}")

    response = glue.start_job_run(
        JobName='ETL_support_tickets-copy',  
        Arguments={
            '--input_file_path': s3_input_path
        }
    )

##  automation_to_datawarehouse_logs

Below code is to automatically load processed log data i.e .parquet data to AWS RedShift

In [None]:
import psycopg2

# Redshift Serverless configuration
REDSHIFT_HOST = 'default-workgroup.914654949253.eu-north-1.redshift-serverless.amazonaws.com'
REDSHIFT_PORT = '5439'
REDSHIFT_DATABASE = 'careplus_db'  # Replace with your Redshift database name
REDSHIFT_USER = 'admin'  # Replace with your Redshift username
REDSHIFT_PASSWORD = 'RNBECghclb983*!'  # Replace with your Redshift password
REDSHIFT_TABLE = 'public.support_logs'  # Replace with your table name
IAM_ROLE = 'arn:aws:iam::914654949253:role/service-role/AmazonRedshift-CommandsAccessRole-20250922T002127'  # Your IAM role ARN

def lambda_handler(event, context):
    # 1: read data from the bucket
    # Get bucket and object key from the S3 event trigger
    record = event['Records'][0]
    bucket_name = record['s3']['bucket']['name']
    input_key = record['s3']['object']['key']

    print(f"📥 Triggered by: s3://{bucket_name}/{input_key}")

    s3_input_path = f's3://{bucket_name}/{input_key}'

    # Connect to Redshift Serverless using psycopg2
    conn = psycopg2.connect(
            host=REDSHIFT_HOST,
            port=REDSHIFT_PORT,
            dbname=REDSHIFT_DATABASE,
            user=REDSHIFT_USER,
            password=REDSHIFT_PASSWORD
    )

    cursor = conn.cursor()

    # COPY SQL query to load data from S3 into the Redshift table
    copy_sql = f"""
        COPY {REDSHIFT_TABLE}
        FROM '{s3_input_path}'
        IAM_ROLE '{IAM_ROLE}'
        FORMAT AS PARQUET
        REGION 'eu-north-1';
        """

    # Execute the query
    cursor.execute(copy_sql)

    # Commit the changes (important for COPY operations)
    conn.commit()

    # Log success
    print(f"Data successfully copied from {s3_input_path} to {REDSHIFT_TABLE}")


    # Close the cursor and the connection
    cursor.close()
    conn.close()

## automation_to_datawarehouse_tickets

This code is to load processed data of tickets (.csv) files to AWS RedShift (datawarehouse)

In [None]:
import psycopg2
import os

# --- Redshift Configuration ---
REDSHIFT_HOST = os.environ['REDSHIFT_HOST']
REDSHIFT_PORT = int(os.environ.get('REDSHIFT_PORT', 5439))
REDSHIFT_DATABASE = os.environ['REDSHIFT_DATABASE']
REDSHIFT_USER = os.environ['REDSHIFT_USER']
REDSHIFT_PASSWORD = os.environ['REDSHIFT_PASSWORD']
REDSHIFT_TABLE = os.environ['REDSHIFT_TABLE']
IAM_ROLE = os.environ['IAM_ROLE']
REGION = os.environ.get('AWS_REGION', '<your region>')


def lambda_handler(event, context):
    # 1. Get S3 event details
    record = event['Records'][0]
    bucket_name = record['s3']['bucket']['name']
    object_key = record['s3']['object']['key']

    # Only process files that match our folder + CSV extension
    if not object_key.startswith("support-tickets/processed/") or not object_key.endswith(".csv"):
        print(f"Skipping file: s3://{bucket_name}/{object_key}")
        return {"status": "skipped"}

    s3_path = f"s3://{bucket_name}/{object_key}"
    print(f"📥 New file detected: {s3_path}")

    # 2. Connect to Redshift
    conn = psycopg2.connect(
        host=REDSHIFT_HOST,
        port=REDSHIFT_PORT,
        dbname=REDSHIFT_DATABASE,
        user=REDSHIFT_USER,
        password=REDSHIFT_PASSWORD
    )
    cursor = conn.cursor()

    # 3. Build COPY command (CSV input)
    copy_sql = f"""
        COPY {REDSHIFT_TABLE}
        FROM '{s3_path}'
        IAM_ROLE '{IAM_ROLE}'
        FORMAT AS CSV
        IGNOREHEADER 1
        REGION '{REGION}'
        TIMEFORMAT 'auto';
    """

    try:
        cursor.execute(copy_sql)
        conn.commit()
        print(f"✅ Data loaded successfully from {s3_path} into {REDSHIFT_TABLE}")
    except Exception as e:
        conn.rollback()
        print(f"❌ Error loading data: {e}")
        raise
    finally:
        cursor.close()
        conn.close()

    return {"status": "success", "file": s3_path}
