In [0]:
%sql
USE CATALOG `nokia-assginment-catalog`;
-- drop schema patent_data cascade;

In [0]:
# Try to create a widget to control schema dropping
try:
    dbutils.widgets.dropdown("drop_patent_data_schema", "false", ["true", "false"], "Drop schema patent_data cascade")
    drop_patent_data_schema = dbutils.widgets.get("drop_patent_data_schema") == "true"
except:
    # Default to not dropping schema in job mode
    drop_patent_data_schema = False

print(f"Drop patent_data schema setting: {drop_patent_data_schema}")

# Execute SQL to drop schema if requested
if drop_patent_data_schema:
    try:
        print("Dropping schema patent_data cascade...")
        spark.sql("DROP SCHEMA IF EXISTS patent_data CASCADE")
        print("Schema patent_data successfully dropped")
    except Exception as e:
        print(f"Error dropping schema: {str(e)}")

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, concat_ws, array_join, from_unixtime, to_date
from pyspark.sql.functions import regexp_replace, lpad, when, length, expr, explode_outer, coalesce, transform, flatten
from pyspark.sql.functions import collect_list, struct, array, current_timestamp
from pyspark.sql.types import StringType, TimestampType, StructType, StructField
from delta.tables import DeltaTable
import os
import traceback
import time
from datetime import datetime

def initialize_spark():
    """Initialize Spark session with Delta Lake support"""
    return SparkSession.builder \
        .appName("Patent Gold Layer Processor") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .config("spark.driver.memory", "8g") \
        .config("spark.executor.memory", "8g") \
        .config("spark.executor.cores", "4") \
        .getOrCreate()

def clean_date(col_obj):
    """Clean and format date strings from various formats to standard date"""
    return to_date(regexp_replace(col_obj.cast("string"), r"[\[\]]", ""), "yyyyMMdd")

def format_class(class_col):
    """Ensure class is exactly 2 digits"""
    return lpad(class_col.cast("string"), 2, "0")

def format_group(group_col):
    """Format group (1-4 digits)"""
    return when(group_col.isNotNull(), group_col.cast("string")).otherwise(lit(""))

def format_subgroup(subgroup_col):
    """Ensure subgroup has at least 2 digits"""
    return when(length(subgroup_col.cast("string")) < 2, 
                lpad(subgroup_col.cast("string"), 2, "0")
            ).otherwise(subgroup_col.cast("string"))

def format_patent_class(section_col, class_col, subclass_col, main_group_col, subgroup_col):
    """Create standardized classification code string"""
    return concat_ws("", 
        section_col,                                    
        format_class(class_col),                      
        subclass_col,                                 
        format_group(main_group_col),                 
        lit("/"),                                            
        format_subgroup(subgroup_col)                 
    )

def check_path_exists(path):
    """Check if a path exists and is accessible"""
    try:
        dbutils.fs.ls(path)
        return True
    except:
        return False

def is_delta_table(spark, path):
    """Check if a path is a valid Delta table"""
    try:
        DeltaTable.forPath(spark, path)
        return True
    except:
        return False

def check_checkpoint_exists(spark, file_name, checkpoint_location):
    """Check if a checkpoint exists for the given file name"""
    checkpoint_path = f"{checkpoint_location}/{file_name}"
    try:
        dbutils.fs.ls(checkpoint_path)
        return True
    except:
        return False

def create_checkpoint_file(spark, checkpoint_path, dir_name):
    """Create a checkpoint file with explicit schema to avoid type inference issues"""
    try:
        checkpoint_schema = StructType([
            StructField("file_name", StringType(), False),
            StructField("processed_timestamp", TimestampType(), False)
        ])
        
        current_time = datetime.now()
        checkpoint_df = spark.createDataFrame(
            [(dir_name, current_time)],
            schema=checkpoint_schema
        )
        
        checkpoint_df.write.format("delta").mode("overwrite").save(checkpoint_path)
        return True
    except Exception as e:
        print(f"Warning: Could not create checkpoint file: {str(e)}")
        print(traceback.format_exc())
        return False

def update_processing_metadata(spark, new_data_processed, database_name="patent_data"):
    """
    Update the processing metadata table that tracks if new data was processed
    
    Args:
        spark: SparkSession
        new_data_processed: Whether new data was processed
        database_name: Database name
    """
    try:
        # Create database if it doesn't exist
        spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")
        
        # Create metadata table if it doesn't exist
        spark.sql(f"""
        CREATE TABLE IF NOT EXISTS {database_name}.processing_metadata (
            processing_id STRING,
            processing_timestamp TIMESTAMP,
            new_data_processed BOOLEAN,
            batch_count INT,
            detail STRING
        ) USING DELTA
        """)
        
        # Generate a unique ID
        import uuid
        processing_id = str(uuid.uuid4())
        
        # Get current batch count
        try:
            checkpoint_location = "/Volumes/nokia-assginment-catalog/checkpoints/checkpoints_data/gold_autoloader/"
            if check_path_exists(checkpoint_location):
                checkpoints = dbutils.fs.ls(checkpoint_location)
                batch_count = len([c for c in checkpoints if not c.name.startswith('_')])
            else:
                batch_count = 0
        except:
            batch_count = -1  # Error determining batch count
        
        # Insert record into metadata table
        spark.sql(f"""
        INSERT INTO {database_name}.processing_metadata VALUES (
            '{processing_id}',
            current_timestamp(),
            {str(new_data_processed).lower()},
            {batch_count},
            'Gold layer processing completed'
        )
        """)
        
        print(f"Updated processing metadata: new_data_processed={new_data_processed}, batch_count={batch_count}")
        return True
    except Exception as e:
        print(f"Warning: Could not update processing metadata: {str(e)}")
        print(traceback.format_exc())
        return False

