# Emissions 02: Full Pipeline Testing

**Purpose**: Test complete Emissions pipeline (Bronze → Transform → Gold)

**Outputs**:
- primes_emises_{vision}_pol_garp (by guarantee)
- primes_emises_{vision}_pol (aggregated by policy)

---

In [1]:
import sys
from pathlib import Path

project_root = Path.cwd().parent.parent
sys.path.insert(0, str(project_root))
print(f"Project root: {project_root}")

Project root: /workspace/new_python


In [2]:
from pyspark.sql import SparkSession
# from azfr_fsspec_utils import fspath
# import azfr_fsspec_abfs

# azfr_fsspec_abfs.use()

spark = SparkSession.builder \
    .appName("Emissions_Pipeline_Testing") \
    .getOrCreate()

print(f"✓ Spark {spark.version}")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/18 21:50:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


✓ Spark 3.4.4


## 1. Initialize Processor

In [3]:
from utils.loaders.config_loader import ConfigLoader
from utils.logger import PipelineLogger
# CORRECTED: Proper import path
from src.processors.emissions_processors.emissions_processor import EmissionsProcessor

config = ConfigLoader(str(project_root / "config" / "config.yml"))
logger = PipelineLogger("emissions_test")

VISION = "202509"
print(f"Testing pipeline for vision: {VISION}")

Testing pipeline for vision: 202509


## 2. Run Emissions Processor (Manual Steps)

In [4]:
try:
    emissions_processor = EmissionsProcessor(spark, config, logger)
    
    # CORRECTED: Use read() + transform() instead of run()
    # run() writes directly to gold and returns None
    
    # Step 1: Read bronze data
    print("Step 1: Reading bronze data...")
    df = emissions_processor.read(VISION)
    print(f"✓ Read: {df.count():,} rows")
    
    # Step 2: Transform (returns tuple of 2 DataFrames)
    print("\nStep 2: Transforming data...")
    df_pol_garp, df_pol = emissions_processor.transform(df, VISION)
    
    print(f"✓ POL_GARP (by guarantee): {df_pol_garp.count():,} rows")
    print(f"✓ POL (aggregated): {df_pol.count():,} rows")
    
except Exception as e:
    print(f"✗ Error: {e}")
    import traceback
    traceback.print_exc()
    df_pol_garp = df_pol = None

2025-12-18 21:50:34 - emissions_test - INFO - Emissions Processor initialized
Step 1: Reading bronze data...
2025-12-18 21:50:34 - emissions_test - INFO - Reading One BI emissions data for vision 202509
2025-12-18 21:50:38 - emissions_test - INFO - ✓ SUCCESS: Read 20,000 records from One BI (bronze)
✓ Read: 20,000 rows

Step 2: Transforming data...
2025-12-18 21:50:38 - emissions_test - INFO - Starting Emissions transformations
2025-12-18 21:50:38 - emissions_test - INFO - STEP 1: Lowercasing all columns
2025-12-18 21:50:38 - emissions_test - INFO - STEP 2: Applying business filters
2025-12-18 21:50:38 - emissions_test - INFO - After market filter (cd_marche='6'): 20,000 records
2025-12-18 21:50:38 - emissions_test - INFO - After date filter (dt_cpta_cts <= 202509): 20,000 records
2025-12-18 21:50:39 - emissions_test - INFO - After intermediary filter (22 excluded): 20,000 records
2025-12-18 21:50:39 - emissions_test - INFO - ✓ SUCCESS: Filters applied: 0 records filtered out, 20,000 r

## 3. Verify Output Columns

In [5]:
if df_pol_garp is not None:
    print("POL_GARP Schema:")
    print(f"  Columns: {df_pol_garp.columns}")
    print("\nExpected: vision, dircom, cdpole, nopol, cdprod, noint, cgarp, cmarch, cseg, cssseg, cd_cat_min, primes_x, primes_n, mtcom_x")

POL_GARP Schema:
  Columns: ['vision', 'dircom', 'cdpole', 'nopol', 'cdprod', 'noint', 'cgarp', 'cmarch', 'cseg', 'cssseg', 'cd_cat_min', 'primes_x', 'primes_n', 'mtcom_x']

Expected: vision, dircom, cdpole, nopol, cdprod, noint, cgarp, cmarch, cseg, cssseg, cd_cat_min, primes_x, primes_n, mtcom_x


