In [32]:
from pathlib import Path
import pandas as pd
import psycopg2

BASE_DIR = Path.cwd()

DATA_DIR = BASE_DIR / 'data'
RAW_DATA_DIR = DATA_DIR / 'raw'
PROCESSED_DATA_DIR = DATA_DIR / 'processed'

def read_df(data_path: str) -> pd.DataFrame:
    """
        Reads a given file from the given path,
        Determine the data type from the name,
        returns a pandas DataFrame.
    """
    try:
        if data_path.endswith('.csv'):
            return pd.read_csv(data_path)
        elif data_path.endswith('.xlsx') or data_path.endswith('.xls'):
            return pd.read_excel(data_path)
        elif data_path.endswith('.json'):
            return pd.read_json(data_path, lines=True)
        else:
            raise ValueError("Unsupported file format. Please provide a CSV, Excel, or JSON file.")
    except Exception as e:
        print(f"An error occurred while reading the file: {e}")
        return pd.DataFrame()  # Return an empty DataFrame in case of error

In [33]:
paths_to_data = [
    "partner_roaming.xlsx",
    "raw_usage_2025_01.csv",
    "sessions.json"
]

def extract_all_data(file_paths): # Note: file_paths is a list of file paths and only csv, json, and excel files are supported
    if not file_paths or len(file_paths) == 0:
        raise ValueError("The list of file paths is empty.")
    data_frames = {}
    for path in file_paths:
        df = read_df(f"{RAW_DATA_DIR}/{path}")
        df_key = Path(path).stem.split('/')[-1]  # Use the file name without extension
        # get the key word roaming, usage, sessions as the key for the file that contains the word and assign it as the key.
        df_key = 'roaming' if 'roaming' in df_key else 'usage' if 'usage' in df_key else 'sessions' if 'sessions' in df_key else df_key
        data_frames[df_key] = df
        print(f"Extracted {df_key} data with {len(df)} records.")
    return data_frames

# call the function to extract data
all_data_frames = extract_all_data(paths_to_data)
data_frames = all_data_frames.copy()
#print(extracted)

Extracted roaming data with 200 records.
Extracted usage data with 500 records.
Extracted sessions data with 300 records.


In [34]:
data_frames['roaming'].columns

Index(['msisdn', 'country', 'roaming_data_mb', 'roaming_cost_usd', 'date'], dtype='object')

In [35]:
def clean_dfs(dfs):
    cleaned = {}
    for key, df in dfs.items():
        if df.empty:
            cleaned[key] = df
            continue  # Skip empty DataFrames
        
        # Make a copy to avoid modifying the original
        df = df.copy()
        
        df.columns = df.columns.str.lower().str.strip().str.replace(" ", "_", regex=False) #format the columns to lower_case and remove space

        if 'download_mb' in df.columns and 'upload_mb' in df.columns:
            df = df.rename(columns={
                'download_mb' : 'Download MB',
                'upload_mb' : 'Upload MB',
            })
        
        if 'avg_throughput' in df.columns:
            median_throughput = df['avg_throughput'].median()   #calculate median of avg_throughput
            df['avg_throughput'] = df['avg_throughput'].fillna(median_throughput)
            
        if 'session_id' in df.columns:    
            df = df.drop_duplicates(subset=['session_id'], keep='first') 
        
        if 'Download MB' in df.columns and 'Upload MB' in df.columns:
            df['total_usage_mb'] = df['Download MB'].fillna(0) + df['Upload MB'].fillna(0)

        # drop -ve values for duration_ms
        if 'duration_ms' in df.columns:
            df = df[df['duration_ms'] >= 0]
            
        #parse date columns to datetime format if any
        if 'timestamp' in df.columns:
            df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce')
            
        #replace missing values in 'app_category' with 'unknown'
        if 'app_category' in df.columns:
            df['app_category'] = df['app_category'].fillna('unknown')
        
        cleaned[key] = df
            
    return cleaned

In [36]:
cleaned_dfs = clean_dfs(data_frames)

In [37]:
cleaned_dfs['usage'].columns

Index(['msisdn', 'session_id', 'timestamp', 'Download MB', 'Upload MB',
       'avg_throughput', 'latency_ms', 'duration_ms', 'region',
       'total_usage_mb'],
      dtype='object')

In [38]:
def aggregate_daily_usage(df: pd.DataFrame) -> pd.DataFrame:
    
    if df.empty or 'msisdn' not in df.columns or 'timestamp' not in df.columns:
        return pd.DataFrame()  # Return empty DataFrame if its invalid
    #GroupBy
    df['date'] = df['timestamp'].dt.date
    agg = (
        df.groupby(['msisdn', 'date'])
        .agg(
            total_usage_mb = ('total_usage_mb', 'sum'),
            avg_throughput = ('avg_throughput', 'mean'),
            sessions = ('session_id', 'nunique'),
            latensy_ms = ('latency_ms', 'mean') if 'latency_ms' in df.columns else ('timestamp', 'count')
        ).reset_index()
    )
    
    agg['total_usage_mb'] = agg['total_usage_mb'].astype(float)
    agg['sessions'] = agg['sessions'].astype(int)
    agg['avg_throughput'] = agg['avg_throughput'].astype(float)
    return agg

daily_usage_agg = aggregate_daily_usage(cleaned_dfs['usage'])
        

In [39]:
output_data = {
    'cleaned_data': cleaned_dfs,
    'daily_usage_aggregation': daily_usage_agg
}

In [40]:
def make_db_connection():
    """Create a database connection to the PostgreSQL database."""
    conn = None
    try:
        conn = psycopg2.connect(
            host="localhost",
            database="skylinkDB",
            user="postgres",
            password="@PGAdmin2025",
            port="5433"
        )
        print("Connection successful")
    except Exception as e:
        print(f"Error: {e}")
    return conn


In [42]:
# function to execute SQL queries
def execute_query(connection, query):
    if connection:
        cur = connection.cursor()
    else:
        cur = None
    if cur:
        try:
            cur.execute(query)
            connection.commit()
            print("Query executed successfully")
        except Exception as e:
            print(f"Error executing query: {e}")
            connection.rollback()
        finally:
            cur.close()
            connection.close()
    else:
        print("No database connection available")

In [45]:
output_data['cleaned_data']['usage'].columns

Index(['msisdn', 'session_id', 'timestamp', 'Download MB', 'Upload MB',
       'avg_throughput', 'latency_ms', 'duration_ms', 'region',
       'total_usage_mb', 'date'],
      dtype='object')

In [None]:
CREATE_TABLE_USAGE = """
    CREATE TABLE IF NOT EXISTS USAGE (
        msisdn VARCHAR(15) NOT NULL,
        session_id INT,
        timestamp DATE NOT NULL,
        download_mb FLOAT,
        upload_mb FLOAT,
        avg_throughput FLOAT,
        latency_ms FLOAT,
        duration_ms INT,
        region VARCHAR(50),
    );
"""
conn = make_db_connection()

execute_query(conn, CREATE_TABLE_USAGE)

Connection successful
Query executed successfully


In [None]:
INSERT_INTO_USAGE = """
    INSERT INTO USAGE (msisdn, date, total_usage_mb, avg_throughput, sessions, latensy_ms)
    