def merge_or_write_delta(spark, df, output_path, batch_mode="append", merge_keys=["publication_number"]):
    """
    Performs a proper upsert (update if exists, insert if not exists) operation 
    using the specified merge keys.
    Falls back to regular write if delta table doesn't exist yet.
    """
    try:
        # Check if the Delta table already exists and is valid
        is_valid_delta = is_delta_table(spark, output_path) 
        
        if is_valid_delta and batch_mode != "overwrite":
            # If it exists and is valid, perform a merge operation (upsert)
            print(f"Valid Delta table exists at {output_path}, performing merge operation")
            
            # Create a DeltaTable object for the existing table
            delta_table = DeltaTable.forPath(spark, output_path)
            
            # Create the merge condition dynamically based on the merge keys
            merge_condition = " AND ".join([f"target.{key} = source.{key}" for key in merge_keys])
            
            # Perform the merge operation
            delta_table.alias("target").merge(
                df.alias("source"),
                merge_condition
            ).whenMatchedUpdateAll(
            ).whenNotMatchedInsertAll(
            ).execute()
            
            print(f"Merge operation completed for {output_path}")
            return True
        else:
            # If the table doesn't exist, isn't valid, or we're in overwrite mode, just write the data
            print(f"Writing data to {output_path} using mode {batch_mode}")
            df.write.format("delta").mode(batch_mode).save(output_path)
            print(f"Write operation completed for {output_path}")
            return True
    except Exception as e:
        print(f"Error in merge/write operation: {str(e)}")
        print(traceback.format_exc())
        
        # Fallback to regular write if anything goes wrong with merge
        try:
            print(f"Falling back to regular write operation for {output_path}")
            df.write.format("delta").mode(batch_mode).save(output_path)
            print(f"Fallback write operation completed for {output_path}")
            return True
        except Exception as write_error:
            print(f"Error in fallback write operation: {str(write_error)}")
            print(traceback.format_exc())
            raise

def extract_claim_text(claims_df):
    """Extract claim text values recursively with added diagnostics and fallbacks"""
    try:
        print(f"Starting claim text extraction for {claims_df.count()} claims")
        
        # Show claim structure for debugging
        print("Claim structure sample:")
        claims_df.select("publication_number", "claim").limit(1).show(truncate=False)
        
        # Check if claim-text exists
        has_claim_text = claims_df.select(col("claim.claim-text").isNotNull().alias("has_claim_text"))
        has_claim_text_count = has_claim_text.filter("has_claim_text = true").count()
        print(f"Records with claim-text: {has_claim_text_count}")
        
        if has_claim_text_count == 0:
            print("WARNING: No claim-text fields found, using fallback extraction")
            # Return simple fallback with just publication number
            return claims_df.select(
                "publication_number",
                lit(None).alias("value_array")
            )
        
        # First level extraction
        level1_df = claims_df.select(
            "publication_number", 
            explode_outer(col("claim.claim-text")).alias("claim_text_obj")
        )
        
        print(f"First level extraction produced {level1_df.count()} records")
        
        # Check if _VALUE exists in the data
        has_value = level1_df.select(col("claim_text_obj._VALUE").isNotNull().alias("has_value"))
        has_value_count = has_value.filter("has_value = true").count()
        print(f"Records with _VALUE: {has_value_count}")
        
        # Extract direct _VALUE arrays
        level2_df = level1_df.select(
            "publication_number",
            col("claim_text_obj._VALUE").alias("value_array")
        )
        
        # Check for nested claim-text
        nested_claims_df = level1_df.filter(col("claim_text_obj.claim-text").isNotNull())
        nested_count = nested_claims_df.count()
        print(f"Records with nested claim-text: {nested_count}")
        
        if nested_count > 0:
            print("Processing nested claims")
            try:
                nested_df = nested_claims_df.select(
                    "publication_number",
                    explode_outer(col("claim_text_obj.claim-text")).alias("nested_claim_text")
                ).select(
                    "publication_number",
                    col("nested_claim_text._VALUE").alias("value_array")
                )
                
                # Union direct and nested values
                result_df = level2_df.union(nested_df)
                print(f"Final extracted records: {result_df.count()}")
                return result_df
            except Exception as nested_error:
                print(f"Error processing nested claims: {str(nested_error)}")
                print(traceback.format_exc())
                # Return just the direct values if nested processing fails
                return level2_df
        else:
            return level2_df
            
    except Exception as e:
        print(f"Error in claim text extraction: {str(e)}")
        print(traceback.format_exc())
        # Return a DataFrame with the same schema but empty values as fallback
        return claims_df.select(
            "publication_number",
            lit(None).alias("value_array")
        )

