## Business Central merge data notebook
In this part the files in the delta folder will be merge with the Lakehouse table.
- It iterates first on the folders to append to the existing table.
- After that is will remove all duplicates by sorting the table. 
- At last it will remove all deleted records inside the table that are deleted in Business Central

Please change the parameters in the first part.

In [None]:
%%pyspark
# settings
spark.conf.set("spark.sql.parquet.vorder.enabled","true")
spark.conf.set("spark.microsoft.delta.optimizewrite.enabled","true")
spark.conf.set("spark.sql.parquet.filterPushdown", "true")
spark.conf.set("spark.sql.parquet.mergeSchema", "false")
spark.conf.set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
spark.conf.set("spark.sql.delta.commitProtocol.enabled", "true")
spark.conf.set("spark.sql.analyzer.maxIterations", "999")
spark.conf.set("spark.sql.caseSensitive", "true")
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

# file paths
folder_path_spark = 'Files/deltas/' # this is mostly the default
folder_path_json = '/lakehouse/default/Files/' # this is mostly the default
folder_path_reset = '/lakehouse/default/Files/reset/' # this is mostly the default
folder_path = '/lakehouse/default/Files/deltas/' # this is mostly the default

# parameters
workspace = 'businessCentral' #can also be a GUID
Lakehouse = 'businessCentral'; #can also be a GUID - if you do please add back-quotes around the GUID for example: '`GUID`'
Remove_delta = True; #will remove the delta files if everything is processed
Drop_table_if_mismatch = False; #option to drop the table if json file has different columns then in the table
no_Partition = 258 #how many partition is used in the dataframe, a good starting point might be 2-4 partitions per CPU core in your Spark cluster
DecimalFormat = 'float' #how to format the decimal numbers, can be 'float' or 'decimal(10,3)'. If you change this it will be a breaking change for the table
DateTimeFormat = 'timestamp' #how to format the datetime, can be 'timestamp' or 'date'. If you change this it will be a breaking change for the table
schema_name = "" #for if you are using a lakehouse based on a schema
max_parallel_tables = 4 #number of tables to process in parallel (adjust based on cluster size)
use_delta_merge = False #use Delta MERGE operations instead of union+overwrite (EXPERIMENTAL - set to False if errors occur)

In [None]:
%%pyspark
import os
import json
from pyspark.sql.types import *

def is_string_empty(s):
    return not bool(s.strip())

if Drop_table_if_mismatch:

    def count_keys(obj):  
        if isinstance(obj, dict):  
            return len(obj) + sum(count_keys(v) for v in obj.values())  
        if isinstance(obj, list):  
            return sum(count_keys(v) for v in obj)  
        return 0  

    for filename in os.listdir(folder_path_json):
        if "manifest" not in filename: # exclude the manifest files
            if filename.endswith(".cdm.json"):
                table_name = filename.replace("-","")
                table_name = table_name.replace(".cdm.json","")

                if table_name in [t.name for t in spark.catalog.listTables()]:
                    #count number of columns in excisting table
                    if is_string_empty(schema_name):
                        SQL_Query = "SELECT * FROM " + Lakehouse + "." + table_name + " LIMIT 1"
                    else:
                        SQL_Query = "SELECT * FROM " + Lakehouse + "." + schema_name +"." + table_name + " LIMIT 1"
                    df = spark.sql(SQL_Query)
                    num_cols_table = len(df.columns)                

                    #count number of columns in json file                
                    f = open(folder_path_json + filename)
                    schema = json.load(f)
                    has_attributes = schema["definitions"][0]["hasAttributes"]  
                    num_names = len(has_attributes)

                    if num_cols_table != num_names:
                        if is_string_empty(schema_name):
                            df = spark.sql("DROP TABLE IF EXISTS "+ Lakehouse + "." + table_name)
                        else:
                            df = spark.sql("DROP TABLE IF EXISTS "+ Lakehouse + "." + schema_name +"." + table_name)                       

In [None]:
%%pyspark
import os
import glob
from pyspark.sql.types import *

def is_string_empty(s):
    return not bool(s.strip())

