# Task 2: Data Preprocessing

In [1]:
from de_classes.preprocessor_classes.preprocessor import Preprocessor
from de_classes.preprocessor_classes.weather_preprocessor import WeatherPreprocessor
from de_classes.preprocessor_classes.carbon_preprocessor import CarbonPreprocessor
from de_classes.preprocessor_classes.data_preprocessing_config import DataPreprocessingConfig
from de_classes.utility_classes.spark_manager import SparkManager
from dotenv import load_dotenv
import os
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [2]:
spark_configs = {
    "spark.sql.adaptive.enabled" : "true",
    "spark.sql.adaptive.coalescePartitions.enabled": "true",
    "spark.sql.execution.useObjectHashAggregate" :"false",
    "spark.sql.join.preferSortMergeJoin" : "true",
    "spark.sql.codegen.wholeStage" : "false",
    "spark.sql.codegen.maxFields" : "200",
    "spark.sql.codegen.methodSplitThreshold" : "1024",
    "spark.sql.codegen.aggregate.splitAggregateFunc.enabled" : "true",
    "spark.sql.codegen.fallback" : "true",
    "spark.sql.adaptive.enabled": "true"
}

In [3]:
load_dotenv()
INPUT_FILE_WEATHER = os.getenv('RAW_STREAMED_WEATHER')
INPUT_FILE_CO2 = os.getenv('RAW_STREAMED_CO2')
OUTPUT_FILE_WEATHER = os.getenv('CLEANED_WEATHER')
OUTPUT_FILE_CO2 = os.getenv('CLEANED_CO2')
REFERENCIAL_COUNTRY = os.getenv('COUNTRY_CSV')
REFERENCIAL_CITY =os.getenv('CITY_CSV')

In [4]:
co2_cols_config = DataPreprocessingConfig(primary_key_cols = ['country', 'date', 'sector'],
                                           alpha_cols=['sector'],
                                           double_cols= ['MtCO2_per_day'],
                                           handle_remove_value_cols=['country', 'date', 'sector'],
                                           handle_median_cols=['MtCO2_per_day']
                                          )

weather_cols_config = DataPreprocessingConfig(primary_key_cols = ['station_id', 'date'],
                                           alpha_cols=['season'],
                                           double_cols= ['avg_temp_c', 'min_temp_c', 'max_temp_c', 'precipitation_mm'],
                                           remove_cols=['snow_depth_mm', 'avg_wind_dir_deg', 'avg_wind_speed_kmh', 'peak_wind_gust_kmh', 'avg_sea_level_pres_hpa', 'sunshine_total_min'],
                                           handle_remove_value_cols=['station_id', 'date', 'city_name', 'precipitation_mm'],
                                           handle_mean_cols=['avg_temp_c', 'min_temp_c', 'max_temp_c'],
                                           handle_mode_cols=['season']
                                          )