def process_patents_gold(renamed_df, output_path, batch_mode="append"):
    """Process the patent gold entity with properly formatted CPC and additional data"""
    try:
        # Create the standard patent gold dataframe
        patent_df = renamed_df.select(
            # Basic patent information
            col("publication_number"),
            col("invention_title"),
            array_join(col("abstract_text"), " ").alias("abstract"),
            array_join(col("description_text"), " ").alias("description"),

            # Publication reference
            col("publication_country"),
            clean_date(col("publication_date")).alias("publication_date"),
            col("publication_kind"),
            
            # Application reference
            col("application_country"),
            clean_date(col("application_date")).alias("application_date"),
            col("application_number"),
            col("application_series_code"),
            
            # CPC components individually for later analysis
            col("cpc_section").alias("cpc_section"),
            format_class(col("cpc_class")).alias("cpc_class"),
            col("cpc_subclass").alias("cpc_subclass"),
            format_group(col("cpc_main_group")).alias("cpc_group"),
            format_subgroup(col("cpc_subgroup")).alias("cpc_subgroup"),
            
            # CPC metadata
            clean_date(col("cpc_action_date")).alias("cpc_action_date"),
            col("cpc_data_source"),
            col("cpc_status"),
            col("cpc_value"),
            clean_date(col("cpc_version_date")).alias("cpc_version_date"),
            col("cpc_office_country"),
            col("cpc_scheme_origin"),
            col("cpc_symbol_position"),
            
            # Full CPC code with proper structure (e.g., A01B33/00)
            format_patent_class(
                col("cpc_section"),
                col("cpc_class"),
                col("cpc_subclass"),
                col("cpc_main_group"),
                col("cpc_subgroup")
            ).alias("cpc_main"),
            
            # Hierarchical CPC code for tiered analysis
            concat_ws("", 
                col("cpc_section"),                                  
                format_class(col("cpc_class"))                      
            ).alias("cpc_class_level"),
            
            concat_ws("", 
                col("cpc_section"),                                  
                format_class(col("cpc_class")),                     
                col("cpc_subclass")                                 
            ).alias("cpc_subclass_level")
        )
        
        # Add gold ingestion metadata
        patent_df = patent_df.withColumn("gold_ingestion_date", current_timestamp())
        
        # Use merge operation for upsert capability
        spark = renamed_df.sparkSession
        merge_or_write_delta(spark, patent_df, output_path, batch_mode)
        
        return patent_df
    except Exception as e:
        print(f"Error processing patents gold: {str(e)}")
        print(traceback.format_exc())
        raise

def process_ipc_gold(renamed_df, output_path, individual_output_path, batch_mode="append"):
    """Process IPC classifications for gold layer with both grouped and individual records"""
    try:
        # Process IPC classifications first
        ipc_df = renamed_df.select(
            col("publication_number"),
            explode_outer(col("ipc_classification")).alias("ipc")
        ).select(
            col("publication_number"),
            col("ipc.section").alias("ipc_section"),
            col("ipc.class").alias("ipc_class"),
            col("ipc.subclass").alias("ipc_subclass"),
            col("ipc.main-group").alias("ipc_main_group"),
            col("ipc.subgroup").alias("ipc_subgroup"),
            col("ipc.classification-value").alias("ipc_value"),
            clean_date(col("ipc.action-date.date")).alias("ipc_action_date"),
            col("ipc.classification-status").alias("ipc_status"),
            col("ipc.classification-level").alias("ipc_level"),
            col("ipc.classification-data-source").alias("ipc_data_source"),
            col("ipc.generating-office.country").alias("ipc_office_country"),
            clean_date(col("ipc.ipc-version-indicator.date")).alias("ipc_version_date"),
            col("ipc.symbol-position").alias("ipc_symbol_position")
        )

        # Add formatted IPC code
        ipc_df = ipc_df.withColumn(
            "ipc_code",
            format_patent_class(
                col("ipc_section"),
                col("ipc_class"),
                col("ipc_subclass"),
                col("ipc_main_group"),
                col("ipc_subgroup")
            )
        )

        # Save individual IPC records
        ipc_individual_df = ipc_df.withColumn("gold_ingestion_date", current_timestamp())
        spark = renamed_df.sparkSession
        merge_or_write_delta(spark, ipc_individual_df, individual_output_path, batch_mode)

        # Group IPC information by publication_number
        ipc_grouped_df = ipc_df.groupBy("publication_number").agg(
            collect_list("ipc_code").alias("ipc_codes"),
            collect_list(
                struct(
                    "ipc_code", "ipc_section", "ipc_class", "ipc_subclass",
                    "ipc_main_group", "ipc_subgroup", "ipc_value", "ipc_action_date",
                    "ipc_status", "ipc_level", "ipc_data_source"
                )
            ).alias("ipc_details")
        )
        
        # Add gold ingestion metadata
        ipc_grouped_df = ipc_grouped_df.withColumn("gold_ingestion_date", current_timestamp())
        
        # Use merge operation for upsert capability
        merge_or_write_delta(spark, ipc_grouped_df, output_path, batch_mode)
        
        return ipc_grouped_df, ipc_individual_df
    except Exception as e:
        print(f"Error processing IPC gold: {str(e)}")
        print(traceback.format_exc())
        raise

