# Combined MySQL and Lakehouse Data Fetching with Dynamic Country Detection

This notebook combines data from:
1. MySQL database (country-specific schemas)
2. AWS Athena Lakehouse (meth_2012 and positions_us_v1 schemas)

**Enhanced with dynamic country detection** - automatically detects which countries to fetch from each source:
- Fetches from MySQL for countries with individual schemas
- Fetches remaining countries from lakehouse meth_2012 + US schema
- No manual country lists required!

The final output will be a merged dataset with consistent column names.

In [1]:
# Import required libraries
import mysql.connector
import pandas as pd
import re
from datetime import datetime
import time
import boto3
import warnings
warnings.filterwarnings('ignore')

print("All libraries imported successfully!")

All libraries imported successfully!


## Configuration and Date Range Input

In [2]:
# Get execution date for file naming
execution_date = datetime.now().strftime("%Y%m%d")
print(f"Execution date: {execution_date}")

def get_date_range_from_user():
    """
    Prompts the user to enter a start and end date and validates their format (YYYY-MM-DD).
    Returns a tuple of (start_date, end_date) or (None, None) if input is invalid.
    """
    while True:
        start_date_str = input("Enter the start date (YYYY-MM-DD) for the query: ")
        end_date_str = input("Enter the end date (YYYY-MM-DD) for the query: ")
        try:
            start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
            end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
            if start_date > end_date:
                print("Start date must be before or equal to end date. Please try again.")
                continue
            return start_date_str, end_date_str
        except ValueError:
            print("Invalid date format. Please use YYYY-MM-DD.")

def validate_date_range(start_date_str, end_date_str):
    """
    Alternative function for environments where input() is not supported.
    Validates the provided start and end date strings.
    """
    try:
        start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
        end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
        if start_date > end_date:
            raise ValueError("Start date must be before or equal to end date.")
        return start_date_str, end_date_str
    except ValueError as e:
        print(f"Date validation error: {e}")
        return None, None

# Try to get dates from user input, fall back to manual entry if needed
try:
    start_date, end_date = get_date_range_from_user()
    print(f"‚úÖ Date range set: {start_date} to {end_date}")
except:
    print("‚ö†Ô∏è Interactive input not available. Please manually set your date range below:")
    # Modify these dates as needed
    start_date, end_date = validate_date_range('2025-01-01', '2025-12-31')
    if start_date and end_date:
        print(f"üìÖ Using default date range: {start_date} to {end_date}")
        print("üí° To use custom dates, modify the values in the validate_date_range() call above")
    else:
        print("‚ùå Invalid date range. Please check the date format.")

Execution date: 20250703


Enter the start date (YYYY-MM-DD) for the query:  2025-06-01
Enter the end date (YYYY-MM-DD) for the query:  2025-06-30


‚úÖ Date range set: 2025-06-01 to 2025-06-30


## MySQL Database Connection and Schema Discovery

In [3]:
def connect_to_mysql():
    """
    Establishes a connection to the MySQL database using provided credentials.
    Returns a connection object or None if connection fails.
    """
    try:
        host = 'lookup-unsigned.cluster-ro-cyrwvn3qibpy.us-east-1.rds.amazonaws.com'
        user = input("Enter your MySQL username: ")
        password = input("Enter your MySQL password: ")

        mydb = mysql.connector.connect(
            host=host,
            user=user,
            password=password
        )
        print("‚úÖ Successfully connected to MySQL database.")
        return mydb
    except mysql.connector.Error as err:
        print(f"‚ùå Error connecting to MySQL: {err}")
        return None
    except:
        print("‚ö†Ô∏è Interactive input not available for MySQL credentials.")
        print("üí° Please modify this cell to include your credentials directly if needed.")
        return None

# Connect to MySQL
mysql_conn = connect_to_mysql()

Enter your MySQL username:  hahhahaha
Enter your MySQL password:  hahahahahha


‚ö†Ô∏è Interactive input not available for MySQL credentials.
üí° Please modify this cell to include your credentials directly if needed.


In [4]:
def get_and_filter_schemas(db_connection):
    """
    Queries the database for all distinct schema names and filters them based on
    the pattern 'positions_{country_code}_v1', explicitly excluding 'positions_us_v1'.
    Returns a list of matching schema names.
    """
    if not db_connection:
        return []

    matching_schemas = []
    cursor = None
    try:
        cursor = db_connection.cursor()
        query = "SELECT DISTINCT schema_name FROM information_schema.schemata"
        cursor.execute(query)

        # Regex to match the pattern positions_{country_code}_v1
        schema_pattern = re.compile(r"^positions_([a-z]{2,})_v1$")

        for (schema_name,) in cursor:
            if schema_pattern.match(schema_name) and schema_name != "positions_us_v1":
                matching_schemas.append(schema_name)

        if matching_schemas:
            print(f"‚úÖ Found matching schemas (excluding 'positions_us_v1'): {', '.join(matching_schemas)}")
        else:
            print("‚ùå No schemas found matching the pattern 'positions_{country_code}_v1' (excluding 'positions_us_v1').")

    except mysql.connector.Error as err:
        print(f"‚ùå Error querying schemas: {err}")
    finally:
        if cursor:
            cursor.close()

    return matching_schemas

