In [1]:
import json
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, collect_list, struct, lit, count, sum as spark_sum, max as spark_max, lower, trim, collect_set, coalesce
from datetime import datetime
import json
from datetime import datetime, timedelta

# Initialize Spark session
spark = SparkSession.builder.appName("CoffeeLotLineage").getOrCreate()

def norm(column_name):
    return lower(trim(col(column_name)))

StatementMeta(, 4dd3d300-86c6-497c-aa5f-e88bc5d3160d, 3, Finished, Available, Finished)

In [2]:
start_time = time.time()

df_production = spark.table("traceability.`ACOM Production Consumption`")
df_purchase = spark.table("traceability.`ACOM Navision Purchase`")
df_sale = spark.table("traceability.`ACOM Navision Sale`")
df_eacl = spark.table("`EACL Navision`")  
df_transform = spark.table("traceability.`ACOM Nav Transform`")
df_bridge = spark.table("traceability.`ACOM Nav Bridge`")
df_results = spark.table("traceability.`ACOM Production Results`")

load_time = time.time() - start_time
print(f"‚úì Loaded all tables in {load_time:.2f} seconds")
print(f"  - Production/Consumption records: {df_production.count():,}")
print(f"  - Purchase records: {df_purchase.count():,}")
print(f"  - Sale records: {df_sale.count():,}")
print(f"  - EACL records: {df_eacl.count():,}")

StatementMeta(, 4dd3d300-86c6-497c-aa5f-e88bc5d3160d, 4, Finished, Available, Finished)

‚úì Loaded all tables in 39.99 seconds
  - Production/Consumption records: 2,846,425
  - Purchase records: 19,795
  - Sale records: 21
  - EACL records: 6


In [3]:
df_eacl_filtered = df_eacl.filter(col("Lot Number").isNotNull() & (trim(col("Lot Number")) != ""))
df_eacl_filtered = df_eacl_filtered.withColumn("lot_norm", norm("Lot Number"))
df_sale = df_sale.withColumn("sale_contract_norm", norm("Sale Contract"))

step1 = df_eacl_filtered.alias("eacl").join(
    df_sale.alias("sale"),
    col("eacl.lot_norm") == col("sale.sale_contract_norm"),
    "inner"
).select(
    col("eacl.Lot Number").alias("lotNumber"),
    col("sale.Sale Contract").alias("saleContract"),
    col("sale.Lot #").alias("saleLot"),
    col("eacl.Sale Contract #").alias("saleContractHash")
)

step1_count = step1.count()
print(f"Step 1: EACL Navision [Lot Number] -> ACOM Sale: {step1_count} matches")

# Optional: List available unique saleContractHash for reference if changing x later
available_hashes = [row["saleContractHash"] for row in step1.select("saleContractHash").distinct().collect()]
print("Available sale contract hashes (for reference):")
print(available_hashes)

# Set fixed x value (replace with your desired hash)
x = "4297250543"  # Example fixed value from sample data

# Validate x exists in data
if x not in available_hashes:
    raise ValueError(f"Fixed contract '{x}' not found in available hashes.")

print(f"Selected sales contract for tracing: {x}")

# Filter step1 for this contract
step1_filtered = step1.filter(col("saleContractHash") == x)

StatementMeta(, 4dd3d300-86c6-497c-aa5f-e88bc5d3160d, 5, Finished, Available, Finished)

Step 1: EACL Navision [Lot Number] -> ACOM Sale: 26 matches
Available sale contract hashes (for reference):
['4297250523', '4197250084', '4297250223', '4297250543', '4297250568']
Selected sales contract for tracing: 4297250543


In [4]:
step1_filtered.head(10)

StatementMeta(, 4dd3d300-86c6-497c-aa5f-e88bc5d3160d, 6, Finished, Available, Finished)