def process_inventors_gold(renamed_df, output_path, individual_output_path, batch_mode="append"):
    """Process inventors for gold layer with both grouped and individual records"""
    try:
        # Extract inventors
        inventors_df = renamed_df.select(
            col("publication_number"),
            explode_outer(col("inventors")).alias("inventor")
        ).select(
            col("publication_number"),
            col("inventor.addressbook.first-name").alias("inventor_first_name"),
            col("inventor.addressbook.last-name").alias("inventor_last_name"),
            concat_ws(" ", 
                col("inventor.addressbook.first-name"), 
                col("inventor.addressbook.last-name")
            ).alias("inventor_name"),
            col("inventor.addressbook.address.city").alias("inventor_city"),
            col("inventor.addressbook.address.state").alias("inventor_state"),
            col("inventor.addressbook.address.country").alias("inventor_country")
        )

        # Save individual inventor records
        inventors_individual_df = inventors_df.withColumn("gold_ingestion_date", current_timestamp())
        spark = renamed_df.sparkSession
        merge_or_write_delta(spark, inventors_individual_df, individual_output_path, batch_mode)

        # Group inventors
        inventors_grouped_df = inventors_df.groupBy("publication_number").agg(
            collect_list("inventor_name").alias("inventor_names"),
            collect_list(
                struct(
                    "inventor_name", "inventor_city", "inventor_state", "inventor_country"
                )
            ).alias("inventor_details")
        )
        
        # Add gold ingestion metadata
        inventors_grouped_df = inventors_grouped_df.withColumn("gold_ingestion_date", current_timestamp())
        
        # Use merge operation for upsert capability
        merge_or_write_delta(spark, inventors_grouped_df, output_path, batch_mode)
        
        return inventors_grouped_df, inventors_individual_df
    except Exception as e:
        print(f"Error processing inventors gold: {str(e)}")
        print(traceback.format_exc())
        raise

def process_applicants_gold(renamed_df, output_path, individual_output_path, batch_mode="append"):
    """Process applicants for gold layer with both grouped and individual records"""
    try:
        # Extract applicants
        applicants_df = renamed_df.select(
            col("publication_number"),
            explode_outer(col("applicants")).alias("applicant")
        ).select(
            col("publication_number"),
            col("applicant.addressbook.first-name").alias("applicant_first_name"),
            col("applicant.addressbook.last-name").alias("applicant_last_name"),
            col("applicant.addressbook.orgname").alias("applicant_orgname"),
            when(col("applicant.addressbook.orgname").isNotNull(), 
                 col("applicant.addressbook.orgname"))
            .otherwise(
                concat_ws(" ", 
                    col("applicant.addressbook.first-name"),
                    col("applicant.addressbook.last-name")
                )
            ).alias("applicant_name"),
            col("applicant.addressbook.address.city").alias("applicant_city"),
            col("applicant.addressbook.address.state").alias("applicant_state"),
            col("applicant.addressbook.address.country").alias("applicant_country")
        )

        # Save individual applicant records
        applicants_individual_df = applicants_df.withColumn("gold_ingestion_date", current_timestamp())
        spark = renamed_df.sparkSession
        merge_or_write_delta(spark, applicants_individual_df, individual_output_path, batch_mode)

        # Group applicants
        applicants_grouped_df = applicants_df.groupBy("publication_number").agg(
            collect_list("applicant_name").alias("applicant_names"),
            collect_list(
                struct(
                    "applicant_name", "applicant_city", "applicant_state", "applicant_country"
                )
            ).alias("applicant_details")
        )
        
        # Add gold ingestion metadata
        applicants_grouped_df = applicants_grouped_df.withColumn("gold_ingestion_date", current_timestamp())
        
        # Use merge operation for upsert capability
        merge_or_write_delta(spark, applicants_grouped_df, output_path, batch_mode)
        
        return applicants_grouped_df, applicants_individual_df
    except Exception as e:
        print(f"Error processing applicants gold: {str(e)}")
        print(traceback.format_exc())
        raise

def process_us_applicants_gold(renamed_df, output_path, individual_output_path, batch_mode="append"):
    """Process US applicants for gold layer with both grouped and individual records"""
    try:
        # Extract US applicants
        us_applicants_df = renamed_df.select(
            col("publication_number"),
            explode_outer(col("applicants")).alias("applicant")
        ).select(
            col("publication_number"),
            col("applicant.addressbook.first-name").alias("applicant_first_name"),
            col("applicant.addressbook.last-name").alias("applicant_last_name"),
            col("applicant.addressbook.orgname").alias("applicant_orgname"),
            when(col("applicant.addressbook.orgname").isNotNull(), 
                 col("applicant.addressbook.orgname"))
            .otherwise(
                concat_ws(" ", 
                    col("applicant.addressbook.first-name"),
                    col("applicant.addressbook.last-name")
                )
            ).alias("applicant_name"),
            col("applicant.addressbook.address.city").alias("applicant_city"),
            col("applicant.addressbook.address.state").alias("applicant_state"),
            col("applicant.addressbook.address.country").alias("applicant_country")
        ).filter(col("applicant_country") == "US")  # Filter for US applicants

        # Save individual US applicant records
        us_applicants_individual_df = us_applicants_df.withColumn("gold_ingestion_date", current_timestamp())
        spark = renamed_df.sparkSession
        merge_or_write_delta(spark, us_applicants_individual_df, individual_output_path, batch_mode)

        # Group US applicants
        us_applicants_grouped_df = us_applicants_df.groupBy("publication_number").agg(
            collect_list("applicant_name").alias("us_applicant_names"),
            collect_list(
                struct(
                    "applicant_name", "applicant_city", "applicant_state", "applicant_country"
                )
            ).alias("us_applicant_details")
        )
        
        # Add gold ingestion metadata
        us_applicants_grouped_df = us_applicants_grouped_df.withColumn("gold_ingestion_date", current_timestamp())
        
        # Use merge operation for upsert capability
        merge_or_write_delta(spark, us_applicants_grouped_df, output_path, batch_mode)
        
        return us_applicants_grouped_df, us_applicants_individual_df
    except Exception as e:
        print(f"Error processing US applicants gold: {str(e)}")
        print(traceback.format_exc())
        raise