def extract_mysql_countries(filtered_schemas):
    """
    Extracts country codes from MySQL schema names.
    Returns a list of country codes that will be fetched from MySQL.
    """
    mysql_countries = []
    schema_pattern = re.compile(r"^positions_([a-z]{2,})_v1$")
    
    for schema in filtered_schemas:
        match = schema_pattern.match(schema)
        if match:
            country_code = match.group(1).upper()
            if country_code == 'UK':
                country_code = 'GB'
            mysql_countries.append(country_code)
    
    mysql_countries_sorted = sorted(set(mysql_countries))  # Remove duplicates and sort
    print(f"üåç MySQL will fetch data for countries: {', '.join(mysql_countries_sorted)}")
    return mysql_countries_sorted

# Get filtered schemas and extract countries
if mysql_conn:
    filtered_schemas = get_and_filter_schemas(mysql_conn)
    mysql_countries = extract_mysql_countries(filtered_schemas)
    print(f"Found {len(filtered_schemas)} schemas covering {len(mysql_countries)} countries")
else:
    filtered_schemas = []
    mysql_countries = []
    print("‚ö†Ô∏è No MySQL connection available")

‚úÖ Found matching schemas (excluding 'positions_us_v1'): positions_au_v1, positions_ca_v1, positions_fr_v1, positions_ph_v1, positions_pt_v1, positions_uk_v1, positions_co_v1, positions_pa_v1, positions_hrf_v1, positions_hk_v1, positions_tw_v1, positions_gh_v1, positions_pe_v1, positions_cl_v1, positions_ec_v1, positions_ma_v1, positions_ng_v1, positions_ke_v1, positions_gt_v1, positions_sv_v1, positions_uy_v1, positions_do_v1, positions_bd_v1, positions_hn_v1, positions_ve_v1, positions_ni_v1, positions_gr_v1, positions_il_v1, positions_qa_v1, positions_bh_v1, positions_hr_v1, positions_ba_v1, positions_kw_v1, positions_bg_v1, positions_mt_v1, positions_pk_v1, positions_sk_v1, positions_ua_v1, positions_eg_v1, positions_om_v1, positions_rs_v1, positions_id_v1, positions_lt_v1, positions_lv_v1, positions_vn_v1
üåç MySQL will fetch data for countries: AU, BA, BD, BG, BH, CA, CL, CO, DO, EC, EG, FR, GB, GH, GR, GT, HK, HN, HR, HRF, ID, IL, KE, KW, LT, LV, MA, MT, NG, NI, OM, PA, PE, PH

## AWS Athena Connection and Dynamic Country Discovery

In [4]:
# Set up AWS credentials
print("üîê Setting up AWS credentials...")
try:
    aws_access_key_id = input("Enter your aws_access_key_id: ")
    aws_secret_access_key = input("Enter your aws_secret_access_key: ")
    aws_session_token = input("Enter your aws_session_token: ")

    session = boto3.Session(
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key,
        aws_session_token=aws_session_token,
        region_name='us-east-1'
    )
    
    # Create Athena client
    athena_client = session.client('athena')
    print("‚úÖ AWS session and Athena client created successfully!")
except:
    print("‚ö†Ô∏è Interactive input not available for AWS credentials.")
    print("üí° Please modify this cell to include your AWS credentials directly if needed.")
    athena_client = None

üîê Setting up AWS credentials...


Enter your aws_access_key_id:  ASIA6EAB4CAJRQKRW2M6
Enter your aws_secret_access_key:  iEIreoiYbnnyyBSfRUbPPQsZFbGAMbC4HzfYE7rG
Enter your aws_session_token:  IQoJb3JpZ2luX2VjEPP//////////wEaCXVzLWVhc3QtMSJHMEUCIQD6eO59EM0YJa0g3QTkc/Mvhd99we7kKkicYdSNXbtBSgIgOfMzJzVwmIz+uQ9mfqxjSKQ4tSzKk6kbcIPnAueCB+gqogMI7P//////////ARADGgw5NzA2NjY1NDUxNzEiDJlEDpwea6LI3A800Sr2AlPT0LYtqbAh0cm5dj1TvRFv4/DnQXL/Fv51b5EP8mmHKhbkrPhWvTje3dw1j9rMoHIioIcW2HUZRvU62kgbTFO6MHGbaLNbzxqmOyCF424lI1Qzro59bdKVzsOei28qDlYlNyQY1aCRMrd2CABtzr6FPy7NBwY3CbaMxoIJuGcGG0TJ8zKJE/2Btr1Re4ijHodVhHz8yflkGAJPKM8H9bBHbrFqkcscwXDFD9HPeQW+5wbN9lzXt4uIp3EZS9uM6DCTFfxhw8nk++K6f8KGqGmjwbEy7HkkWz4EK18ZiguyKHSlYQjqOM84ES8wIfs9cOYZSZO8jD5/BFC3BAYghYAlc6lG0RVBIHqYdXjhCNMCfFzJJcnW/BG17US4RULPTgn5gY178ijiozUdp6VFnC4y4yVac2581Rc/F8L8tHEf2/XMvf36BV44u1MIn6nxoALkT5015O6D0G670OCMsDsr8hYtlMsDWLgbBGmVF/qTQyc4zXbvMLyWlMMGOqYBy3Vgrk2wwW6jiugfgmrpsm0pnswqce+LczMZJnm/XpTS0TFBHchsKennUsRCFqSu5cq+oEj4oIi1Lt/U7N5lgEQx9a9mcCCm8ZkE4a0XnMtolcAsHjC9xoIBCnrZZ

‚úÖ AWS session and Athena client created successfully!