[Row(lotNumber='4297258337', saleContract='4297258337', saleLot='4297258337-LB-1', saleContractHash='4297250543'),
 Row(lotNumber='4297258337', saleContract='4297258337', saleLot='4297258337-LB-2', saleContractHash='4297250543'),
 Row(lotNumber='4297258337', saleContract='4297258337', saleLot='4297258337-LB-3', saleContractHash='4297250543'),
 Row(lotNumber='4297258337', saleContract='4297258337', saleLot='4297258337-LB-4', saleContractHash='4297250543'),
 Row(lotNumber='4297258337', saleContract='4297258337', saleLot='4297258337-LB-5', saleContractHash='4297250543'),
 Row(lotNumber='4297258337', saleContract='4297258337', saleLot='4297258337-LB-6', saleContractHash='4297250543'),
 Row(lotNumber='4297258337', saleContract='4297258337', saleLot='4297258337-LB-7', saleContractHash='4297250543'),
 Row(lotNumber='4297258337', saleContract='4297258337', saleLot='4297258337-LB-8', saleContractHash='4297250543'),
 Row(lotNumber='4297258337', saleContract='4297258337', saleLot='4297258337-LB-9

In [5]:
# Step 2: ACOM Navision Sale ["Lot #"] -> ACOM Nav Transform ["Sale Lot"]
step1_filtered = step1_filtered.withColumn("sale_lot_norm", norm("saleLot"))
df_transform = df_transform.withColumn("transform_sale_lot_norm", norm("Sale Lot"))

step2 = step1_filtered.join(
    df_transform,
    col("sale_lot_norm") == col("transform_sale_lot_norm"),
    "inner"
).select(
    "lotNumber",
    "saleContract",
    "saleLot",
    "saleContractHash",
    col("Production Lot").alias("productionLot")
)

step2_count = step2.count()
print(f"Step 2 (filtered): ACOM Sale -> ACOM Transform: {step2_count} matches")

StatementMeta(, 4dd3d300-86c6-497c-aa5f-e88bc5d3160d, 7, Finished, Available, Finished)

Step 2 (filtered): ACOM Sale -> ACOM Transform: 10 matches


In [6]:
step2.head(10)

StatementMeta(, 4dd3d300-86c6-497c-aa5f-e88bc5d3160d, 8, Finished, Available, Finished)

[Row(lotNumber='4297258337', saleContract='4297258337', saleLot='4297258337-LB-1', saleContractHash='4297250543', productionLot='4297258337-LB-1'),
 Row(lotNumber='4297258337', saleContract='4297258337', saleLot='4297258337-LB-2', saleContractHash='4297250543', productionLot='4297258337-LB-2'),
 Row(lotNumber='4297258337', saleContract='4297258337', saleLot='4297258337-LB-3', saleContractHash='4297250543', productionLot='4297258337-LB-3'),
 Row(lotNumber='4297258337', saleContract='4297258337', saleLot='4297258337-LB-4', saleContractHash='4297250543', productionLot='4297258337-LB-4'),
 Row(lotNumber='4297258337', saleContract='4297258337', saleLot='4297258337-LB-5', saleContractHash='4297250543', productionLot='4297258337-LB-5'),
 Row(lotNumber='4297258337', saleContract='4297258337', saleLot='4297258337-LB-6', saleContractHash='4297250543', productionLot='4297258337-LB-6'),
 Row(lotNumber='4297258337', saleContract='4297258337', saleLot='4297258337-LB-7', saleContractHash='4297250543'

In [7]:
# Step 3: ACOM Nav Transform ["Production Lot"] -> ACOM Nav Bridge ["Lot No_(O)"]
step2 = step2.withColumn("production_lot_norm", norm("productionLot"))
df_bridge = df_bridge.withColumn("lot_no_o_norm", norm("Lot No_(O)"))

step3 = step2.join(
    df_bridge,
    col("production_lot_norm") == col("lot_no_o_norm"),
    "inner"
).select(
    "lotNumber",
    "saleContract",
    "saleLot",
    "productionLot",
    "saleContractHash",
    col("Lot No_(D)").alias("bridgeDestLot")
)

step3_count = step3.count()
print(f"Step 3 (filtered): ACOM Transform -> ACOM Bridge: {step3_count} matches")

StatementMeta(, 4dd3d300-86c6-497c-aa5f-e88bc5d3160d, 9, Finished, Available, Finished)

Step 3 (filtered): ACOM Transform -> ACOM Bridge: 10 matches


In [8]:
step3.head(10)

StatementMeta(, 4dd3d300-86c6-497c-aa5f-e88bc5d3160d, 10, Finished, Available, Finished)

[Row(lotNumber='4297258337', saleContract='4297258337', saleLot='4297258337-LB-1', productionLot='4297258337-LB-1', saleContractHash='4297250543', bridgeDestLot='4297258337-LB-1'),
 Row(lotNumber='4297258337', saleContract='4297258337', saleLot='4297258337-LB-2', productionLot='4297258337-LB-2', saleContractHash='4297250543', bridgeDestLot='4297258337-LB-2'),
 Row(lotNumber='4297258337', saleContract='4297258337', saleLot='4297258337-LB-3', productionLot='4297258337-LB-3', saleContractHash='4297250543', bridgeDestLot='4297258337-LB-3'),
 Row(lotNumber='4297258337', saleContract='4297258337', saleLot='4297258337-LB-4', productionLot='4297258337-LB-4', saleContractHash='4297250543', bridgeDestLot='4297258337-LB-4'),
 Row(lotNumber='4297258337', saleContract='4297258337', saleLot='4297258337-LB-5', productionLot='4297258337-LB-5', saleContractHash='4297250543', bridgeDestLot='4297258337-LB-5'),
 Row(lotNumber='4297258337', saleContract='4297258337', saleLot='4297258337-LB-6', productionLo

In [9]:
# Step 4: ACOM Nav Bridge ["Lot No_(D)"] -> ACOM Production Results ["Lot No_"]
step3 = step3.withColumn("bridge_dest_lot_norm", norm("bridgeDestLot"))
df_results = df_results.withColumn("results_lot_no_norm", norm("Lot No"))

step4 = step3.join(
    df_results,
    col("bridge_dest_lot_norm") == col("results_lot_no_norm"),
    "inner"
).select(
    "lotNumber",
    "saleContract",
    "saleLot",
    "productionLot",
    "bridgeDestLot",
    "saleContractHash",
    col("Prod_ Order No_").alias("prodOrder")
)

step4_count = step4.count()
print(f"Step 4 (filtered): ACOM Bridge -> ACOM Production Results: {step4_count} matches")

StatementMeta(, 4dd3d300-86c6-497c-aa5f-e88bc5d3160d, 11, Finished, Available, Finished)

Step 4 (filtered): ACOM Bridge -> ACOM Production Results: 10 matches


In [10]:
step4.head(10)

StatementMeta(, 4dd3d300-86c6-497c-aa5f-e88bc5d3160d, 12, Finished, Available, Finished)

[Row(lotNumber='4297258337', saleContract='4297258337', saleLot='4297258337-LB-1', productionLot='4297258337-LB-1', bridgeDestLot='4297258337-LB-1', saleContractHash='4297250543', prodOrder='PRDL2500172'),
 Row(lotNumber='4297258337', saleContract='4297258337', saleLot='4297258337-LB-2', productionLot='4297258337-LB-2', bridgeDestLot='4297258337-LB-2', saleContractHash='4297250543', prodOrder='PRDL2500172'),
 Row(lotNumber='4297258337', saleContract='4297258337', saleLot='4297258337-LB-3', productionLot='4297258337-LB-3', bridgeDestLot='4297258337-LB-3', saleContractHash='4297250543', prodOrder='PRDL2500172'),
 Row(lotNumber='4297258337', saleContract='4297258337', saleLot='4297258337-LB-4', productionLot='4297258337-LB-4', bridgeDestLot='4297258337-LB-4', saleContractHash='4297250543', prodOrder='PRDL2500172'),
 Row(lotNumber='4297258337', saleContract='4297258337', saleLot='4297258337-LB-5', productionLot='4297258337-LB-5', bridgeDestLot='4297258337-LB-5', saleContractHash='429725054

In [11]:
print("\n[STEP 5] Performing join in correct order...")
start_time = time.time()

# STEP 5a: First, filter df_production to only the relevant prodOrders from step4
print("  5a: Filtering df_production to relevant production orders from step4...")
relevant_prod_orders = step4.select("prodOrder").distinct()
df_production_filtered = df_production.join(
    relevant_prod_orders,
    df_production["Prod Ord No"] == relevant_prod_orders["prodOrder"],
    "inner"
)

filtered_count = df_production_filtered.count()
print(f"  ‚úì Filtered to {filtered_count:,} production records")

# STEP 5b: Now join the filtered production with purchase data
print("  5b: Joining filtered production with df_purchase...")
joined_production_purchase = df_production_filtered.alias("prod").join(
    df_purchase.alias("purch"),
    norm("prod.Lot No_") == norm("purch.Lots"),
    "left"
).select(
    col("prod.Prod Ord No").alias("prodOrder"),
    col("prod.Lot No_").alias("consumptionLot"),
    col("purch.Item Number").alias("VLOOKUP_Col2"),
    col("purch.Description").alias("VLOOKUP_Description"),
    col("purch.Quantity").alias("VLOOKUP_Quantity"),
    col("purch.UoM").alias("VLOOKUP_Unit_of_Measure"),
    col("purch.Contract").alias("VLOOKUP_Contract"),
    col("purch.Season").alias("VLOOKUP_Season"),
    col("purch.Date").alias("VLOOKUP_Date_of_delivery"),
    col("purch.Location Code").alias("Location Code"),
    col("purch.Counterparty").alias("Counterparty"),
    col("purch.Certified").alias("Certified")
)

vlookup_time = time.time() - start_time
joined_count = joined_production_purchase.count()
print(f"  ‚úì Join completed in {vlookup_time:.2f} seconds")
print(f"    - Total records: {joined_count:,}")

# STEP 5c: Finally, join step4 with the enriched production-purchase data
print("  5c: Joining step4 with enriched production-purchase data...")
step5 = step4.alias("s4").join(
    joined_production_purchase.alias("jp"),
    col("s4.prodOrder") == col("jp.prodOrder"),
    "left"
).select(
    col("s4.lotNumber"),
    col("s4.prodOrder"),
    col("jp.consumptionLot"),
    col("s4.saleContractHash"),
    col("jp.VLOOKUP_Col2"),
    col("jp.VLOOKUP_Description"),
    col("jp.VLOOKUP_Quantity"),
    col("jp.VLOOKUP_Unit_of_Measure"),
    col("jp.VLOOKUP_Contract"),
    col("jp.VLOOKUP_Season"),
    col("jp.VLOOKUP_Date_of_delivery"),
    col("jp.Location Code"),
    col("jp.Counterparty"),
    col("jp.Certified")
)

step5_count = step5.count()
print(f"Step 5: Final join completed: {step5_count} matches")

# Build purchase_lot_map as before
purchase_lot_map = step5.groupBy("saleContractHash").agg(
    collect_list(
        struct(
            "consumptionLot",
            "VLOOKUP_Col2",
            "VLOOKUP_Description",
            "VLOOKUP_Quantity",
            "VLOOKUP_Unit_of_Measure",
            "VLOOKUP_Contract",
            "VLOOKUP_Season",
            "VLOOKUP_Date_of_delivery",
            "Location Code",
            "Counterparty",
            "Certified"
        )
    ).alias("consumptionDetails")
)

map_size = purchase_lot_map.count()
print(f"Purchase lot mapping built for selected contract: {map_size} sale contract(s)")

total_step5_time = time.time() - start_time
print(f"‚úì Step 5 completed in {total_step5_time:.2f} seconds")

StatementMeta(, 4dd3d300-86c6-497c-aa5f-e88bc5d3160d, 13, Finished, Available, Finished)


[STEP 5] Performing join in correct order...
  5a: Filtering df_production to relevant production orders from step4...
  ‚úì Filtered to 9 production records
  5b: Joining filtered production with df_purchase...
  ‚úì Join completed in 5.29 seconds
    - Total records: 9
  5c: Joining step4 with enriched production-purchase data...
Step 5: Final join completed: 90 matches
Purchase lot mapping built for selected contract: 1 sale contract(s)
‚úì Step 5 completed in 14.12 seconds


In [12]:
step5.head(10)

StatementMeta(, 4dd3d300-86c6-497c-aa5f-e88bc5d3160d, 14, Finished, Available, Finished)

[Row(lotNumber='4297258337', prodOrder='PRDL2500172', consumptionLot='4297258196-GL-6', saleContractHash='4297250543', VLOOKUP_Col2=None, VLOOKUP_Description=None, VLOOKUP_Quantity=None, VLOOKUP_Unit_of_Measure=None, VLOOKUP_Contract=None, VLOOKUP_Season=None, VLOOKUP_Date_of_delivery=None, Location Code=None, Counterparty=None, Certified=None),
 Row(lotNumber='4297258337', prodOrder='PRDL2500172', consumptionLot='4297258034-GL-5', saleContractHash='4297250543', VLOOKUP_Col2=None, VLOOKUP_Description=None, VLOOKUP_Quantity=None, VLOOKUP_Unit_of_Measure=None, VLOOKUP_Contract=None, VLOOKUP_Season=None, VLOOKUP_Date_of_delivery=None, Location Code=None, Counterparty=None, Certified=None),
 Row(lotNumber='4297258337', prodOrder='PRDL2500172', consumptionLot='4297258367-GL', saleContractHash='4297250543', VLOOKUP_Col2=None, VLOOKUP_Description=None, VLOOKUP_Quantity=None, VLOOKUP_Unit_of_Measure=None, VLOOKUP_Contract=None, VLOOKUP_Season=None, VLOOKUP_Date_of_delivery=None, Location Code=

In [16]:
# ========== CELL 1: Fixed Lineage Tracker Class ==========
import json
from datetime import datetime, timedelta

class LotLineageTracker:
    """Recursive lineage tracker for coffee lots with complete bidirectional tracing."""
    
    def __init__(self, df_production):
        self.df_production = df_production
        self.lot_records_cache = {}
        self.prod_order_cache = {}
        print("‚úì LotLineageTracker initialized")
    
    def parse_excel_date(self, date_value):
        if date_value is None or date_value == '':
            return ''
        try:
            if isinstance(date_value, (int, float)):
                base_date = datetime(1899, 12, 30)
                parsed_date = base_date + timedelta(days=date_value)
                return parsed_date.strftime('%Y-%m-%d')
            else:
                parsed_date = datetime.fromisoformat(str(date_value).split('T')[0])
                return parsed_date.strftime('%Y-%m-%d')
        except:
            return str(date_value)
    
    def get_lot_records(self, lot_no):
        if lot_no in self.lot_records_cache:
            return self.lot_records_cache[lot_no]
        records = self.df_production.filter(self.df_production["Lot No_"] == lot_no).collect()
        result = [row.asDict() for row in records]
        self.lot_records_cache[lot_no] = result
        return result
    
    def get_prod_order_records(self, prod_order):
        if prod_order in self.prod_order_cache:
            return self.prod_order_cache[prod_order]
        records = self.df_production.filter(self.df_production["Prod_ Order No_"] == prod_order).collect()
        result = [row.asDict() for row in records]
        self.prod_order_cache[prod_order] = result
        return result
    
    def get_process_types_for_lot(self, lot):
        lot_data = self.get_lot_records(lot)
        if not lot_data:
            return ['Not Found']
        types = set()
        for record in lot_data:
            process_type = record.get('Process Type', 'Unknown')
            if process_type:
                types.add(str(process_type).strip())
        return list(types) if types else ['Unknown']
    
    def get_lot_lineage(self, lot_no, max_depth=100):
        visited = set()
        
        def trace_lot_origin(lot, depth=0):
            # Check termination conditions
            if lot in visited or depth >= max_depth:
                return {
                    'lot_no': lot,
                    'warning': 'Max depth reached' if depth >= max_depth else 'Already visited',
                    'process_types': self.get_process_types_for_lot(lot),
                    'sources': [],
                    'destinations': [],
                    'details': {}
                }
            
            visited.add(lot)
            lot_data = self.get_lot_records(lot)
            
            # Handle not found
            if not lot_data:
                return {
                    'lot_no': lot,
                    'process_types': ['Not Found'],
                    'sources': [],
                    'destinations': [],
                    'details': {}
                }
            
            # Group records by process type
            processes = {}
            for record in lot_data:
                process_type = record.get('Process Type') or 'Unknown'
                if process_type not in processes:
                    processes[process_type] = []
                processes[process_type].append(record)
            
            # Initialize node structure
            node = {
                'lot_no': lot,
                'process_types': self.get_process_types_for_lot(lot),
                'sources': [],
                'destinations': [],
                'details': {}
            }
            
            # Add basic details
            if lot_data:
                first = lot_data[0]
                node['details'] = {
                    'item_no': first.get('Item No_', ''),
                    'description': first.get('Description', ''),
                    'certified': first.get('Certified', ''),
                    'unit_of_measure': first.get('Unit of Measure', 'KG'),
                    'location_code': first.get('Location Code', ''),
                    'counterparty': first.get('Counterparty', '')
                }
            
            # ===== HANDLE OUTPUT PROCESS =====
            # This lot was produced (Output) by consuming other lots
            # Find SOURCES: What was consumed to make this lot?
            if 'Output' in processes:
                for output_record in processes['Output']:
                    prod_order = output_record.get('Prod_ Order No_')
                    if prod_order:
                        node['details']['production_order'] = prod_order
                        node['details']['output_quantity'] = output_record.get('Quantity (Inv_UoM)', 0)
                        node['details']['output_date'] = self.parse_excel_date(output_record.get('Date'))
                        
                        # Get all records in this production order
                        prod_records = self.get_prod_order_records(prod_order)
                        consumption_lots = set()
                        
                        # Find all CONSUMPTION lots in the same production order
                        for record in prod_records:
                            if record.get('Process Type') == 'Consumption' and record.get('Lot No_') != lot:
                                consumption_lots.add(record.get('Lot No_'))
                        
                        # Recursively trace each consumed lot as a SOURCE
                        for consumed_lot in consumption_lots:
                            if consumed_lot:
                                source_node = trace_lot_origin(consumed_lot, depth + 1)
                                source_node['relationship'] = 'Consumed to produce this lot'
                                node['sources'].append(source_node)
            
            # ===== HANDLE CONSUMPTION PROCESS =====
            # This lot was consumed (Consumption) to produce other lots
            # Find DESTINATIONS: What was produced by consuming this lot?
            if 'Consumption' in processes:
                for consumption_record in processes['Consumption']:
                    prod_order = consumption_record.get('Prod_ Order No_')
                    if prod_order:
                        node['details']['consumption_quantity'] = consumption_record.get('Quantity (Inv_UoM)', 0)
                        node['details']['consumption_date'] = self.parse_excel_date(consumption_record.get('Date'))
                        
                        # Get all records in this production order
                        prod_records = self.get_prod_order_records(prod_order)
                        output_lots = set()
                        
                        # Find all OUTPUT lots in the same production order
                        for record in prod_records:
                            if record.get('Process Type') == 'Output' and record.get('Lot No_') != lot:
                                output_lots.add(record.get('Lot No_'))
                        
                        # Recursively trace each output lot as a DESTINATION
                        for output_lot in output_lots:
                            if output_lot:
                                dest_node = trace_lot_origin(output_lot, depth + 1)
                                dest_node['relationship'] = 'Produced by consuming this lot'
                                node['destinations'].append(dest_node)
            
            # ===== HANDLE TRANSFER PROCESS =====
            # This lot was transferred
            if 'Transfer' in processes:
                for transfer_record in processes['Transfer']:
                    dest_lot = transfer_record.get('Lot Dest')
                    node['details']['transfer'] = {
                        'transfer_quantity': transfer_record.get('Quantity (Inv_UoM)', 0),
                        'transfer_date': self.parse_excel_date(transfer_record.get('Date')),
                        'transferred_to': dest_lot
                    }
                    
                    # Find DESTINATION: Where this lot was transferred TO
                    if dest_lot and dest_lot != lot:
                        dest_node = trace_lot_origin(dest_lot, depth + 1)
                        dest_node['relationship'] = 'Transferred to'
                        node['destinations'].append(dest_node)
                    
                    # Find SOURCE: Lots that were transferred to create this lot
                    source_transfers = self.df_production.filter(
                        (self.df_production["Process Type"] == "Transfer") & 
                        (self.df_production["Lot Dest"] == lot) &
                        (self.df_production["Lot No_"] != lot)
                    ).collect()
                    
                    for src_transfer in source_transfers:
                        source_lot = src_transfer['Lot No_']
                        if source_lot and source_lot != lot:
                            source_node = trace_lot_origin(source_lot, depth + 1)
                            source_node['relationship'] = 'Transferred from'
                            node['sources'].append(source_node)
            
            # ===== HANDLE PURCHASE PROCESS =====
            # This lot was purchased (origin point)
            if 'Purchase' in processes:
                purchase_record = processes['Purchase'][0]
                node['details']['purchase'] = {
                    'quantity': purchase_record.get('Quantity (Inv_UoM)', 0),
                    'date': self.parse_excel_date(purchase_record.get('Date'))
                }
                node['is_origin'] = True  # Mark as origin point
            
            return node
        
        # Start the recursive trace
        lineage_tree = trace_lot_origin(lot_no)
        
        return {
            'query_lot': lot_no,
            'total_lots_traced': len(visited),
            'lineage_tree': lineage_tree
        }
    
    def get_lineage_as_json(self, lot_no, max_depth=100, pretty=True):
        """Get lineage as JSON string"""
        lineage = self.get_lot_lineage(lot_no, max_depth)
        if pretty:
            return json.dumps(lineage, indent=2, ensure_ascii=False)
        return json.dumps(lineage, ensure_ascii=False)
    
    def trace_multiple_lots(self, lot_numbers, max_depth=100):
        """Trace lineage for multiple lots with progress tracking"""
        results = {}
        total = len(lot_numbers)
        for i, lot_no in enumerate(lot_numbers, 1):
            if i % 10 == 0 or i == total:
                print(f"[{i}/{total}] Tracing lot: {lot_no}")
            results[lot_no] = self.get_lot_lineage(lot_no, max_depth)
        return results
    
    def clear_cache(self):
        """Clear all caches"""
        self.lot_records_cache.clear()
        self.prod_order_cache.clear()
        print("‚úì Cache cleared")


# ========== CELL 2: Initialize Tracker ==========
tracker = LotLineageTracker(df_production)


# ========== CELL 3: Get Consumption Lots from Step 5 ==========
consumption_lots_df = step5.select("consumptionLot").distinct()
consumption_lots = [row['consumptionLot'] for row in consumption_lots_df.collect()]

print(f"‚úì Found {len(consumption_lots)} unique consumption lots")
print(f"Sample lots: {consumption_lots[:5]}")


# ========== CELL 4: Test with Single Lot ==========
test_lot = consumption_lots[0]
print(f"\n=== Testing with lot: {test_lot} ===")

test_lineage = tracker.get_lot_lineage(test_lot, max_depth=150)
print(f"‚úì Successfully traced {test_lineage['total_lots_traced']} lots")
print(f"‚úì Sources found: {len(test_lineage['lineage_tree'].get('sources', []))}")
print(f"‚úì Destinations found: {len(test_lineage['lineage_tree'].get('destinations', []))}")
print(f"\nSample output (first 2000 chars):")
print(json.dumps(test_lineage, indent=2)[:2000])


# ========== CELL 5: Trace All Consumption Lots ==========
print(f"\n=== Tracing ALL {len(consumption_lots)} consumption lots ===")
print("‚è≥ This may take several minutes...\n")

import time
start_time = time.time()

all_lineages = tracker.trace_multiple_lots(consumption_lots, max_depth=150)

elapsed = time.time() - start_time
print(f"\n‚úì Completed in {elapsed:.2f} seconds ({elapsed/60:.1f} minutes)")
print(f"  - Consumption lots traced: {len(all_lineages)}")
print(f"  - Total related lots found: {sum(l['total_lots_traced'] for l in all_lineages.values())}")


# ========== CELL 6: Export Complete JSON ==========
total_lots = sum(l['total_lots_traced'] for l in all_lineages.values())
avg_lots = total_lots / len(all_lineages) if all_lineages else 0

final_output = {
    'sale_contract': 'SALE_CONTRACT_ID',  # Replace with actual contract
    'trace_timestamp': datetime.now().isoformat(),
    'summary': {
        'consumption_lots_found': len(consumption_lots),
        'total_related_lots_traced': total_lots,
        'average_depth': round(avg_lots, 1),
        'max_depth_used': 150
    },
    'consumption_lots': consumption_lots,
    'lineage_traces': all_lineages
}

json_output = json.dumps(final_output, indent=2, ensure_ascii=False)

print("\n" + "="*80)
print("üéâ COMPLETE - BIDIRECTIONAL LINEAGE TRACING")
print("="*80)
print(f"\n‚úì Total consumption lots: {len(all_lineages)}")
print(f"‚úì Total related lots: {total_lots}")
print(f"‚úì JSON size: {len(json_output.encode('utf-8')) / 1024:.2f} KB")
print("\n" + "="*80)
print("üì• COPY JSON BELOW")
print("="*80 + "\n")

print(json_output)

print("\n" + "="*80)
print("üì• END OF JSON")
print("="*80)


StatementMeta(, 4dd3d300-86c6-497c-aa5f-e88bc5d3160d, 18, Finished, Available, Finished)

‚úì LotLineageTracker initialized
‚úì Found 9 unique consumption lots
Sample lots: ['4297258337-GL1-5', '4297258337-GL1-1', '4297258196-GL-6', '4297258367-GL', '4297258034-GL-5']

=== Testing with lot: 4297258337-GL1-5 ===
‚úì Successfully traced 5 lots
‚úì Sources found: 0
‚úì Destinations found: 4

Sample output (first 2000 chars):
{
  "query_lot": "4297258337-GL1-5",
  "total_lots_traced": 5,
  "lineage_tree": {
    "lot_no": "4297258337-GL1-5",
    "process_types": [
      "Consumption",
      "Transfer"
    ],
    "sources": [],
    "destinations": [
      {
        "lot_no": "00-4297258337-18",
        "process_types": [
          "Output"
        ],
        "sources": [],
        "destinations": [],
        "details": {
          "item_no": "R-16 STD WP",
          "description": "R-S16 wet polish",
          "certified": null,
          "unit_of_measure": "KG",
          "location_code": "",
          "counterparty": ""
        },
        "relationship": "Transferred to"
      