def process_claims_gold(renamed_df, output_path, individual_output_path, batch_mode="append"):
    """Process claims for gold layer with individual claims as separate rows"""
    try:
        print(f"Starting claims processing")
        spark = renamed_df.sparkSession
        
        # Check if claims field exists
        claims_exist_count = renamed_df.filter(col("claims").isNotNull()).count()
        print(f"Records with non-null claims: {claims_exist_count}")
        
        if claims_exist_count == 0:
            print("WARNING: No claims data found")
            return None, None
        
        # Extract claims
        claims_df = renamed_df.select(
            col("publication_number"),
            explode_outer(col("claims")).alias("claim")
        )
        
        print(f"Extracted {claims_df.count()} claim rows from patents")
        
        # Use the existing extract_claim_text function to get text values
        claim_values_df = extract_claim_text(claims_df)
        
        # Process the extracted values
        from pyspark.sql.window import Window
        from pyspark.sql.functions import row_number, concat_ws
        
        # Group by publication_number and assign a claim number to each claim
        # First convert the value arrays to text
        df_with_text = claim_values_df.withColumn(
            "claim_text",
            when(col("value_array").isNotNull(), 
                 array_join(col("value_array"), " ")
            ).otherwise(lit("No claim text available"))
        )
        
        # Group by publication_number to get complete claims
        claims_by_patent = df_with_text.groupBy("publication_number").agg(
            collect_list("claim_text").alias("claim_texts")
        )
        
        # Explode to get one row per claim again
        claims_exploded = claims_by_patent.select(
            col("publication_number"),
            explode_outer(col("claim_texts")).alias("claim_text")
        )
        
        # Assign sequential numbers to each claim
        window_spec = Window.partitionBy("publication_number").orderBy("claim_text")
        
        claims_individual_df = claims_exploded.withColumn(
            # Assign sequential number to each claim within a patent
            "claim_number", 
            row_number().over(window_spec)
        ).withColumn(
            # Create a unique ID combining publication number and claim number
            "claim_id", 
            concat_ws("_", col("publication_number"), col("claim_number").cast("string"))
        ).withColumn(
            "gold_ingestion_date", 
            current_timestamp()
        )
        
        # Show the schema to confirm both columns exist
        print("Individual claims schema:")
        claims_individual_df.printSchema()
        
        # Write individual claims directly (one row per claim)
        individual_count = claims_individual_df.count()
        print(f"Writing {individual_count} individual claim records to {individual_output_path}")
        claims_individual_df.write.format("delta").mode(batch_mode).save(individual_output_path)
        
        # Create grouped claims (publication_number -> all claims text)
        claims_grouped_df = claims_individual_df.groupBy("publication_number").agg(
            array_join(collect_list("claim_text"), " ").alias("all_claims_text")
        ).withColumn(
            "gold_ingestion_date", 
            current_timestamp()
        )
        
        # Write grouped claims directly
        grouped_count = claims_grouped_df.count()
        print(f"Writing {grouped_count} grouped claim records to {output_path}")
        claims_grouped_df.write.format("delta").mode(batch_mode).save(output_path)
        
        # Check if files were written
        try:
            ind_files = dbutils.fs.ls(individual_output_path)
            grp_files = dbutils.fs.ls(output_path)
            ind_data = [f for f in ind_files if not f.name.startswith('_')]
            grp_data = [f for f in grp_files if not f.name.startswith('_')]
            print(f"Individual claims directory contains {len(ind_data)} data files")
            print(f"Grouped claims directory contains {len(grp_data)} data files")
        except Exception as ls_error:
            print(f"Error checking output: {str(ls_error)}")
        
        return claims_grouped_df, claims_individual_df
        
    except Exception as e:
        print(f"Error in claims processing: {str(e)}")
        import traceback
        print(traceback.format_exc())
        return None, None
    

