In [1]:
# Importing required libraries
import os
import sys
import logging
import pandas as pd
import psycopg2
from dotenv import load_dotenv
# Adding paths to access the custom utility functions
sys.path.append(os.path.abspath(os.path.join('../utils')))
from utils import run_sql_query, populate_dataframe_to_database, create_table_query

In [2]:
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

In [3]:
# Get the database connection parameters from environment variables
DB_HOST = os.getenv("POSTGRES_HOST")
DB_PORT = os.getenv("POSTGRES_PORT")
DB_NAME = os.getenv("POSTGRES_DATABASE")
DB_USER = os.getenv("POSTGRES_USERNAME")
DB_PASSWORD = os.getenv("POSTGRES_PASSWORD")
# Connect to the PostgreSQL database
try:
    connection_params = {
        'host': "localhost",
        'port': DB_PORT,
        'database': DB_NAME,
        'user': DB_USER,
        'password': DB_PASSWORD
    }
    connection = psycopg2.connect(**connection_params)
    logger.info("Successfully connected to the PostgreSQL database.")
except psycopg2.OperationalError as e:
    logger.error("Error connecting to PostgreSQL database: %s", e)
    raise Exception("Unable to connect to the database")

2025-02-23 23:15:36,207 - INFO - Successfully connected to the PostgreSQL database.


In [4]:
# Define the root directory
root_directory = '../data/'
# Initialize SQL queries for schema creation
sql_queries = ""
# Initialize a DataFrame to hold all Totals data
all_totals_data = pd.DataFrame()

In [5]:
def date_changer(df):
    """
    Converts parsable dates in a DataFrame's 'Date' column to datetime format and drops rows with non-parsable dates.
    Args:
        df (pandas.DataFrame): The DataFrame containing a 'Date' column.
    Returns:
        pandas.DataFrame: The DataFrame with the 'Date' column converted to datetime format (if successful).
    """
    # Setting up the logger
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.INFO)
    handler = logging.StreamHandler()  # Logs to console
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    try:
        # Attempt to convert parsable dates to datetime format
        df['Date'] = pd.to_datetime(df['Date'], errors='coerce')
        # Drop rows with non-parsable dates (converted to NaNs)
        df = df.dropna(subset=['Date'])
        logger.info("Successfully converted dates and dropped rows with non-parsable dates.")
        return df
    except pd.errors.ParserError as err:
        logger.error(f"Error parsing date column: {err}")
        raise

In [6]:
# Iterate through each folder in the root directory
for folder_name in os.listdir(root_directory):
    folder_path = os.path.join(root_directory, folder_name)
    logger.info(f"Processing folder: {folder_name}")
    # Check if the item is a directory
    if os.path.isdir(folder_path):
        chart_data_path = os.path.join(folder_path, 'Chart data.csv')
        table_data_path = os.path.join(folder_path, 'Table data.csv')
        totals_data_path = os.path.join(folder_path, 'Totals.csv')
        # Process "Chart data.csv"
        if os.path.exists(chart_data_path):
            try:
                chart_data_df = pd.read_csv(chart_data_path)
                table_name = f'{folder_name.lower().replace(" ","_")}_chart_data'
                table_query = create_table_query(chart_data_df, table_name)
                sql_queries += table_query
                logger.info(f"Generated table query for {table_name}: {table_query.strip()}")
                # Create the table and populate data
                run_sql_query(connection_params, table_query)
                chart_data_df.fillna(0, inplace=True)
                populate_dataframe_to_database(connection_params, chart_data_df, table_name)
            except Exception as e:
                logger.error(f"Error processing {chart_data_path}: {e}")
        # Process "Table data.csv"
        if os.path.exists(table_data_path):
            try:
                table_data_df = pd.read_csv(table_data_path)
                table_name = f'{folder_name.lower().replace(" ","_")}_table_data'
                table_query = create_table_query(table_data_df, table_name)
                sql_queries += table_query
                logger.info(f"Generated table query for {table_name}: {table_query.strip()}")
                # Create the table and populate data
                run_sql_query(connection_params, table_query)
                table_data_df.fillna(0, inplace=True)
                populate_dataframe_to_database(connection_params, table_data_df, table_name)
            except Exception as e:
                logger.error(f"Error processing {table_data_path}: {e}")
        # Process "Totals.csv"
        if os.path.exists(totals_data_path):
            try:
                totals_df = pd.read_csv(totals_data_path)
                all_totals_data = pd.concat([all_totals_data, totals_df], ignore_index=True)
            except Exception as e:
                logger.error(f"Error processing {totals_data_path}: {e}")

