Carolinas Medical Center Cleaning

In [None]:
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, upper, when, lit, trim, length

session = Session.builder.getOrCreate()

df = session.table('"HEALTH_NAV"."STAGE"."CAROLINAS_MEDICAL_CENTER"')

df = df.filter(upper(col('"code|2|type"')) == "CPT")

columns_mapping = {
    'DESCRIPTION': 'DESCRIPTION',
    '"code|2"': 'CODE',
    '"code|2|type"': 'CODE_TYPE',
    'PAYER_NAME': 'PAYER_NAME',
    'PLAN_NAME': 'PLAN_NAME',
    '"standard_charge|negotiated_dollar"': 'STANDARD_CHARGE_DOLLAR',
    '"standard_charge|negotiated_percentage"': 'STANDARD_CHARGE_PERCENTAGE',
    '"standard_charge|min"': 'MINIMUM_CHARGE',
    '"standard_charge|max"': 'MAXIMUM_CHARGE'
}


existing_cols = df.schema.names
existing_columns = {k: v for k, v in columns_mapping.items() if k in existing_cols}

if not existing_columns:
    raise ValueError(
        "None of the expected columns were found in the source table.\n"
        f"Available columns are:\n{existing_cols}"
    )


df = df.select([col(k).alias(v) for k, v in existing_columns.items()])


numeric_columns = [v for k, v in existing_columns.items() 
                  if any(x in k.lower() for x in ['charge', 'amount', 'percentage', 'min', 'max'])]


for column in existing_columns.values():
    
    is_blank = (col(column).isNull()) | (length(trim(col(column))) == 0)
    
    if column in numeric_columns:
        # For numeric columns: convert blank to 0, otherwise keep original value
        df = df.withColumn(column, 
                         when(is_blank, lit(0))
                         .otherwise(col(column)))
    else:
        # For non-numeric columns: convert blank to '0'
        df = df.withColumn(column,
                          when(is_blank, lit('0'))
                          .otherwise(col(column)))


if 'STANDARD_CHARGE_DOLLAR' in existing_columns.values() and 'STANDARD_CHARGE_PERCENTAGE' in existing_columns.values():
    df = df.filter(
        (col('STANDARD_CHARGE_DOLLAR') != 0) | 
        (col('STANDARD_CHARGE_PERCENTAGE') != 0))
else:
    print("Warning: Could not filter null charge rows - required columns not found")


column_defs = ",\n".join([f'"{col}" STRING' for col in existing_columns.values()])

create_stmt = f"""
    CREATE OR REPLACE TABLE "HEALTH_NAV"."CORE"."CAROLINAS_MEDICAL_CENTER" (
        {column_defs}
    )
"""
session.sql(create_stmt).collect()

df.write.save_as_table(
    '"HEALTH_NAV"."CORE"."CAROLINAS_MEDICAL_CENTER"',
    mode='overwrite'
)

print("Data successfully written to HEALTH_NAV.CORE.CAROLINAS_MEDICAL_CENTER")

St Rose Dominican Rose De Lima

In [None]:
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, upper, when, lit, trim, length, to_decimal


session = Session.builder.getOrCreate()


df = session.table('"HEALTH_NAV"."STAGE"."ST_ROSE_DOMINICAN_HOSPITAL_ROSE_DE_LIMA"')


df = df.filter(upper(col('"Code Type"')) == "CPT")


columns_mapping = {
    'DESCRIPTION': 'DESCRIPTION',
    'CODE': 'CODE',
    '"Code Type"': 'CODE_TYPE',
    '"Payer Name"': 'PAYER_NAME',
    '"Plan Name"': 'PLAN_NAME',
    '"Standard Charge Dollar"': 'STANDARD_CHARGE_DOLLAR',
    '"Standard Charge Percentage"': 'STANDARD_CHARGE_PERCENTAGE',
    '"Minimum Charge"': 'MINIMUM_CHARGE',
    '"Maximum Charge"': 'MAXIMUM_CHARGE'
}

existing_cols = df.schema.names
existing_columns = {k: v for k, v in columns_mapping.items() if k in existing_cols}


if '"Standard Charge Dollar"' not in existing_cols:
    df = df.withColumn('STANDARD_CHARGE_DOLLAR', lit(0.0))
    existing_columns['STANDARD_CHARGE_DOLLAR'] = 'STANDARD_CHARGE_DOLLAR'

if not existing_columns:
    raise ValueError(
        "None of the expected columns were found in the source table.\n"
        f"Available columns are:\n{existing_cols}"
    )


df = df.select([col(k).alias(v) if k in existing_cols else lit(None).alias(v) 
               for k, v in columns_mapping.items()])


numeric_columns = ['STANDARD_CHARGE_DOLLAR', 'STANDARD_CHARGE_PERCENTAGE', 
                  'MINIMUM_CHARGE', 'MAXIMUM_CHARGE']
numeric_columns = [col for col in numeric_columns if col in df.columns]


for column in df.columns:
    df = df.withColumn(column, 
                      when(trim(upper(col(column))) == 'N/A', lit(None))
                      .otherwise(col(column)))


for column in df.columns:
    is_blank = (col(column).isNull()) | (length(trim(col(column))) == 0)
    
    if column in numeric_columns:
        df = df.withColumn(column, 
                         when(is_blank, lit(None))
                         .otherwise(col(column)))
    else:
        df = df.withColumn(column,
                          when(is_blank, lit('0'))
                          .otherwise(col(column)))


for column in numeric_columns:
    df = df.withColumn(column, 
                      to_decimal(col(column), 38, 2).cast("FLOAT"))

for column in numeric_columns:
    df = df.withColumn(column,
                      when(col(column).isNull(), lit(0.0))
                      .otherwise(col(column)))


if 'STANDARD_CHARGE_PERCENTAGE' in numeric_columns and 'MAXIMUM_CHARGE' in numeric_columns:
    has_percentage = (col('STANDARD_CHARGE_DOLLAR') == 0) & (col('STANDARD_CHARGE_PERCENTAGE') > 0)
    calculated_dollar = (col('STANDARD_CHARGE_PERCENTAGE') * col('MAXIMUM_CHARGE') * 0.01)
    
    df = df.withColumn('STANDARD_CHARGE_DOLLAR',
                      when(has_percentage, calculated_dollar)
                      .otherwise(col('STANDARD_CHARGE_DOLLAR')))


column_defs = []
for col_name in columns_mapping.values():
    if col_name in numeric_columns:
        column_defs.append(f'"{col_name}" FLOAT')
    else:
        column_defs.append(f'"{col_name}" STRING')

create_stmt = """CREATE OR REPLACE TABLE "HEALTH_NAV"."CORE"."ST_ROSE_DOMINICAN_HOSPITAL_ROSE_DE_LIMA" (
""" + ",\n".join(column_defs) + "\n)"

session.sql(create_stmt).collect()


df.write.save_as_table(
    '"HEALTH_NAV"."CORE"."ST_ROSE_DOMINICAN_HOSPITAL_ROSE_DE_LIMA"',
    mode='overwrite'
)

print("Data successfully written to HEALTH_NAV.CORE.ST_ROSE_DOMINICAN_HOSPITAL_ROSE_DE_LIMA")
print(f"Final columns: {df.columns}")

St Rose Dominican Siena Cleaning

In [None]:
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, upper, when, lit, trim, length, to_decimal


session = Session.builder.getOrCreate()


df = session.table('"HEALTH_NAV"."STAGE"."ST_ROSE_DOMINICAN_HOSPITAL_SIENA"')


df = df.filter(upper(col('"Code Type"')) == "CPT")

columns_mapping = {
    'DESCRIPTION': 'DESCRIPTION',
    'CODE': 'CODE',
    '"Code Type"': 'CODE_TYPE',
    '"Payer Name"': 'PAYER_NAME',
    '"Plan Name"': 'PLAN_NAME',
    '"Standard Charge Dollar"': 'STANDARD_CHARGE_DOLLAR',
    '"Standard Charge Percentage"': 'STANDARD_CHARGE_PERCENTAGE',
    '"Minimum Charge"': 'MINIMUM_CHARGE',
    '"Maximum Charge"': 'MAXIMUM_CHARGE'
}