In [5]:
def get_lakehouse_countries(athena_client, mysql_countries):
    """
    Queries Athena to get all distinct country codes from meth_2012 schema,
    then filters out countries already covered by MySQL schemas.
    Returns a list of country codes to fetch from lakehouse.
    """
    if not athena_client:
        print("‚ö†Ô∏è No Athena client available for country discovery")
        return []
    
    # Query to get distinct countries from meth_2012 schema
    discovery_query = """
    SELECT DISTINCT country_code
    FROM "demand"."positions"
    WHERE position_schema = 'meth_2012'
      AND country_code IS NOT NULL
      AND country_code != ''
    ORDER BY country_code
    """
    
    try:
        print("üîç Discovering available countries in lakehouse meth_2012 schema...")
        start_time = time.time()
        
        response = athena_client.start_query_execution(
            QueryString=discovery_query,
            QueryExecutionContext={
                'Database': 'demand'
            },
            ResultConfiguration={
                'OutputLocation': 's3://naman.kansal/Local Lakehouse Queries/'
            }
        )
        
        query_execution_id = response['QueryExecutionId']
        print(f"Country discovery query ID: {query_execution_id}")
        
        # Wait for query to complete
        while True:
            status = athena_client.get_query_execution(QueryExecutionId=query_execution_id)['QueryExecution']['Status']['State']
            if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
                break
            print("‚è≥ Country discovery query running...")
            time.sleep(5)
        
        end_time = time.time()
        execution_time = round(end_time - start_time, 2)
        
        if status == 'SUCCEEDED':
            print(f"‚úÖ Country discovery completed in {execution_time} seconds")
            
            # Fetch results
            results = athena_client.get_query_results(QueryExecutionId=query_execution_id)
            
            # Extract country codes
            all_lakehouse_countries = []
            for row in results['ResultSet']['Rows'][1:]:  # Skip header
                country_code = row['Data'][0].get('VarCharValue', '').strip()
                if country_code:
                    all_lakehouse_countries.append(country_code)
            
            print(f"üåç Found {len(all_lakehouse_countries)} countries in lakehouse meth_2012 schema")
            print(f"üìù All lakehouse countries: {', '.join(sorted(all_lakehouse_countries))}")
            
            # Filter out countries already covered by MySQL
            countries_to_fetch = [c for c in all_lakehouse_countries if c not in mysql_countries]
            countries_to_fetch_sorted = sorted(set(countries_to_fetch))
            
            print(f"\nüéØ Lakehouse will fetch data for countries: {', '.join(countries_to_fetch_sorted)}")
            print(f"üö´ Excluded (already in MySQL): {', '.join(sorted(set(mysql_countries)))}")
            
            return countries_to_fetch_sorted
        else:
            print(f"‚ùå Country discovery query failed with status: {status}")
            return []
            
    except Exception as e:
        print(f"‚ùå Error during country discovery: {e}")
        return []

# Discover countries to fetch from lakehouse
if athena_client:
    lakehouse_countries = get_lakehouse_countries(athena_client, mysql_countries)
    print(f"\nüìä Data fetching strategy:")
    print(f"   ‚Ä¢ MySQL: {len(mysql_countries)} countries via individual schemas")
    print(f"   ‚Ä¢ Lakehouse: {len(lakehouse_countries)} countries via meth_2012 + US via positions_us_v1")
    print(f"   ‚Ä¢ Total unique countries: {len(set(mysql_countries + lakehouse_countries + ['US']))}")
else:
    lakehouse_countries = []
    print("‚ö†Ô∏è Skipping lakehouse country discovery - no client available")

NameError: name 'mysql_countries' is not defined

## MySQL Data Fetching

In [7]:
def fetch_mysql_data(db_connection, schema_list, start_date_str, end_date_str):
    """
    For each schema in the list, constructs and executes a SQL query
    with date range filtering to fetch job and position data.
    Returns a pandas DataFrame containing all results and a timing report DataFrame.
    """
    if not db_connection or not schema_list:
        return pd.DataFrame(), pd.DataFrame()

    all_data_frames = []
    timing_records = []
    schema_pattern_for_country_code = re.compile(r"^positions_([a-z]{2,})_v1$")

    for schema_name in schema_list:
        match = schema_pattern_for_country_code.match(schema_name)
        if not match:
            print(
                f"‚ö†Ô∏è Skipping schema '{schema_name}' as it doesn't match expected pattern "
                "for country code extraction."
            )
            continue

        country_code_lower = match.group(1)
        if country_code_lower == "uk":
            country_code_upper = "GB"
        else:
            country_code_upper = country_code_lower.upper()

        # Timing start
        start_time = datetime.now()
        start_time_unix = time.time()

        # Updated SQL query with date range filtering
        sql_query = f"""
        SELECT
            '{country_code_upper}' AS country_code,
            DATE_FORMAT(b.date, '%b-%y') AS position_month,
            a.covering_source_id as source_id,
            jl.`language` as `language`, 
            COUNT(DISTINCT a.job_id) AS total_jobs,
            COUNT(DISTINCT a.position_id) AS total_positions,
            'mysql' as data_source
        FROM
            {schema_name}.long_term_jobs_m2012_new a
            LEFT JOIN norm_prod.dates b ON (a.ad_first_seen_date_id = b.id)
            LEFT JOIN jobs.tbl_JobLanguage jl ON (a.job_id = jl.jobid)
            LEFT JOIN acqnotes.sources c ON (a.covering_source_id = c.id)
        WHERE 1=1
            AND b.year = 2025
            AND b.date >= '{start_date_str}'
            AND b.date <= '{end_date_str}'
        GROUP BY
            1, 2, 3, 4;
        """

        print(f"\nüîÑ Executing query for schema: {schema_name} (Country: {country_code_upper})")
        try:
            df_schema = pd.read_sql_query(sql_query, db_connection)
            end_time = datetime.now()
            end_time_unix = time.time()
            total_processing_time = end_time_unix - start_time_unix

            timing_records.append({
                "schema": schema_name,
                "country_code": country_code_upper,
                "start_time": start_time.strftime('%Y-%m-%d %H:%M:%S'),
                "end_time": end_time.strftime('%Y-%m-%d %H:%M:%S'),
                "total_processing_seconds": round(total_processing_time, 2),
                "data_source": "mysql",
                "date_range": f"{start_date_str} to {end_date_str}"
            })

            if not df_schema.empty:
                print(f"‚úÖ Successfully fetched {len(df_schema)} rows from {schema_name}.")
                all_data_frames.append(df_schema)
            else:
                print(f"‚ö†Ô∏è No data returned from {schema_name} for the given date range.")
        except Exception as e:
            end_time = datetime.now()
            end_time_unix = time.time()
            total_processing_time = end_time_unix - start_time_unix

            timing_records.append({
                "schema": schema_name,
                "country_code": country_code_upper,
                "start_time": start_time.strftime('%Y-%m-%d %H:%M:%S'),
                "end_time": end_time.strftime('%Y-%m-%d %H:%M:%S'),
                "total_processing_seconds": round(total_processing_time, 2),
                "error": str(e),
                "data_source": "mysql",
                "date_range": f"{start_date_str} to {end_date_str}"
            })

            print(f"‚ùå Error executing query for schema {schema_name}: {e}")

    if not all_data_frames:
        print("‚ö†Ô∏è No data collected from any MySQL schemas.")
        return pd.DataFrame(), pd.DataFrame(timing_records)

    final_df = pd.concat(all_data_frames, ignore_index=True)
    timing_df = pd.DataFrame(timing_records)
    return final_df, timing_df