In [5]:
def demo_process(processor: Preprocessor, referencial_table=None, validation_params=None, debug=False):
        logger.info("="*80)
        logger.info("STARTING COMPLETE PREPROCESSING PIPELINE")
        logger.info("="*80)
        
        try:
            logger.info("\n[1/12] READING DATA")
            logger.info("-" * 40)
            processor.read_data()
            if debug:
                processor.df.show(10, truncate=False)
            
            logger.info("\n[2/12] ADD PROCESSING TIMESTAMP")
            logger.info("-" * 40)
            processor.add_processing_timestamp()
            if debug:
                processor.df.show(10, truncate=False)
            
            logger.info("[3/12] Droping columns...")
            processor.drop_columns()
            if debug:
                processor.df.show(10, truncate=False)
            
            
            logger.info("\n[4/12] CAPTURING BEFORE STATE")
            logger.info("-" * 40)
            if hasattr(processor, 'capture_before_state'):
                processor.capture_before_state()
            else:
                logger.info("Before state capture not implemented for this preprocessor")
            
            logger.info("\n[5/12] DATA CLEANING")
            logger.info("-" * 40)
            processor.cleaning_data()
            processor.remove_duplicates()
            if debug:
                processor.df.show(10, truncate=False)
            
            logger.info("\n[6/12] HANDLING MISSING VALUES")
            logger.info("-" * 40)
            processor.handle_missing_values()
            if debug:
                processor.df.show(10, truncate=False)
            
            logger.info("\n[7/12] DATA STANDARDIZATION")
            logger.info("-" * 40)
            processor.standardize_data()
            if debug:
                processor.df.show(10, truncate=False)
            
            logger.info("\n[8/12] DATA ENRICHMENT")
            logger.info("-" * 40)
            processor.enrich_data()
            if debug:
                processor.df.show(10, truncate=False)
            
            logger.info("\n[9/12] DATA VALIDATION")
            logger.info("-" * 40)
            if validation_params:
                validation_result = processor.validate_data(referencial_table, **validation_params)
            else:
                validation_result = processor.validate_data(referencial_table)
            if debug:
                processor.df.show(10, truncate=False)
            
            logger.info("\n[10/12] CAPTURING AFTER STATE")
            logger.info("-" * 40)
            if hasattr(processor, 'capture_after_state'):
                processor.capture_after_state()
            else:
                logger.info("After state capture not implemented for this preprocessor")
            
            logger.info("\n[11/12] GENERATING COMPARISON REPORT")
            logger.info("-" * 40)
            if hasattr(processor, 'print_preprocessing_comparison'):
                processor.print_preprocessing_comparison()
            else:
                logger.info("Comparison report not implemented for this preprocessor")
            
            logger.info("\n[12/12] WRITING PROCESSED DATA")
            logger.info("-" * 40)
            processor.write_data(write_mode = "overwrite")
            
            logger.info("\n" + "="*80)
            logger.info("PREPROCESSING PIPELINE COMPLETED SUCCESSFULLY")
            logger.info("="*80)
            
            return validation_result
            
        except Exception as e:
            logger.error(f"Preprocessing pipeline failed: {str(e)}")
            logger.error("="*80)
            logger.error("PREPROCESSING PIPELINE FAILED")
            logger.error("="*80)
            raise

In [6]:
with SparkManager(app_name = "Preprocessing Spark", configs = spark_configs) as manager:
    spark = manager.spark
    df_city = spark.read.csv(REFERENCIAL_CITY, header =True)
    df_country = spark.read.csv(REFERENCIAL_COUNTRY, header = True)
    
    weather_processor = WeatherPreprocessor(spark, INPUT_FILE_WEATHER, OUTPUT_FILE_WEATHER, weather_cols_config)
    demo_process(weather_processor, referencial_table = df_city, debug = True)
    
    carbon_processor = CarbonPreprocessor(spark, INPUT_FILE_CO2, OUTPUT_FILE_CO2, co2_cols_config)
    demo_process(carbon_processor, referencial_table = df_country, debug = True)

    print("Show After Cleanning")
    print("Cleaned Weather")
    df_clean_weather = spark.read.parquet(OUTPUT_FILE_WEATHER)
    df_clean_weather.show()
    print("Cleaned Carbon")
    df_clean_co2 = spark.read.parquet(OUTPUT_FILE_CO2)
    df_clean_co2.show()

25/08/29 19:32:59 WARN Utils: Your hostname, fz.localdomain resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/08/29 19:32:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/29 19:33:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
INFO:__main__:STARTING COMPLETE PREPROCESSING PIPELINE
INFO:__main__:
[1/12] READING DATA
INFO:__main__:----------------------------------------
INFO:de_classes.preprocessor_classes.preprocessor:Data read successfully from hdfs://localhost:9000/user/student/data_store/processed_data/streamed_raw_data/raw_streamed_weather_parquet
INFO:de_classes.preprocessor_classes.preprocessor:Initial row count: 112666     
INFO:de_classes.preprocessor_classes.preprocessor:Schema: None


root
 |-- station_id: string (nullable = true)
 |-- city_name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- season: string (nullable = true)
 |-- avg_temp_c: double (nullable = true)
 |-- min_temp_c: double (nullable = true)
 |-- max_temp_c: double (nullable = true)
 |-- precipitation_mm: double (nullable = true)
 |-- snow_depth_mm: double (nullable = true)
 |-- avg_wind_dir_deg: double (nullable = true)
 |-- avg_wind_speed_kmh: double (nullable = true)
 |-- peak_wind_gust_kmh: double (nullable = true)
 |-- avg_sea_level_pres_hpa: double (nullable = true)
 |-- sunshine_total_min: double (nullable = true)



INFO:__main__:                                                                  
[2/12] ADD PROCESSING TIMESTAMP
INFO:__main__:----------------------------------------
INFO:de_classes.preprocessor_classes.preprocessor:Added 'processing_time' column with current timestamp