existing_cols = df.schema.names
existing_columns = {k: v for k, v in columns_mapping.items() if k in existing_cols}


if '"Standard Charge Dollar"' not in existing_cols:
    df = df.withColumn('STANDARD_CHARGE_DOLLAR', lit(0.0))
    existing_columns['STANDARD_CHARGE_DOLLAR'] = 'STANDARD_CHARGE_DOLLAR'

if not existing_columns:
    raise ValueError(
        "None of the expected columns were found in the source table.\n"
        f"Available columns are:\n{existing_cols}"
    )


df = df.select([col(k).alias(v) if k in existing_cols else lit(None).alias(v) 
               for k, v in columns_mapping.items()])


numeric_columns = ['STANDARD_CHARGE_DOLLAR', 'STANDARD_CHARGE_PERCENTAGE', 
                  'MINIMUM_CHARGE', 'MAXIMUM_CHARGE']
numeric_columns = [col for col in numeric_columns if col in df.columns]


for column in df.columns:
    df = df.withColumn(column, 
                      when(trim(upper(col(column))) == 'N/A', lit(None))
                      .otherwise(col(column)))


for column in df.columns:
    is_blank = (col(column).isNull()) | (length(trim(col(column))) == 0)
    
    if column in numeric_columns:
        df = df.withColumn(column, 
                         when(is_blank, lit(None))
                         .otherwise(col(column)))
    else:
        df = df.withColumn(column,
                          when(is_blank, lit('0'))
                          .otherwise(col(column)))


for column in numeric_columns:
    df = df.withColumn(column, 
                      to_decimal(col(column), 38, 2).cast("FLOAT"))


for column in numeric_columns:
    df = df.withColumn(column,
                      when(col(column).isNull(), lit(0.0))
                      .otherwise(col(column)))


if 'STANDARD_CHARGE_PERCENTAGE' in numeric_columns and 'MAXIMUM_CHARGE' in numeric_columns:
    has_percentage = (col('STANDARD_CHARGE_DOLLAR') == 0) & (col('STANDARD_CHARGE_PERCENTAGE') > 0)
    calculated_dollar = (col('STANDARD_CHARGE_PERCENTAGE') * col('MAXIMUM_CHARGE') * 0.01)
    
    df = df.withColumn('STANDARD_CHARGE_DOLLAR',
                      when(has_percentage, calculated_dollar)
                      .otherwise(col('STANDARD_CHARGE_DOLLAR')))


column_defs = []
for col_name in columns_mapping.values():
    if col_name in numeric_columns:
        column_defs.append(f'"{col_name}" FLOAT')
    else:
        column_defs.append(f'"{col_name}" STRING')

create_stmt = """CREATE OR REPLACE TABLE "HEALTH_NAV"."CORE"."ST_ROSE_DOMINICAN_HOSPITAL_SIENA" (
""" + ",\n".join(column_defs) + "\n)"

session.sql(create_stmt).collect()


df.write.save_as_table(
    '"HEALTH_NAV"."CORE"."ST_ROSE_DOMINICAN_HOSPITAL_SIENA"',
    mode='overwrite'
)

print("Data successfully written to HEALTH_NAV.CORE.ST_ROSE_DOMINICAN_HOSPITAL_SIENA")
print(f"Final columns: {df.columns}")

In [None]:
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, upper, when, lit, trim, length


session = Session.builder.getOrCreate()


df = session.table('"HEALTH_NAV"."STAGE"."LINDEN_OAKS_HOSPITAL"')

df = df.filter(upper(col('"code|2|type"')) == "CPT")


columns_mapping = {
    'DESCRIPTION': 'DESCRIPTION',
    '"code|2"': 'CODE',
    '"code|2|type"': 'CODE_TYPE',
    'PAYER_NAME': 'PAYER_NAME',
    'PLAN_NAME': 'PLAN_NAME',
    '"standard_charge|negotiated_dollar"': 'STANDARD_CHARGE_DOLLAR',
    '"standard_charge|negotiated_percentage"': 'STANDARD_CHARGE_PERCENTAGE',
    '"standard_charge|min"': 'MINIMUM_CHARGE',
    '"standard_charge|max"': 'MAXIMUM_CHARGE'
}


existing_cols = df.schema.names
existing_columns = {k: v for k, v in columns_mapping.items() if k in existing_cols}

if not existing_columns:
    raise ValueError(
        "None of the expected columns were found in the source table.\n"
        f"Available columns are:\n{existing_cols}"
    )

df = df.select([col(k).alias(v) for k, v in existing_columns.items()])

numeric_columns = [v for k, v in existing_columns.items() 
                  if any(x in k.lower() for x in ['charge', 'amount', 'percentage', 'min', 'max'])]

for column in existing_columns.values():
    is_blank = (col(column).isNull()) | (length(trim(col(column))) == 0)
    
    if column in numeric_columns:
        df = df.withColumn(column, 
                         when(is_blank, lit(0))
                         .otherwise(col(column)))
    else:
       
        df = df.withColumn(column,
                          when(is_blank, lit('0'))
                          .otherwise(col(column)))


if 'STANDARD_CHARGE_DOLLAR' in existing_columns.values() and 'STANDARD_CHARGE_PERCENTAGE' in existing_columns.values():
    df = df.filter(
        (col('STANDARD_CHARGE_DOLLAR') != 0) | 
        (col('STANDARD_CHARGE_PERCENTAGE') != 0))
else:
    print("Warning: Could not filter null charge rows - required columns not found")


column_defs = ",\n".join([f'"{col}" STRING' for col in existing_columns.values()])

create_stmt = f"""
    CREATE OR REPLACE TABLE "HEALTH_NAV"."CORE"."LINDEN_OAKS_HOSPITAL" (
        {column_defs}
    )
"""
session.sql(create_stmt).collect()


df.write.save_as_table(
    '"HEALTH_NAV"."CORE"."LINDEN_OAKS_HOSPITAL"',
    mode='overwrite'
)

print("Data successfully written to HEALTH_NAV.CORE.LINDEN_OAKS_HOSPITAL")

NOVANT_HEALTH_MINT_HILL_MEDICAL_CENTER

In [None]:
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, upper, when, lit, trim, length, to_decimal


session = Session.builder.getOrCreate()


df = session.table('"HEALTH_NAV"."STAGE"."NOVANT_HEALTH_MINT_HILL_MEDICAL_CENTER"')


df = df.filter(upper(col('"Code Type"')) == "CPT")


columns_mapping = {
    'DESCRIPTION': 'DESCRIPTION',
    'CODE': 'CODE',
    '"Code Type"': 'CODE_TYPE',
    '"Payer Name"': 'PAYER_NAME',
    '"Plan Name"': 'PLAN_NAME',
    '"Standard Charge"': 'STANDARD_CHARGE_DOLLAR',
    '"Standard Charge Percentage"': 'STANDARD_CHARGE_PERCENTAGE',
    '"Minimum Charge"': 'MINIMUM_CHARGE',
    '"Maximum Charge"': 'MAXIMUM_CHARGE'
}
existing_cols = df.schema.names
existing_columns = {k: v for k, v in columns_mapping.items() if k in existing_cols}

if '"Standard Charge"' not in existing_cols:
    df = df.withColumn('STANDARD_CHARGE_DOLLAR', lit(0.0))
    existing_columns['STANDARD_CHARGE_DOLLAR'] = 'STANDARD_CHARGE_DOLLAR'

if not existing_columns:
    raise ValueError(
        "None of the expected columns were found in the source table.\n"
        f"Available columns are:\n{existing_cols}"
    )
df = df.select([col(k).alias(v) if k in existing_cols else lit(None).alias(v) 
               for k, v in columns_mapping.items()])