# Fetch MySQL data
if mysql_conn and filtered_schemas and start_date and end_date:
    print("\nüöÄ Starting MySQL data fetching process...")
    mysql_results_df, mysql_timing_df = fetch_mysql_data(mysql_conn, filtered_schemas, start_date, end_date)
    print(f"\nüìä MySQL Results: {len(mysql_results_df)} total rows")
    if not mysql_results_df.empty:
        print("MySQL Data Preview:")
        print(mysql_results_df.head())
else:
    mysql_results_df = pd.DataFrame()
    mysql_timing_df = pd.DataFrame()
    print("‚ö†Ô∏è Skipping MySQL data fetch - no connection, schemas, or date range available")


üöÄ Starting MySQL data fetching process...

üîÑ Executing query for schema: positions_au_v1 (Country: AU)
‚úÖ Successfully fetched 1429 rows from positions_au_v1.

üîÑ Executing query for schema: positions_ca_v1 (Country: CA)
‚úÖ Successfully fetched 4070 rows from positions_ca_v1.

üîÑ Executing query for schema: positions_fr_v1 (Country: FR)
‚úÖ Successfully fetched 2231 rows from positions_fr_v1.

üîÑ Executing query for schema: positions_ph_v1 (Country: PH)
‚úÖ Successfully fetched 826 rows from positions_ph_v1.

üîÑ Executing query for schema: positions_pt_v1 (Country: PT)
‚úÖ Successfully fetched 586 rows from positions_pt_v1.

üîÑ Executing query for schema: positions_uk_v1 (Country: GB)
‚úÖ Successfully fetched 3161 rows from positions_uk_v1.

üîÑ Executing query for schema: positions_co_v1 (Country: CO)
‚úÖ Successfully fetched 555 rows from positions_co_v1.

üîÑ Executing query for schema: positions_pa_v1 (Country: PA)
‚úÖ Successfully fetched 277 rows from positio

## Lakehouse Data Fetching with Dynamic Country List

In [8]:
def build_dynamic_athena_query(start_date_str, end_date_str, lakehouse_countries):
    """
    Builds the Athena query with dynamically determined country list.
    """
    # Convert lakehouse_countries list to SQL format
    if lakehouse_countries:
        country_list_sql = "', '".join(lakehouse_countries)
        country_filter = f"'{ country_list_sql }'"
    else:
        # If no countries to fetch from meth_2012, use empty condition that will be false
        country_filter = "'__NO_COUNTRIES__'"
    
    query = f"""
    WITH filtered_positions AS (
      SELECT
        p.position_id,
        p.preferred_job_id,
        p.position_schema,
        p.country_code,
        p.job_id_list
      FROM "demand"."positions" p
      WHERE p.year = 2025
        AND p.position_schema IN ('meth_2012', 'positions_us_v1')
    ),
    exploded_jobs AS (
      SELECT
        fp.position_id,
        fp.position_schema,
        fp.country_code,
        fp.preferred_job_id,
        job_id
      FROM filtered_positions AS fp
      CROSS JOIN UNNEST(fp.job_id_list) AS t(job_id)
    )
    SELECT
      e.country_code,
      date_format(
        date_trunc('month', j.first_seen_date),
        '%b-%y'
      ) AS position_month,
      j.source_id,
      j.source_name,
      j.source_country,
      jl.language,
      COUNT(DISTINCT e.job_id) AS total_jobs,
      COUNT(DISTINCT e.position_id) AS total_positions,
      'athena' as data_source
    FROM exploded_jobs AS e
    JOIN "demand"."jobs" j
      ON e.job_id = j.job_id
    JOIN "demand"."job_languages" jl ON (jl.job_id = j.job_id)
    WHERE
      j.first_seen_date >= date '{start_date_str}'
      AND j.first_seen_date <= date '{end_date_str}'
      AND (
        (
          e.position_schema = 'meth_2012'
            AND e.country_code IN ({country_filter})
        )
        OR
        e.position_schema = 'positions_us_v1'
      )
    GROUP BY
      e.country_code,
      date_trunc('month', j.first_seen_date),
      j.source_id,
      j.source_name,
      j.source_country,
      jl.language
    ORDER BY
      e.country_code,
      date_trunc('month', j.first_seen_date);
    """
    
    return query

