In [16]:
import boto3
import gzip
import os
import shlex
import pytz
import httpagentparser

from dotenv import load_dotenv
from io import BytesIO
from datetime import datetime
from pytz import timezone, utc
from urllib.parse import urlparse
from sqlalchemy import create_engine, text



In [17]:
load_dotenv()

db_user = os.getenv("DB_USER")
db_pass = os.getenv("DB_PASS")
db_host = os.getenv("DB_HOST")
db_name = os.getenv("DB_NAME")

engine = create_engine(f"mysql+pymysql://{db_user}:{db_pass}@{db_host}/{db_name}")

s3 = boto3.client(
    's3',
    aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
    aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
    region_name=os.getenv("AWS_DEFAULT_REGION")
)

bucket = os.getenv('S3_BUCKET')
prefix = ''

response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)

# Extract

In [18]:
objects = response.get('Contents', [])

# Filter .gz files
gz_files = [obj['Key'] for obj in objects if obj['Key'].endswith('.gz')]

# for log_key in gz_files:
#     print(f"\n Reading log file: {log_key} \n")

#     obj = s3.get_object(Bucket=bucket, Key=log_key)

# with gzip.GzipFile(fileobj=obj['Body']) as gz:
#         for i, line in enumerate(gz):
#             print(line.decode('utf-8').strip())
#             if i > 5:
#                 break


# Transform

In [19]:
# Function to safely convert a value to float (ex. -1, '', -1.0, etc.)
def safe_float(value):
    try:
        return float(value)
    except:
        return 0.0

# Function to parse a log line
def parse_log_line(line, log_key):
    try:
        parts = shlex.split(line)
        
        # FILTER
        # Filter out log lines that don't have enough parts
        if len(parts) < 14:
            print(f"❌ Skipping line from {log_key}: Not enough parts")
            return None
        
        # Filter out ELB log lines that don't have valid parts
        if parts[2] == '-' or parts[2].strip() == '':
            print(f"❌ Skipping line from {log_key}: Invalid ELB log format")
            return None
       
        if parts[8] == '-' or not parts[8].isdigit():
            print(f"❌ Skipping line from {log_key}: Invalid status code")
            return None
    
        # TRANSFORM
        # Timestamp conversion
        timestamp = utc.localize(datetime.strptime(parts[1], '%Y-%m-%dT%H:%M:%S.%fZ')) \
            .astimezone(timezone('US/Eastern')) \
            .replace(microsecond=0, tzinfo=None)

        # Client IP
        client_ip = parts[3].split(':')[0]

        # Request
        request_parts = parts[12].split()
        http_method = request_parts[0]
        requested_path = request_parts[1]
        # requested_path = urlparse(full_url).path
        
        # Status
        elb_status_code = int(parts[8]) if parts[8].isdigit() else None
        backend_status_code = int(parts[9]) if parts[9].isdigit() else None
        
        # Bytes received and sent
        received_bytes = int(parts[10]) if parts[10].isdigit() else 0
        sent_bytes = int(parts[11]) if parts[11].isdigit() else 0

        # Processing time in ms
        total_processing_time_ms = sum(safe_float(parts[i]) for i in [5, 6, 7]) * 1000

        # User agent
        user_agent_full = parts[13]
        browser_os = httpagentparser.simple_detect(user_agent_full)
        ua_os_family = browser_os[0].split()[0] if browser_os[0] else 'Other'
        ua_browser_family = browser_os[1].split()[0] if browser_os[1] else 'Other'
        
        return {
            'log_timestamp': timestamp,
            'client_ip': client_ip,
            'http_method': http_method,
            'requested_path': requested_path,
            'elb_status_code': elb_status_code,
            'backend_status_code': backend_status_code,
            'total_processing_time_ms': total_processing_time_ms,
            'received_bytes': received_bytes,
            'sent_bytes': sent_bytes,
            'user_agent_full': user_agent_full,
            'ua_browser_family': ua_browser_family,
            'ua_os_family': ua_os_family,
            'log_source_file': log_key
        }

    except Exception as e:
        print(f"❌ Failed to parse line from {log_key}: {e}")
        return None
    
