In [1]:
import psycopg2  # Importing PostgreSQL database adapter for Python
from psycopg2 import sql  # Importing SQL module for safely constructing SQL queries
import logging  # Importing logging module for event tracking and debugging
import os  # Importing OS module for interacting with the operating system
import warnings  # Importing warnings module to control warning messages
import pandas as pd  # Importing Pandas for data manipulation and analysis

from datetime import datetime, timedelta  # Importing datetime and timedelta for handling dates and time deltas

# Suppress all warnings
warnings.filterwarnings("ignore")


In [2]:

notebook_name = 'Incremental Load Testing' 

# Paths for the log directories
info_log_path = f'../Logs/info/{notebook_name}_info.log'

# Creating directories if they don't exist
os.makedirs(os.path.dirname(info_log_path), exist_ok=True)

# Clearing any previous handlers if re-running this setup
logger = logging.getLogger()
while logger.handlers:
    logger.handlers.pop()

# Configuring logging
info_logger = logging.getLogger('info_logger')

info_handler = logging.FileHandler(info_log_path, mode='a')  # Append mode

info_handler.setLevel(logging.INFO)

# Consistent formatter for both handlers
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
info_handler.setFormatter(formatter)

# Adding handlers to the loggers
info_logger.addHandler(info_handler)

info_logger.setLevel(logging.INFO)


### Metadata check

In [3]:

# Reading the Data_Extraction_combined_df Excel file into a DataFrame
Data_Extraction_test_df = pd.read_excel('../Data/Output/Data_Extraction_combined_df.xlsx')

# Adding the current timestamp - 10 days to the 'extraction_timestamp' column to verify filtering criteria
Data_Extraction_test_df['extraction_timestamp'] =  datetime.now() - timedelta(days=10)

info_logger.info("Old Sample data generation completed")

In [4]:

# Establishing a connection to the PostgreSQL database
connection = psycopg2.connect(
    dbname='UK Real Estate DB',
    user='postgres',
    password='123!@*qweQWE',
    host='localhost',
    port='5432'
)

# Listing of target tables in the database to check for the last extracted date
target_tables = ['region_dimension', 'date_dimension', 'sales_transactions_fact', 
                 'rental_dimension', 'vehicle_dimension', 'property_type_dimension', 
                 'demographics_dimension', 'education_employment_dimension']

# Initializing a variable to store the minimum last extracted date across all target tables
min_last_extracted_date = None

# Retrieving the minimum last extracted date across all specified target tables
with connection.cursor() as cursor:
    query_metadata = f"""
    SELECT MIN(last_extracted_date) as min_last_extracted_date 
    FROM metadata 
    WHERE table_name IN ({','.join([f"'{table}'" for table in target_tables])});
    """
    cursor.execute(query_metadata)
    result = cursor.fetchone()
    min_last_extracted_date = result[0] if result else None

# Closing the database connection after retrieving the date
connection.close()

# Filtering the incremental data based on the retrieved minimum last extraction date
if min_last_extracted_date is not None:
    # Filtering to get only new data since the minimum last extraction date across all target tables
    filtered_data_for_processing = Data_Extraction_test_df[Data_Extraction_test_df['extraction_timestamp'] > min_last_extracted_date]
    record_count = filtered_data_for_processing.shape[0]
    
    if record_count > 0:
        print(f"Processing {record_count} new records after filtering by the minimum last extraction date: {min_last_extracted_date}.")
    else:
        print("No new data to process after filtering.")
else:
    print("No previous extraction date found for any target table. Processing all available data.")
    filtered_data_for_transformation = Data_Extraction_test_df
    record_count = filtered_data_for_processing.shape[0]
    
    if record_count > 0:
        print(f"Processing {record_count} records.")
    else:
        print("No data available for processing.")
 
info_logger.info("Metadata test completed")

No new data to process after filtering.


### Incremental logic check

In [5]:

# Database credentials
DB_CONFIG = {
    'dbname': 'UK Real Estate DB',
    'user': 'postgres',
    'password': '123!@*qweQWE',
    'host': 'localhost',
    'port': '5432'
}