df.limit(5).show()

numeric_columns = ['STANDARD_CHARGE_DOLLAR', 'STANDARD_CHARGE_PERCENTAGE', 
                  'MINIMUM_CHARGE', 'MAXIMUM_CHARGE']
numeric_columns = [col for col in numeric_columns if col in df.columns]


for column in df.columns:
    df = df.withColumn(column, 
                      when(trim(upper(col(column))) == 'N/A', lit(None))
                      .otherwise(col(column)))


for column in df.columns:
    is_blank = (col(column).isNull()) | (length(trim(col(column))) == 0)
    
    if column in numeric_columns:
        df = df.withColumn(column, 
                         when(is_blank, lit(None))
                         .otherwise(col(column)))
    else:
        df = df.withColumn(column,
                          when(is_blank, lit('0'))
                          .otherwise(col(column)))

for column in numeric_columns:
    df = df.withColumn(column, 
                      to_decimal(col(column), 38, 2).cast("FLOAT"))

for column in numeric_columns:
    df = df.withColumn(column,
                      when(col(column).isNull(), lit(0.0))
                      .otherwise(col(column)))


column_defs = []
for col_name in columns_mapping.values():
    if col_name in numeric_columns:
        column_defs.append(f'"{col_name}" FLOAT')
    else:
        column_defs.append(f'"{col_name}" STRING')

create_stmt = """CREATE OR REPLACE TABLE "HEALTH_NAV"."CORE"."NOVANT_HEALTH_MINT_HILL_MEDICAL_CENTER" (
""" + ",\n".join(column_defs) + "\n)"

session.sql(create_stmt).collect()


df.write.save_as_table(
    '"HEALTH_NAV"."CORE"."NOVANT_HEALTH_MINT_HILL_MEDICAL_CENTER"',
    mode='overwrite'
)

print("Data successfully written to HEALTH_NAV.CORE.NOVANT_HEALTH_MINT_HILL_MEDICAL_CENTER")
print(f"Final columns: {df.columns}")

Novant_health_charlotte_orthopedic_hospital

In [None]:
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, upper, when, lit, trim, length, to_decimal


session = Session.builder.getOrCreate()


df = session.table('"HEALTH_NAV"."STAGE"."NOVANT_HEALTH_CHARLOTTE_ORTHOPEDIC_HOSPITAL"')


df = df.filter(upper(col('"Code Type"')) == "CPT")

columns_mapping = {
    'DESCRIPTION': 'DESCRIPTION',
    'CODE': 'CODE',
    '"Code Type"': 'CODE_TYPE',
    '"Payer Name"': 'PAYER_NAME',
    '"Plan Name"': 'PLAN_NAME',
    '"Standard Charge"': 'STANDARD_CHARGE_DOLLAR',
    '"Standard Charge Percentage"': 'STANDARD_CHARGE_PERCENTAGE',
    '"Minimum Charge"': 'MINIMUM_CHARGE',
    '"Maximum Charge"': 'MAXIMUM_CHARGE'
}
existing_cols = df.schema.names
existing_columns = {k: v for k, v in columns_mapping.items() if k in existing_cols}

if '"Standard Charge"' not in existing_cols:
    df = df.withColumn('STANDARD_CHARGE_DOLLAR', lit(0.0))
    existing_columns['STANDARD_CHARGE_DOLLAR'] = 'STANDARD_CHARGE_DOLLAR'

if not existing_columns:
    raise ValueError(
        "None of the expected columns were found in the source table.\n"
        f"Available columns are:\n{existing_cols}"
    )
df = df.select([col(k).alias(v) if k in existing_cols else lit(None).alias(v) 
               for k, v in columns_mapping.items()])
df.limit(5).show()

numeric_columns = ['STANDARD_CHARGE_DOLLAR', 'STANDARD_CHARGE_PERCENTAGE', 
                  'MINIMUM_CHARGE', 'MAXIMUM_CHARGE']
numeric_columns = [col for col in numeric_columns if col in df.columns]


for column in df.columns:
    df = df.withColumn(column, 
                      when(trim(upper(col(column))) == 'N/A', lit(None))
                      .otherwise(col(column)))


for column in df.columns:
    is_blank = (col(column).isNull()) | (length(trim(col(column))) == 0)
    
    if column in numeric_columns:
        df = df.withColumn(column, 
                         when(is_blank, lit(None))
                         .otherwise(col(column)))
    else:
        df = df.withColumn(column,
                          when(is_blank, lit('0'))
                          .otherwise(col(column)))

for column in numeric_columns:
    df = df.withColumn(column, 
                      to_decimal(col(column), 38, 2).cast("FLOAT"))

for column in numeric_columns:
    df = df.withColumn(column,
                      when(col(column).isNull(), lit(0.0))
                      .otherwise(col(column)))


column_defs = []
for col_name in columns_mapping.values():
    if col_name in numeric_columns:
        column_defs.append(f'"{col_name}" FLOAT')
    else:
        column_defs.append(f'"{col_name}" STRING')

create_stmt = """CREATE OR REPLACE TABLE "HEALTH_NAV"."CORE"."NOVANT_HEALTH_CHARLOTTE_ORTHOPEDIC_HOSPITAL" (
""" + ",\n".join(column_defs) + "\n)"

session.sql(create_stmt).collect()


df.write.save_as_table(
    '"HEALTH_NAV"."CORE"."NOVANT_HEALTH_CHARLOTTE_ORTHOPEDIC_HOSPITAL"',
    mode='overwrite'
)

print("Data successfully written to HEALTH_NAV.CORE.NOVANT_HEALTH_CHARLOTTE_ORTHOPEDIC_HOSPITAL")
print(f"Final columns: {df.columns}")

NOVANT_HEALTH_MATTHEWS_MEDICAL_CENTER

In [None]:
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, upper, when, lit, trim, length, to_decimal

session = Session.builder.getOrCreate()

df = session.table('"HEALTH_NAV"."STAGE"."NOVANT_HEALTH_MATTHEWS_MEDICAL_CENTER"')

df = df.filter(upper(col('"Code Type"')) == "CPT")

columns_mapping = {
    'DESCRIPTION': 'DESCRIPTION',
    'CODE': 'CODE',
    '"Code Type"': 'CODE_TYPE',
    '"Payer Name"': 'PAYER_NAME',
    '"Plan Name"': 'PLAN_NAME',
    '"Standard Charge"': 'STANDARD_CHARGE_DOLLAR',
    '"Standard Charge Percentage"': 'STANDARD_CHARGE_PERCENTAGE',
    '"Minimum Charge"': 'MINIMUM_CHARGE',
    '"Maximum Charge"': 'MAXIMUM_CHARGE'
}
existing_cols = df.schema.names
existing_columns = {k: v for k, v in columns_mapping.items() if k in existing_cols}

if '"Standard Charge"' not in existing_cols:
    df = df.withColumn('STANDARD_CHARGE_DOLLAR', lit(0.0))
    existing_columns['STANDARD_CHARGE_DOLLAR'] = 'STANDARD_CHARGE_DOLLAR'

if not existing_columns:
    raise ValueError(
        "None of the expected columns were found in the source table.\n"
        f"Available columns are:\n{existing_cols}"
    )
df = df.select([col(k).alias(v) if k in existing_cols else lit(None).alias(v) 
               for k, v in columns_mapping.items()])
df.limit(5).show()

numeric_columns = ['STANDARD_CHARGE_DOLLAR', 'STANDARD_CHARGE_PERCENTAGE', 
                  'MINIMUM_CHARGE', 'MAXIMUM_CHARGE']
numeric_columns = [col for col in numeric_columns if col in df.columns]

#
for column in df.columns:
    df = df.withColumn(column, 
                      when(trim(upper(col(column))) == 'N/A', lit(None))
                      .otherwise(col(column)))