+----------+---------------+----------+------+----------+----------+----------+----------------+-------------+----------------+------------------+------------------+----------------------+------------------+
|station_id|city_name      |date      |season|avg_temp_c|min_temp_c|max_temp_c|precipitation_mm|snow_depth_mm|avg_wind_dir_deg|avg_wind_speed_kmh|peak_wind_gust_kmh|avg_sea_level_pres_hpa|sunshine_total_min|
+----------+---------------+----------+------+----------+----------+----------+----------------+-------------+----------------+------------------+------------------+----------------------+------------------+
|43295     |bangalore      |2022-02-16|Winter|22.6      |15.3      |28.8      |0.0             |NULL         |89.0            |8.4               |NULL              |1010.1                |NULL              |
|26603     |kaliningrad    |2022-02-16|Winter|2.8       |1.8       |4.6       |NULL            |NULL         |178.0           |25.3              |61.1              |999

INFO:__main__:[3/12] Droping columns...
INFO:de_classes.preprocessor_classes.preprocessor:Column removed: ['snow_depth_mm', 'avg_wind_dir_deg', 'avg_wind_speed_kmh', 'peak_wind_gust_kmh', 'avg_sea_level_pres_hpa', 'sunshine_total_min']


+----------+---------------+----------+------+----------+----------+----------+----------------+-------------+----------------+------------------+------------------+----------------------+------------------+-----------------------+
|station_id|city_name      |date      |season|avg_temp_c|min_temp_c|max_temp_c|precipitation_mm|snow_depth_mm|avg_wind_dir_deg|avg_wind_speed_kmh|peak_wind_gust_kmh|avg_sea_level_pres_hpa|sunshine_total_min|processing_time        |
+----------+---------------+----------+------+----------+----------+----------+----------------+-------------+----------------+------------------+------------------+----------------------+------------------+-----------------------+
|43295     |bangalore      |2022-02-16|Winter|22.6      |15.3      |28.8      |0.0             |NULL         |89.0            |8.4               |NULL              |1010.1                |NULL              |2025-08-29 19:33:21.789|
|26603     |kaliningrad    |2022-02-16|Winter|2.8       |1.8       |4.6 

INFO:__main__:
[4/12] CAPTURING BEFORE STATE
INFO:__main__:----------------------------------------
INFO:__main__:Before state capture not implemented for this preprocessor
INFO:__main__:
[5/12] DATA CLEANING
INFO:__main__:----------------------------------------
INFO:de_classes.preprocessor_classes.preprocessor:Starting data cleaning process...
INFO:de_classes.preprocessor_classes.preprocessor:Cleaned column 'season' to contain only alphabetic characters.
INFO:de_classes.preprocessor_classes.preprocessor:Cleaned and casted column 'avg_temp_c' to DoubleType.


+----------+---------------+----------+------+----------+----------+----------+----------------+-----------------------+
|station_id|city_name      |date      |season|avg_temp_c|min_temp_c|max_temp_c|precipitation_mm|processing_time        |
+----------+---------------+----------+------+----------+----------+----------+----------------+-----------------------+
|43295     |bangalore      |2022-02-16|Winter|22.6      |15.3      |28.8      |0.0             |2025-08-29 19:33:22.213|
|26603     |kaliningrad    |2022-02-16|Winter|2.8       |1.8       |4.6       |NULL            |2025-08-29 19:33:22.213|
|D6217     |saarbrücken    |2022-02-16|Winter|8.7       |6.1       |12.9      |9.6             |2025-08-29 19:33:22.213|
|72483     |sacramento     |2022-02-16|Winter|12.6      |5.6       |21.7      |0.0             |2025-08-29 19:33:22.213|
|59287     |guangzhou      |2022-02-16|Winter|17.2      |11.9      |22.1      |1.8             |2025-08-29 19:33:22.213|
|52866     |xining         |2022

INFO:de_classes.preprocessor_classes.preprocessor:Cleaned and casted column 'min_temp_c' to DoubleType.
INFO:de_classes.preprocessor_classes.preprocessor:Cleaned and casted column 'max_temp_c' to DoubleType.
INFO:de_classes.preprocessor_classes.preprocessor:Cleaned and casted column 'precipitation_mm' to DoubleType.
INFO:de_classes.preprocessor_classes.weather_preprocessor:Start remove invalid temperature relationships...
INFO:de_classes.preprocessor_classes.preprocessor:Duplicates removed: 56023 rows (based on ['station_id', 'date'])
INFO:__main__:                                                                  
[6/12] HANDLING MISSING VALUES
INFO:__main__:----------------------------------------
INFO:de_classes.preprocessor_classes.preprocessor:Dropping rows with missing values for specified columns.
INFO:de_classes.preprocessor_classes.weather_preprocessor:Starting temperature missing value imputation...
INFO:de_classes.preprocessor_classes.weather_preprocessor:Temperature columns 