# DDL Queries to execute
queries = {
    "record_accuracy": """
       -- Query to select required fields from the incremental load ETL source data
       -- This query retrieves all the necessary columns from the incremental_load_etl_source_data table to be compared against the target data in the fact and dimension tables for validation of active records.

SELECT 'Records in source not in target' AS description, COUNT(*) AS count
FROM (
    
    SELECT 
        dis.district,
        r.region_code,
        r.region_name,
        st.local_authority_code,
        r.local_authority_name,
        d.transfer_month_year,
        d.date,
        d.month,
        d.quarter,
        d.year,
        dis.town_city,
        dis.county,
        demo.area_sq_km,
        demo.all_ages,
        demo.male_population,
        demo.female_population,
        demo.age_0_20,
        demo.age_20_40,
        demo.age_40_60,
        demo.age_60_plus,
        demo.age_dependency_ratio,
        p.property_type,
        p.duration,
        st.price,
        st.predicted_price_unscaled,
        st.average_price,
        st.average_price_log,
        st.average_price_pct_change,
        st.annual_change_percent,
        p.detached_price,
        p.semi_detached_price,
        p.terraced_price,
        p.flat_price,
        st.detached_price_log,
        st.semi_detached_price_log,
        st.terraced_price_log,
        st.flat_price_log,
        st.detached_price_pct_change,
        st.semi_detached_price_pct_change,
        st.terraced_price_pct_change,
        st.flat_price_pct_change,
        st.detached_semi_detached_ratio,
        st.detached_terraced_ratio,
        st.detached_flat_ratio,
        st.sales_volume,
        st.sales_volume_log,
        st.old_sales_volume,
        st.old_price,
        st.new_price,
        st.ftb_price,
        st.foo_price,
        st.cash_price,
        st.mortgage_price,
        st.index,
        rnt.rental_price,
        rnt.one_bedroom_rent,
        rnt.two_bedrooms_rent,
        rnt.three_bedrooms_rent,
        rnt.four_or_more_bedrooms_rent,
        rnt.all_categories_rent,
        ee.qualification_index_score,
        ee.qualification_index_rank,
        ee.no_qualifications,
        ee.level_1_and_entry_level_qualifications,
        ee.level_2_qualifications,
        ee.apprenticeship,
        ee.level_3_qualifications,
        ee.level_4_qualifications_and_above,
        ee.other_qualifications,
        ee.num_aged_16_plus_unemployed,
        ee.num_aged_16_plus_employed,
        ee.num_aged_16_plus_self_employed,
        demo.est_num_households_with_child,
        st.gdhi,
        st.gdhi_per_capita,
        ee.deprivation_average_score,
        st.deprivation_adjusted_gdhi,
        ee.deprivation_employment_ratio,
        ee.qualification_adjusted_employment_rate,
        st.deprivation_reduction_potential,
        st.housing_demand_indicator,
        v.buses_total,
        v.petrol_cars_total,
        v.petrol_lgv_total,
        v.hgv_total,
        v.lpg_lgv_total,
        v.hgv_motorways,
        v.personal_transport
    FROM sales_transactions_fact st
    JOIN region_dimension r ON st.local_authority_code = r.local_authority_code
    JOIN date_dimension d ON st.date = d.date
    JOIN district_dimension dis ON st.local_authority_code = dis.local_authority_code AND st.date = dis.date AND dis.is_current = TRUE
    JOIN demographics_dimension demo ON st.local_authority_code = demo.local_authority_code AND st.date = demo.date AND demo.is_current = TRUE
    JOIN property_type_dimension p ON st.local_authority_code = p.local_authority_code AND st.date = p.date AND p.is_current = TRUE
    JOIN education_employment_dimension ee ON st.local_authority_code = ee.local_authority_code AND st.date = ee.date AND ee.is_current = TRUE
    JOIN rental_dimension rnt ON st.local_authority_code = rnt.local_authority_code AND st.date = rnt.date AND rnt.is_current = TRUE
    JOIN vehicle_dimension v ON st.local_authority_code = v.local_authority_code AND v.region_id = r.region_id
    WHERE st.is_current = TRUE
    EXCEPT
    select
	    district,
        region_code,
        region_name,
        local_authority_code,
        local_authority_name,
        transfer_month_year,
        date,
        month,
        quarter,
        year,
        town_city,
        county,
        area_sq_km,
        all_ages,
        male_population,
        female_population,
        age_0_20,
        age_20_40,
        age_40_60,
        age_60_plus,
        age_dependency_ratio,
        property_type,
        duration,
        price,
        predicted_price_unscaled,
        average_price,
        average_price_log,
        average_price_pct_change,
        annual_change_percent,
        detached_price,
        semi_detached_price,
        terraced_price,
        flat_price,
        detached_price_log,
        semi_detached_price_log,
        terraced_price_log,
        flat_price_log,
        detached_price_pct_change,
        semi_detached_price_pct_change,
        terraced_price_pct_change,
        flat_price_pct_change,
        detached_semi_detached_ratio,
        detached_terraced_ratio,
        detached_flat_ratio,
        sales_volume,
        sales_volume_log,
        old_sales_volume,
        old_price,
        new_price,
        ftb_price,
        foo_price,
        cash_price,
        mortgage_price,
        index,
        rental_price,
        one_bedroom_rent,
        two_bedrooms_rent,
        three_bedrooms_rent,
        four_or_more_bedrooms_rent,
        all_categories_rent,
        qualification_index_score,
        qualification_index_rank,
        no_qualifications,
        level_1_and_entry_level_qualifications,
        level_2_qualifications,
        apprenticeship,
        level_3_qualifications,
        level_4_qualifications_and_above,
        other_qualifications,
        num_aged_16_plus_unemployed,
        num_aged_16_plus_employed,
        num_aged_16_plus_self_employed,
        est_num_households_with_child,
        gdhi,
        gdhi_per_capita,
        deprivation_average_score,
        deprivation_adjusted_gdhi,
        deprivation_employment_ratio,
        qualification_adjusted_employment_rate,
        deprivation_reduction_potential,
        housing_demand_indicator,
        buses_total,
        petrol_cars_total,
        petrol_lgv_total,
        hgv_total,
        lpg_lgv_total,
        hgv_motorways,
        personal_transport
FROM incremental_load_etl_source_data
	
) AS missing_records

UNION ALL --to compare records in target not in source
SELECT 'Records in target not in source' AS description, COUNT(*) AS count
FROM (
    select
	    district,
        region_code,
        region_name,
        local_authority_code,
        local_authority_name,
        transfer_month_year,
        date,
        month,
        quarter,
        year,
        town_city,
        county,
        area_sq_km,
        all_ages,
        male_population,
        female_population,
        age_0_20,
        age_20_40,
        age_40_60,
        age_60_plus,
        age_dependency_ratio,
        property_type,
        duration,
        price,
        predicted_price_unscaled,
        average_price,
        average_price_log,
        average_price_pct_change,
        annual_change_percent,
        detached_price,
        semi_detached_price,
        terraced_price,
        flat_price,
        detached_price_log,
        semi_detached_price_log,
        terraced_price_log,
        flat_price_log,
        detached_price_pct_change,
        semi_detached_price_pct_change,
        terraced_price_pct_change,
        flat_price_pct_change,
        detached_semi_detached_ratio,
        detached_terraced_ratio,
        detached_flat_ratio,
        sales_volume,
        sales_volume_log,
        old_sales_volume,
        old_price,
        new_price,
        ftb_price,
        foo_price,
        cash_price,
        mortgage_price,
        index,
        rental_price,
        one_bedroom_rent,
        two_bedrooms_rent,
        three_bedrooms_rent,
        four_or_more_bedrooms_rent,
        all_categories_rent,
        qualification_index_score,
        qualification_index_rank,
        no_qualifications,
        level_1_and_entry_level_qualifications,
        level_2_qualifications,
        apprenticeship,
        level_3_qualifications,
        level_4_qualifications_and_above,
        other_qualifications,
        num_aged_16_plus_unemployed,
        num_aged_16_plus_employed,
        num_aged_16_plus_self_employed,
        est_num_households_with_child,
        gdhi,
        gdhi_per_capita,
        deprivation_average_score,
        deprivation_adjusted_gdhi,
        deprivation_employment_ratio,
        qualification_adjusted_employment_rate,
        deprivation_reduction_potential,
        housing_demand_indicator,
        buses_total,
        petrol_cars_total,
        petrol_lgv_total,
        hgv_total,
        lpg_lgv_total,
        hgv_motorways,
        personal_transport
	FROM incremental_load_etl_source_data
	EXCEPT
	
    SELECT 
        dis.district,
        r.region_code,
        r.region_name,
        st.local_authority_code,
        r.local_authority_name,
        d.transfer_month_year,
        d.date,
        d.month,
        d.quarter,
        d.year,
        dis.town_city,
        dis.county,
        demo.area_sq_km,
        demo.all_ages,
        demo.male_population,
        demo.female_population,
        demo.age_0_20,
        demo.age_20_40,
        demo.age_40_60,
        demo.age_60_plus,
        demo.age_dependency_ratio,
        p.property_type,
        p.duration,
        st.price,
        st.predicted_price_unscaled,
        st.average_price,
        st.average_price_log,
        st.average_price_pct_change,
        st.annual_change_percent,
        p.detached_price,
        p.semi_detached_price,
        p.terraced_price,
        p.flat_price,
        st.detached_price_log,
        st.semi_detached_price_log,
        st.terraced_price_log,
        st.flat_price_log,
        st.detached_price_pct_change,
        st.semi_detached_price_pct_change,
        st.terraced_price_pct_change,
        st.flat_price_pct_change,
        st.detached_semi_detached_ratio,
        st.detached_terraced_ratio,
        st.detached_flat_ratio,
        st.sales_volume,
        st.sales_volume_log,
        st.old_sales_volume,
        st.old_price,
        st.new_price,
        st.ftb_price,
        st.foo_price,
        st.cash_price,
        st.mortgage_price,
        st.index,
        rnt.rental_price,
        rnt.one_bedroom_rent,
        rnt.two_bedrooms_rent,
        rnt.three_bedrooms_rent,
        rnt.four_or_more_bedrooms_rent,
        rnt.all_categories_rent,
        ee.qualification_index_score,
        ee.qualification_index_rank,
        ee.no_qualifications,
        ee.level_1_and_entry_level_qualifications,
        ee.level_2_qualifications,
        ee.apprenticeship,
        ee.level_3_qualifications,
        ee.level_4_qualifications_and_above,
        ee.other_qualifications,
        ee.num_aged_16_plus_unemployed,
        ee.num_aged_16_plus_employed,
        ee.num_aged_16_plus_self_employed,
        demo.est_num_households_with_child,
        st.gdhi,
        st.gdhi_per_capita,
        ee.deprivation_average_score,
        st.deprivation_adjusted_gdhi,
        ee.deprivation_employment_ratio,
        ee.qualification_adjusted_employment_rate,
        st.deprivation_reduction_potential,
        st.housing_demand_indicator,
        v.buses_total,
        v.petrol_cars_total,
        v.petrol_lgv_total,
        v.hgv_total,
        v.lpg_lgv_total,
        v.hgv_motorways,
        v.personal_transport
    FROM sales_transactions_fact st
    JOIN region_dimension r ON st.local_authority_code = r.local_authority_code
    JOIN date_dimension d ON st.date = d.date
    JOIN district_dimension dis ON st.local_authority_code = dis.local_authority_code AND st.date = dis.date AND dis.is_current = TRUE
    JOIN demographics_dimension demo ON st.local_authority_code = demo.local_authority_code AND st.date = demo.date AND demo.is_current = TRUE
    JOIN property_type_dimension p ON st.local_authority_code = p.local_authority_code AND st.date = p.date AND p.is_current = TRUE
    JOIN education_employment_dimension ee ON st.local_authority_code = ee.local_authority_code AND st.date = ee.date AND ee.is_current = TRUE
    JOIN rental_dimension rnt ON st.local_authority_code = rnt.local_authority_code AND st.date = rnt.date AND rnt.is_current = TRUE
    JOIN vehicle_dimension v ON st.local_authority_code = v.local_authority_code AND v.region_id = r.region_id
    WHERE st.is_current = TRUE
    	
) AS extra_records; """
}