2025-02-23 23:15:36,802 - INFO - Processing folder: content_type
2025-02-23 23:15:36,820 - INFO - Generated table query for content_type_chart_data: CREATE TABLE IF NOT EXISTS content_type_chart_data (
                "Date" TEXT, "Content type" TEXT, "Views" INTEGER,
                PRIMARY KEY ("Date", "Content type")
            );


Log success


2025-02-23 23:15:37,675 - INFO - Generated table query for content_type_table_data: CREATE TABLE IF NOT EXISTS content_type_table_data (
                "Content type" TEXT, "Views" INTEGER, "Watch time (hours)" DOUBLE PRECISION, "Average view duration" TEXT,
                PRIMARY KEY ("Content type")
            );
2025-02-23 23:15:37,854 - INFO - Processing folder: geography


Inserted 2558 rows into the database table content_type_chart_data.
Log success
Inserted 3 rows into the database table content_type_table_data.


2025-02-23 23:15:37,883 - INFO - Generated table query for geography_chart_data: CREATE TABLE IF NOT EXISTS geography_chart_data (
                "Date" TEXT, "Geography" TEXT, "Views" INTEGER,
                PRIMARY KEY ("Date", "Geography")
            );


Log success


2025-02-23 23:15:38,671 - INFO - Generated table query for geography_table_data: CREATE TABLE IF NOT EXISTS geography_table_data (
                "Geography" TEXT, "Views" INTEGER, "Watch time (hours)" DOUBLE PRECISION, "Average view duration" TEXT,
                PRIMARY KEY ("Geography")
            );
2025-02-23 23:15:38,844 - INFO - Processing folder: viewership_by_date
2025-02-23 23:15:38,847 - INFO - Generated table query for viewership_by_date_table_data: CREATE TABLE IF NOT EXISTS viewership_by_date_table_data (
                "Date" TEXT, "Views" DOUBLE PRECISION, "Watch time (hours)" DOUBLE PRECISION, "Average view duration" TEXT,
                PRIMARY KEY ("Date")
            );


Inserted 37091 rows into the database table geography_chart_data.
Log success
Inserted 30 rows into the database table geography_table_data.
Log success


2025-02-23 23:15:39,080 - INFO - Processing folder: cities
2025-02-23 23:15:39,104 - INFO - Generated table query for cities_chart_data: CREATE TABLE IF NOT EXISTS cities_chart_data (
                "Date" TEXT, "Cities" TEXT, "City name" TEXT, "Views" INTEGER,
                PRIMARY KEY ("Date", "Cities")
            );


Inserted 502 rows into the database table viewership_by_date_table_data.
Log success


2025-02-23 23:15:39,372 - INFO - Generated table query for cities_table_data: CREATE TABLE IF NOT EXISTS cities_table_data (
                "Cities" TEXT, "City name" TEXT, "Geography" TEXT, "Geography.1" TEXT, "Views" INTEGER, "Watch time (hours)" DOUBLE PRECISION, "Average view duration" TEXT,
                PRIMARY KEY ("Cities")
            );
2025-02-23 23:15:39,530 - INFO - Processing folder: traffic_source
2025-02-23 23:15:39,554 - INFO - Generated table query for traffic_source_chart_data: CREATE TABLE IF NOT EXISTS traffic_source_chart_data (
                "Date" TEXT, "Traffic source" TEXT, "Views" INTEGER,
                PRIMARY KEY ("Date", "Traffic source")
            );


Inserted 6395 rows into the database table cities_chart_data.
Log success
Inserted 9 rows into the database table cities_table_data.
Log success


2025-02-23 23:15:39,862 - INFO - Generated table query for traffic_source_table_data: CREATE TABLE IF NOT EXISTS traffic_source_table_data (
                "Traffic source" TEXT, "Views" DOUBLE PRECISION, "Watch time (hours)" DOUBLE PRECISION, "Average view duration" TEXT, "Impressions" DOUBLE PRECISION, "Impressions click-through rate (%)" DOUBLE PRECISION,
                PRIMARY KEY ("Traffic source")
            );


Inserted 11511 rows into the database table traffic_source_chart_data.
Log success