parsed_logs = []

for log_key in gz_files:
    obj = s3.get_object(Bucket=bucket, Key=log_key)
    with gzip.GzipFile(fileobj=obj['Body']) as gz:
        for raw_line in gz:
            line = raw_line.decode('utf-8').strip()
            parsed = parse_log_line(line, log_key)
            if parsed:
                parsed_logs.append(parsed)

for log in parsed_logs[:3]:
    print("Log Entry:")
    for key, value in log.items():
        print(f"  {key}: {value}")
    print("---------------")

print(f"Total parsed logs: {len(parsed_logs)}")

Log Entry:
  log_timestamp: 2025-05-26 19:55:00
  client_ip: 172.56.35.11
  http_method: OPTIONS
  requested_path: https://members.erank.com:443/api/account/user-preferences/competitor_sales_columns
  elb_status_code: 204
  backend_status_code: 204
  total_processing_time_ms: 20.0
  received_bytes: 66
  sent_bytes: 391
  user_agent_full: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:128.0) Gecko/20100101 Firefox/128.0
  ua_browser_family: Firefox
  ua_os_family: MacOS
  log_source_file: 848357551741_elasticloadbalancing_us-west-2_app.erank-app.88dfa9dc536560af_20250527T0000Z_44.237.58.174_13mkyazt.log.gz
---------------
Log Entry:
  log_timestamp: 2025-05-26 19:55:01
  client_ip: 172.56.35.11
  http_method: GET
  requested_path: https://members.erank.com:443/api/account/user-preferences/competitor_sales_columns
  elb_status_code: 404
  backend_status_code: 404
  total_processing_time_ms: 57.0
  received_bytes: 105
  sent_bytes: 307
  user_agent_full: Mozilla/5.0 (Macintosh; Intel Ma

# Load

In [None]:
insert_stmt = text("""
    INSERT INTO elb_log_data (
        log_timestamp,
        client_ip,
        http_method,
        requested_path,
        elb_status_code,
        backend_status_code,
        total_processing_time_ms,
        received_bytes,
        sent_bytes,
        user_agent_full,
        ua_browser_family,
        ua_os_family,
        log_source_file
    ) VALUES (
        :log_timestamp,
        :client_ip,
        :http_method,
        :requested_path,
        :elb_status_code,
        :backend_status_code,
        :total_processing_time_ms,
        :received_bytes,
        :sent_bytes,
        :user_agent_full,
        :ua_browser_family,
        :ua_os_family,
        :log_source_file
    )
""")

# 3 Parsed logs as test insert
with engine.connect() as conn:
    for i, log in enumerate(parsed_logs[:3]):
        print("Inserting:", log)
        try:
            conn.execute(insert_stmt, log)
            print(f"✅ Inserted row {i + 1}")
        except Exception as e:
            print(f"❌ Failed to insert row {i + 1}: {e}")

    conn.commit()



Inserting: {'log_timestamp': datetime.datetime(2025, 5, 26, 19, 55), 'client_ip': '172.56.35.11', 'http_method': 'OPTIONS', 'requested_path': 'https://members.erank.com:443/api/account/user-preferences/competitor_sales_columns', 'elb_status_code': 204, 'backend_status_code': 204, 'total_processing_time_ms': 20.0, 'received_bytes': 66, 'sent_bytes': 391, 'user_agent_full': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:128.0) Gecko/20100101 Firefox/128.0', 'ua_browser_family': 'Firefox', 'ua_os_family': 'MacOS', 'log_source_file': '848357551741_elasticloadbalancing_us-west-2_app.erank-app.88dfa9dc536560af_20250527T0000Z_44.237.58.174_13mkyazt.log.gz'}
✅ Inserted row 1
Inserting: {'log_timestamp': datetime.datetime(2025, 5, 26, 19, 55, 1), 'client_ip': '172.56.35.11', 'http_method': 'GET', 'requested_path': 'https://members.erank.com:443/api/account/user-preferences/competitor_sales_columns', 'elb_status_code': 404, 'backend_status_code': 404, 'total_processing_time_ms': 57.0, 'receiv