+----------+---------+----------+------+----------+----------+----------+----------------+-----------------------+
|station_id|city_name|date      |season|avg_temp_c|min_temp_c|max_temp_c|precipitation_mm|processing_time        |
+----------+---------+----------+------+----------+----------+----------+----------------+-----------------------+
|10046     |kiel     |2021-12-02|Winter|0.8       |-0.9      |2.0       |1.0             |2025-08-29 19:33:27.408|
|10046     |kiel     |2021-12-05|Winter|2.4       |0.8       |4.5       |4.9             |2025-08-29 19:33:27.408|
|10046     |kiel     |2021-12-06|Winter|2.6       |0.8       |3.9       |0.0             |2025-08-29 19:33:27.408|
|10046     |kiel     |2021-12-10|Winter|2.1       |1.3       |3.2       |0.0             |2025-08-29 19:33:27.408|
|10046     |kiel     |2021-12-31|Winter|11.0      |9.9       |12.3      |8.0             |2025-08-29 19:33:27.408|
|10046     |kiel     |2022-01-05|Winter|3.2       |0.8       |5.6       |6.0    

INFO:de_classes.preprocessor_classes.weather_preprocessor:Imputation values available for: ['avg_temp_c', 'min_temp_c', 'max_temp_c']
INFO:de_classes.preprocessor_classes.weather_preprocessor:Applied avg_temp_c imputation with logic constraints
INFO:de_classes.preprocessor_classes.weather_preprocessor:Applied min_temp_c imputation with logic constraints
INFO:de_classes.preprocessor_classes.weather_preprocessor:Applied max_temp_c imputation with logic constraints
INFO:de_classes.preprocessor_classes.weather_preprocessor:Cleaned up temporary columns: ['__avg_temp_c_mean', '__min_temp_c_mean', '__max_temp_c_mean']
INFO:de_classes.preprocessor_classes.weather_preprocessor:Temperature missing value imputation completed with logical constraints
INFO:__main__:                                                                  
[7/12] DATA STANDARDIZATION
INFO:__main__:----------------------------------------
INFO:de_classes.preprocessor_classes.weather_preprocessor:Rounding double columns to 1 

+----------+---------+----------+------+----------+----------+----------+----------------+-----------------------+
|station_id|city_name|date      |season|avg_temp_c|min_temp_c|max_temp_c|precipitation_mm|processing_time        |
+----------+---------+----------+------+----------+----------+----------+----------------+-----------------------+
|10046     |kiel     |2021-12-02|Winter|0.8       |-0.9      |2.0       |1.0             |2025-08-29 19:33:31.431|
|10046     |kiel     |2021-12-05|Winter|2.4       |0.8       |4.5       |4.9             |2025-08-29 19:33:31.431|
|10046     |kiel     |2021-12-06|Winter|2.6       |0.8       |3.9       |0.0             |2025-08-29 19:33:31.431|
|10046     |kiel     |2021-12-10|Winter|2.1       |1.3       |3.2       |0.0             |2025-08-29 19:33:31.431|
|10046     |kiel     |2021-12-31|Winter|11.0      |9.9       |12.3      |8.0             |2025-08-29 19:33:31.431|
|10046     |kiel     |2022-01-05|Winter|3.2       |0.8       |5.6       |6.0    

INFO:de_classes.preprocessor_classes.weather_preprocessor:Weather data standardized
INFO:__main__:                                                                  
[8/12] DATA ENRICHMENT
INFO:__main__:----------------------------------------


