# EDA on Gold Tables

In [None]:
# Modifying sys.path to include '/workspace/etl' and '/workspace/etl/utils' in the list of paths
import sys
sys.path.append('/workspace/etl')
sys.path.append('/workspace/etl/utils')
print(sys.path)

# Importing Modules
import os
import boto3
import logging
import pandas as pd
from dotenv import load_dotenv
from sqlalchemy import create_engine
from extract import DataExtractor
from step2_load_to_postgres import DataLoader
from utils_connection import get_s3_parquet_file_key, get_connection_uri
import matplotlib.pyplot as plt

In [None]:
# Cell 2: Load Environment Variables
load_dotenv()

# Fetch AWS credentials from environment variables
s3_access_key_id = os.getenv('S3_ACCESS_KEY_ID')
s3_secret_access_key = os.getenv('S3_SECRET_ACCESS_KEY')
s3_region = os.getenv('S3_REGION')
s3_bucket_name = os.getenv('S3_BUCKET_NAME')

print("S3_ACCESS_KEY_ID: ", s3_access_key_id)
print("S3_SECRET_ACCESS_KEY: ", s3_secret_access_key)
print("S3_BUCKET_NAME: ", s3_bucket_name)

# Initialize a session using boto3
session = boto3.Session(
    aws_access_key_id=s3_access_key_id,
    aws_secret_access_key=s3_secret_access_key,
    region_name=s3_region
)

# Initialize S3 client
s3_client = session.client('s3')

# Example: List objects in the bucket to verify access
try:
    response = s3_client.list_objects_v2(
        Bucket=s3_bucket_name  # Ensure bucket_name is converted to string
    )
    # Print object keys if listing was successful
    print("Objects in bucket:")
    for obj in response.get('Contents', []):
        print(obj['Key'])
except Exception as e:
    print(f"Error accessing bucket: {str(e)}")