def process_complete_patents_gold(renamed_df, output_path, batch_mode="append"):
    """Process the complete patent dataframe with all joined components for gold layer"""
    try:
        # Start with patent processing (without adding gold_ingestion_date yet)
        patent_df = renamed_df.select(
            # Basic patent information
            col("publication_number"),
            col("invention_title"),
            array_join(col("abstract_text"), " ").alias("abstract"),
            array_join(col("description_text"), " ").alias("description"),

            # Publication reference
            col("publication_country"),
            clean_date(col("publication_date")).alias("publication_date"),
            col("publication_kind"),
            
            # Application reference
            col("application_country"),
            clean_date(col("application_date")).alias("application_date"),
            col("application_number"),
            col("application_series_code"),
            
            # CPC components individually for later analysis
            col("cpc_section").alias("cpc_section"),
            format_class(col("cpc_class")).alias("cpc_class"),
            col("cpc_subclass").alias("cpc_subclass"),
            format_group(col("cpc_main_group")).alias("cpc_group"),
            format_subgroup(col("cpc_subgroup")).alias("cpc_subgroup"),
            
            # CPC metadata
            clean_date(col("cpc_action_date")).alias("cpc_action_date"),
            col("cpc_data_source"),
            col("cpc_status"),
            col("cpc_value"),
            clean_date(col("cpc_version_date")).alias("cpc_version_date"),
            col("cpc_office_country"),
            col("cpc_scheme_origin"),
            col("cpc_symbol_position"),
            
            # Full CPC code with proper structure (e.g., A01B33/00)
            format_patent_class(
                col("cpc_section"),
                col("cpc_class"),
                col("cpc_subclass"),
                col("cpc_main_group"),
                col("cpc_subgroup")
            ).alias("cpc_main"),
            
            # Hierarchical CPC code for tiered analysis
            concat_ws("", 
                col("cpc_section"),                                  
                format_class(col("cpc_class"))                      
            ).alias("cpc_class_level"),
            
            concat_ws("", 
                col("cpc_section"),                                  
                format_class(col("cpc_class")),                     
                col("cpc_subclass")                                 
            ).alias("cpc_subclass_level")
        )

        # Process claims (without adding gold_ingestion_date yet)
        claims_df = renamed_df.select(
            col("publication_number"),
            explode_outer(col("claims")).alias("claim")
        )

        claim_values_df = extract_claim_text(claims_df)
        claim_values_df = claim_values_df.withColumn(
            "claim_text", 
            when(col("value_array").isNotNull(),
                 array_join(col("value_array"), " ")
            ).otherwise(lit(None))
        )

        claims_grouped_df = claim_values_df.groupBy("publication_number").agg(
            array_join(collect_list("claim_text"), " ").alias("all_claims_text")
        )
        
        # Process IPC (without adding gold_ingestion_date yet)
        ipc_df = renamed_df.select(
            col("publication_number"),
            explode_outer(col("ipc_classification")).alias("ipc")
        ).select(
            col("publication_number"),
            col("ipc.section").alias("ipc_section"),
            col("ipc.class").alias("ipc_class"),
            col("ipc.subclass").alias("ipc_subclass"),
            col("ipc.main-group").alias("ipc_main_group"),
            col("ipc.subgroup").alias("ipc_subgroup"),
            col("ipc.classification-value").alias("ipc_value"),
            clean_date(col("ipc.action-date.date")).alias("ipc_action_date"),
            col("ipc.classification-status").alias("ipc_status"),
            col("ipc.classification-level").alias("ipc_level"),
            col("ipc.classification-data-source").alias("ipc_data_source"),
            col("ipc.generating-office.country").alias("ipc_office_country"),
            clean_date(col("ipc.ipc-version-indicator.date")).alias("ipc_version_date"),
            col("ipc.symbol-position").alias("ipc_symbol_position")
        )

        ipc_df = ipc_df.withColumn(
            "ipc_code",
            format_patent_class(
                col("ipc_section"),
                col("ipc_class"),
                col("ipc_subclass"),
                col("ipc_main_group"),
                col("ipc_subgroup")
            )
        )

        ipc_grouped_df = ipc_df.groupBy("publication_number").agg(
            collect_list("ipc_code").alias("ipc_codes"),
            collect_list(
                struct(
                    "ipc_code", "ipc_section", "ipc_class", "ipc_subclass",
                    "ipc_main_group", "ipc_subgroup", "ipc_value", "ipc_action_date",
                    "ipc_status", "ipc_level", "ipc_data_source"
                )
            ).alias("ipc_details")
        )
        
        # Process inventors (without adding gold_ingestion_date yet)
        inventors_df = renamed_df.select(
            col("publication_number"),
            explode_outer(col("inventors")).alias("inventor")
        ).select(
            col("publication_number"),
            col("inventor.addressbook.first-name").alias("inventor_first_name"),
            col("inventor.addressbook.last-name").alias("inventor_last_name"),
            concat_ws(" ", 
                col("inventor.addressbook.first-name"), 
                col("inventor.addressbook.last-name")
            ).alias("inventor_name"),
            col("inventor.addressbook.address.city").alias("inventor_city"),
            col("inventor.addressbook.address.state").alias("inventor_state"),
            col("inventor.addressbook.address.country").alias("inventor_country")
        )

        inventors_grouped_df = inventors_df.groupBy("publication_number").agg(
            collect_list("inventor_name").alias("inventor_names"),
            collect_list(
                struct(
                    "inventor_name", "inventor_city", "inventor_state", "inventor_country"
                )
            ).alias("inventor_details")
        )
        
        # Process applicants (without adding gold_ingestion_date yet)
        applicants_df = renamed_df.select(
            col("publication_number"),
            explode_outer(col("applicants")).alias("applicant")
        ).select(
            col("publication_number"),
            col("applicant.addressbook.first-name").alias("applicant_first_name"),
            col("applicant.addressbook.last-name").alias("applicant_last_name"),
            col("applicant.addressbook.orgname").alias("applicant_orgname"),
            when(col("applicant.addressbook.orgname").isNotNull(), 
                 col("applicant.addressbook.orgname"))
            .otherwise(
                concat_ws(" ", 
                    col("applicant.addressbook.first-name"),
                    col("applicant.addressbook.last-name")
                )
            ).alias("applicant_name"),
            col("applicant.addressbook.address.city").alias("applicant_city"),
            col("applicant.addressbook.address.state").alias("applicant_state"),
            col("applicant.addressbook.address.country").alias("applicant_country")
        )

        applicants_grouped_df = applicants_df.groupBy("publication_number").agg(
            collect_list("applicant_name").alias("applicant_names"),
            collect_list(
                struct(
                    "applicant_name", "applicant_city", "applicant_state", "applicant_country"
                )
            ).alias("applicant_details")
        )
        
        # Join all components
        complete_patent_df = patent_df.join(
            claims_grouped_df, on="publication_number", how="left"
        ).join(
            ipc_grouped_df, on="publication_number", how="left"
        ).join(
            inventors_grouped_df, on="publication_number", how="left"
        ).join(
            applicants_grouped_df, on="publication_number", how="left"
        )
        
        # Add gold ingestion metadata - only once after joining
        complete_patent_df = complete_patent_df.withColumn("gold_ingestion_date", current_timestamp())
        
        # Use merge operation for upsert capability
        spark = renamed_df.sparkSession
        merge_or_write_delta(spark, complete_patent_df, output_path, batch_mode)
        
        return complete_patent_df
    except Exception as e:
        print(f"Error processing complete patents gold: {str(e)}")
        print(traceback.format_exc())
        raise

