In [48]:
import psycopg2
import os
import re
import io
import json
import hashlib
import logging
import tempfile
import datetime
from typing import Dict, List, Tuple

import boto3
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
#import s3fs
#from tenacity import retry, wait_exponential, stop_after_attempt, retry_if_exception_type
from dotenv import load_dotenv
#from google.oauth2 import service_account
#from googleapiclient.discovery import build

In [49]:
load_dotenv()
SECRET_ACCESS_KEY = os.getenv("SECRET_ACCESS_KEY")
ACCESS_KEY_ID = os.getenv("ACCESS_KEY_ID")

In [50]:
# SETTINGS
AWS_REGION = "eu-north-1"   # Stockholm Region
LOCAL_TMP = "./tmp"

# S3 PATHS
CUSTOMERS_S3_PATH = "s3://core-telecoms-data-lake/customers/"
CALL_LOGS_PREFIX = "s3://core-telecoms-data-lake/call logs/"  # contains daily CSV
SOCIAL_MEDIA_PREFIX = "s3://core-telecoms-data-lake/social_medias/"   # contains daily JSON

In [51]:
# GOOGLE SHEET SETTINGS
GOOGLE_SHEET_ID = "17IXo7TjDSSHaFobGG9hcqgbsNKTaqgyctWGnwDeNkIQ"
GOOGLE_SHEET_RANGE = "Agents!A1:D101"

# DAILY POSTGRES TABLES
POSTGRES_TABLES = [
    "Web_form_request_2025_11_20",
    "Web_form_request_2025_11_21",
    "Web_form_request_2025_11_22",
    "Web_form_request_2025_11_23",
]
POSTGRES_SSM_PARAMETER = "/coretelecomms/database/"  # JSON stored here


In [52]:
# LOGGING SETUP
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s"
)
logger = logging.getLogger()

In [53]:
# HELPERS
def ensure_tmp_folder():
    if not os.path.exists(LOCAL_TMP):
        os.makedirs(LOCAL_TMP)
        logging.info("Created local tmp folder.")
    else:
        logging.info("Local tmp folder already exists.")


In [54]:
# AWS HELPERS
def get_ssm_secret(name):
    """Retrieves DB credentials stored as JSON in SSM Parameter Store"""
    ssm = boto3.client("ssm", region_name=AWS_REGION,\
                    aws_access_key_id=ACCESS_KEY_ID,\
                    aws_secret_access_key=SECRET_ACCESS_KEY)
    param = ssm.get_parameter(Name=name, WithDecryption=True)
    return json.loads(param["Parameter"]["Value"])


def download_s3_file(s3_path, local_path):
    """Downloads a single object from S3."""
    s3 = boto3.client("s3", region_name=AWS_REGION,\
                    aws_access_key_id=ACCESS_KEY_ID,\
                    aws_secret_access_key=SECRET_ACCESS_KEY)

    bucket, key = s3_path.replace("s3://", "").split("/", 1)
    s3.download_file(bucket, key, local_path)

    logging.info(f"Downloaded S3 file → {local_path}")

In [55]:
def download_s3_prefix(prefix, local_dir, file_type="csv"):
    """Downloads all objects under a folder/prefix."""
    s3 = boto3.client("s3", region_name=AWS_REGION,\
                    aws_access_key_id=ACCESS_KEY_ID,\
                    aws_secret_access_key=SECRET_ACCESS_KEY)
    bucket, key_prefix = prefix.replace("s3://", "").split("/", 1)

    response = s3.list_objects_v2(Bucket=bucket, Prefix=key_prefix)

    if "Contents" not in response:
        logging.warning(f"No files found under prefix {prefix}")
        return []

    downloaded_files = []

    for obj in response["Contents"]:
        key = obj["Key"]
        if not key.endswith(file_type):
            continue

        filename = key.split("/")[-1]
        local_path = os.path.join(local_dir, filename)
        s3.download_file(bucket, key, local_path)
        downloaded_files.append(local_path)

        logging.info(f"Downloaded {file_type.upper()} from → {local_path}")

    return downloaded_files

In [56]:
# POSTGRES EXTRACTION
def extract_postgres_table(table_name, credentials):
    """Extracts a single Postgres table into a pandas DataFrame."""
    conn = psycopg2.connect(
        host=credentials["host"],
        user=credentials["user"],
        password=credentials["password"],
        database=credentials["database"],
        port=credentials["port"]
    )

    query = f"SELECT * FROM customer_complaints.{table_name} LIMIT 5;"
    df = pd.read_sql(query, conn)
    conn.close()

    logging.info(f"Extracted Postgres table → {table_name}")
    return df