for column in df.columns:
    is_blank = (col(column).isNull()) | (length(trim(col(column))) == 0)
    
    if column in numeric_columns:
        df = df.withColumn(column, 
                         when(is_blank, lit(None))
                         .otherwise(col(column)))
    else:
        df = df.withColumn(column,
                          when(is_blank, lit('0'))
                          .otherwise(col(column)))

for column in numeric_columns:
    df = df.withColumn(column, 
                      to_decimal(col(column), 38, 2).cast("FLOAT"))

for column in numeric_columns:
    df = df.withColumn(column,
                      when(col(column).isNull(), lit(0.0))
                      .otherwise(col(column)))

#
column_defs = []
for col_name in columns_mapping.values():
    if col_name in numeric_columns:
        column_defs.append(f'"{col_name}" FLOAT')
    else:
        column_defs.append(f'"{col_name}" STRING')

create_stmt = """CREATE OR REPLACE TABLE "HEALTH_NAV"."CORE"."NOVANT_HEALTH_MATTHEWS_MEDICAL_CENTER" (
""" + ",\n".join(column_defs) + "\n)"

session.sql(create_stmt).collect()


df.write.save_as_table(
    '"HEALTH_NAV"."CORE"."NOVANT_HEALTH_MATTHEWS_MEDICAL_CENTER"',
    mode='overwrite'
)

print("Data successfully written to HEALTH_NAV.CORE.NOVANT_HEALTH_MATTHEWS_MEDICAL_CENTER")
print(f"Final columns: {df.columns}")

NOVANT_HEALTH_HUNTERSVILLE_MEDICAL_CENTER

In [None]:
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, upper, when, lit, trim, length, to_decimal


session = Session.builder.getOrCreate()


df = session.table('"HEALTH_NAV"."STAGE"."NOVANT_HEALTH_HUNTERSVILLE_MEDICAL_CENTER"')


df = df.filter(upper(col('"Code Type"')) == "CPT")


columns_mapping = {
    'DESCRIPTION': 'DESCRIPTION',
    'CODE': 'CODE',
    '"Code Type"': 'CODE_TYPE',
    '"Payer Name"': 'PAYER_NAME',
    '"Plan Name"': 'PLAN_NAME',
    '"Standard Charge"': 'STANDARD_CHARGE_DOLLAR',
    '"Standard Charge Percentage"': 'STANDARD_CHARGE_PERCENTAGE',
    '"Minimum Charge"': 'MINIMUM_CHARGE',
    '"Maximum Charge"': 'MAXIMUM_CHARGE'
}
existing_cols = df.schema.names
existing_columns = {k: v for k, v in columns_mapping.items() if k in existing_cols}

if '"Standard Charge"' not in existing_cols:
    df = df.withColumn('STANDARD_CHARGE_DOLLAR', lit(0.0))
    existing_columns['STANDARD_CHARGE_DOLLAR'] = 'STANDARD_CHARGE_DOLLAR'

if not existing_columns:
    raise ValueError(
        "None of the expected columns were found in the source table.\n"
        f"Available columns are:\n{existing_cols}"
    )
df = df.select([col(k).alias(v) if k in existing_cols else lit(None).alias(v) 
               for k, v in columns_mapping.items()])
df.limit(5).show()

numeric_columns = ['STANDARD_CHARGE_DOLLAR', 'STANDARD_CHARGE_PERCENTAGE', 
                  'MINIMUM_CHARGE', 'MAXIMUM_CHARGE']
numeric_columns = [col for col in numeric_columns if col in df.columns]

for column in df.columns:
    df = df.withColumn(column, 
                      when(trim(upper(col(column))) == 'N/A', lit(None))
                      .otherwise(col(column)))

for column in df.columns:
    is_blank = (col(column).isNull()) | (length(trim(col(column))) == 0)
    
    if column in numeric_columns:
        df = df.withColumn(column, 
                         when(is_blank, lit(None))
                         .otherwise(col(column)))
    else:
        df = df.withColumn(column,
                          when(is_blank, lit('0'))
                          .otherwise(col(column)))
for column in numeric_columns:
    df = df.withColumn(column, 
                      to_decimal(col(column), 38, 2).cast("FLOAT"))
for column in numeric_columns:
    df = df.withColumn(column,
                      when(col(column).isNull(), lit(0.0))
                      .otherwise(col(column)))

#
column_defs = []
for col_name in columns_mapping.values():
    if col_name in numeric_columns:
        column_defs.append(f'"{col_name}" FLOAT')
    else:
        column_defs.append(f'"{col_name}" STRING')

create_stmt = """CREATE OR REPLACE TABLE "HEALTH_NAV"."CORE"."NOVANT_HEALTH_HUNTERSVILLE_MEDICAL_CENTER" (
""" + ",\n".join(column_defs) + "\n)"

session.sql(create_stmt).collect()


df.write.save_as_table(
    '"HEALTH_NAV"."CORE"."NOVANT_HEALTH_HUNTERSVILLE_MEDICAL_CENTER"',
    mode='overwrite'
)

print("Data successfully written to HEALTH_NAV.CORE.NOVANT_HEALTH_HUNTERSVILLE_MEDICAL_CENTER")
print(f"Final columns: {df.columns}")

NOVANT_HEALTH_BALLANTYNE_MEDICAL_CENTER

In [None]:
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, upper, when, lit, trim, length, to_decimal


session = Session.builder.getOrCreate()


df = session.table('"HEALTH_NAV"."STAGE"."NOVANT_HEALTH_BALLANTYNE_MEDICAL_CENTER"')


df = df.filter(upper(col('"Code Type"')) == "CPT")


columns_mapping = {
    'DESCRIPTION': 'DESCRIPTION',
    'CODE': 'CODE',
    '"Code Type"': 'CODE_TYPE',
    '"Payer Name"': 'PAYER_NAME',
    '"Plan Name"': 'PLAN_NAME',
    '"Standard Charge"': 'STANDARD_CHARGE_DOLLAR',
    '"Standard Charge Percentage"': 'STANDARD_CHARGE_PERCENTAGE',
    '"Minimum Charge"': 'MINIMUM_CHARGE',
    '"Maximum Charge"': 'MAXIMUM_CHARGE'
}
existing_cols = df.schema.names
existing_columns = {k: v for k, v in columns_mapping.items() if k in existing_cols}

if '"Standard Charge"' not in existing_cols:
    df = df.withColumn('STANDARD_CHARGE_DOLLAR', lit(0.0))
    existing_columns['STANDARD_CHARGE_DOLLAR'] = 'STANDARD_CHARGE_DOLLAR'

if not existing_columns:
    raise ValueError(
        "None of the expected columns were found in the source table.\n"
        f"Available columns are:\n{existing_cols}"
    )
df = df.select([col(k).alias(v) if k in existing_cols else lit(None).alias(v) 
               for k, v in columns_mapping.items()])
df.limit(5).show()

numeric_columns = ['STANDARD_CHARGE_DOLLAR', 'STANDARD_CHARGE_PERCENTAGE', 
                  'MINIMUM_CHARGE', 'MAXIMUM_CHARGE']
numeric_columns = [col for col in numeric_columns if col in df.columns]


for column in df.columns:
    df = df.withColumn(column, 
                      when(trim(upper(col(column))) == 'N/A', lit(None))
                      .otherwise(col(column)))


for column in df.columns:
    is_blank = (col(column).isNull()) | (length(trim(col(column))) == 0)
    
    if column in numeric_columns:
        df = df.withColumn(column, 
                         when(is_blank, lit(None))
                         .otherwise(col(column)))
    else:
        df = df.withColumn(column,
                          when(is_blank, lit('0'))
                          .otherwise(col(column)))

for column in numeric_columns:
    df = df.withColumn(column, 
                      to_decimal(col(column), 38, 2).cast("FLOAT"))

for column in numeric_columns:
    df = df.withColumn(column,
                      when(col(column).isNull(), lit(0.0))
                      .otherwise(col(column)))