def fetch_athena_data(athena_client, start_date_str, end_date_str, lakehouse_countries):
    """
    Executes Athena query with dynamically determined country list and returns results as DataFrame
    """
    if not athena_client:
        return pd.DataFrame()
    
    # Build dynamic query
    query = build_dynamic_athena_query(start_date_str, end_date_str, lakehouse_countries)
    
    # Print query info
    print(f"\nüéØ Athena query will fetch:")
    print(f"   ‚Ä¢ meth_2012 countries: {', '.join(lakehouse_countries) if lakehouse_countries else 'None'}")
    print(f"   ‚Ä¢ positions_us_v1: US data")
    print(f"   ‚Ä¢ Date range: {start_date_str} to {end_date_str}")
    
    try:
        print("\nüöÄ Starting Athena query execution...")
        start_time = time.time()
        
        response = athena_client.start_query_execution(
            QueryString=query,
            QueryExecutionContext={
                'Database': 'demand'
            },
            ResultConfiguration={
                'OutputLocation': 's3://naman.kansal/Local Lakehouse Queries/'
            }
        )
        
        # Get query execution ID
        query_execution_id = response['QueryExecutionId']
        print(f"Query execution ID: {query_execution_id}")
        
        # Wait for query to complete
        while True:
            status = athena_client.get_query_execution(QueryExecutionId=query_execution_id)['QueryExecution']['Status']['State']
            if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
                break
            print("‚è≥ Query is still running...")
            time.sleep(15)
        
        end_time = time.time()
        execution_time = round(end_time - start_time, 2)
        
        # Check if the query succeeded
        if status == 'SUCCEEDED':
            print(f"‚úÖ Query completed successfully in {execution_time} seconds")
            
            # Fetch results
            results = athena_client.get_query_results(QueryExecutionId=query_execution_id)
            
            # Extract data from results
            columns = [col['Label'] for col in results['ResultSet']['ResultSetMetadata']['ColumnInfo']]
            rows = [row['Data'] for row in results['ResultSet']['Rows'][1:]]  # Skip header row
            data = [[col['VarCharValue'] if 'VarCharValue' in col else None for col in row] for row in rows]
            
            # Convert to DataFrame
            df = pd.DataFrame(data, columns=columns)
            print(f"üìä Athena Results: {len(df)} total rows")
            
            if not df.empty:
                countries_fetched = sorted(df['country_code'].unique())
                print(f"üåç Countries successfully fetched: {', '.join(countries_fetched)}")
            
            return df
        else:
            print(f"‚ùå Query failed with status: {status}")
            return pd.DataFrame()
            
    except Exception as e:
        print(f"‚ùå Error executing Athena query: {e}")
        return pd.DataFrame()

# Fetch Athena data with dynamic country list
if athena_client and start_date and end_date:
    athena_results_df = fetch_athena_data(athena_client, start_date, end_date, lakehouse_countries)
    if not athena_results_df.empty:
        print("\nAthena Data Preview:")
        print(athena_results_df.head())
else:
    athena_results_df = pd.DataFrame()
    print("‚ö†Ô∏è Skipping Athena data fetch - no client or date range available")


üéØ Athena query will fetch:
   ‚Ä¢ meth_2012 countries: AD, AE, AF, AG, AI, AL, AM, AO, AQ, AR, AS, AT, AW, AX, AZ, BB, BE, BF, BI, BJ, BL, BM, BN, BO, BQ, BR, BS, BT, BV, BW, BY, BZ, CC, CD, CF, CG, CH, CI, CK, CM, CN, CP, CR, CU, CV, CW, CX, CY, CZ, DE, DJ, DK, DM, DZ, EE, EH, ER, ES, ET, FI, FJ, FK, FM, FO, GA, GD, GE, GF, GG, GI, GL, GM, GN, GP, GQ, GS, GU, GW, GY, HM, HT, HU, IE, IM, IN, IO, IQ, IR, IS, IT, JE, JM, JO, JP, KG, KH, KI, KM, KN, KP, KR, KY, KZ, LA, LB, LC, LI, LK, LR, LS, LU, LY, MC, MD, ME, MF, MG, MH, MK, ML, MM, MN, MP, MQ, MR, MS, MU, MV, MW, MX, MY, MZ, NA, NC, NE, NF, NL, NO, NP, NR, NU, NZ, PF, PG, PL, PM, PN, PS, PW, PY, RE, RO, RU, RW, SA, SB, SC, SD, SE, SG, SH, SI, SJ, SL, SM, SN, SO, SR, SS, ST, SX, SY, SZ, TC, TD, TF, TG, TH, TJ, TK, TL, TM, TN, TO, TR, TT, TV, TZ, UG, UM, US, UZ, VA, VC, VG, VI, VU, WF, WS, YE, YT, ZA, ZM, ZW
   ‚Ä¢ positions_us_v1: US data
   ‚Ä¢ Date range: 2025-06-01 to 2025-06-30