+----------+---------+----------+------+----------+----------+----------+----------------+-----------------------+
|station_id|city_name|date      |season|avg_temp_c|min_temp_c|max_temp_c|precipitation_mm|processing_time        |
+----------+---------+----------+------+----------+----------+----------+----------------+-----------------------+
|10046     |Kiel     |2021-12-02|Winter|0.8       |-0.9      |2.0       |1.0             |2025-08-29 19:33:37.423|
|10046     |Kiel     |2021-12-05|Winter|2.4       |0.8       |4.5       |4.9             |2025-08-29 19:33:37.423|
|10046     |Kiel     |2021-12-06|Winter|2.6       |0.8       |3.9       |0.0             |2025-08-29 19:33:37.423|
|10046     |Kiel     |2021-12-10|Winter|2.1       |1.3       |3.2       |0.0             |2025-08-29 19:33:37.423|
|10046     |Kiel     |2021-12-31|Winter|11.0      |9.9       |12.3      |8.0             |2025-08-29 19:33:37.423|
|10046     |Kiel     |2022-01-05|Winter|3.2       |0.8       |5.6       |6.0    

INFO:de_classes.preprocessor_classes.weather_preprocessor:Adding climate risk scoring to weather data
INFO:de_classes.preprocessor_classes.weather_preprocessor:Weather data enriched with additional context
INFO:__main__:                                                                  
[9/12] DATA VALIDATION
INFO:__main__:----------------------------------------
INFO:de_classes.preprocessor_classes.weather_preprocessor:Starting Data Validation Pipeline


+----------+---------+----------+------+----------+----------+----------+----------------+-----------------------+------------+-------------+----------------------+---------+------------------+------------------+
|station_id|city_name|date      |season|avg_temp_c|min_temp_c|max_temp_c|precipitation_mm|processing_time        |temp_range_c|temp_category|precipitation_category|spi_index|climate_risk_score|climate_risk_level|
+----------+---------+----------+------+----------+----------+----------+----------------+-----------------------+------------+-------------+----------------------+---------+------------------+------------------+
|10046     |Kiel     |2022-01-05|Winter|3.2       |0.8       |5.6       |6.0             |2025-08-29 19:33:42.635|4.8         |Cold         |Moderate Rain         |-0.5     |0.0               |LOW               |
|10046     |Kiel     |2022-01-06|Winter|1.2       |-3.1      |3.7       |0.2             |2025-08-29 19:33:42.635|6.8         |Cold         |Light R

INFO:de_classes.preprocessor_classes.weather_preprocessor:DEBUG: Initial DataFrame count: 51027
INFO:de_classes.preprocessor_classes.weather_preprocessor:
[1/4] SCHEMA VALIDATION
INFO:de_classes.preprocessor_classes.weather_preprocessor:----------------------------------------
INFO:de_classes.preprocessor_classes.weather_preprocessor:Schema validation passed
INFO:de_classes.preprocessor_classes.weather_preprocessor:
[2/4] NULL VALUE CHECK
INFO:de_classes.preprocessor_classes.weather_preprocessor:----------------------------------------
INFO:de_classes.preprocessor_classes.weather_preprocessor:  station_id: 100.0% complete (0 nulls)
INFO:de_classes.preprocessor_classes.weather_preprocessor:  city_name: 100.0% complete (0 nulls)
INFO:de_classes.preprocessor_classes.weather_preprocessor:  date: 100.0% complete (0 nulls)
INFO:de_classes.preprocessor_classes.weather_preprocessor:  season: 100.0% complete (0 nulls)
INFO:de_classes.preprocessor_classes.weather_preprocessor:  avg_temp_c: 100.0

+----------+-----------+----------+------+----------+----------+----------+----------------+-----------------------+------------+-------------+----------------------+---------+------------------+------------------+
|station_id|city_name  |date      |season|avg_temp_c|min_temp_c|max_temp_c|precipitation_mm|processing_time        |temp_range_c|temp_category|precipitation_category|spi_index|climate_risk_score|climate_risk_level|
+----------+-----------+----------+------+----------+----------+----------+----------------+-----------------------+------------+-------------+----------------------+---------+------------------+------------------+
|31713     |Birobidzhan|2021-12-01|Winter|-13.1     |-24.2     |-9.3      |0.0             |2025-08-29 19:34:24.792|14.9        |Freezing     |No Rain               |-0.9     |1.3               |MODERATE          |
|31713     |Birobidzhan|2021-12-02|Winter|-18.3     |-26.0     |-11.1     |0.8             |2025-08-29 19:34:24.792|14.9        |Freezing   