def gold_layer_processing():
    """Process silver Delta files into gold Delta tables for each entity"""
    try:
        # Try to create a widget to control reprocessing (only works in interactive mode)
        try:
            dbutils.widgets.dropdown("force_reprocess", "false", ["true", "false"], "Force Reprocessing")
            force_reprocess = dbutils.widgets.get("force_reprocess") == "true"
        except:
            # Default to incremental processing in job mode
            force_reprocess = False
        
        print(f"Force reprocessing mode: {force_reprocess}")
        
        spark = initialize_spark()
        
        # Fixed paths - silver data is directly in patent_data
        silver_path = "/Volumes/nokia-assginment-catalog/silver/patent_data"
        gold_path = "/Volumes/nokia-assginment-catalog/gold"
        checkpoint_location = "/Volumes/nokia-assginment-catalog/checkpoints/checkpoints_data/gold_autoloader/"
        
        # Define gold entity paths - add paths for ungrouped data
        entity_paths = {
            "patents": f"{gold_path}/patents",
            "ipc_classifications": f"{gold_path}/ipc_classifications", 
            "ipc_individual": f"{gold_path}/ipc_individual",  # Added path for individual IPCs
            "inventors": f"{gold_path}/inventors",
            "inventors_individual": f"{gold_path}/inventors_individual",  # Added path for individual inventors
            "applicants": f"{gold_path}/applicants",
            "applicants_individual": f"{gold_path}/applicants_individual",  # Added path for individual applicants
            "us_applicants": f"{gold_path}/us_applicants",
            "us_applicants_individual": f"{gold_path}/us_applicants_individual",  # Added path for individual US applicants
            "claims": f"{gold_path}/claims",
            "claims_individual": f"{gold_path}/claims_individual",  # Added path for individual claims
            "complete_patents": f"{gold_path}/complete_patents"
        }
        
        # Check silver patent_data directory
        try:
            if not check_path_exists(silver_path):
                return False, f"Silver patent_data path does not exist: {silver_path}"
                
            # List batch directories in the patent_data folder
            silver_items = dbutils.fs.ls(silver_path)
            
            # Get the batch directories (batch1, batch2, etc.)
            batch_dirs = [d for d in silver_items if d.isDir() and not d.name.startswith('_')]
            
            if len(batch_dirs) == 0:
                return False, "No batch directories found in silver/patent_data"
                
            # We'll process each batch directory directly
            delta_dirs = batch_dirs
            
        except Exception as e:
            return False, f"Error listing silver patent_data files: {str(e)}"
        
        # Handle checkpoint directory based on force_reprocess flag
        if force_reprocess:
            try:
                dbutils.fs.rm(checkpoint_location, True)
            except:
                pass
            
            try:
                dbutils.fs.mkdirs(checkpoint_location)
            except Exception as e:
                return False, f"Error creating checkpoint directory: {str(e)}"
        else:
            # Just ensure the directory exists
            try:
                if not check_path_exists(checkpoint_location):
                    dbutils.fs.mkdirs(checkpoint_location)
            except Exception as e:
                return False, f"Error checking/creating checkpoint directory: {str(e)}"
        
        # Handle output directories based on force_reprocess flag
        write_mode = "overwrite" if force_reprocess else "append"
        
        for entity, path in entity_paths.items():
            if force_reprocess:
                try:
                    dbutils.fs.rm(path, True)
                except:
                    pass
                
                try:
                                        dbutils.fs.mkdirs(path)
                except Exception as e:
                    return False, f"Error creating output directory for {entity}: {str(e)}"
            else:
                # Just ensure the directory exists
                try:
                    if not check_path_exists(path):
                        dbutils.fs.mkdirs(path)
                except Exception as e:
                    return False, f"Error with output directory for {entity}: {str(e)}"
        
        # Process each batch directory incrementally
        total_processed = 0
        total_errors = 0
        new_data_processed = False  # Track if any new data was processed
        
        for dir_index, dir_item in enumerate(delta_dirs):
            dir_path = dir_item.path
            dir_name = dir_item.name.rstrip('/')  # Remove trailing slash if present
            
            # Use checkpoint location for tracking progress
            file_checkpoint_path = f"{checkpoint_location}/{dir_name}"
            
            print(f"Processing batch directory {dir_index+1}/{len(delta_dirs)}: {dir_name}")
            
            # Check if directory was already processed using checkpoint
            if not force_reprocess and check_checkpoint_exists(spark, dir_name, checkpoint_location):
                print(f"Skipping already processed batch directory (checkpoint found): {dir_name}")
                continue
            
            # If we get here, we're processing new data
            new_data_processed = True
            
            try:
                # Read the Delta files from silver layer
                start_time = time.time()
                try:
                    silver_df = spark.read.format("delta").load(dir_path)
                    record_count = silver_df.count()
                    end_time = time.time()
                    print(f"Loaded {record_count} records from batch directory in {end_time - start_time:.2f} seconds")
                except Exception as e:
                    print(f"Error reading Delta files from batch directory {dir_path}: {str(e)}")
                    print(traceback.format_exc())
                    total_errors += 1
                    continue
                
                # Process for each gold entity
                try:
                    # Use append mode for all but the first batch if it's first run 
                    # or use overwrite for all if force_reprocess is True
                    current_batch_mode = write_mode
                    
                    # Process patents gold entity
                    process_patents_gold(silver_df, entity_paths["patents"], current_batch_mode)
                    
                    # Process IPC classifications gold entity - both grouped and individual
                    process_ipc_gold(silver_df, entity_paths["ipc_classifications"], 
                                     entity_paths["ipc_individual"], current_batch_mode)
                    
                    # Process inventors gold entity - both grouped and individual
                    process_inventors_gold(silver_df, entity_paths["inventors"], 
                                          entity_paths["inventors_individual"], current_batch_mode)
                    
                    # Process applicants gold entity - both grouped and individual
                    process_applicants_gold(silver_df, entity_paths["applicants"], 
                                           entity_paths["applicants_individual"], current_batch_mode)
                    
                    # Process US applicants gold entity - both grouped and individual
                    process_us_applicants_gold(silver_df, entity_paths["us_applicants"], 
                                              entity_paths["us_applicants_individual"], current_batch_mode)
                    
                    # Process claims gold entity - both grouped and individual
                    process_claims_gold(silver_df, entity_paths["claims"], 
                                       entity_paths["claims_individual"], current_batch_mode)
                    
                    # Process complete patents gold entity
                    process_complete_patents_gold(silver_df, entity_paths["complete_patents"], current_batch_mode)
                    
                    # If first batch was processed successfully, switch to append mode for remaining batches
                    if write_mode == "overwrite":
                        write_mode = "append"
                    
                except Exception as e:
                    print(f"Error during entity processing for {dir_name}: {str(e)}")
                    print(traceback.format_exc())
                    total_errors += 1
                    continue
                
                # Create checkpoint file to mark successful processing with fixed schema
                try:
                    # Ensure checkpoint directory exists
                    parent_dir = os.path.dirname(file_checkpoint_path)
                    if not check_path_exists(parent_dir):
                        dbutils.fs.mkdirs(parent_dir)
                    
                    # Create checkpoint with proper schema
                    create_checkpoint_file(spark, file_checkpoint_path, dir_name)
                except Exception as checkpoint_error:
                    print(f"Warning: Could not create checkpoint file: {str(checkpoint_error)}")
                    print(traceback.format_exc())
                
                total_processed += 1
                print(f"Successfully processed {dir_name} to all gold entities")
                
            except Exception as e:
                print(f"Error processing {dir_name}: {str(e)}")
                print(traceback.format_exc())
                total_errors += 1
        
        print(f"Completed gold layer processing. Directories processed: {total_processed}, Errors: {total_errors}")
        
        # Update processing metadata to let the next step know if new data was processed
        update_processing_metadata(spark, new_data_processed)
        
        # Store new_data_processed in a special table to communicate with the next step
        try:
            processing_status_path = "/Volumes/nokia-assginment-catalog/processing_status"
            if not check_path_exists(processing_status_path):
                dbutils.fs.mkdirs(processing_status_path)
                
            # Create a simple DataFrame with the processing status
            status_df = spark.createDataFrame([(new_data_processed,)], ["new_data_processed"])
            status_df.write.format("delta").mode("overwrite").save(f"{processing_status_path}/status")
            
            print(f"Stored processing status: new_data_processed={new_data_processed}")
        except Exception as e:
            print(f"Warning: Could not store processing status: {str(e)}")
            print(traceback.format_exc())
        
        # Return success with stats including whether new data was processed
        return True, {"processed": total_processed, "errors": total_errors, "new_data_processed": new_data_processed}
    
    except Exception as e:
        print(f"Error in gold layer processing: {str(e)}")
        print(traceback.format_exc())
        return False, str(e)

# Main function for running the processing standalone
def main():
    success, result = gold_layer_processing()
    if success:
        if isinstance(result, dict):
            print(f"Gold layer processing completed successfully.")
            print(f"Processed: {result.get('processed', 0)} batches")
            print(f"Errors: {result.get('errors', 0)}")
            print(f"New data processed: {result.get('new_data_processed', False)}")
            
            # Also return this as a proper JSON result
            import json
            result_json = json.dumps(result)
            dbutils.notebook.exit(result_json)
        else:
            print(f"Gold layer processing completed with message: {result}")
            dbutils.notebook.exit(json.dumps({"success": True, "message": str(result), "new_data_processed": False}))
    else:
        print(f"Gold layer processing failed: {result}")
        dbutils.notebook.exit(json.dumps({"success": False, "error": str(result)}))

main()