In [57]:
ensure_tmp_folder()

2025-12-04 22:31:25,096 | INFO | Local tmp folder already exists.


In [61]:
# Customers (Static CSV)
customers_path = os.path.join(LOCAL_TMP, "customers.csv")
download_s3_file(CUSTOMERS_S3_PATH, customers_path)

2025-12-04 22:38:23,213 | INFO | Downloaded S3 file → ./tmp\customers.csv


In [None]:
# Call Center Logs (Daily CSV)
call_logs = download_s3_prefix(CALL_LOGS_PREFIX, LOCAL_TMP, file_type="csv")

2025-12-04 11:40:22,978 | INFO | Downloaded CSV from → ./tmp\call_logs_day_2025-11-20.csv
2025-12-04 11:41:08,865 | INFO | Downloaded CSV from → ./tmp\call_logs_day_2025-11-21.csv
2025-12-04 11:41:58,258 | INFO | Downloaded CSV from → ./tmp\call_logs_day_2025-11-22.csv
2025-12-04 11:42:46,383 | INFO | Downloaded CSV from → ./tmp\call_logs_day_2025-11-23.csv


In [None]:
# Social Media Complaints (Daily JSON)
social_files = download_s3_prefix(SOCIAL_MEDIA_PREFIX, LOCAL_TMP, file_type="json")

2025-12-04 11:45:39,787 | INFO | Downloaded JSON from → ./tmp\media_complaint_day_2025-11-20.json
2025-12-04 11:46:04,397 | INFO | Downloaded JSON from → ./tmp\media_complaint_day_2025-11-21.json
2025-12-04 11:46:22,631 | INFO | Downloaded JSON from → ./tmp\media_complaint_day_2025-11-22.json
2025-12-04 11:46:43,751 | INFO | Downloaded JSON from → ./tmp\media_complaint_day_2025-11-23.json


In [None]:
# Website Complaints (Daily, from Postgres)
#db_credentials = get_ssm_secret(POSTGRES_SSM_PARAMETER)

#for table in POSTGRES_TABLES:
#    df = extract_postgres_table(table, db_credentials)
#    df.to_csv(os.path.join(LOCAL_TMP, f"{table}.csv"), index=False)

#logging.info("All extractions completed successfully!")

In [None]:
import boto3
from botocore.exceptions import ClientError

def get_ssm_parameter(name: str, decrypt: bool = True, region: str = "eu-north-1"):
    
    ssm = boto3.client("ssm", region_name=AWS_REGION,\
                        aws_access_key_id=ACCESS_KEY_ID,\
                    aws_secret_access_key=SECRET_ACCESS_KEY)

    try:
        response = ssm.get_parameter(
            Name=name,
            WithDecryption=decrypt
        )
        return response["Parameter"]["Value"]

    except ClientError as e:
        raise Exception(f"SSM error retrieving '{name}': {e}")

In [None]:
# get_ssm_secret("/coretelecomms/database/")
db_host = get_ssm_parameter("/coretelecomms/database/db_host")
db_name = get_ssm_parameter("/coretelecomms/database/db_name")
db_user = get_ssm_parameter("/coretelecomms/database/db_username")
db_pass = get_ssm_parameter("/coretelecomms/database/db_password")
db_port = get_ssm_parameter("/coretelecomms/database/db_port")

In [None]:
# Fetch database credentials from SSM
db_credentials = {
    "host": get_ssm_parameter("/coretelecomms/database/db_host"),
    "database": get_ssm_parameter("/coretelecomms/database/db_name"),
    "user": get_ssm_parameter("/coretelecomms/database/db_username"),
    "password": get_ssm_parameter("/coretelecomms/database/db_password"),
    "port": get_ssm_parameter("/coretelecomms/database/db_port"),
}

# Website Complaints (Daily, from Postgres)
for table in POSTGRES_TABLES:
    df = extract_postgres_table(table, db_credentials)
    df.to_csv(os.path.join(LOCAL_TMP, f"{table}.csv"), index=False)

logging.info("All extractions completed successfully!")

  df = pd.read_sql(query, conn)
2025-12-04 21:42:07,903 | INFO | Extracted Postgres table → Web_form_request_2025_11_20
2025-12-04 21:42:11,808 | INFO | Extracted Postgres table → Web_form_request_2025_11_21
2025-12-04 21:42:14,394 | INFO | Extracted Postgres table → Web_form_request_2025_11_22
2025-12-04 21:42:16,280 | INFO | Extracted Postgres table → Web_form_request_2025_11_23
2025-12-04 21:42:16,284 | INFO | All extractions completed successfully!