üöÄ Starting Athena query execution...
Query exe

## Data Harmonization and Merging

In [9]:
def harmonize_and_merge_data(mysql_df, athena_df):
    """
    Harmonizes column names and data types, then merges the two DataFrames
    """
    combined_dfs = []
    
    # Process MySQL data
    if not mysql_df.empty:
        mysql_harmonized = mysql_df.copy()
        # Add missing columns with None values
        mysql_harmonized['source_name'] = None
        mysql_harmonized['source_country'] = None
        print(f"‚úÖ MySQL data harmonized: {len(mysql_harmonized)} rows")
        combined_dfs.append(mysql_harmonized)
    
    # Process Athena data  
    if not athena_df.empty:
        athena_harmonized = athena_df.copy()
        print(f"‚úÖ Athena data harmonized: {len(athena_harmonized)} rows")
        combined_dfs.append(athena_harmonized)
    
    # Combine all DataFrames
    if combined_dfs:
        final_combined_df = pd.concat(combined_dfs, ignore_index=True)
        
        # Ensure consistent column order
        column_order = [
            'country_code', 'position_month', 'source_id', 'source_name', 
            'source_country', 'language', 'total_jobs', 'total_positions', 'data_source'
        ]
        
        final_combined_df = final_combined_df.reindex(columns=column_order)
        
        # Convert numeric columns
        numeric_columns = ['source_id', 'total_jobs', 'total_positions']
        for col in numeric_columns:
            if col in final_combined_df.columns:
                final_combined_df[col] = pd.to_numeric(final_combined_df[col], errors='coerce')
        
        print(f"\nüéâ Successfully combined datasets!")
        print(f"üìä Total combined rows: {len(final_combined_df)}")
        print(f"üìä MySQL rows: {len(mysql_df) if not mysql_df.empty else 0}")
        print(f"üìä Athena rows: {len(athena_df) if not athena_df.empty else 0}")
        print(f"üìÖ Date range analyzed: {start_date} to {end_date}")
        
        # Check for data completeness
        all_countries_in_data = sorted(final_combined_df['country_code'].unique())
        expected_countries = sorted(set(mysql_countries + lakehouse_countries + ['US']))
        
        print(f"\nüîç Data completeness check:")
        print(f"   Expected countries: {', '.join(expected_countries)}")
        print(f"   Actual countries in data: {', '.join(all_countries_in_data)}")
        
        missing_countries = set(expected_countries) - set(all_countries_in_data)
        if missing_countries:
            print(f"   ‚ö†Ô∏è Missing countries: {', '.join(sorted(missing_countries))}")
        else:
            print(f"   ‚úÖ All expected countries present in data")
        
        return final_combined_df
    else:
        print("‚ö†Ô∏è No data available to combine")
        return pd.DataFrame()

# Harmonize and merge data
print("\nüîÑ Harmonizing and merging datasets...")
combined_final_df = harmonize_and_merge_data(mysql_results_df, athena_results_df)

if not combined_final_df.empty:
    print("\nüìã Combined Dataset Info:")
    print(combined_final_df.info())
    print("\nüìä Combined Data Preview:")
    print(combined_final_df.head(10))
    print("\nüìà Data Source Distribution:")
    print(combined_final_df['data_source'].value_counts())
    print("\nüìà Date Range Coverage:")
    print(combined_final_df['position_month'].value_counts().sort_index())


üîÑ Harmonizing and merging datasets...
‚úÖ MySQL data harmonized: 21393 rows
‚úÖ Athena data harmonized: 999 rows

üéâ Successfully combined datasets!
üìä Total combined rows: 22392
üìä MySQL rows: 21393
üìä Athena rows: 999
üìÖ Date range analyzed: 2025-06-01 to 2025-06-30

üîç Data completeness check:
   Expected countries: AD, AE, AF, AG, AI, AL, AM, AO, AQ, AR, AS, AT, AU, AW, AX, AZ, BA, BB, BD, BE, BF, BG, BH, BI, BJ, BL, BM, BN, BO, BQ, BR, BS, BT, BV, BW, BY, BZ, CA, CC, CD, CF, CG, CH, CI, CK, CL, CM, CN, CO, CP, CR, CU, CV, CW, CX, CY, CZ, DE, DJ, DK, DM, DO, DZ, EC, EE, EG, EH, ER, ES, ET, FI, FJ, FK, FM, FO, FR, GA, GB, GD, GE, GF, GG, GH, GI, GL, GM, GN, GP, GQ, GR, GS, GT, GU, GW, GY, HK, HM, HN, HR, HRF, HT, HU, ID, IE, IL, IM, IN, IO, IQ, IR, IS, IT, JE, JM, JO, JP, KE, KG, KH, KI, KM, KN, KP, KR, KW, KY, KZ, LA, LB, LC, LI, LK, LR, LS, LT, LU, LV, LY, MA, MC, MD, ME, MF, MG, MH, MK, ML, MM, MN, MP, MQ, MR, MS, MT, MU, MV, MW, MX, MY, MZ, NA, NC, NE, NF, NG, NI

## Export Results

In [10]:
# Generate date range suffix for filenames
date_range_suffix = f"{start_date.replace('-', '')}_{end_date.replace('-', '')}" if start_date and end_date else execution_date