In [3]:
class DataGoldEDA:
    def __init__(self):
        """Initialize the DataGoldEDA class."""
        self.engine = create_engine(get_connection_uri())

    def get_data_from_postgres_to_pd(self, schema_name: str, table_name: str) -> pd.DataFrame:
        """Loads data from a PostgreSQL table in a given schema into a Pandas DataFrame."""
        query = f"SELECT * FROM {schema_name}.{table_name}"
        try:
            df = pd.read_sql(query, self.engine)
            print(f"Data loaded successfully from {schema_name}.{table_name}")
            return df
        except Exception as e:
            print(f"Error loading data from {schema_name}.{table_name}: {e}")
            return None

    # Add the methods for the metrics here
    def matching_email_only(self) -> pd.DataFrame:
        """Get the count of rows where there's a match in email_hash but not in phone_hash."""
        query = """
        SELECT COUNT(*) AS email_match_only
        FROM gold.lead_quality_matching
        WHERE email_match = TRUE AND phone_match = FALSE;
        """
        return pd.read_sql(query, self.engine)

    def matching_phone_only(self) -> pd.DataFrame:
        """Get the count of rows where there's a match in phone_hash but not in email_hash."""
        query = """
        SELECT COUNT(*) AS phone_match_only
        FROM gold.lead_quality_matching
        WHERE phone_match = TRUE AND email_match = FALSE;
        """
        return pd.read_sql(query, self.engine)

    def matching_both(self) -> pd.DataFrame:
        """Get the count of rows where there's a match in both email_hash and phone_hash."""
        query = """
        SELECT COUNT(*) AS both_match
        FROM gold.lead_quality_matching
        WHERE email_match = TRUE AND phone_match = TRUE;
        """
        return pd.read_sql(query, self.engine)
    
    def matching_all(self) -> pd.DataFrame:
        """Get the count of rows where lead_uuid is NULL."""
        query = """
        SELECT COUNT(*) as null_leads
        FROM gold.lead_quality_matching
        WHERE lead_uuid is null;
        """
        return pd.read_sql(query, self.engine)

    def total_leads_per_partition(self) -> pd.DataFrame:
        """Get the total number of leads per partition date."""
        query = """
        SELECT _partition_date, COUNT(DISTINCT lead_uuid) AS total_leads
        FROM gold.lead_quality_matching
        GROUP BY _partition_date
        ORDER BY _partition_date;
        """
        return pd.read_sql(query, self.engine)

    def get_total_and_new_leads_per_partition(self) -> pd.DataFrame:
        """
        Calculates the total and new leads for each _partition_date.
        A new lead is one that has not appeared in any of the previous _partition_dates.
        """
        query = """
        WITH sorted_leads AS (
            SELECT 
                _partition_date, 
                lead_uuid, 
                ROW_NUMBER() OVER (PARTITION BY lead_uuid ORDER BY _partition_date) AS row_num
            FROM gold.lead_quality_matching
        ),
        new_leads AS (
            SELECT 
                _partition_date, 
                COUNT(DISTINCT lead_uuid) AS new_leads
            FROM sorted_leads
            WHERE row_num = 1
            GROUP BY _partition_date
        ),
        total_leads AS (
            SELECT 
                _partition_date, 
                COUNT(DISTINCT lead_uuid) AS total_leads
            FROM gold.lead_quality_matching
            GROUP BY _partition_date
        )
        SELECT 
            t._partition_date, 
            t.total_leads, 
            n.new_leads
        FROM total_leads t
        LEFT JOIN new_leads n 
        ON t._partition_date = n._partition_date
        ORDER BY t._partition_date;
        """
        try:
            df = pd.read_sql(query, self.engine)
            print("Total and new leads per partition date calculated successfully.")
            return df
        except Exception as e:
            print(f"Error running query: {e}")
            return None

    def overall_lead_quality_distribution(self) -> pd.DataFrame:
        """Get the distribution of lead quality flags."""
        query = """
        SELECT lead_quality_flag, COUNT(DISTINCT lead_uuid) AS count
        FROM gold.lead_quality_matching
        GROUP BY lead_quality_flag;
        """
        return pd.read_sql(query, self.engine)
    
    def lead_quality_distribution_by_partition(self) -> pd.DataFrame:
        """
        Get the distribution of lead quality flags per _partition_date.

        This function retrieves the count of unique leads for each quality classification 
        (High Quality, Medium Quality, Low Quality) across all available partition dates. 

        Key Points:
        - Lead Quality Flags: The function categorizes leads based on their quality flag, which
        is derived from specific criteria, such as the values of `set` and `demo`.
        - Daily Distribution: The counts are aggregated by the `_partition_date`, allowing for 
        analysis of lead quality over time. This provides insight into how lead quality changes 
        across different days.
        - Data Overview: By grouping the results by both `_partition_date` and `lead_quality_flag`,
        the function offers a clear view of how many leads fall into each quality category 
        for each day in the dataset.
        
        Considerations:
        - New vs. Existing Leads: This function provides an overview of lead quality distribution 
        but does not distinguish between leads that are new on a given day versus those that were 
        present on previous days.
        - Temporal Trends: The data retrieved allows for temporal analysis, helping to identify 
        patterns in lead quality over time, which can be essential for assessing the effectiveness 
        of lead generation strategies.

        Returns:
            pd.DataFrame: A DataFrame containing the date, lead quality flag, and the count of 
            unique leads in each category for that date.
        """
        query = """
        SELECT 
            _partition_date,
            lead_quality_flag, 
            COUNT(DISTINCT lead_uuid) AS count
        FROM gold.lead_quality_matching
        GROUP BY _partition_date, lead_quality_flag
        ORDER BY _partition_date, lead_quality_flag;
        """
        return pd.read_sql(query, self.engine)

    def new_high_quality_leads_daily_count(self) -> pd.DataFrame:
        """
        Get the daily count of new high-quality leads.

        This function counts the number of unique leads classified as 'High Quality'
        that appear for the first time on each day. It provides insights into the 
        growth of new high-quality leads over time.

        Key Points:
        - High Quality Definition: The function relies on the classification of high-quality leads,
        based on specific criteria (e.g., leads with `set = 1` and `demo = 1`).
        - New Lead Identification: It counts only those leads that qualify as high quality
        for the first time on each date, ensuring accurate tracking of lead growth.

        Returns:
            pd.DataFrame: A DataFrame containing the date and the count of new high-quality leads.
        """
        query = """
        WITH high_quality_leads AS (
            SELECT 
                lead_uuid, 
                _partition_date,
                ROW_NUMBER() OVER (PARTITION BY lead_uuid ORDER BY _partition_date) AS first_high_quality_day
            FROM gold.lead_quality_matching
            WHERE lead_quality_flag = 'High Quality'
        ),
        daily_new_high_quality_counts AS (
            SELECT 
                _partition_date, 
                COUNT(lead_uuid) AS new_high_quality_count
            FROM high_quality_leads
            WHERE first_high_quality_day = 1  -- Only count leads that are new high-quality for the first time
            GROUP BY _partition_date
        )
        SELECT 
            _partition_date, 
            new_high_quality_count
        FROM daily_new_high_quality_counts
        ORDER BY _partition_date;
        """
        return pd.read_sql(query, self.engine)
    
    def plot_new_high_quality_leads_trend(self):
        """
        Plot the trend of new high-quality leads over time.

        This function retrieves the daily counts of new high-quality leads
        and creates a line graph to visualize the trend over the specified period,
        excluding the first day's data to avoid the peak.
        """
        # Retrieve the daily counts of new high-quality leads
        new_high_quality_leads = self.new_high_quality_leads_daily_count()

        # Check if data is available
        if new_high_quality_leads.empty:
            print("No data available to plot.")
            return

        # Convert _partition_date to datetime if necessary
        new_high_quality_leads['_partition_date'] = pd.to_datetime(new_high_quality_leads['_partition_date'])

        # Exclude the first day to avoid the peak
        new_high_quality_leads = new_high_quality_leads.iloc[1:]

        # Plotting the trend
        plt.figure(figsize=(12, 6))
        plt.plot(new_high_quality_leads['_partition_date'], new_high_quality_leads['new_high_quality_count'], marker='o')
        
        # Adding title and labels
        plt.title('Trend of New High-Quality Leads Over Time (Excluding First Day)')
        plt.xlabel('Date')
        plt.ylabel('Number of New High-Quality Leads')
        plt.xticks(rotation=45)
        plt.grid()
        
        # Show the plot
        plt.tight_layout()
        plt.show()