In [None]:
## TRANSFORM

In [None]:
# 
# import os
# import pandas as pd

TMP_FOLDER = LOCAL_TMP

def transform_media_complaint_jsons(tmp_folder: str = TMP_FOLDER):
    """
    Reads all JSON files that start with 'media_complaint' from the tmp folder,
    transforms column names (lowercase, replace spaces with underscore),
    converts date columns to datetime, and returns ONE combined DataFrame.
    """

    json_files = [
        f for f in os.listdir(tmp_folder)
        if f.startswith("media_complaint") and f.endswith(".json")
    ]

    if not json_files:
        print("No media_complaint JSON files found.")
        return pd.DataFrame()  # return empty dataframe

    df_list = []

    for file in json_files:
        file_path = os.path.join(tmp_folder, file)

        # Load JSON
        df = pd.read_json(file_path)

        # --- Transform Column Names ---
        df.columns = (
            df.columns
            .str.lower()
            .str.strip()
            .str.replace(" ", "_")
        )

        # --- Convert All Date Columns to datetime ---
        for col in df.columns:
            if "date" in col:
                df[col] = pd.to_datetime(df[col], errors="coerce")

        df_list.append(df)

    # Combine all dataframes into one
    final_df = pd.concat(df_list, ignore_index=True)

    return final_df


In [None]:
media_complains = transform_media_complaint_jsons()
media_complains.head()

In [76]:
media_complains.to_parquet("tmp/media_complains.parquet", compression="snappy")

In [None]:
TMP_FOLDER = LOCAL_TMP

def transform_web_form_csvs(tmp_folder: str = TMP_FOLDER):
    """
    Reads all CSV files starting with 'web_form' from the tmp folder,
    transforms column names (lowercase, underscores, no spaces),
    converts date columns to datetime format,
    and returns ONE combined DataFrame.
    """

    csv_files = [
        f for f in os.listdir(tmp_folder)
        if f.startswith("Web_form") and f.endswith(".csv")
    ]

    if not csv_files:
        print("No web_form CSV files found.")
        return pd.DataFrame()

    df_list = []

    for file in csv_files:
        file_path = os.path.join(tmp_folder, file)

        # Load CSV
        df = pd.read_csv(file_path)

        # --- Transform Column Names ---
        df.columns = (
            df.columns
            .str.lower()
            .str.strip()
            .str.replace(" ", "_")
        )

        # --- Convert Date Columns ---
        for col in df.columns:
            if "date" in col:
                df[col] = pd.to_datetime(df[col], errors="coerce")

        df_list.append(df)

    # Combine all transformed CSV files
    final_df = pd.concat(df_list, ignore_index=True)

    return final_df


In [None]:
web_form = transform_web_form_csvs()
web_form.head(3)

Unnamed: 0,column1,request_id,customer_id,complaint_category,agent_id,resolutionstatus,request_date,resolution_date,webformgenerationdate
0,1,nnc0ddcdX7446ddcd260d4520a53e7279611b32c637796...,7446ddcd-260d-4520-a53e-7279611b32c6,Payments,1071,In-Progress,2025-02-10 12:22:54,NaT,2025-11-20
1,3,in39383bKd1b6383b601f4f39a97f9c2a158d02d41492a...,d1b6383b-601f-4f39-a97f-9c2a158d02d4,Router Delivery,1080,In-Progress,2025-03-07 18:46:16,NaT,2025-11-20
2,9,ondd3cddQ012b3cdd17994d2db633c1f3845b3b7b95338...,012b3cdd-1799-4d2d-b633-c1f3845b3b7b,Router Delivery,1004,Resolved,2025-03-30 22:14:24,2025-04-12 12:52:24,2025-11-20


In [73]:
web_form.to_parquet("tmp/web_form.parquet", compression="snappy")

In [None]:
TMP_FOLDER = LOCAL_TMP

def transform_call_logs_csv(tmp_folder: str = TMP_FOLDER):
    """
    Reads all CSV files starting with 'call_logs' from tmp folder, 
    performs required transformations and returns ONE combined DataFrame.
    """

    csv_files = [
        f for f in os.listdir(tmp_folder)
        if f.startswith("call_logs") and f.endswith(".csv")
    ]

    if not csv_files:
        print("No call_logs CSV files found.")
        return pd.DataFrame()

    df_list = []

    for file in csv_files:
        file_path = os.path.join(tmp_folder, file)

        # Load CSV
        df = pd.read_csv(file_path)

        # --- Fix broken column name ---
        # Handles cases like "COMPLAINT_catego ry" with space in between
        bad_cols = ["COMPLAINT_catego ry", "COMPLAINT_catego  ry", "COMPLAINT_category "]
        for bad_col in bad_cols:
            if bad_col in df.columns:
                df = df.rename(columns={bad_col: "complaint_category"})

        # --- Generic column name cleaning ---
        df.columns = (
            df.columns
            .str.strip()
            .str.lower()
            .str.replace(" ", "_")
        )

        # --- Convert date-like columns to datetime ---
        for col in df.columns:
            if "date" in col:
                df[col] = pd.to_datetime(df[col], errors="coerce")

        df_list.append(df)

    # Combine all transformed dfs into one
    final_df = pd.concat(df_list, ignore_index=True)

    return final_df