if os.path.exists(folder_path_reset):
    for filename in os.listdir(folder_path_reset):
        # Remove the table
        table_name = filename.replace("-","")
        table_name = table_name.replace(".txt","")

        if is_string_empty(schema_name):
            df = spark.sql("DROP TABLE IF EXISTS "+ Lakehouse + "." + table_name)
        else:
            df = spark.sql("DROP TABLE IF EXISTS "+ Lakehouse + "." + schema_name +"." + table_name) 

        try:  
            os.remove(folder_path_reset + '/' + filename)  
        except OSError as e:  # this would catch any error when trying to delete the file  
            print(f"Error: {filename} : {e.strerror}")

In [None]:
%%pyspark
import json
import os
import glob
from pyspark.sql.types import *
from pyspark.sql.utils import AnalysisException
from pyspark.sql.functions import col, desc, expr
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime

def is_string_empty(s):
    return not bool(s.strip())

def get_table_name(schema_name_val, lakehouse_val, table_val):
    """Helper to construct fully qualified table name"""
    if is_string_empty(schema_name_val):
        return f"{lakehouse_val}.{table_val}"
    else:
        return f"{lakehouse_val}.{schema_name_val}.{table_val}"

def convert_datatype(col_type, col_name):
    """Convert CDM data types to Spark data types"""
    type_mapping = {
        "String": "string",
        "Guid": "string",
        "Code": "string",
        "Option": "string",
        "Date": "date",
        "Time": "string",
        "DateTime": DateTimeFormat,
        "Duration": "string",
        "Decimal": DecimalFormat,
        "Boolean": "boolean",
        "Integer": "int",
        "Int64": "int",
        "Int32": "int"
    }
    
    # Special handling for audit fields
    if col_name in ['SystemModifiedAt-2000000003', 'SystemCreatedAt-2000000001']:
        return "timestamp"
    
    return type_mapping.get(col_type, "string")