In [None]:
# Set up logging
logging.basicConfig(level=logging.INFO)

# Example schema and table names
schema_names = ['gold']
gold_table_names = ['lead_quality_matching']

# Instantiate the DataGoldEDA
eda = DataGoldEDA()

# Get Gold Schema
gold_schema = schema_names[0]

# Load data from the gold table and perform EDA
for table_name in gold_table_names:
    gold_data = eda.get_data_from_postgres_to_pd(gold_schema, table_name)
    if gold_data is not None:
        logging.info(f"\nPerforming EDA on table: {table_name}")

# Perform various analyses using the defined methods
try:
    # Get counts for various matching scenarios
    email_match_only = eda.matching_email_only()
    logging.info(f"Email Match Only: {email_match_only.iloc[0]['email_match_only']}")

    phone_match_only = eda.matching_phone_only()
    logging.info(f"Phone Match Only: {phone_match_only.iloc[0]['phone_match_only']}")

    both_match = eda.matching_both()
    logging.info(f"Both Match: {both_match.iloc[0]['both_match']}")

    null_leads = eda.matching_all()
    logging.info(f"Null Leads: {null_leads.iloc[0]['null_leads']}")

    # Get total leads per partition
    total_leads_per_partition = eda.total_leads_per_partition()
    logging.info(f"Total Leads Per Partition:\n{total_leads_per_partition}")

    # Get total and new leads per partition
    total_and_new_leads = eda.get_total_and_new_leads_per_partition()
    logging.info(f"Total and New Leads Per Partition:\n{total_and_new_leads}")

    # Get overall lead quality distribution
    lead_quality_distribution = eda.overall_lead_quality_distribution()
    logging.info(f"Overall Lead Quality Distribution:\n{lead_quality_distribution}")

    # Get lead quality distribution by partition
    lead_quality_distribution_by_partition = eda.lead_quality_distribution_by_partition()
    logging.info(f"Lead Quality Distribution by Partition:\n{lead_quality_distribution_by_partition}")

    # Get daily increase of high-quality leads
    new_daily_high_quality_increase = eda.new_high_quality_leads_daily_count()
    logging.info(f"Daily High Quality Leads Increase:\n{new_daily_high_quality_increase}")

    # Plot the trend of new high-quality leads
    eda.plot_new_high_quality_leads_trend()

except Exception as e:
    logging.error(f"An error occurred during EDA: {e}")