2025-02-23 23:15:40,070 - INFO - Processing folder: subscription_source
2025-02-23 23:15:40,090 - INFO - Generated table query for subscription_source_chart_data: CREATE TABLE IF NOT EXISTS subscription_source_chart_data (
                "Date" TEXT, "Subscription source" TEXT, "Subscribers" INTEGER,
                PRIMARY KEY ("Date", "Subscription source")
            );


Inserted 11 rows into the database table traffic_source_table_data.
Log success


2025-02-23 23:15:40,482 - INFO - Generated table query for subscription_source_table_data: CREATE TABLE IF NOT EXISTS subscription_source_table_data (
                "Subscription source" TEXT, "Subscribers" INTEGER, "Subscribers gained" INTEGER, "Subscribers lost" INTEGER,
                PRIMARY KEY ("Subscription source")
            );
2025-02-23 23:15:40,648 - INFO - Processing folder: viewer_gender
2025-02-23 23:15:40,651 - INFO - Generated table query for viewer_gender_table_data: CREATE TABLE IF NOT EXISTS viewer_gender_table_data (
                "Viewer gender" TEXT, "Views (%)" DOUBLE PRECISION, "Average view duration" TEXT, "Average percentage viewed (%)" DOUBLE PRECISION, "Watch time (hours) (%)" DOUBLE PRECISION,
                PRIMARY KEY ("Viewer gender")
            );


Inserted 10232 rows into the database table subscription_source_chart_data.
Log success
Inserted 9 rows into the database table subscription_source_table_data.
Log success


2025-02-23 23:15:40,851 - INFO - Processing folder: device_type
2025-02-23 23:15:40,859 - INFO - Generated table query for device_type_chart_data: CREATE TABLE IF NOT EXISTS device_type_chart_data (
                "Date" TEXT, "Device type" TEXT, "Views" INTEGER,
                PRIMARY KEY ("Date", "Device type")
            );


Inserted 2 rows into the database table viewer_gender_table_data.
Log success


2025-02-23 23:15:41,103 - INFO - Generated table query for device_type_table_data: CREATE TABLE IF NOT EXISTS device_type_table_data (
                "Device type" TEXT, "Views" INTEGER, "Watch time (hours)" DOUBLE PRECISION, "Average view duration" TEXT,
                PRIMARY KEY ("Device type")
            );
2025-02-23 23:15:41,271 - INFO - Processing folder: subtitles_and_cc
2025-02-23 23:15:41,283 - INFO - Generated table query for subtitles_and_cc_chart_data: CREATE TABLE IF NOT EXISTS subtitles_and_cc_chart_data (
                "Date" TEXT, "Subtitles and CC" TEXT, "Views" INTEGER,
                PRIMARY KEY ("Date", "Subtitles and CC")
            );


Inserted 5116 rows into the database table device_type_chart_data.
Log success
Inserted 5 rows into the database table device_type_table_data.
Log success


2025-02-23 23:15:41,641 - INFO - Generated table query for subtitles_and_cc_table_data: CREATE TABLE IF NOT EXISTS subtitles_and_cc_table_data (
                "Subtitles and CC" TEXT, "Views" INTEGER, "Watch time (hours)" DOUBLE PRECISION, "Average view duration" TEXT,
                PRIMARY KEY ("Subtitles and CC")
            );
2025-02-23 23:15:41,798 - INFO - Processing folder: sharing_service
2025-02-23 23:15:41,810 - INFO - Generated table query for sharing_service_chart_data: CREATE TABLE IF NOT EXISTS sharing_service_chart_data (
                "Date" TEXT, "Sharing service" TEXT, "Shares" INTEGER,
                PRIMARY KEY ("Date", "Sharing service")
            );


Inserted 10232 rows into the database table subtitles_and_cc_chart_data.
Log success
Inserted 9 rows into the database table subtitles_and_cc_table_data.
Log success


2025-02-23 23:15:42,125 - INFO - Generated table query for sharing_service_table_data: CREATE TABLE IF NOT EXISTS sharing_service_table_data (
                "Sharing service" TEXT, "Shares" INTEGER,
                PRIMARY KEY ("Sharing service")
            );
2025-02-23 23:15:42,310 - INFO - Processing folder: operating_system


Inserted 11511 rows into the database table sharing_service_chart_data.
Log success
Inserted 10 rows into the database table sharing_service_table_data.


2025-02-23 23:15:42,330 - INFO - Generated table query for operating_system_chart_data: CREATE TABLE IF NOT EXISTS operating_system_chart_data (
                "Date" TEXT, "Operating system" TEXT, "Views" INTEGER,
                PRIMARY KEY ("Date", "Operating system")
            );