# Save combined results
if not combined_final_df.empty:
    # Main combined dataset
    combined_csv_filename = f"Combined_MoM_Country_Source_Volume_Dynamic_{date_range_suffix}.csv"
    try:
        combined_final_df.to_csv(combined_csv_filename, index=False)
        print(f"‚úÖ Combined dataset saved to: {combined_csv_filename}")
    except Exception as e:
        print(f"‚ùå Error saving combined dataset: {e}")
    
    # Country mapping report
    country_mapping = pd.DataFrame({
        'country_code': sorted(set(mysql_countries + lakehouse_countries + ['US'])),
    })
    country_mapping['data_source'] = country_mapping['country_code'].apply(
        lambda x: 'mysql' if x in mysql_countries else 'athena'
    )
    country_mapping['schema_type'] = country_mapping.apply(
        lambda row: f"positions_{row['country_code'].lower()}_v1" if row['data_source'] == 'mysql' 
                    else 'positions_us_v1' if row['country_code'] == 'US'
                    else 'meth_2012', axis=1
    )
    
    mapping_filename = f"Country_Source_Mapping_{date_range_suffix}.csv"
    try:
        country_mapping.to_csv(mapping_filename, index=False)
        print(f"‚úÖ Country mapping saved to: {mapping_filename}")
    except Exception as e:
        print(f"‚ùå Error saving country mapping: {e}")
    
    # Summary statistics by date range
    summary_stats = combined_final_df.groupby(['data_source', 'country_code', 'position_month']).agg({
        'total_jobs': 'sum',
        'total_positions': 'sum',
        'source_id': 'nunique'
    }).reset_index()
    summary_stats.columns = ['data_source', 'country_code', 'position_month', 'total_jobs_sum', 'total_positions_sum', 'unique_sources']
    
    summary_filename = f"Summary_Stats_Dynamic_{date_range_suffix}.csv"
    try:
        summary_stats.to_csv(summary_filename, index=False)
        print(f"‚úÖ Summary statistics saved to: {summary_filename}")
    except Exception as e:
        print(f"‚ùå Error saving summary statistics: {e}")
    
    # Monthly aggregation for trend analysis
    monthly_trends = combined_final_df.groupby(['position_month', 'data_source']).agg({
        'total_jobs': 'sum',
        'total_positions': 'sum',
        'country_code': 'nunique'
    }).reset_index()
    monthly_trends.columns = ['position_month', 'data_source', 'total_jobs', 'total_positions', 'unique_countries']
    
    trends_filename = f"Monthly_Trends_Dynamic_{date_range_suffix}.csv"
    try:
        monthly_trends.to_csv(trends_filename, index=False)
        print(f"‚úÖ Monthly trends saved to: {trends_filename}")
    except Exception as e:
        print(f"‚ùå Error saving monthly trends: {e}")
else:
    print("‚ö†Ô∏è No combined data to save")

# Save timing report if available
if not mysql_timing_df.empty:
    timing_filename = f"MySQL_Timing_Report_Dynamic_{date_range_suffix}.csv"
    try:
        mysql_timing_df.to_csv(timing_filename, index=False)
        print(f"‚úÖ MySQL timing report saved to: {timing_filename}")
    except Exception as e:
        print(f"‚ùå Error saving timing report: {e}")

‚úÖ Combined dataset saved to: Combined_MoM_Country_Source_Volume_Dynamic_20250601_20250630.csv
‚úÖ Country mapping saved to: Country_Source_Mapping_20250601_20250630.csv
‚úÖ Summary statistics saved to: Summary_Stats_Dynamic_20250601_20250630.csv
‚úÖ Monthly trends saved to: Monthly_Trends_Dynamic_20250601_20250630.csv
‚úÖ MySQL timing report saved to: MySQL_Timing_Report_Dynamic_20250601_20250630.csv


## Final Analysis Summary

