<a href="https://colab.research.google.com/github/ShikharV010/gist_daily_runs/blob/main/Page_Performance.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install psycopg2-binary sqlalchemy pandas
!pip install pymongo

Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.11-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (4.9 kB)
Downloading psycopg2_binary-2.9.11-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl (4.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.2/4.2 MB[0m [31m16.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.11
Collecting pymongo
  Downloading pymongo-4.15.3-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl.metadata (22 kB)
Collecting dnspython<3.0.0,>=1.16.0 (from pymongo)
  Downloading dnspython-2.8.0-py3-none-any.whl.metadata (5.7 kB)
Downloading pymongo-4.15.3-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl (1.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.7/1.7 MB[0m [31m17.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dnspython-2.8.0-py3-none-a

In [9]:
import pymongo
from pymongo import MongoClient
import pandas as pd
import re
from sqlalchemy import create_engine, text
import pytz
from datetime import datetime, timedelta
import uuid
import time
import math
from urllib.parse import urlparse  # needed for improved normalize_url

# MongoDB connection parameters
mongo_connection_string = "mongodb+srv://readonlyPAT:readonlyusermongopassword@serverlessinstance1.e4u1z9c.mongodb.net/gw_seo_pat"

# PostgreSQL connection parameters
pg_params = {
    'host': 'gw-rds-prod.celzx4qnlkfp.us-east-1.rds.amazonaws.com',
    'database': 'gw_prod',
    'user': 'airbyte_user',
    'password': 'airbyte_user_password',
    'port': '5432'
}

# Function to clean domain names
def clean_domain(domain):
    if domain is None:
        return None

    # Remove 'sc-domain:' prefix
    if 'sc-domain:' in domain:
        domain = domain.replace('sc-domain:', '')

    # Remove https://, http://, www., blog., blogs., trailing slashes, etc.
    domain = re.sub(r'^https?://', '', domain)
    domain = re.sub(r'^www\.', '', domain)
    domain = re.sub(r'^blog\.', '', domain)
    domain = re.sub(r'^blogs\.', '', domain)
    domain = domain.strip('/')

    # Remove all TLDs (.com, .org, etc.) and any following content
    domain = re.sub(r'\.[a-zA-Z0-9]+(\.[a-zA-Z0-9]+)*($|/.*$)', '', domain)

    # Remove any remaining periods
    domain = domain.replace('.', '')

    return domain

# Improved: normalize URLs using urlparse
def normalize_url(url):
    if not url:
        return None

    url = url.strip().lower()
    # ensure urlparse sees a scheme
    if not url.startswith(('http://', 'https://')):
        url = 'http://' + url

    parsed = urlparse(url)
    # remove www./blog./blogs. from the host
    host = parsed.netloc.replace('www.', '').replace('blog.', '').replace('blogs.', '')
    # rebuild as host + path, drop query/fragment, trim trailing slash
    return (host + parsed.path).rstrip('/')


# PostgreSQL optimization - create engine with pooling and timeout settings
def create_optimized_engine(pg_params):
    pg_conn_string = f"postgresql://{pg_params['user']}:{pg_params['password']}@{pg_params['host']}:{pg_params['port']}/{pg_params['database']}"
    return create_engine(
        pg_conn_string,
        pool_size=10,              # Increased number of connections for larger batches
        max_overflow=20,           # Allow more additional connections
        pool_timeout=60,           # Longer wait for a connection
        pool_recycle=1800,         # Recycle connections after 30 minutes
        connect_args={"options": "-c statement_timeout=600000"}  # 10 minute default timeout
    )

# Create schema and table structure separately (avoid timeouts) - WITH TRUNCATE OPTION
def setup_database_schema(engine):
    schema_created = False
    table_created = False

    with engine.connect() as connection:
        # Enable autocommit to ensure schema creation is persisted even if later operations fail
        connection.execution_options(isolation_level="AUTOCOMMIT")

        try:
            # Try setting a very long timeout for schema operations
            connection.execute(text("SET statement_timeout = 1200000;"))  # 20 minutes

            # Create schema if not exists
            connection.execute(text("CREATE SCHEMA IF NOT EXISTS gist;"))
            schema_created = True
            print("Ensured schema 'gist' exists")

            # Check if table exists
            table_exists_query = """
            SELECT EXISTS (
                SELECT 1 FROM information_schema.tables
                WHERE table_schema = 'gist' AND table_name = 'gist_pageperformance'
            );
            """
            table_exists = connection.execute(text(table_exists_query)).scalar()

            if not table_exists:
                # Create table without indexes first for speed
                create_table_sql = """
                CREATE TABLE gist.gist_pageperformance (
                    composite_key TEXT PRIMARY KEY,
                    cluster_id TEXT,
                    writer TEXT,
                    page TEXT,
                    domain TEXT,
                    cleaned_domain TEXT,
                    normalized_page TEXT,
                    normalized_post_link TEXT,
                    start_date TIMESTAMP WITH TIME ZONE,
                    end_date TIMESTAMP WITH TIME ZONE,
                    clicks INTEGER,
                    impressions INTEGER,
                    ctr FLOAT,
                    position FLOAT,
                    post_link TEXT,
                    delivery_date TIMESTAMP WITH TIME ZONE,
                    process_run_id TEXT,
                    processed_at TIMESTAMP WITH TIME ZONE
                );
                """
                connection.execute(text(create_table_sql))
                table_created = True
                print("Created table without indexes")
            else:
                print("Table already exists - truncating it for fresh data")

                # First drop any views that might depend on this table
                try:
                    connection.execute(text("DROP VIEW IF EXISTS gist.writer_performance_view;"))
                    connection.execute(text("DROP VIEW IF EXISTS gist.domain_performance_view;"))
                    print("Dropped dependent views")
                except Exception as view_error:
                    print(f"Warning: Could not drop dependent views: {view_error}")
                    print("Proceeding with truncate anyway...")

                # Truncate the table (deletes all rows but keeps table structure and indexes)
                truncate_sql = "TRUNCATE TABLE gist.gist_pageperformance;"
                connection.execute(text(truncate_sql))
                print("Successfully truncated the table")
                table_created = True  # Consider table ready

            # Return true in both cases: new table or truncated existing table
            return schema_created and table_created

        except Exception as e:
            print(f"Error in schema/table setup: {e}")
            # Return partial success status
            return schema_created and table_created

# Create indexes separately to avoid timeout
def create_table_indexes(engine):
    with engine.connect() as connection:
        # Enable autocommit for index operations
        connection.execution_options(isolation_level="AUTOCOMMIT")

        try:
            # Set long timeout for index creation
            connection.execute(text("SET statement_timeout = 600000;"))  # 10 minutes

            # Check if table exists
            table_exists_query = """
            SELECT EXISTS (
                SELECT 1 FROM information_schema.tables
                WHERE table_schema = 'gist' AND table_name = 'gist_pageperformance'
            );
            """
            table_exists = connection.execute(text(table_exists_query)).scalar()

            if not table_exists:
                print("Table doesn't exist, cannot create indexes")
                return False

            # Create each index separately with individual transactions
            index_statements = [
                """CREATE INDEX IF NOT EXISTS idx_pageperformance_page
                   ON gist.gist_pageperformance(page);""",
                """CREATE INDEX IF NOT EXISTS idx_pageperformance_domain
                   ON gist.gist_pageperformance(cleaned_domain);""",
                """CREATE INDEX IF NOT EXISTS idx_pageperformance_start_date
                   ON gist.gist_pageperformance(start_date);""",
                """CREATE INDEX IF NOT EXISTS idx_pageperformance_normalized_page
                   ON gist.gist_pageperformance(normalized_page);""",
                """CREATE INDEX IF NOT EXISTS idx_pageperformance_normalized_post_link
                   ON gist.gist_pageperformance(normalized_post_link);"""
            ]

            success_count = 0
            for i, stmt in enumerate(index_statements):
                try:
                    # Create each index in a separate transaction
                    with engine.connect() as idx_conn:
                        idx_conn.execution_options(isolation_level="AUTOCOMMIT")
                        idx_conn.execute(text("SET statement_timeout = 300000;"))  # 5 minutes per index
                        idx_conn.execute(text(stmt))
                    success_count += 1
                    print(f"Created index {i+1}/{len(index_statements)}")
                except Exception as e:
                    print(f"Warning: Could not create index {i+1}: {e}")
                    print("Will continue without this index")

            print(f"Created {success_count}/{len(index_statements)} indexes")
            return success_count > 0

        except Exception as e:
            print(f"Error creating indexes: {e}")
            return False

# Direct batch insertion with larger batch size (1000 rows)
def insert_data_in_batches(engine, dataframe, batch_size=50000):
    if dataframe.empty:
        print("No data to insert")
        return 0

    # Remove duplicates before insertion to avoid primary key violations
    print("Checking for duplicate composite keys in the dataset...")
    duplicates = dataframe.duplicated(subset=['composite_key'], keep='first')
    if duplicates.any():
        dup_count = duplicates.sum()
        print(f"Found {dup_count} duplicate composite keys. Keeping only the first occurrence of each.")
        # Keep only the first occurrence of each composite_key
        dataframe = dataframe.drop_duplicates(subset=['composite_key'], keep='first')

    total_rows = len(dataframe)
    total_batches = math.ceil(total_rows / batch_size)
    rows_inserted = 0

    print(f"Inserting {total_rows} rows in {total_batches} batches (size {batch_size})")

    for i in range(0, total_rows, batch_size):
        batch_num = i // batch_size + 1
        end_idx = min(i + batch_size, total_rows)
        batch_df = dataframe.iloc[i:end_idx].copy()

        print(f"Processing batch {batch_num}/{total_batches} ({len(batch_df)} records)")

        try:
            with engine.connect() as conn:
                # Enable autocommit for this connection
                conn.execution_options(isolation_level="AUTOCOMMIT")

                # Set reasonable timeout
                conn.execute(text("SET statement_timeout = 600000;"))  # 10 minutes for larger batches

                # Handle NULL values
                batch_df = batch_df.where(pd.notnull(batch_df), None)

                # Use to_sql with method='multi' for better performance
                batch_df.to_sql(
                    'gist_pageperformance',
                    conn,
                    schema='gist',
                    if_exists='append',
                    index=False,
                    method='multi',
                    chunksize=100  # Smaller internal chunks
                )

                rows_inserted += len(batch_df)
                print(f"Batch {batch_num} processed, total progress: {rows_inserted}/{total_rows} rows")

        except Exception as batch_error:
            print(f"Error processing batch {batch_num}: {str(batch_error)[:200]}...")
            print("Falling back to smaller batches for this chunk")

            # Try smaller batches as fallback
            small_batch_size = 50
            small_total_batches = math.ceil(len(batch_df) / small_batch_size)

            for j in range(0, len(batch_df), small_batch_size):
                small_batch_num = j // small_batch_size + 1
                small_end_idx = min(j + small_batch_size, len(batch_df))
                small_batch_df = batch_df.iloc[j:small_end_idx].copy()

                print(f"Processing small batch {small_batch_num}/{small_total_batches}")

                try:
                    with engine.connect() as small_conn:
                        small_conn.execution_options(isolation_level="AUTOCOMMIT")
                        small_conn.execute(text("SET statement_timeout = 300000;"))  # 5 minutes

                        small_batch_df = small_batch_df.where(pd.notnull(small_batch_df), None)

                        small_batch_df.to_sql(
                            'gist_pageperformance',
                            small_conn,
                            schema='gist',
                            if_exists='append',
                            index=False,
                            method='multi',
                            chunksize=10
                        )

                        rows_inserted += len(small_batch_df)
                        print(f"Small batch processed, total progress: {rows_inserted}/{total_rows} rows")

                except Exception as small_batch_error:
                    print(f"Error processing small batch: {str(small_batch_error)[:200]}...")
                    print("Trying row-by-row for this batch")

                    # Try individual row insertion as final fallback
                    for idx, row in small_batch_df.iterrows():
                        try:
                            # Handle NaN values
                            row = row.where(pd.notnull(row), None)

                            # Convert to DataFrame with single row
                            single_row_df = pd.DataFrame([row.to_dict()])

                            with engine.connect() as single_conn:
                                single_conn.execution_options(isolation_level="AUTOCOMMIT")
                                single_row_df.to_sql(
                                    'gist_pageperformance',
                                    single_conn,
                                    schema='gist',
                                    if_exists='append',
                                    index=False
                                )
                                rows_inserted += 1

                        except Exception as row_error:
                            if "duplicate key" in str(row_error).lower():
                                print(f"Skipping duplicate key for row {idx}")
                            else:
                                print(f"Error inserting row {idx}: {str(row_error)[:100]}...")

        # Small pause between batches
        time.sleep(0.3)

    # Add an explicit final commit to ensure all data is committed
    try:
        with engine.connect() as final_conn:
            final_conn.execution_options(isolation_level="AUTOCOMMIT")
            # Execute a dummy query to ensure connection is good
            final_conn.execute(text("SELECT 1;"))
            print("Final connection verified and committed")
    except Exception as e:
        print(f"Final commit attempt error: {e}")

    return rows_inserted

# Step 4: Validate insertions and run consistency checks
def validate_data(engine, original_df):
    print("\n===== STEP 4: Validating inserted data =====")
    try:
        with engine.connect() as conn:
            # Enable autocommit
            conn.execution_options(isolation_level="AUTOCOMMIT")

            # Set timeout
            conn.execute(text("SET statement_timeout = 300000;"))  # 5 minutes

            # Check if the table exists
            table_exists_query = """
            SELECT EXISTS (
                SELECT 1 FROM information_schema.tables
                WHERE table_schema = 'gist' AND table_name = 'gist_pageperformance'
            );
            """
            table_exists = conn.execute(text(table_exists_query)).scalar()

            if not table_exists:
                print("VALIDATION FAILED: Table does not exist!")
                return False

            # Count records
            count_query = "SELECT COUNT(*) FROM gist.gist_pageperformance;"
            db_count = conn.execute(text(count_query)).scalar()

            # Compare with expected count
            expected_count = len(original_df.drop_duplicates(subset=['composite_key'], keep='first'))

            print(f"Records in database: {db_count}")
            print(f"Expected records: {expected_count}")

            if db_count == 0:
                print("VALIDATION FAILED: No records in database!")
                return False

            # Sample some records to verify content
            sample_query = """
            SELECT composite_key, cluster_id, page, post_link
            FROM gist.gist_pageperformance
            LIMIT 5;
            """

            samples = conn.execute(text(sample_query)).fetchall()
            if samples:
                print("Sample records from database:")
                for sample in samples:
                    print(f"  - {sample}")

            # Check for data distribution
            stats_query = """
            SELECT
                COUNT(*) as total_records,
                COUNT(DISTINCT writer) as distinct_writers,
                COUNT(DISTINCT cleaned_domain) as distinct_domains,
                MIN(start_date) as earliest_date,
                MAX(start_date) as latest_date
            FROM gist.gist_pageperformance;
            """

            stats = conn.execute(text(stats_query)).fetchone()
            if stats:
                print("\nData statistics:")
                print(f"Total records: {stats[0]}")
                print(f"Distinct writers: {stats[1]}")
                print(f"Distinct domains: {stats[2]}")
                print(f"Date range: {stats[3]} to {stats[4]}")

            # Additional count verification
            try:
                # Count with different methods
                count1 = conn.execute(text("SELECT COUNT(*) FROM gist.gist_pageperformance;")).scalar()
                count2 = conn.execute(text("SELECT COUNT(1) FROM gist.gist_pageperformance;")).scalar()

                # Count by specific columns
                count_by_key = conn.execute(text("SELECT COUNT(DISTINCT composite_key) FROM gist.gist_pageperformance;")).scalar()
                count_by_page = conn.execute(text("SELECT COUNT(DISTINCT page) FROM gist.gist_pageperformance;")).scalar()

                print("\nDetailed count verification:")
                print(f"COUNT(*): {count1}")
                print(f"COUNT(1): {count2}")
                print(f"COUNT(DISTINCT composite_key): {count_by_key}")
                print(f"COUNT(DISTINCT page): {count_by_page}")

                # Check if all records from the current run are present
                current_run_count = conn.execute(text(f"SELECT COUNT(*) FROM gist.gist_pageperformance WHERE process_run_id = '{run_id}';")).scalar()
                print(f"Records from current run (process_run_id = {run_id}): {current_run_count}")

            except Exception as e:
                print(f"Additional verification failed: {e}")

            return db_count > 0

    except Exception as e:
        print(f"Validation failed with error: {e}")
        return False

# Main script execution starts here
print("Connecting to PostgreSQL...")
engine = create_optimized_engine(pg_params)

# <<< CHANGED FILTER: only require a real post_link; status ignored >>>
clusters_query = """
SELECT id, writer, post_link, post_date as delivery_date
FROM public.clusters
WHERE post_link IS NOT NULL
  AND post_link <> ''
"""
pg_clusters_df = pd.read_sql(clusters_query, engine)
print(f"Fetched {len(pg_clusters_df)} clusters from PostgreSQL")

# Ensure dates are timezone-aware
if 'delivery_date' in pg_clusters_df.columns:
    pg_clusters_df['delivery_date'] = pd.to_datetime(pg_clusters_df['delivery_date'])
    if pg_clusters_df['delivery_date'].dt.tz is None:
        pg_clusters_df['delivery_date'] = pg_clusters_df['delivery_date'].dt.tz_localize('UTC')
    else:
        pg_clusters_df['delivery_date'] = pg_clusters_df['delivery_date'].dt.tz_convert('UTC')

# Clean NA-ish post_link values before normalization (defensive)
pg_clusters_df['post_link'] = (
    pg_clusters_df['post_link']
      .astype(str)
      .str.strip()
      .replace({'': None, 'nan': None, 'None': None}, regex=False)
)

# Apply normalization to post_link
print("Normalizing post_link values...")
pg_clusters_df['normalized_post_link'] = pg_clusters_df['post_link'].apply(normalize_url)

# Connect to MongoDB
print("Connecting to MongoDB...")
mongo_client = MongoClient(mongo_connection_string)
db_name = "gw_seo_pat"
mongo_db = mongo_client[db_name]
collection = mongo_db["page_performance"]

# Updated: Remove date filter to include all data
query = {}
print("Fetching all documents from MongoDB (no date filter)...")

# Get all documents
all_docs = list(collection.find(query))

for doc in all_docs:
    if '_id' in doc:
        doc['_id'] = str(doc['_id'])
    if 'domain' in doc:
        doc['cleaned_domain'] = clean_domain(doc['domain'])

# Convert to DataFrame
mongo_df = pd.DataFrame(all_docs)
print(f"Fetched {len(mongo_df)} records from MongoDB (filtered for last 4 months)")  # message text left as-is per your original

# Apply normalization to page URLs
print("Normalizing page URLs...")
mongo_df['normalized_page'] = mongo_df['page'].apply(normalize_url)

# Convert MongoDB date fields to pandas datetime with UTC timezone
if 'startDate' in mongo_df.columns:
    mongo_df['startDate'] = pd.to_datetime(mongo_df['startDate']).dt.tz_localize('UTC')
if 'endDate' in mongo_df.columns:
    mongo_df['endDate'] = pd.to_datetime(mongo_df['endDate']).dt.tz_localize('UTC')

# Join MongoDB data with clusters based on normalized URLs
print("Joining MongoDB data with clusters using normalized URLs...")
final_df = pd.merge(
    mongo_df,
    pg_clusters_df,
    left_on='normalized_page',
    right_on='normalized_post_link',
    how='left'
)
print(f"After joining MongoDB with clusters: {len(final_df)} records")
print(f"Records lost in this join: {len(mongo_df) - len(final_df)}")

# Sample some new matches to validate normalization
if len(final_df) > 0:
    new_matches = final_df[final_df['page'] != final_df['post_link']]
    if not new_matches.empty:
        print(f"\nNew matches found through normalization: {len(new_matches)}")
        print("Sample of new matches (showing original URLs):")
        print(new_matches[['page', 'post_link']].head(5).to_string())

# Generate run ID and timestamp for tracking
run_id = datetime.now().strftime("%Y%m%d%H%M%S")
current_timestamp = datetime.now()

# Add processing metadata
if not final_df.empty:
    final_df['process_run_id'] = run_id
    final_df['processed_at'] = current_timestamp

    # Create a composite key for identifying records
    final_df['composite_key'] = final_df.apply(
        lambda row: f"{row.get('id', '')}_{row.get('page', '')}_{row.get('startDate', '').strftime('%Y%m%d')}",
        axis=1
    )

    # IMPORTANT: Check for and handle duplicate composite keys before proceeding
    duplicate_keys = final_df.duplicated(subset=['composite_key'], keep=False)
    if duplicate_keys.any():
        print(f"\nWARNING: Found {duplicate_keys.sum()} rows with duplicate composite keys")
        # Keep only the first occurrence of each composite_key
        print("Removing duplicate composite keys (keeping first occurrence only)")
        final_df = final_df.drop_duplicates(subset=['composite_key'], keep='first')
        print(f"Dataset after removing duplicates: {len(final_df)} records")

    print(f"Final dataset prepared with {len(final_df)} records")

    # Column mapping for database
    column_mapping = {
        'composite_key': 'composite_key',
        'id': 'cluster_id',
        'writer': 'writer',
        'page': 'page',
        'domain': 'domain',
        'cleaned_domain': 'cleaned_domain',
        'normalized_page': 'normalized_page',
        'normalized_post_link': 'normalized_post_link',
        'startDate': 'start_date',
        'endDate': 'end_date',
        'clicks': 'clicks',
        'impressions': 'impressions',
        'ctr': 'ctr',
        'position': 'position',
        'post_link': 'post_link',
        'delivery_date': 'delivery_date',
        'process_run_id': 'process_run_id',
        'processed_at': 'processed_at'
    }

    # Only keep the columns we want
    columns_to_keep = list(column_mapping.keys())
    columns_to_keep = [col for col in columns_to_keep if col in final_df.columns]
    subset_df = final_df[columns_to_keep].copy()

    # Rename columns according to the mapping
    renamed_df = subset_df.rename(columns={k: v for k, v in column_mapping.items() if k in subset_df.columns})

    # Save original dataframe for validation
    original_df = renamed_df.copy()

    # STEP 1: Setup schema and table structure (separate step)
    print("\n===== STEP 1: Setting up database schema =====")
    setup_success = setup_database_schema(engine)

    if setup_success:
        # STEP 2: Create database indexes (separate step)
        print("\n===== STEP 2: Creating table indexes =====")
        index_success = create_table_indexes(engine)

        # STEP 3: Insert data using direct batch insertion with duplicate handling
        print("\n===== STEP 3: Inserting data =====")
        # Use larger batch size (2000 rows)
        rows_inserted = insert_data_in_batches(engine, renamed_df, batch_size=2000)

        # STEP 4: Validate data insertion
        validation_success = validate_data(engine, original_df)

        # No dependent views creation - removed as requested
    else:
        print("Failed to set up database schema")

else:
    print("No data to insert into PostgreSQL")

# Final verification
print("\n===== FINAL VERIFICATION =====")
try:
    with engine.connect() as final_verify:
        final_verify.execution_options(isolation_level="AUTOCOMMIT")
        final_count = final_verify.execute(text("SELECT COUNT(*) FROM gist.gist_pageperformance;")).scalar()
        print(f"Final count in database: {final_count}")
        print(f"Expected count: {len(final_df.drop_duplicates(subset=['composite_key'], keep='first'))}")

        # Check for this run's records
        run_count = final_verify.execute(text(f"SELECT COUNT(*) FROM gist.gist_pageperformance WHERE process_run_id = '{run_id}';")).scalar()
        print(f"Records with current run_id ({run_id}): {run_count}")

        # Check if there are older records
        other_runs = final_verify.execute(text(f"SELECT process_run_id, COUNT(*) FROM gist.gist_pageperformance WHERE process_run_id != '{run_id}' GROUP BY process_run_id;")).fetchall()
        if other_runs:
            print("Found records from other runs:")
            for other_run in other_runs:
                print(f"  - Run ID {other_run[0]}: {other_run[1]} records")

except Exception as e:
    print(f"Final verification failed: {e}")

# Output summary
print("\n===== SUMMARY =====")
print(f"MongoDB records (filtered for last 4 months): {len(mongo_df)}")
print(f"Clusters records: {len(pg_clusters_df)}")
print(f"Final dataset after joining: {len(final_df)}")
print(f"Database: {pg_params['database']}")
print(f"Table: gist.gist_pageperformance")
print(f"Process run ID: {run_id}")
print(f"Process completed at: {datetime.now()}")

# Close connections
mongo_client.close()


Connecting to PostgreSQL...
Fetched 32652 clusters from PostgreSQL
Normalizing post_link values...
Connecting to MongoDB...
Fetching all documents from MongoDB (no date filter)...
Fetched 2052756 records from MongoDB (filtered for last 4 months)
Normalizing page URLs...
Joining MongoDB data with clusters using normalized URLs...
After joining MongoDB with clusters: 2084673 records
Records lost in this join: -31917

New matches found through normalization: 781973
Sample of new matches (showing original URLs):
                                                                                                    page                                                                                      post_link
0            https://www.thesoundartist.com/blog/guitar-lessons-in-nassau-county-find-your-perfect-match            www.thesoundartist.com/blog/guitar-lessons-in-nassau-county-find-your-perfect-match
1  https://www.thesoundartist.com/blog/handpan-lessons-in-huntington-with-monthly-soun