In [0]:
dbutils.widgets.text(name="env", defaultValue="", label="Enter environment")
env = dbutils.widgets.get("env")
env

'dev'

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType

In [0]:
%run "./paths"

('abfss://landing@dlsunitycat.dfs.core.windows.net/',
 'abfss://medallion@dlsunitycat.dfs.core.windows.net/bronze',
 'abfss://medallion@dlsunitycat.dfs.core.windows.net/silver',
 'abfss://medallion@dlsunitycat.dfs.core.windows.net/gold')

In [0]:
catalog = f'{env}_catalog'
table = 'plans'
plans_table = f'{catalog}{bronze_tables[table]}'
plans_table


'dev_catalog.bronze.plans_bronze'

In [0]:
plans = spark.table(plans_table)

In [0]:
plans_1 = (
    plans.filter(F.col("dental_only_plan") != "Yes")
    .filter((F.col("metal_level").isNotNull()) & (F.col("plan_type") != "Indemnity"))
    .withColumn("new_plan", F.lower("new_plan"))
    .withColumn(
        "metal_level", F.regexp_replace("metal_level", "Expanded Bronze", "Bronze")
    )
    .withColumn("metal_level", F.lower("metal_level"))
    .drop("dental_only_plan", "date_ingested")
)

def find_bad_plans(df, col):
    df = (
        df.select("plan_id", col)
        .dropDuplicates()
        .groupBy("plan_id")
        .count()
        .filter(F.col("count") > 1)
        .select(F.col("plan_id").alias("bad_plans"))
    )
    return df


def combine_bad_plans(df_1, df_2):
    return df_1.unionByName(df_2)

bad_plans_df = combine_bad_plans(
    find_bad_plans(plans_1, "metal_level"), find_bad_plans(plans_1, "plan_type")
)

plans_2 = (
    plans_1.join(
        F.broadcast(bad_plans_df),
        plans_1["plan_id"] == bad_plans_df["bad_plans"],
        how="left",
    )
    .filter(F.col("bad_plans").isNull())
    .drop("bad_plans")
)

plans_3 = (
    plans_2.dropDuplicates()
    .withColumn("pandemic_era", F.when(F.col("business_year") < 2021, "pre-covid").otherwise("post-covid"))
    .withColumn("date_ingested", F.current_timestamp())
)

plans_3.write.format("delta").mode("overwrite").save(silver_paths[table])