#
column_defs = []
for col_name in columns_mapping.values():
    if col_name in numeric_columns:
        column_defs.append(f'"{col_name}" FLOAT')
    else:
        column_defs.append(f'"{col_name}" STRING')

create_stmt = """CREATE OR REPLACE TABLE "HEALTH_NAV"."CORE"."NOVANT_HEALTH_BALLANTYNE_MEDICAL_CENTER" (
""" + ",\n".join(column_defs) + "\n)"

session.sql(create_stmt).collect()


df.write.save_as_table(
    '"HEALTH_NAV"."CORE"."NOVANT_HEALTH_BALLANTYNE_MEDICAL_CENTER"',
    mode='overwrite'
)

print("Data successfully written to HEALTH_NAV.CORE.NOVANT_HEALTH_BALLANTYNE_MEDICAL_CENTER")
print(f"Final columns: {df.columns}")

NOVANT_HEALTH_PRESBYTERIAN_MEDICAL_CENTER

In [None]:
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, upper, when, lit, trim, length, to_decimal


session = Session.builder.getOrCreate()

df = session.table('"HEALTH_NAV"."STAGE"."NOVANT_HEALTH_PRESBYTERIAN_MEDICAL_CENTER"')

df = df.filter(upper(col('"Code Type"')) == "CPT")

columns_mapping = {
    'DESCRIPTION': 'DESCRIPTION',
    'CODE': 'CODE',
    '"Code Type"': 'CODE_TYPE',
    '"Payer Name"': 'PAYER_NAME',
    '"Plan Name"': 'PLAN_NAME',
    '"Standard Charge"': 'STANDARD_CHARGE_DOLLAR',
    '"Standard Charge Percentage"': 'STANDARD_CHARGE_PERCENTAGE',
    '"Minimum Charge"': 'MINIMUM_CHARGE',
    '"Maximum Charge"': 'MAXIMUM_CHARGE'
}
existing_cols = df.schema.names
existing_columns = {k: v for k, v in columns_mapping.items() if k in existing_cols}

if '"Standard Charge"' not in existing_cols:
    df = df.withColumn('STANDARD_CHARGE_DOLLAR', lit(0.0))
    existing_columns['STANDARD_CHARGE_DOLLAR'] = 'STANDARD_CHARGE_DOLLAR'

if not existing_columns:
    raise ValueError(
        "None of the expected columns were found in the source table.\n"
        f"Available columns are:\n{existing_cols}"
    )
df = df.select([col(k).alias(v) if k in existing_cols else lit(None).alias(v) 
               for k, v in columns_mapping.items()])
df.limit(5).show()

numeric_columns = ['STANDARD_CHARGE_DOLLAR', 'STANDARD_CHARGE_PERCENTAGE', 
                  'MINIMUM_CHARGE', 'MAXIMUM_CHARGE']
numeric_columns = [col for col in numeric_columns if col in df.columns]


for column in df.columns:
    df = df.withColumn(column, 
                      when(trim(upper(col(column))) == 'N/A', lit(None))
                      .otherwise(col(column)))


for column in df.columns:
    is_blank = (col(column).isNull()) | (length(trim(col(column))) == 0)
    
    if column in numeric_columns:
        df = df.withColumn(column, 
                         when(is_blank, lit(None))
                         .otherwise(col(column)))
    else:
        df = df.withColumn(column,
                          when(is_blank, lit('0'))
                          .otherwise(col(column)))

for column in numeric_columns:
    df = df.withColumn(column, 
                      to_decimal(col(column), 38, 2).cast("FLOAT"))

for column in numeric_columns:
    df = df.withColumn(column,
                      when(col(column).isNull(), lit(0.0))
                      .otherwise(col(column)))

#
column_defs = []
for col_name in columns_mapping.values():
    if col_name in numeric_columns:
        column_defs.append(f'"{col_name}" FLOAT')
    else:
        column_defs.append(f'"{col_name}" STRING')

create_stmt = """CREATE OR REPLACE TABLE "HEALTH_NAV"."CORE"."NOVANT_HEALTH_PRESBYTERIAN_MEDICAL_CENTER" (
""" + ",\n".join(column_defs) + "\n)"

session.sql(create_stmt).collect()


df.write.save_as_table(
    '"HEALTH_NAV"."CORE"."NOVANT_HEALTH_PRESBYTERIAN_MEDICAL_CENTER"',
    mode='overwrite'
)

print("Data successfully written to HEALTH_NAV.CORE.NOVANT_HEALTH_PRESBYTERIAN_MEDICAL_CENTER")
print(f"Final columns: {df.columns}")

UT_HEALTH_HENDERSON

In [None]:
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, upper, when, lit, trim, length


session = Session.builder.getOrCreate()

df = session.table('"HEALTH_NAV"."STAGE"."UT_HEALTH_HENDERSON"')

df = df.filter(upper(col('"code|2|type"')) == "CPT")


columns_mapping = {
    'DESCRIPTION': 'DESCRIPTION',
    '"code|2"': 'CODE',
    '"code|2|type"': 'CODE_TYPE',
    'PAYER_NAME': 'PAYER_NAME',
    'PLAN_NAME': 'PLAN_NAME',
    '"standard_charge|negotiated_dollar"': 'STANDARD_CHARGE_DOLLAR',
    '"standard_charge|negotiated_percentage"': 'STANDARD_CHARGE_PERCENTAGE',
    '"standard_charge|min"': 'MINIMUM_CHARGE',
    '"standard_charge|max"': 'MAXIMUM_CHARGE'
}

actual_cols_normalized = {col_name.lower().replace('"', ''): col_name for col_name in df.schema.names}


existing_columns = {}
for k, v in columns_mapping.items():
    normalized_key = k.lower().replace('"', '')
    if normalized_key in actual_cols_normalized:
        actual_col_name = actual_cols_normalized[normalized_key]
        existing_columns[actual_col_name] = v


if not existing_columns:
    raise ValueError(
        "None of the expected columns were found in the source table.\n"
        f"Available columns are:\n{existing_cols}"
    )

df = df.select([col(k).alias(v) for k, v in existing_columns.items()])


numeric_columns = [v for k, v in existing_columns.items() 
                  if any(x in k.lower() for x in ['charge', 'amount', 'percentage', 'min', 'max'])]


for column in existing_columns.values():
    
    is_blank = (col(column).isNull()) | (length(trim(col(column))) == 0)
    
    if column in numeric_columns:
        
        df = df.withColumn(column, 
                         when(is_blank, lit(0))
                         .otherwise(col(column)))
    else:
        
        df = df.withColumn(column,
                          when(is_blank, lit('0'))
                          .otherwise(col(column)))


if 'STANDARD_CHARGE_DOLLAR' in existing_columns.values() and 'STANDARD_CHARGE_PERCENTAGE' in existing_columns.values():
    df = df.filter(
        (col('STANDARD_CHARGE_DOLLAR') != 0) | 
        (col('STANDARD_CHARGE_PERCENTAGE') != 0))
else:
    print("Warning: Could not filter null charge rows - required columns not found")


if 'STANDARD_CHARGE_PERCENTAGE' in numeric_columns and 'MAXIMUM_CHARGE' in numeric_columns:
    has_percentage = (col('STANDARD_CHARGE_DOLLAR') == 0) & (col('STANDARD_CHARGE_PERCENTAGE') > 0)
    calculated_dollar = (col('STANDARD_CHARGE_PERCENTAGE') * col('MAXIMUM_CHARGE') * 0.01)
    
    df = df.withColumn('STANDARD_CHARGE_DOLLAR',
                      when(has_percentage, calculated_dollar)
                      .otherwise(col('STANDARD_CHARGE_DOLLAR')))
    

# Step 9: Prepare CREATE TABLE statement with cleaned column names
column_defs = ",\n".join([f'"{col}" STRING' for col in existing_columns.values()])

create_stmt = f"""
    CREATE OR REPLACE TABLE "HEALTH_NAV"."CORE"."UT_HEALTH_HENDERSON" (
        {column_defs}
    )
"""
session.sql(create_stmt).collect()