25/08/29 19:34:34 WARN MemoryManager: Total allocation exceeds 50.00% (477,364,224 bytes) of heap memory
Scaling row group sizes to 88.92% for 4 writers
25/08/29 19:34:34 WARN MemoryManager: Total allocation exceeds 50.00% (477,364,224 bytes) of heap memory
Scaling row group sizes to 71.13% for 5 writers
25/08/29 19:34:34 WARN MemoryManager: Total allocation exceeds 50.00% (477,364,224 bytes) of heap memory
Scaling row group sizes to 59.28% for 6 writers
25/08/29 19:34:34 WARN MemoryManager: Total allocation exceeds 50.00% (477,364,224 bytes) of heap memory
Scaling row group sizes to 50.81% for 7 writers
25/08/29 19:34:34 WARN MemoryManager: Total allocation exceeds 50.00% (477,364,224 bytes) of heap memory
Scaling row group sizes to 44.46% for 8 writers
25/08/29 19:34:34 WARN MemoryManager: Total allocation exceeds 50.00% (477,364,224 bytes) of heap memory
Scaling row group sizes to 39.52% for 9 writers
25/08/29 19:34:34 WARN MemoryManager: Total allocation exceeds 50.00% (477,364,224

root
 |-- country: string (nullable = true)
 |-- date: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- MtCO2_per_day: double (nullable = true)

+--------------+----------+----------------------+-------------+
|country       |date      |sector                |MtCO2_per_day|
+--------------+----------+----------------------+-------------+
|Russia        |2022-01-09|Industry              |0.855182     |
|Russia        |2022-01-09|Ground Transport      |0.670885     |
|Russia        |2022-01-09|Domestic Aviation     |0.0650072    |
|Japan         |2022-01-09|Residential           |0.117101     |
|Japan         |2022-01-09|Power                 |1.7333       |
|Japan         |2022-01-09|International Aviation|0.0143209    |
|Japan         |2022-01-09|Industry              |0.925614     |
|United Kingdom|2022-01-09|Domestic Aviation     |0.00771844   |
|Russia        |2022-01-10|International Aviation|0.00609377   |
|Russia        |2022-01-10|Industry              |0.8430

INFO:__main__:[3/12] Droping columns...
INFO:de_classes.preprocessor_classes.preprocessor:Column removed: []
INFO:__main__:
[4/12] CAPTURING BEFORE STATE
INFO:__main__:----------------------------------------
INFO:__main__:Before state capture not implemented for this preprocessor
INFO:__main__:
[5/12] DATA CLEANING
INFO:__main__:----------------------------------------
INFO:de_classes.preprocessor_classes.preprocessor:Starting data cleaning process...
INFO:de_classes.preprocessor_classes.preprocessor:Cleaned column 'sector' to contain only alphabetic characters.


+--------------+----------+----------------------+-------------+----------------------+
|country       |date      |sector                |MtCO2_per_day|processing_time       |
+--------------+----------+----------------------+-------------+----------------------+
|Russia        |2022-01-09|Industry              |0.855182     |2025-08-29 19:34:38.14|
|Russia        |2022-01-09|Ground Transport      |0.670885     |2025-08-29 19:34:38.14|
|Russia        |2022-01-09|Domestic Aviation     |0.0650072    |2025-08-29 19:34:38.14|
|Japan         |2022-01-09|Residential           |0.117101     |2025-08-29 19:34:38.14|
|Japan         |2022-01-09|Power                 |1.7333       |2025-08-29 19:34:38.14|
|Japan         |2022-01-09|International Aviation|0.0143209    |2025-08-29 19:34:38.14|
|Japan         |2022-01-09|Industry              |0.925614     |2025-08-29 19:34:38.14|
|United Kingdom|2022-01-09|Domestic Aviation     |0.00771844   |2025-08-29 19:34:38.14|
|Russia        |2022-01-10|Inter

INFO:de_classes.preprocessor_classes.preprocessor:Cleaned and casted column 'MtCO2_per_day' to DoubleType.
INFO:de_classes.preprocessor_classes.preprocessor:Duplicates removed: 4608 rows (based on ['country', 'date', 'sector'])
INFO:__main__:
[6/12] HANDLING MISSING VALUES
INFO:__main__:----------------------------------------
INFO:de_classes.preprocessor_classes.preprocessor:Dropping rows with missing values for specified columns.
INFO:de_classes.preprocessor_classes.carbon_preprocessor:Calculating approximate median for columns: ['MtCO2_per_day']


+--------------+----------+----------------------+-------------+-----------------------+
|country       |date      |sector                |MtCO2_per_day|processing_time        |
+--------------+----------+----------------------+-------------+-----------------------+
|United States |2022-02-01|Power                 |3.94541      |2025-08-29 19:34:39.791|
|Russia        |2022-02-03|Residential           |0.725518     |2025-08-29 19:34:39.791|
|Japan         |2022-02-06|Ground Transport      |0.490519     |2025-08-29 19:34:39.791|
|Germany       |2022-02-07|Ground Transport      |0.387667     |2025-08-29 19:34:39.791|
|Germany       |2022-02-07|International Aviation|0.0311924    |2025-08-29 19:34:39.791|
|China         |2022-03-01|International Aviation|0.0194904    |2025-08-29 19:34:39.791|
|France        |2022-03-03|Power                 |0.133933     |2025-08-29 19:34:39.791|
|United Kingdom|2022-01-12|Residential           |0.334114     |2025-08-29 19:34:39.791|
|India         |2022-

INFO:__main__:
[7/12] DATA STANDARDIZATION
INFO:__main__:----------------------------------------
INFO:de_classes.preprocessor_classes.carbon_preprocessor:Rounding double columns to 6 decimal place: ['MtCO2_per_day']
INFO:de_classes.preprocessor_classes.carbon_preprocessor:Carbon data standardized


+--------------+----------+----------------------+-------------+-----------------------+
|country       |date      |sector                |MtCO2_per_day|processing_time        |
+--------------+----------+----------------------+-------------+-----------------------+
|United States |2022-02-01|Power                 |3.94541      |2025-08-29 19:34:41.684|
|Russia        |2022-02-03|Residential           |0.725518     |2025-08-29 19:34:41.684|
|Japan         |2022-02-06|Ground Transport      |0.490519     |2025-08-29 19:34:41.684|
|Germany       |2022-02-07|Ground Transport      |0.387667     |2025-08-29 19:34:41.684|
|Germany       |2022-02-07|International Aviation|0.0311924    |2025-08-29 19:34:41.684|
|China         |2022-03-01|International Aviation|0.0194904    |2025-08-29 19:34:41.684|
|France        |2022-03-03|Power                 |0.133933     |2025-08-29 19:34:41.684|
|United Kingdom|2022-01-12|Residential           |0.334114     |2025-08-29 19:34:41.684|
|India         |2022-

INFO:__main__:
[8/12] DATA ENRICHMENT
INFO:__main__:----------------------------------------


+--------------+----------+----------------------+-------------+-----------------------+
|country       |date      |sector                |MtCO2_per_day|processing_time        |
+--------------+----------+----------------------+-------------+-----------------------+
|United States |2022-02-01|Power                 |3.94541      |2025-08-29 19:34:42.594|
|Russia        |2022-02-03|Residential           |0.725518     |2025-08-29 19:34:42.594|
|Japan         |2022-02-06|Ground Transport      |0.490519     |2025-08-29 19:34:42.594|
|Germany       |2022-02-07|Ground Transport      |0.387667     |2025-08-29 19:34:42.594|
|Germany       |2022-02-07|International Aviation|0.031192     |2025-08-29 19:34:42.594|
|China         |2022-03-01|International Aviation|0.01949      |2025-08-29 19:34:42.594|
|France        |2022-03-03|Power                 |0.133933     |2025-08-29 19:34:42.594|
|United Kingdom|2022-01-12|Residential           |0.334114     |2025-08-29 19:34:42.594|
|India         |2022-

INFO:de_classes.preprocessor_classes.carbon_preprocessor:Carbon data enriched with additional context
INFO:__main__:                                                                  
[9/12] DATA VALIDATION
INFO:__main__:----------------------------------------
INFO:de_classes.preprocessor_classes.carbon_preprocessor:Starting Carbon Data Validation Pipeline


+--------------+----------+----------------------+-------------+-----------------------+--------------+---------------+
|country       |date      |sector                |MtCO2_per_day|processing_time        |emission_level|sector_category|
+--------------+----------+----------------------+-------------+-----------------------+--------------+---------------+
|United States |2022-02-01|Power                 |3.94541      |2025-08-29 19:34:43.727|Medium        |Other          |
|Russia        |2022-02-03|Residential           |0.725518     |2025-08-29 19:34:43.727|Low           |Other          |
|Japan         |2022-02-06|Ground Transport      |0.490519     |2025-08-29 19:34:43.727|Low           |Other          |
|Germany       |2022-02-07|Ground Transport      |0.387667     |2025-08-29 19:34:43.727|Low           |Other          |
|Germany       |2022-02-07|International Aviation|0.031192     |2025-08-29 19:34:43.727|Very Low      |Other          |
|China         |2022-03-01|International

INFO:de_classes.preprocessor_classes.carbon_preprocessor:DEBUG: Initial DataFrame count: 4608
INFO:de_classes.preprocessor_classes.carbon_preprocessor:
[1/5] SCHEMA VALIDATION
INFO:de_classes.preprocessor_classes.carbon_preprocessor:----------------------------------------
INFO:de_classes.preprocessor_classes.carbon_preprocessor:Schema validation passed
INFO:de_classes.preprocessor_classes.carbon_preprocessor:
[2/5] NULL VALUE CHECK
INFO:de_classes.preprocessor_classes.carbon_preprocessor:----------------------------------------
INFO:de_classes.preprocessor_classes.carbon_preprocessor:  country: 100.0% complete (0 nulls)
INFO:de_classes.preprocessor_classes.carbon_preprocessor:  date: 100.0% complete (0 nulls)
INFO:de_classes.preprocessor_classes.carbon_preprocessor:  sector: 100.0% complete (0 nulls)
INFO:de_classes.preprocessor_classes.carbon_preprocessor:  MtCO2_per_day: 100.0% complete (0 nulls)
INFO:de_classes.preprocessor_classes.carbon_preprocessor:  processing_time: 100.0% comp

+-------+----------+----------------------+-------------+-----------------------+--------------+---------------+
|country|date      |sector                |MtCO2_per_day|processing_time        |emission_level|sector_category|
+-------+----------+----------------------+-------------+-----------------------+--------------+---------------+
|China  |2022-01-07|Ground Transport      |2.48657      |2025-08-29 19:35:15.275|Medium        |Other          |
|China  |2021-12-10|Residential           |1.75808      |2025-08-29 19:35:15.275|Medium        |Other          |
|France |2022-01-06|International Aviation|0.019309     |2025-08-29 19:35:15.275|Very Low      |Other          |
|Germany|2022-05-02|Power                 |0.372776     |2025-08-29 19:35:15.275|Low           |Other          |
|Germany|2021-12-04|Residential           |0.655291     |2025-08-29 19:35:15.275|Low           |Other          |
|Germany|2022-01-12|Residential           |0.666526     |2025-08-29 19:35:15.275|Low           |

25/08/29 19:35:17 WARN MemoryManager: Total allocation exceeds 50.00% (477,364,224 bytes) of heap memory
Scaling row group sizes to 88.92% for 4 writers
25/08/29 19:35:17 WARN MemoryManager: Total allocation exceeds 50.00% (477,364,224 bytes) of heap memory
Scaling row group sizes to 71.13% for 5 writers
25/08/29 19:35:17 WARN MemoryManager: Total allocation exceeds 50.00% (477,364,224 bytes) of heap memory
Scaling row group sizes to 59.28% for 6 writers
25/08/29 19:35:17 WARN MemoryManager: Total allocation exceeds 50.00% (477,364,224 bytes) of heap memory
Scaling row group sizes to 50.81% for 7 writers
25/08/29 19:35:17 WARN MemoryManager: Total allocation exceeds 50.00% (477,364,224 bytes) of heap memory
Scaling row group sizes to 44.46% for 8 writers
25/08/29 19:35:17 WARN MemoryManager: Total allocation exceeds 50.00% (477,364,224 bytes) of heap memory
Scaling row group sizes to 39.52% for 9 writers
25/08/29 19:35:17 WARN MemoryManager: Total allocation exceeds 50.00% (477,364,224

Show After Cleanning
Cleaned Weather
+----------+---------+----------+------+----------+----------+----------+----------------+--------------------+------------+-------------+----------------------+---------+------------------+------------------+
|station_id|city_name|      date|season|avg_temp_c|min_temp_c|max_temp_c|precipitation_mm|     processing_time|temp_range_c|temp_category|precipitation_category|spi_index|climate_risk_score|climate_risk_level|
+----------+---------+----------+------+----------+----------+----------+----------------+--------------------+------------+-------------+----------------------+---------+------------------+------------------+
|     42724| Agartala|2021-12-01|Winter|      22.3|      15.0|      30.0|             0.0|2025-08-29 19:34:...|        15.0|         Warm|               No Rain|     -0.4|               0.3|               LOW|
|     42724| Agartala|2021-12-02|Winter|      22.2|      16.0|      30.0|             0.0|2025-08-29 19:34:...|        14.0