def process_table(entry_name, files_in_dir):
    """Process a single table - designed to run in parallel"""
    try:
        start_time = datetime.now()
        table_name = entry_name.replace("-","")
        print(f"[{datetime.now().strftime('%H:%M:%S')}] Processing table: {table_name}")
        
        # Read new data with schema inference disabled for performance
        df_new = spark.read.option("minPartitions", no_Partition).format("csv").option("header","true").option("inferSchema","false").load(folder_path_spark + entry_name +"/*")
        
        # Check if dataframe has data
        if df_new.rdd.isEmpty():
            print(f"[{datetime.now().strftime('%H:%M:%S')}] ⚠ Skipping {table_name} - no data found")
            return {"table": table_name, "status": "skipped", "reason": "no data"}
        
        # Load and parse schema
        with open(folder_path_json + entry_name +".cdm.json") as f:
            schema = json.load(f)
        
        column_names = [attr["name"] for attr in schema["definitions"][0]["hasAttributes"]] 
        column_types = [attr['dataFormat'] for attr in schema["definitions"][0]["hasAttributes"]]
        
        ContainsCompany = '$Company' in column_names
        
        # Apply schema transformations
        for col_name, col_type in zip(column_names, column_types):
            spark_type = convert_datatype(col_type, col_name)
            df_new = df_new.withColumn(col_name, col(col_name).cast(spark_type))
        
        # Create temp view for new data
        temp_view = f"temp_{table_name}_{os.getpid()}"
        df_new.createOrReplaceTempView(temp_view)
        
        full_table_name = get_table_name(schema_name, Lakehouse, table_name)
        
        # Check if table exists using try/except instead of catalog listing (thread-safe)
        table_exists = False
        try:
            spark.sql(f"DESCRIBE TABLE {full_table_name}").collect()
            table_exists = True
        except:
            table_exists = False
        
        if table_exists and use_delta_merge:
            # Use DELTA MERGE - much faster than union + overwrite
            if ContainsCompany:
                key_condition = "target.`$Company` = source.`$Company` AND target.`systemId-2000000000` = source.`systemId-2000000000`"
            else:
                key_condition = "target.`systemId-2000000000` = source.`systemId-2000000000`"
            
            # Build MERGE statement with proper formatting
            merge_sql = f"MERGE INTO {full_table_name} as target USING {temp_view} as source ON {key_condition} "
            merge_sql += "WHEN MATCHED AND source.`SystemCreatedAt-2000000001` IS NULL THEN DELETE "
            merge_sql += "WHEN MATCHED AND source.`SystemModifiedAt-2000000003` > target.`SystemModifiedAt-2000000003` THEN UPDATE SET * "
            merge_sql += "WHEN NOT MATCHED AND source.`SystemCreatedAt-2000000001` IS NOT NULL THEN INSERT *"
            
            try:
                spark.sql(merge_sql)
            except Exception as merge_error:
                print(f"MERGE SQL failed for {table_name}.")
                print(f"Full table name: {full_table_name}")
                print(f"Temp view: {temp_view}")
                print(f"Key condition: {key_condition}")
                print(f"SQL length: {len(merge_sql)}")
                print(f"SQL: {merge_sql}")
                raise merge_error
            
        elif table_exists:
            # Fallback to union approach if merge is disabled
            df_old = spark.sql(f"SELECT * FROM {full_table_name}")
            df_combined = df_new.union(df_old)
            
            # Filter out deletes
            df_deletes = df_combined.filter(col('SystemCreatedAt-2000000001').isNull())
            if ContainsCompany:
                df_combined = df_combined.join(df_deletes, ['$Company','systemId-2000000000'], 'leftanti')
            else:
                df_combined = df_combined.join(df_deletes, ['systemId-2000000000'], 'leftanti')
            
            # Remove duplicates
            if ContainsCompany:
                df_combined = df_combined.orderBy('$Company','systemId-2000000000', desc('SystemModifiedAt-2000000003'))
                df_combined = df_combined.dropDuplicates(['$Company','systemId-2000000000'])
            else:
                df_combined = df_combined.orderBy('systemId-2000000000', desc('SystemModifiedAt-2000000003'))
                df_combined = df_combined.dropDuplicates(['systemId-2000000000'])
            
            # Coalesce instead of repartition for better performance
            df_combined = df_combined.coalesce(no_Partition // 2)
            df_combined.write.mode("overwrite").format("delta").saveAsTable(full_table_name)
        else:
            # New table - just write
            df_new.coalesce(no_Partition // 4).write.mode("overwrite").format("delta").saveAsTable(full_table_name)
        
        # Clean up temp view
        spark.catalog.dropTempView(temp_view)
        
        # Delete delta files if configured
        if Remove_delta:
            for filename in files_in_dir:
                try:  
                    os.remove(filename)  
                except OSError as e:  
                    print(f"Error deleting {filename}: {e.strerror}")
        
        duration = (datetime.now() - start_time).total_seconds()
        print(f"[{datetime.now().strftime('%H:%M:%S')}] ✓ Completed {table_name} in {duration:.1f}s")
        return {"table": table_name, "status": "success", "duration": duration}
        
    except Exception as e:
        import traceback
        error_details = traceback.format_exc()
        print(f"[{datetime.now().strftime('%H:%M:%S')}] ✗ Error processing {table_name}: {str(e)}")
        print(f"Full traceback:\n{error_details}")
        return {"table": table_name, "status": "error", "error": str(e)}

# Collect all tables to process
tables_to_process = []
for entry in os.scandir(folder_path):
    if entry.is_dir():
        files_in_dir = glob.glob(folder_path + entry.name + '/*')
        if files_in_dir:
            tables_to_process.append((entry.name, files_in_dir))

print(f"Found {len(tables_to_process)} tables to process")
print(f"Processing with max {max_parallel_tables} tables in parallel")
print("=" * 60)

# Process tables in parallel
start_time = datetime.now()
results = []

with ThreadPoolExecutor(max_workers=max_parallel_tables) as executor:
    future_to_table = {executor.submit(process_table, entry_name, files): entry_name 
                       for entry_name, files in tables_to_process}
    
    for future in as_completed(future_to_table):
        results.append(future.result())

# Summary
total_duration = (datetime.now() - start_time).total_seconds()
success_count = sum(1 for r in results if r["status"] == "success")
error_count = len(results) - success_count

print("=" * 60)
print(f"Processing completed in {total_duration:.1f}s")
print(f"Success: {success_count} | Errors: {error_count}")
if error_count > 0:
    print("\nFailed tables:")
    for r in results:
        if r["status"] == "error":
            print(f"  - {r['table']}: {r['error']}")