# Step 10: Write to the destination table
df.write.save_as_table(
    '"HEALTH_NAV"."CORE"."UT_HEALTH_HENDERSON"',
    mode='overwrite'
)

print("Data successfully written to HEALTH_NAV.CORE.UT_HEALTH_HENDERSON")

Elmhurst memorial hospital

In [None]:
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, upper, when, lit, trim, length, to_decimal


session = Session.builder.getOrCreate()


df = session.table('"HEALTH_NAV"."STAGE"."ELMHURST_MEMORIAL_HOSPITAL"')


df = df.filter(upper(col('"Code Type"')) == "CPT")


columns_mapping = {
    'DESCRIPTION': 'DESCRIPTION',
    'CODE': 'CODE',
    '"Code Type"': 'CODE_TYPE',
    '"Payer Name"': 'PAYER_NAME',
    '"Plan Name"': 'PLAN_NAME',
    '"Standard Charge Dollar"': 'STANDARD_CHARGE_DOLLAR',
    '"Standard Charge Percentage"': 'STANDARD_CHARGE_PERCENTAGE',
    '"Minimum Charge"': 'MINIMUM_CHARGE',
    '"Maximum Charge"': 'MAXIMUM_CHARGE'
}


existing_cols = df.schema.names
existing_columns = {k: v for k, v in columns_mapping.items() if k in existing_cols}


if '"Standard Charge Dollar"' not in existing_cols and '"Standard Charge"' not in existing_cols:
    df = df.withColumn('STANDARD_CHARGE_DOLLAR', lit(0.0))
    existing_columns['STANDARD_CHARGE_DOLLAR'] = 'STANDARD_CHARGE_DOLLAR'


df = df.select([col(k).alias(v) if k in existing_cols else lit(None).alias(v)
               for k, v in columns_mapping.items()])


numeric_columns = ['STANDARD_CHARGE_DOLLAR', 'STANDARD_CHARGE_PERCENTAGE',
                   'MINIMUM_CHARGE', 'MAXIMUM_CHARGE']
numeric_columns = [c for c in numeric_columns if c in df.columns]


for column in df.columns:
    df = df.withColumn(column,
                       when(trim(upper(col(column))) == 'N/A', lit(None))
                       .otherwise(col(column)))


for column in df.columns:
    is_blank = (col(column).isNull()) | (length(trim(col(column))) == 0)
    if column in numeric_columns:
        df = df.withColumn(column, when(is_blank, lit(None)).otherwise(col(column)))
    else:
        df = df.withColumn(column, when(is_blank, lit('0')).otherwise(col(column)))


for column in numeric_columns:
    df = df.withColumn(column, to_decimal(col(column), 38, 2).cast("FLOAT"))


for column in numeric_columns:
    df = df.withColumn(column, when(col(column).isNull(), lit(0.0)).otherwise(col(column)))


if 'STANDARD_CHARGE_PERCENTAGE' in df.columns and 'MAXIMUM_CHARGE' in df.columns:
    df = df.withColumn(
        'STANDARD_CHARGE_DOLLAR',
        when(
            (col('STANDARD_CHARGE_DOLLAR') == 0) &
            (col('STANDARD_CHARGE_PERCENTAGE') > 0) &
            (col('MAXIMUM_CHARGE') > 0),
            col('STANDARD_CHARGE_PERCENTAGE') * col('MAXIMUM_CHARGE') * 0.01
        ).otherwise(col('STANDARD_CHARGE_DOLLAR'))
    )


column_defs = []
for col_name in columns_mapping.values():
    if col_name in numeric_columns:
        column_defs.append(f'"{col_name}" FLOAT')
    else:
        column_defs.append(f'"{col_name}" STRING')

create_stmt = """CREATE OR REPLACE TABLE "HEALTH_NAV"."CORE"."ELMHURST_MEMORIAL_HOSPITAL" (\n""" + ",\n".join(column_defs) + "\n)"

session.sql(create_stmt).collect()


df.write.save_as_table(
    '"HEALTH_NAV"."CORE"."ELMHURST_MEMORIAL_HOSPITAL"',
    mode='overwrite'
)

print("Data successfully written to HEALTH_NAV.CORE.ELMHURST_MEMORIAL_HOSPITAL")
print(f"Final columns: {df.columns}")


Edward Hospital

In [None]:
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, upper, when, lit, trim, length, to_decimal


session = Session.builder.getOrCreate()


df = session.table('"HEALTH_NAV"."STAGE"."EDWARD_HOSPITAL"')


df = df.filter(upper(col('"Code Type"')) == "CPT")


columns_mapping = {
    'DESCRIPTION': 'DESCRIPTION',
    'CODE': 'CODE',
    '"Code Type"': 'CODE_TYPE',
    '"Payer Name"': 'PAYER_NAME',
    '"Plan Name"': 'PLAN_NAME',
    '"Standard Charge Dollar"': 'STANDARD_CHARGE_DOLLAR',
    '"Standard Charge Percentage"': 'STANDARD_CHARGE_PERCENTAGE',
    '"Minimum Charge"': 'MINIMUM_CHARGE',
    '"Maximum Charge"': 'MAXIMUM_CHARGE'
}


existing_cols = df.schema.names
existing_columns = {k: v for k, v in columns_mapping.items() if k in existing_cols}


if '"Standard Charge Dollar"' not in existing_cols and '"Standard Charge"' not in existing_cols:
    df = df.withColumn('STANDARD_CHARGE_DOLLAR', lit(0.0))
    existing_columns['STANDARD_CHARGE_DOLLAR'] = 'STANDARD_CHARGE_DOLLAR'


df = df.select([col(k).alias(v) if k in existing_cols else lit(None).alias(v)
               for k, v in columns_mapping.items()])


numeric_columns = ['STANDARD_CHARGE_DOLLAR', 'STANDARD_CHARGE_PERCENTAGE',
                   'MINIMUM_CHARGE', 'MAXIMUM_CHARGE']
numeric_columns = [c for c in numeric_columns if c in df.columns]


for column in df.columns:
    df = df.withColumn(column,
                       when(trim(upper(col(column))) == 'N/A', lit(None))
                       .otherwise(col(column)))


for column in df.columns:
    is_blank = (col(column).isNull()) | (length(trim(col(column))) == 0)
    if column in numeric_columns:
        df = df.withColumn(column, when(is_blank, lit(None)).otherwise(col(column)))
    else:
        df = df.withColumn(column, when(is_blank, lit('0')).otherwise(col(column)))


for column in numeric_columns:
    df = df.withColumn(column, to_decimal(col(column), 38, 2).cast("FLOAT"))


for column in numeric_columns:
    df = df.withColumn(column, when(col(column).isNull(), lit(0.0)).otherwise(col(column)))


if 'STANDARD_CHARGE_PERCENTAGE' in df.columns and 'MAXIMUM_CHARGE' in df.columns:
    df = df.withColumn(
        'STANDARD_CHARGE_DOLLAR',
        when(
            (col('STANDARD_CHARGE_DOLLAR') == 0) &
            (col('STANDARD_CHARGE_PERCENTAGE') > 0) &
            (col('MAXIMUM_CHARGE') > 0),
            col('STANDARD_CHARGE_PERCENTAGE') * col('MAXIMUM_CHARGE') * 0.01
        ).otherwise(col('STANDARD_CHARGE_DOLLAR'))
    )


column_defs = []
for col_name in columns_mapping.values():
    if col_name in numeric_columns:
        column_defs.append(f'"{col_name}" FLOAT')
    else:
        column_defs.append(f'"{col_name}" STRING')

create_stmt = """CREATE OR REPLACE TABLE "HEALTH_NAV"."CORE"."EDWARD_HOSPITAL" (\n""" + ",\n".join(column_defs) + "\n)"

session.sql(create_stmt).collect()