In [6]:
if df_pol is not None:
    print("POL Schema:")
    print(f"  Columns: {df_pol.columns}")
    print("\nExpected: vision, dircom, cdpole, nopol, cdprod, noint, cmarch, cseg, cssseg, primes_x, primes_n, mtcom_x")

POL Schema:
  Columns: ['vision', 'dircom', 'nopol', 'noint', 'cdpole', 'cdprod', 'cmarch', 'cseg', 'cssseg', 'primes_x', 'primes_n', 'mtcom_x']

Expected: vision, dircom, cdpole, nopol, cdprod, noint, cmarch, cseg, cssseg, primes_x, primes_n, mtcom_x


## 4. Sample Data

In [7]:
if df_pol_garp is not None:
    print("POL_GARP output (by guarantee):")
    # CORRECTED: Use actual column names (primes_x, primes_n, cgarp)
    df_pol_garp.select('nopol', 'cdprod', 'cgarp', 'primes_x', 'primes_n').show(5)
    
    print("\nPOL output (aggregated by policy):")
    # CORRECTED: Use actual column names
    df_pol.select('nopol', 'cdprod', 'primes_x', 'primes_n').show(5)
else:
    print("⚠ No data to display")

POL_GARP output (by guarantee):
+-----------+------+-----+--------+--------+
|      nopol|cdprod|cgarp|primes_x|primes_n|
+-----------+------+-----+--------+--------+
|POL00009453| 01030|310YY|15010.72|15010.72|
|POL00009911| 01056|300YY|15103.07|     0.0|
|POL00011327| 01082|260YY|35916.96|35916.96|
|POL00011176| 01141|310YY| 25051.7|     0.0|
|POL00003479| 01081|240YY|14007.18|14007.18|
+-----------+------+-----+--------+--------+
only showing top 5 rows


POL output (aggregated by policy):
+-----------+------+--------+--------+
|      nopol|cdprod|primes_x|primes_n|
+-----------+------+--------+--------+
|POL00001661| 01109|30698.44|     0.0|
|POL00000926| 01090|24266.67|24266.67|
|POL00011082| 01085|20295.41|     0.0|
|POL00013129| 01080|29919.31|29919.31|
|POL00007736| 01098|43718.11|43718.11|
+-----------+------+--------+--------+
only showing top 5 rows



## 5. Validate Aggregation Logic

In [8]:
if df_pol_garp is not None and df_pol is not None:
    # Verify POL is aggregation of POL_GARP
    print("Validation: POL should be aggregation of POL_GARP")
    
    # Pick one policy
    sample_nopol = df_pol.select('nopol').first()['nopol']
    
    print(f"\nSample policy: {sample_nopol}")
    
    # POL_GARP for this policy (multiple guarantees)
    garp_data = df_pol_garp.filter(df_pol_garp.nopol == sample_nopol).select('cgarp', 'primes_x')
    print("\nPOL_GARP (by guarantee):")
    garp_data.show()
    
    # POL for this policy (aggregated)
    pol_data = df_pol.filter(df_pol.nopol == sample_nopol).select('primes_x')
    print("POL (aggregated):")
    pol_data.show()
    
    print("✓ Validation complete")

Validation: POL should be aggregation of POL_GARP

Sample policy: POL00010331

POL_GARP (by guarantee):
+-----+--------+
|cgarp|primes_x|
+-----+--------+
|220YY|15493.65|
|240YY|29407.58|
+-----+--------+

POL (aggregated):
+--------+
|primes_x|
+--------+
|15493.65|
|29407.58|
+--------+

✓ Validation complete


## 6. Optional: Write to Gold (Manual)

In [None]:
# Uncomment to write outputs manually
# if df_pol_garp is not None and df_pol is not None:
#     emissions_processor.write((df_pol_garp, df_pol), VISION)
#     print("✓ Data written to gold layer")

## Summary

In [None]:
print("="*60)
print("EMISSIONS PIPELINE TESTING COMPLETE")
print("="*60)
print(f"\nVision: {VISION}")
print(f"POL_GARP: {'✓' if df_pol_garp is not None else '✗'}")
print(f"POL:      {'✓' if df_pol is not None else '✗'}")
print("\nKey learnings:")
print("  1. Use read() + transform() for testing (run() writes directly)")
print("  2. transform() returns tuple of (df_pol_garp, df_pol)")
print("  3. Column names: primes_x, primes_n, cgarp, mtcom_x")
print("\n→ Run production: python main.py --vision 202509 --component emissions")