In [11]:
# Generate comprehensive analysis summary
if not combined_final_df.empty and start_date and end_date:
    print("\n" + "="*80)
    print("üìä DYNAMIC DATA ANALYSIS SUMMARY")
    print("="*80)
    print(f"üìÖ Analysis Period: {start_date} to {end_date}")
    print(f"üìä Total Records: {len(combined_final_df):,}")
    print(f"üåç Countries Covered: {combined_final_df['country_code'].nunique()}")
    print(f"üìö Languages: {combined_final_df['language'].nunique()}")
    print(f"üè¢ Unique Sources: {combined_final_df['source_id'].nunique()}")
    
    total_jobs = combined_final_df['total_jobs'].sum()
    total_positions = combined_final_df['total_positions'].sum()
    print(f"üíº Total Jobs: {total_jobs:,}")
    print(f"üìç Total Positions: {total_positions:,}")
    
    print("\nüéØ DATA SOURCE STRATEGY EXECUTED:")
    print(f"   MySQL Individual Schemas: {len(mysql_countries)} countries")
    print(f"   Lakehouse meth_2012: {len(lakehouse_countries)} countries")
    print(f"   Lakehouse positions_us_v1: US")
    
    print("\nüìä MYSQL COUNTRIES:")
    mysql_data = combined_final_df[combined_final_df['data_source'] == 'mysql']
    if not mysql_data.empty:
        mysql_country_stats = mysql_data.groupby('country_code')['total_jobs'].sum().sort_values(ascending=False)
        for country, jobs in mysql_country_stats.items():
            print(f"   {country}: {jobs:,} jobs (via positions_{country.lower()}_v1)")
    else:
        print("   No MySQL data available")
    
    print("\nüè¢ LAKEHOUSE COUNTRIES:")
    athena_data = combined_final_df[combined_final_df['data_source'] == 'athena']
    if not athena_data.empty:
        athena_country_stats = athena_data.groupby('country_code')['total_jobs'].sum().sort_values(ascending=False)
        for country, jobs in athena_country_stats.items():
            schema_type = 'positions_us_v1' if country == 'US' else 'meth_2012'
            print(f"   {country}: {jobs:,} jobs (via {schema_type})")
    else:
        print("   No lakehouse data available")
    
    print("\nüîù TOP 10 COUNTRIES BY TOTAL JOBS:")
    top_countries = combined_final_df.groupby('country_code')['total_jobs'].sum().sort_values(ascending=False).head(10)
    for country, jobs in top_countries.items():
        source = 'MySQL' if country in mysql_countries else 'Lakehouse'
        print(f"   {country}: {jobs:,} jobs ({source})")
    
    print("\nüìà DATA SOURCE BREAKDOWN:")
    source_breakdown = combined_final_df.groupby('data_source').agg({
        'total_jobs': 'sum',
        'total_positions': 'sum',
        'country_code': 'nunique'
    })
    for source in source_breakdown.index:
        jobs = source_breakdown.loc[source, 'total_jobs']
        positions = source_breakdown.loc[source, 'total_positions']
        countries = source_breakdown.loc[source, 'country_code']
        print(f"   {source.upper()}: {jobs:,} jobs, {positions:,} positions, {countries} countries")
    
    print("\nüìÖ MONTHLY DISTRIBUTION:")
    monthly_dist = combined_final_df.groupby('position_month')['total_jobs'].sum().sort_index()
    for month, jobs in monthly_dist.items():
        print(f"   {month}: {jobs:,} jobs")
    
    print("\n" + "="*80)
    print(f"üìÅ Files Generated:")
    print(f"   ‚Ä¢ Combined_MoM_Country_Source_Volume_Dynamic_{date_range_suffix}.csv")
    print(f"   ‚Ä¢ Country_Source_Mapping_{date_range_suffix}.csv")
    print(f"   ‚Ä¢ Summary_Stats_Dynamic_{date_range_suffix}.csv")
    print(f"   ‚Ä¢ Monthly_Trends_Dynamic_{date_range_suffix}.csv")
    if not mysql_timing_df.empty:
        print(f"   ‚Ä¢ MySQL_Timing_Report_Dynamic_{date_range_suffix}.csv")
    print("\nüéâ DYNAMIC COUNTRY DETECTION SUCCESSFUL!")
    print("   No manual country lists required - all countries automatically detected and optimally sourced.")
    print("="*80)
else:
    print("‚ö†Ô∏è No data available for analysis summary")


üìä DYNAMIC DATA ANALYSIS SUMMARY
üìÖ Analysis Period: 2025-06-01 to 2025-06-30
üìä Total Records: 22,392
üåç Countries Covered: 47
üìö Languages: 43
üè¢ Unique Sources: 8151
üíº Total Jobs: 6,502,419
üìç Total Positions: 4,738,223

üéØ DATA SOURCE STRATEGY EXECUTED:
   MySQL Individual Schemas: 45 countries
   Lakehouse meth_2012: 204 countries
   Lakehouse positions_us_v1: US

üìä MYSQL COUNTRIES:
   FR: 1,675,495 jobs (via positions_fr_v1)
   GB: 1,219,272 jobs (via positions_gb_v1)
   CO: 871,312 jobs (via positions_co_v1)
   AU: 544,950 jobs (via positions_au_v1)
   CA: 499,760 jobs (via positions_ca_v1)
   PE: 371,406 jobs (via positions_pe_v1)
   CL: 138,527 jobs (via positions_cl_v1)
   ID: 120,941 jobs (via positions_id_v1)
   PT: 92,852 jobs (via positions_pt_v1)
   UA: 89,651 jobs (via positions_ua_v1)
   TW: 72,929 jobs (via positions_tw_v1)
   PH: 66,970 jobs (via positions_ph_v1)
   HK: 59,588 jobs (via positions_hk_v1)
   VN: 54,373 jobs (via positions_vn_v1)


## Cleanup

In [12]:
# Close connections
if mysql_conn and mysql_conn.is_connected():
    mysql_conn.close()
    print("‚úÖ MySQL connection closed.")

print("\nüéâ Script execution completed!")
if not combined_final_df.empty and start_date and end_date:
    print(f"üìä Final dataset contains {len(combined_final_df):,} rows across {combined_final_df['country_code'].nunique()} countries")
    print(f"üìÖ Date range analyzed: {start_date} to {end_date}")
    print(f"üóÇÔ∏è Files saved with date range suffix: {date_range_suffix}")
    
    # Calculate date range span
    from datetime import datetime
    start_dt = datetime.strptime(start_date, "%Y-%m-%d")
    end_dt = datetime.strptime(end_date, "%Y-%m-%d")
    days_span = (end_dt - start_dt).days + 1
    print(f"‚è±Ô∏è Analysis covers {days_span} days")
    print(f"ü§ñ Dynamic country detection: {len(mysql_countries)} from MySQL + {len(lakehouse_countries)} from Lakehouse + US")

‚úÖ MySQL connection closed.

üéâ Script execution completed!
üìä Final dataset contains 22,392 rows across 47 countries
üìÖ Date range analyzed: 2025-06-01 to 2025-06-30
üóÇÔ∏è Files saved with date range suffix: 20250601_20250630
‚è±Ô∏è Analysis covers 30 days
ü§ñ Dynamic country detection: 45 from MySQL + 204 from Lakehouse + US