# Step 15: Write to destination
df.write.save_as_table(
    '"HEALTH_NAV"."CORE"."EDWARD_HOSPITAL"',
    mode='overwrite'
)

print("Data successfully written to HEALTH_NAV.CORE.EDWARD_HOSPITAL")
print(f"Final columns: {df.columns}")


Linden Oaks Hospital

In [None]:
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, upper, when, lit, trim, length, to_decimal


session = Session.builder.getOrCreate()


df = session.table('"HEALTH_NAV"."STAGE"."LINDEN_OAKS_HOSPITAL"')


df = df.filter(upper(col('"Code Type"')) == "CPT")


columns_mapping = {
    'DESCRIPTION': 'DESCRIPTION',
    'CODE': 'CODE',
    '"Code Type"': 'CODE_TYPE',
    '"Payer Name"': 'PAYER_NAME',
    '"Plan Name"': 'PLAN_NAME',
    '"Standard Charge Dollar"': 'STANDARD_CHARGE_DOLLAR',
    '"Standard Charge Percentage"': 'STANDARD_CHARGE_PERCENTAGE',
    '"Minimum Charge"': 'MINIMUM_CHARGE',
    '"Maximum Charge"': 'MAXIMUM_CHARGE'
}


existing_cols = df.schema.names
existing_columns = {k: v for k, v in columns_mapping.items() if k in existing_cols}


if '"Standard Charge Dollar"' not in existing_cols and '"Standard Charge"' not in existing_cols:
    df = df.withColumn('STANDARD_CHARGE_DOLLAR', lit(0.0))
    existing_columns['STANDARD_CHARGE_DOLLAR'] = 'STANDARD_CHARGE_DOLLAR'


df = df.select([col(k).alias(v) if k in existing_cols else lit(None).alias(v)
               for k, v in columns_mapping.items()])


numeric_columns = ['STANDARD_CHARGE_DOLLAR', 'STANDARD_CHARGE_PERCENTAGE',
                   'MINIMUM_CHARGE', 'MAXIMUM_CHARGE']
numeric_columns = [c for c in numeric_columns if c in df.columns]


for column in df.columns:
    df = df.withColumn(column,
                       when(trim(upper(col(column))) == 'N/A', lit(None))
                       .otherwise(col(column)))


for column in df.columns:
    is_blank = (col(column).isNull()) | (length(trim(col(column))) == 0)
    if column in numeric_columns:
        df = df.withColumn(column, when(is_blank, lit(None)).otherwise(col(column)))
    else:
        df = df.withColumn(column, when(is_blank, lit('0')).otherwise(col(column)))


for column in numeric_columns:
    df = df.withColumn(column, to_decimal(col(column), 38, 2).cast("FLOAT"))


for column in numeric_columns:
    df = df.withColumn(column, when(col(column).isNull(), lit(0.0)).otherwise(col(column)))


if 'STANDARD_CHARGE_PERCENTAGE' in df.columns and 'MAXIMUM_CHARGE' in df.columns:
    df = df.withColumn(
        'STANDARD_CHARGE_DOLLAR',
        when(
            (col('STANDARD_CHARGE_DOLLAR') == 0) &
            (col('STANDARD_CHARGE_PERCENTAGE') > 0) &
            (col('MAXIMUM_CHARGE') > 0),
            col('STANDARD_CHARGE_PERCENTAGE') * col('MAXIMUM_CHARGE') * 0.01
        ).otherwise(col('STANDARD_CHARGE_DOLLAR'))
    )


column_defs = []
for col_name in columns_mapping.values():
    if col_name in numeric_columns:
        column_defs.append(f'"{col_name}" FLOAT')
    else:
        column_defs.append(f'"{col_name}" STRING')

create_stmt = """CREATE OR REPLACE TABLE "HEALTH_NAV"."CORE"."LINDEN_OAKS_HOSPITAL" (\n""" + ",\n".join(column_defs) + "\n)"

session.sql(create_stmt).collect()


df.write.save_as_table(
    '"HEALTH_NAV"."CORE"."LINDEN_OAKS_HOSPITAL"',
    mode='overwrite'
)

print("Data successfully written to HEALTH_NAV.CORE.LINDEN_OAKS_HOSPITAL")
print(f"Final columns: {df.columns}")


touchette

In [None]:
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, upper, when, lit, trim, length, to_decimal


session = Session.builder.getOrCreate()


df = session.table('"HEALTH_NAV"."STAGE"."TOUCHETTE_REGIONAL_HOSPITAL"')


df = df.filter(upper(col('"Code Type"')) == "CPT")


columns_mapping = {
    'DESCRIPTION': 'DESCRIPTION',
    'CODE': 'CODE',
    '"Code Type"': 'CODE_TYPE',
    '"Payer Name"': 'PAYER_NAME',
    '"Plan Name"': 'PLAN_NAME',
    '"Standard Charge Dollar"': 'STANDARD_CHARGE_DOLLAR',
    '"Standard Charge Percentage"': 'STANDARD_CHARGE_PERCENTAGE',
    '"Minimum Charge"': 'MINIMUM_CHARGE',
    '"Maximum Charge"': 'MAXIMUM_CHARGE'
}


existing_cols = df.schema.names
existing_columns = {k: v for k, v in columns_mapping.items() if k in existing_cols}


if '"Standard Charge Dollar"' not in existing_cols and '"Standard Charge"' not in existing_cols:
    df = df.withColumn('STANDARD_CHARGE_DOLLAR', lit(0.0))
    existing_columns['STANDARD_CHARGE_DOLLAR'] = 'STANDARD_CHARGE_DOLLAR'


df = df.select([col(k).alias(v) if k in existing_cols else lit(None).alias(v)
               for k, v in columns_mapping.items()])


numeric_columns = ['STANDARD_CHARGE_DOLLAR', 'STANDARD_CHARGE_PERCENTAGE',
                   'MINIMUM_CHARGE', 'MAXIMUM_CHARGE']
numeric_columns = [c for c in numeric_columns if c in df.columns]


for column in df.columns:
    df = df.withColumn(column,
                       when(trim(upper(col(column))) == 'N/A', lit(None))
                       .otherwise(col(column)))


for column in df.columns:
    is_blank = (col(column).isNull()) | (length(trim(col(column))) == 0)
    if column in numeric_columns:
        df = df.withColumn(column, when(is_blank, lit(None)).otherwise(col(column)))
    else:
        df = df.withColumn(column, when(is_blank, lit('0')).otherwise(col(column)))


for column in numeric_columns:
    df = df.withColumn(column, to_decimal(col(column), 38, 2).cast("FLOAT"))


for column in numeric_columns:
    df = df.withColumn(column, when(col(column).isNull(), lit(0.0)).otherwise(col(column)))


if 'STANDARD_CHARGE_PERCENTAGE' in df.columns and 'MAXIMUM_CHARGE' in df.columns:
    df = df.withColumn(
        'STANDARD_CHARGE_DOLLAR',
        when(
            (col('STANDARD_CHARGE_DOLLAR') == 0) &
            (col('STANDARD_CHARGE_PERCENTAGE') > 0) &
            (col('MAXIMUM_CHARGE') > 0),
            col('STANDARD_CHARGE_PERCENTAGE') * col('MAXIMUM_CHARGE') * 0.01
        ).otherwise(col('STANDARD_CHARGE_DOLLAR'))
    )


column_defs = []
for col_name in columns_mapping.values():
    if col_name in numeric_columns:
        column_defs.append(f'"{col_name}" FLOAT')
    else:
        column_defs.append(f'"{col_name}" STRING')

create_stmt = """CREATE OR REPLACE TABLE "HEALTH_NAV"."CORE"."TOUCHETTE_REGIONAL_HOSPITAL" (\n""" + ",\n".join(column_defs) + "\n)"

session.sql(create_stmt).collect()


df.write.save_as_table(
    '"HEALTH_NAV"."CORE"."TOUCHETTE_REGIONAL_HOSPITAL"',
    mode='overwrite'
)

