In [None]:
from delta.tables import DeltaTable
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, DateType, BooleanType, DecimalType, DoubleType

In [None]:
schema = StructType([
    StructField('TPAccountName', StringType(), False),
    StructField('EnrollmentCustomerName', StringType(), True),
    StructField('SubscriptionGUID', StringType(), False),
    StructField('SubscriptionName', StringType(), False),
    StructField('FiscalMonth', DateType(), False),
    StructField('$ Organic ACR', DoubleType(), False),
    StructField('$ Average Daily Organic ACR', DoubleType(), False),
    StructField('ServiceLevel1', StringType(), False),
    StructField('ServiceLevel2', StringType(), False),
    StructField('ServiceLevel4', StringType(), False),
    StructField('FiscalYear', StringType(), False),
])

In [None]:
df = spark.read.format("parquet").load("Files/*.parquet", schema=schema) \
    .withColumn("$ Organic ACR", col("$ Organic ACR").cast(DecimalType(19, 4))) \
    .withColumn("$ Average Daily Organic ACR", col("$ Average Daily Organic ACR").cast(DecimalType(19, 4)))

In [None]:
# Rename columns to Delta Lake compatible names
df_renamed = df \
    .withColumnRenamed('TPAccountName', 'tp_account_name') \
    .withColumnRenamed('EnrollmentCustomerName', 'enrollment_customer_name') \
    .withColumnRenamed('SubscriptionGUID', 'subscription_guid') \
    .withColumnRenamed('SubscriptionName', 'subscription_name') \
    .withColumnRenamed('FiscalMonth', 'fiscal_month') \
    .withColumnRenamed('$ Organic ACR', 'organic_acr_usd') \
    .withColumnRenamed('$ Average Daily Organic ACR', 'avg_daily_organic_acr_usd') \
    .withColumnRenamed('ServiceLevel1', 'service_level_1') \
    .withColumnRenamed('ServiceLevel2', 'service_level_2') \
    .withColumnRenamed('ServiceLevel4', 'service_level_4') \
    .withColumnRenamed('FiscalYear', 'fiscal_year')


In [None]:
# Define your target table path
target_table_path = "Tables/acr"

# Check if the Delta table already exists
if DeltaTable.isDeltaTable(spark, target_table_path):
    # Table exists - perform merge
    delta_table = DeltaTable.forPath(spark, target_table_path)
    
    # Define the merge condition (adjust based on your business keys)
    merge_condition = """
        target.tp_account_name = source.tp_account_name
        AND target.enrollment_customer_name = source.enrollment_customer_name
        AND target.service_level_1 = source.service_level_1
        AND target.service_level_2 = source.service_level_2
        AND target.service_level_4 = source.service_level_4
        AND target.subscription_guid = source.subscription_guid 
        AND target.fiscal_month = source.fiscal_month
    """
    
    delta_table.alias("target").merge(
        df_renamed.alias("source"),
        merge_condition
    ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
    
else:
    # Table doesn't exist - create it
    df_renamed.write.mode("overwrite").format("delta").save(target_table_path)
    print("Created new Delta table")