def execute_query(query, output_file):
    """Executes SQL query and writes results to a file, also prints the results."""
    
    # Establishing a connection to the database using the provided configuration
    with psycopg2.connect(**DB_CONFIG) as conn:
        
        # Creating a cursor object for executing the query
        with conn.cursor() as cur:
            
            # Executing the provided SQL query
            cur.execute(query)
            # Fetching all query results
            results = cur.fetchall()
            
            # Iterating through the results, writing each row to the output file and printing
            for row in results:
                line = str(row) + '\n'
                output_file.write(line)
                print(line.strip())  # Print each row

def main():
    # Defining the path for saving query results
    results_path = '../Results/Incremental_load_query_results.txt' 
    
    # Opening the output file in write mode
    with open(results_path, 'w') as output_file:

        # Iterating through each query and running it
        for description, query in queries.items():

            # Writing and printing the message for the running query
            message = f"Running {description}...\n"
            output_file.write(message)
            print(message.strip())  # Print the running message

            # Executing the SQL query
            execute_query(query, output_file)

            # Writing and printing the completion message after the query execution
            completed_message = f"Completed {description}\n\n"
            output_file.write(completed_message)
            print(completed_message.strip())  # Print the completed message

# Calling the main function
if __name__ == "__main__":
    main()
   
info_logger.info("Testing completed for Incremental load")

Running record_accuracy...


('Records in source not in target', 0)
('Records in target not in source', 0)
Completed record_accuracy