Log success


2025-02-23 23:15:42,865 - INFO - Generated table query for operating_system_table_data: CREATE TABLE IF NOT EXISTS operating_system_table_data (
                "Operating system" TEXT, "Views" INTEGER, "Watch time (hours)" DOUBLE PRECISION, "Average view duration" TEXT,
                PRIMARY KEY ("Operating system")
            );
2025-02-23 23:15:43,030 - INFO - Processing folder: subscription_status
2025-02-23 23:15:43,034 - INFO - Generated table query for subscription_status_chart_data: CREATE TABLE IF NOT EXISTS subscription_status_chart_data (
                "Date" TEXT, "Subscription status" TEXT, "Views" INTEGER,
                PRIMARY KEY ("Date", "Subscription status")
            );


Inserted 20464 rows into the database table operating_system_chart_data.
Log success
Inserted 17 rows into the database table operating_system_table_data.
Log success


2025-02-23 23:15:43,257 - INFO - Generated table query for subscription_status_table_data: CREATE TABLE IF NOT EXISTS subscription_status_table_data (
                "Subscription status" TEXT, "Views" INTEGER, "Watch time (hours)" DOUBLE PRECISION, "Average view duration" TEXT,
                PRIMARY KEY ("Subscription status")
            );
2025-02-23 23:15:43,412 - INFO - Processing folder: new_and_returning_viewers
2025-02-23 23:15:43,418 - INFO - Generated table query for new_and_returning_viewers_chart_data: CREATE TABLE IF NOT EXISTS new_and_returning_viewers_chart_data (
                "Date" TEXT, "New and returning viewers" TEXT, "Views" INTEGER,
                PRIMARY KEY ("Date", "New and returning viewers")
            );


Inserted 2558 rows into the database table subscription_status_chart_data.
Log success
Inserted 3 rows into the database table subscription_status_table_data.
Log success


2025-02-23 23:15:43,667 - INFO - Generated table query for new_and_returning_viewers_table_data: CREATE TABLE IF NOT EXISTS new_and_returning_viewers_table_data (
                "New and returning viewers" TEXT, "Views" INTEGER, "Watch time (hours)" DOUBLE PRECISION, "Average view duration" TEXT,
                PRIMARY KEY ("New and returning viewers")
            );
2025-02-23 23:15:43,855 - INFO - Processing folder: viewer_age
2025-02-23 23:15:43,859 - INFO - Generated table query for viewer_age_table_data: CREATE TABLE IF NOT EXISTS viewer_age_table_data (
                "Viewer age" TEXT, "Views (%)" DOUBLE PRECISION, "Average view duration" TEXT, "Average percentage viewed (%)" DOUBLE PRECISION, "Watch time (hours) (%)" DOUBLE PRECISION,
                PRIMARY KEY ("Viewer age")
            );


Inserted 3837 rows into the database table new_and_returning_viewers_chart_data.
Log success
Inserted 4 rows into the database table new_and_returning_viewers_table_data.
Log success
Inserted 4 rows into the database table viewer_age_table_data.


In [7]:
# Create the CREATE TABLE query for totals_table_data
if not all_totals_data.empty:
    totals_table_query = create_table_query(all_totals_data, 'totals_table_data')
    sql_queries += totals_table_query
    logger.info(f"Generated table query for totals_table_data: {totals_table_query.strip()}")
    # Execute the query to create the table
    run_sql_query(connection_params, totals_table_query)
    # Populate the combined Totals data into totals_table_data
    populate_dataframe_to_database(connection_params, all_totals_data, 'totals_table_data')

2025-02-23 23:15:44,030 - INFO - Generated table query for totals_table_data: CREATE TABLE IF NOT EXISTS totals_table_data (
                "Date" TEXT, "Views" DOUBLE PRECISION, "Subscribers" DOUBLE PRECISION, "Shares" DOUBLE PRECISION,
                PRIMARY KEY ("Date")
            );


Log success
Inserted 15348 rows into the database table totals_table_data.


In [8]:
# Specify the file path for the SQL file
sql_file_path = '../database/db_schema.sql'

# Write the SQL queries to the file
os.makedirs(os.path.dirname(sql_file_path), exist_ok=True)
with open(sql_file_path, 'w') as sql_file:
    sql_file.write(sql_queries)

logger.info(f"SQL queries saved to {sql_file_path}")


2025-02-23 23:15:44,618 - INFO - SQL queries saved to ../database/db_schema.sql