print("Data successfully written to HEALTH_NAV.CORE.TOUCHETTE_REGIONAL_HOSPITAL")
print(f"Final columns: {df.columns}")


In [None]:
df.limit(5).show()

caromont

In [None]:
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, upper, when, lit, trim, length, to_decimal


session = Session.builder.getOrCreate()


df = session.table('"HEALTH_NAV"."STAGE"."CAROMONT_PART1"')


df = df.filter(upper(col('"Code Type"')) == "CPT")


columns_mapping = {
    'DESCRIPTION': 'DESCRIPTION',
    'CODE': 'CODE',
    '"Code Type"': 'CODE_TYPE',
    '"Payer Name"': 'PAYER_NAME',
    '"Plan Name"': 'PLAN_NAME',
    '"Standard Charge Dollar"': 'STANDARD_CHARGE_DOLLAR',
    '"Standard Charge Percentage"': 'STANDARD_CHARGE_PERCENTAGE',
    '"Minimum Charge"': 'MINIMUM_CHARGE',
    '"Maximum Charge"': 'MAXIMUM_CHARGE'
}


existing_cols = df.schema.names
existing_columns = {k: v for k, v in columns_mapping.items() if k in existing_cols}


if '"Standard Charge Dollar"' not in existing_cols and '"Standard Charge"' not in existing_cols:
    df = df.withColumn('STANDARD_CHARGE_DOLLAR', lit(0.0))
    existing_columns['STANDARD_CHARGE_DOLLAR'] = 'STANDARD_CHARGE_DOLLAR'


df = df.select([col(k).alias(v) if k in existing_cols else lit(None).alias(v)
               for k, v in columns_mapping.items()])


numeric_columns = ['STANDARD_CHARGE_DOLLAR', 'STANDARD_CHARGE_PERCENTAGE',
                   'MINIMUM_CHARGE', 'MAXIMUM_CHARGE']
numeric_columns = [c for c in numeric_columns if c in df.columns]


for column in df.columns:
    df = df.withColumn(column,
                       when(trim(upper(col(column))) == 'N/A', lit(None))
                       .otherwise(col(column)))


for column in df.columns:
    is_blank = (col(column).isNull()) | (length(trim(col(column))) == 0)
    if column in numeric_columns:
        df = df.withColumn(column, when(is_blank, lit(None)).otherwise(col(column)))
    else:
        df = df.withColumn(column, when(is_blank, lit('0')).otherwise(col(column)))


for column in numeric_columns:
    df = df.withColumn(column, to_decimal(col(column), 38, 2).cast("FLOAT"))


for column in numeric_columns:
    df = df.withColumn(column, when(col(column).isNull(), lit(0.0)).otherwise(col(column)))


if 'STANDARD_CHARGE_PERCENTAGE' in df.columns and 'MAXIMUM_CHARGE' in df.columns:
    df = df.withColumn(
        'STANDARD_CHARGE_DOLLAR',
        when(
            (col('STANDARD_CHARGE_DOLLAR') == 0) &
            (col('STANDARD_CHARGE_PERCENTAGE') > 0) &
            (col('MAXIMUM_CHARGE') > 0),
            col('STANDARD_CHARGE_PERCENTAGE') * col('MAXIMUM_CHARGE') * 0.01
        ).otherwise(col('STANDARD_CHARGE_DOLLAR'))
    )


column_defs = []
for col_name in columns_mapping.values():
    if col_name in numeric_columns:
        column_defs.append(f'"{col_name}" FLOAT')
    else:
        column_defs.append(f'"{col_name}" STRING')

create_stmt = """CREATE OR REPLACE TABLE "HEALTH_NAV"."CORE"."CAROMONT_PART1" (\n""" + ",\n".join(column_defs) + "\n)"

session.sql(create_stmt).collect()


df.write.save_as_table(
    '"HEALTH_NAV"."CORE"."CAROMONT_PART1"',
    mode='overwrite'
)

print("Data successfully written to HEALTH_NAV.CORE.CAROMONT_PART1")
print(f"Final columns: {df.columns}")


In [None]:
df.limit(5).show()

In [None]:
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, upper, when, lit, trim, length, to_decimal


session = Session.builder.getOrCreate()


df = session.table('"HEALTH_NAV"."STAGE"."CAROMONT_PART2"')


df = df.filter(upper(col('"Code Type"')) == "CPT")


columns_mapping = {
    'DESCRIPTION': 'DESCRIPTION',
    'CODE': 'CODE',
    '"Code Type"': 'CODE_TYPE',
    '"Payer Name"': 'PAYER_NAME',
    '"Plan Name"': 'PLAN_NAME',
    '"Standard Charge Dollar"': 'STANDARD_CHARGE_DOLLAR',
    '"Standard Charge Percentage"': 'STANDARD_CHARGE_PERCENTAGE',
    '"Minimum Charge"': 'MINIMUM_CHARGE',
    '"Maximum Charge"': 'MAXIMUM_CHARGE'
}


existing_cols = df.schema.names
existing_columns = {k: v for k, v in columns_mapping.items() if k in existing_cols}


if '"Standard Charge Dollar"' not in existing_cols and '"Standard Charge"' not in existing_cols:
    df = df.withColumn('STANDARD_CHARGE_DOLLAR', lit(0.0))
    existing_columns['STANDARD_CHARGE_DOLLAR'] = 'STANDARD_CHARGE_DOLLAR'


df = df.select([col(k).alias(v) if k in existing_cols else lit(None).alias(v)
               for k, v in columns_mapping.items()])


numeric_columns = ['STANDARD_CHARGE_DOLLAR', 'STANDARD_CHARGE_PERCENTAGE',
                   'MINIMUM_CHARGE', 'MAXIMUM_CHARGE']
numeric_columns = [c for c in numeric_columns if c in df.columns]


for column in df.columns:
    df = df.withColumn(column,
                       when(trim(upper(col(column))) == 'N/A', lit(None))
                       .otherwise(col(column)))


for column in df.columns:
    is_blank = (col(column).isNull()) | (length(trim(col(column))) == 0)
    if column in numeric_columns:
        df = df.withColumn(column, when(is_blank, lit(None)).otherwise(col(column)))
    else:
        df = df.withColumn(column, when(is_blank, lit('0')).otherwise(col(column)))


for column in numeric_columns:
    df = df.withColumn(column, to_decimal(col(column), 38, 2).cast("FLOAT"))


for column in numeric_columns:
    df = df.withColumn(column, when(col(column).isNull(), lit(0.0)).otherwise(col(column)))


if 'STANDARD_CHARGE_PERCENTAGE' in df.columns and 'MAXIMUM_CHARGE' in df.columns:
    df = df.withColumn(
        'STANDARD_CHARGE_DOLLAR',
        when(
            (col('STANDARD_CHARGE_DOLLAR') == 0) &
            (col('STANDARD_CHARGE_PERCENTAGE') > 0) &
            (col('MAXIMUM_CHARGE') > 0),
            col('STANDARD_CHARGE_PERCENTAGE') * col('MAXIMUM_CHARGE') * 0.01
        ).otherwise(col('STANDARD_CHARGE_DOLLAR'))
    )


column_defs = []
for col_name in columns_mapping.values():
    if col_name in numeric_columns:
        column_defs.append(f'"{col_name}" FLOAT')
    else:
        column_defs.append(f'"{col_name}" STRING')

create_stmt = """CREATE OR REPLACE TABLE "HEALTH_NAV"."CORE"."CAROMONT_PART2" (\n""" + ",\n".join(column_defs) + "\n)"

session.sql(create_stmt).collect()


df.write.save_as_table(
    '"HEALTH_NAV"."CORE"."CAROMONT_PART2"',
    mode='overwrite'
)

print("Data successfully written to HEALTH_NAV.CORE.CAROMONT_PART2")
print(f"Final columns: {df.columns}")


In [None]:
df.limit(5).show()