In [None]:
call_logs = transform_call_logs_csv()
call_logs.head(3)

Unnamed: 0,unnamed:_0,call_id,customer_id,complaint_category,agent_id,call_start_time,call_end_time,resolutionstatus,calllogsgenerationdate
0,0,Mar0979K65d7920296c349eab5435e5b1aeb39ce1476b1...,65d79202-96c3-49ea-b543-5e5b1aeb39ce,Technician Support,1035,2025-03-07 18:40:43,2025-03-07 18:52:43,Blocked,2025-11-20
1,2,CorcbaaQaf3aacc510074bd39f0ee5f0ca1bc9f47760ca...,af3aacc5-1007-4bd3-9f0e-e5f0ca1bc9f4,Payments,1021,2025-01-29 02:58:12,2025-01-29 03:03:12,In-Progress,2025-11-20
2,3,Sar4328K5f528143241b4333ba13681987e4e0f7923987...,5f528143-241b-4333-ba13-681987e4e0f7,Payments,1031,2025-05-14 13:42:52,2025-05-14 13:55:52,Resolved,2025-11-20


In [74]:
call_logs.to_parquet("tmp/call_logs.parquet", compression="snappy")

In [63]:

def download_customer_csvs(
    bucket: str = "core-telecoms-data-lake",
    prefix: str = "customers/",
    local_dir: str = LOCAL_TMP
):
    """
    Downloads all CSV files from a specific S3 prefix into a local directory
    without listing the entire bucket (only objects under the prefix).
    """

    # Ensure local folder exists
    os.makedirs(local_dir, exist_ok=True)

    s3 = boto3.client("s3", region_name=AWS_REGION,\
                        aws_access_key_id=ACCESS_KEY_ID,\
                    aws_secret_access_key=SECRET_ACCESS_KEY)

    # List *only* objects under the prefix
    response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)

    if "Contents" not in response:
        print("No files found under the prefix.")
        return

    for obj in response["Contents"]:
        key = obj["Key"]

        # Only download CSV files
        if key.lower().endswith(".csv"):
            filename = os.path.basename(key)
            local_path = os.path.join(local_dir, filename)

            print(f"Downloading {key} → {local_path}")
            s3.download_file(bucket, key, local_path)

    print("All CSV files downloaded successfully!")


In [64]:
download_customer_csvs()

Downloading customers/customers_dataset.csv → ./tmp\customers_dataset.csv
All CSV files downloaded successfully!


In [67]:
df = pd.read_csv("tmp\customers_dataset.csv")
df.head(2)

  df = pd.read_csv("tmp\customers_dataset.csv")


Unnamed: 0,customer_id,name,Gender,DATE of biRTH,signup_date,email,address
0,d6966473-6fb0-4045-b33d-1f3b555e2762,Brianna Schultz,F,1930-09-10,2017-08-22,2772brianna_schultz525@GmaiL.om,"317 Paul Turnpike\nDarrenland, NY 13297"
1,c3431b44-ebcd-4a36-ad0a-11a7cafbd0fd,April Morgan,F,1969-12-05,2017-12-08,2143april.morgane38@gmail.com,63925 Matthew Crescent Suite 075\nNew Tylerbur...


In [68]:
df.columns

Index(['customer_id', 'name', 'Gender', 'DATE of biRTH', 'signup_date',
       'email', 'address'],
      dtype='object')

In [69]:
TMP_FOLDER = LOCAL_TMP

def transform_customers_csv(tmp_folder: str = TMP_FOLDER):
    """
    Reads all CSV files starting with 'call_logs' from tmp folder, 
    performs required transformations and returns ONE combined DataFrame.
    """

    csv_files = [
        f for f in os.listdir(tmp_folder)
        if f.startswith("customers") and f.endswith(".csv")
    ]

    if not csv_files:
        print("No call_logs CSV files found.")
        return pd.DataFrame()

    df_list = []

    for file in csv_files:
        file_path = os.path.join(tmp_folder, file)

        # Load CSV
        df = pd.read_csv(file_path)

        # --- Fix broken column name ---
        # Handles cases like "COMPLAINT_catego ry" with space in between
        bad_cols = ["COMPLAINT_catego ry", "COMPLAINT_catego  ry", "COMPLAINT_category "]
        for bad_col in bad_cols:
            if bad_col in df.columns:
                df = df.rename(columns={bad_col: "complaint_category"})

        # --- Generic column name cleaning ---
        df.columns = (
            df.columns
            .str.strip()
            .str.lower()
            .str.replace(" ", "_")
        )

        # --- Convert date-like columns to datetime ---
        for col in df.columns:
            if "date" in col:
                df[col] = pd.to_datetime(df[col], errors="coerce")

        df_list.append(df)

    # Combine all transformed dfs into one
    final_df = pd.concat(df_list, ignore_index=True)

    return final_df

In [70]:
customers=transform_customers_csv()
customers.head(2)

In [75]:
customers.to_parquet("tmp/customers.parquet", compression="snappy")

In [81]:
# LOAD DATA
def ensure_bucket_exists(bucket_name, region=AWS_REGION):
    s3 = boto3.client("s3", region_name=AWS_REGION,\
        aws_access_key_id=ACCESS_KEY_ID,\
        aws_secret_access_key=SECRET_ACCESS_KEY)

    # Check if bucket exists
    try:
        s3.head_bucket(Bucket=bucket_name)
        print(f"Bucket '{bucket_name}' already exists.")
        return
    except ClientError as e:
        error_code = int(e.response['Error']['Code'])
        if error_code != 404:
            raise e

    # If not exists → create it
    print(f"Creating bucket '{bucket_name}'...")
    if region == "us-east-1":
        s3.create_bucket(Bucket=bucket_name)
    else:
        s3.create_bucket(
            Bucket=bucket_name,
            CreateBucketConfiguration={"LocationConstraint": region}
        )
    print("Bucket created.")

In [82]:
def upload_parquet_files(local_dir, bucket_name, s3_prefix=""):
    s3 = boto3.client("s3", region_name=AWS_REGION,\
        aws_access_key_id=ACCESS_KEY_ID,\
        aws_secret_access_key=SECRET_ACCESS_KEY)

    # Ensure the bucket exists
    ensure_bucket_exists(bucket_name)

    # Loop through local parquet files
    for file_name in os.listdir(local_dir):
        if file_name.endswith(".parquet"):
            local_path = os.path.join(local_dir, file_name)
            s3_path = f"{s3_prefix}/{file_name}" if s3_prefix else file_name

            try:
                print(f"Uploading {local_path} -> s3://{bucket_name}/{s3_path}")
                s3.upload_file(local_path, bucket_name, s3_path)
            except Exception as e:
                print(f"ERROR uploading {file_name}: {e}")

    print("Upload complete!")

In [83]:
upload_parquet_files(
        local_dir=LOCAL_TMP,
        bucket_name="supabase-bucket-2025",
        s3_prefix="raw")

Creating bucket 'supabase-bucket-2025'...
Bucket created.
Uploading ./tmp\call_logs.parquet -> s3://supabase-bucket-2025/raw/call_logs.parquet
ERROR uploading call_logs.parquet: SSL validation failed for https://supabase-bucket-2025.s3.eu-north-1.amazonaws.com/raw/call_logs.parquet?uploadId=RDFwEOaV48ii1dbvcca5Qo8PKx4XpRHjCQhIZfsjDvnuXKJuojSlKzH5wYCgKl9asAcoFWqn6bto_NdAI3OO_q06OiWTdX.wV3n1bdsqjoSQai._ka29xO3XEbENKXBS&partNumber=5 EOF occurred in violation of protocol (_ssl.c:2417)
Uploading ./tmp\customers.parquet -> s3://supabase-bucket-2025/raw/customers.parquet
ERROR uploading customers.parquet: Could not connect to the endpoint URL: "https://supabase-bucket-2025.s3.eu-north-1.amazonaws.com/raw/customers.parquet?uploadId=3jUJr7igcbxAnFfez4thelM33EjX6pX6MBu2fUZfk8uqGYeU65DpeKGYrscO0jd54nHo.sc3ubXis3f5Qo6B2WjPBx5IiM1_.9jE4Ro93tWXBVGN6_umchCRyAA5ngS6&partNumber=11"
Uploading ./tmp\media_complains.parquet -> s3://supabase-bucket-2025/raw/media_complains.parquet
Uploading ./tmp\web_form.