In [None]:
ContainerName = "claimsfs"
LinkedServiceName = "CoreClaimsDataLake"

In [None]:
import random
import uuid
import json

from pyspark.sql.types import StringType, IntegerType, DateType, DoubleType
from pyspark.sql.functions import col, rand, concat, current_date, lit, expr, udf, date_add, row_number, collect_list, struct, sum, from_csv
from pyspark.sql.window import Window
from urllib.parse import urlparse;

adls_url = json.loads(mssparkutils.credentials.getPropertiesAll(LinkedServiceName))["Endpoint"]
adls_parsed_url = urlparse(adls_url)

adls_path = f'abfss://{ContainerName}@{adls_parsed_url.netloc}'

input_path = lambda file: (f'{adls_path}/SyntheaInput/{file}')
output_path = lambda file: (f'{adls_path}/SyntheaOutput/{file}') 

random_member_type = udf(lambda: random.choice(['self', 'spouse', 'dependent']), StringType())
random_adjudicator_type = udf(lambda: random.choice(['Adjudicator', 'Manager']), StringType())

email_from_name = udf(lambda name: f'{name.replace(" ", "").lower()}@contoso.com', StringType())

random_phone_number = udf(lambda: f'({random.randint(0, 999):03})-555-{random.randint(0, 9999):04}', StringType())

make_guid = udf(lambda: str(uuid.uuid4()), StringType())


# Adjudicators Transformation
- Uses the `payers.csv` data source as source since synthia doesn't generate this data
- Renames existing columns and adds missing column

In [None]:
df_adjudicator = spark.read \
    .option("multiline", True) \
    .option("header", True) \
    .csv(input_path('payers.csv')) \
    .withColumn("email", email_from_name(col("NAME"))) \
    .withColumn("type", lit("Adjudicator")) \
    .withColumn("role", random_adjudicator_type()) \
    .select(
        col("Id").alias("id"),\
        col("Id").alias("adjudicatorId"),\
        "type",\
        col("NAME").alias("name"),\
        "email",\
        "role",
    )

df_adjudicator.write \
    .format("json") \
    .mode("overwrite") \
    .json(output_path('adjudicator.json'))

# Claim Procedures Transformation
- Load files `procedures.csv`, `careplans.csv`, `immunizations.csv`, `medications.csv`, and `supplies.csv`
- Select distinct values of the Code and Description fields from each
- concatinate all lists together

In [None]:
def read_procedures(filename):
    return spark.read.option("multiline", True) \
        .option("header", True) \
        .csv(input_path(f'{filename}.csv')) \
        .withColumn("category", lit(filename)) \
        .withColumn("type", lit("ClaimProcedure")) \
        .select(
            col("CODE").alias("id"),
            col("CODE").alias("code"),
            col("DESCRIPTION").alias("description"),
            "category"
        ) \
        .distinct()

df_procedures = read_procedures('procedures') \
    .union(read_procedures('careplans')) \
    .union(read_procedures('immunizations')) \
    .union(read_procedures('medications')) \
    .union(read_procedures('supplies'))

df_procedures.write \
    .format("json") \
    .mode("overwrite") \
    .json(output_path('claimprocedure.json'))

# Payer Transformation
- Reads the `payers.csv` data source
- Renames existing columns and adds missing columns

In [None]:
df_payer = spark.read \
    .option("multiline", True) \
    .option("header", True) \
    .csv(input_path('payers.csv')) \
    .withColumn("phoneNumber", random_phone_number()) \
    .withColumn("email", email_from_name(col("NAME"))) \
    .withColumn("country", lit("US")) \
    .withColumn("type", lit("Payer")) \
    .select(
        col("Id").alias("id"),
        col("Id").alias("payerId"),
        col("NAME").alias("name"),
        "email",
        "phoneNumber",
        col("ADDRESS").alias("address"),
        col("CITY").alias("city"),
        col("STATE_HEADQUARTERED").alias("state"),
        "country"
    )

df_payer.write \
    .format("json") \
    .mode("overwrite") \
    .json(output_path('payers.json'))

# Member Transformation
- Reads the `patients.csv` data source
- Renames existing columns and adds missing columns

In [None]:
df_member = spark.read \
    .option("multiline", True) \
    .option("header", True) \
    .csv(input_path('patients.csv')) \
    .withColumn("email", email_from_name(concat(col("FIRST"), col("LAST")))) \
    .withColumn("phoneNumber", random_phone_number()) \
    .withColumn("type", lit("Member")) \
    .withColumn("country", lit("US")) \
    .withColumn("memberType", random_member_type()) \
    .select(
        col("Id").alias("id"),
        col("Id").alias("memberId"), 
        "memberType",
        "type",
        col("PREFIX").alias("title"),
        col("FIRST").alias("firstName"),
        col("LAST").alias("lastName"),
        "email",
        "phoneNumber",
        col("ADDRESS").alias("address"),
        col("CITY").alias("city"),
        col("STATE").alias("state"),
        "country",
        col("ZIP").alias("zipCode"),
    )

df_member.write \
    .format("json") \
    .mode("overwrite") \
    .json(output_path('patients.json'))


# Member Coverage
- Gets payer ids from the payer dataframe and randomly assigning it to the member ids and generating a mock data

In [None]:
payer_ids = df_payer.select("payerId").rdd.map(lambda row: row[0]).collect()

random_payer = udf(lambda: random.choice([payer_ids]))
random_start_offset = udf(lambda: -random.randint(0, 700), IntegerType())

## Make up coverage for members
## TODO: Potentially pull this from payer_transactions.csv
df_member_coverage = df_member \
    .select("memberId") \
    .withColumn("type", lit("Coverage")) \
    .withColumn("coverageId", make_guid()) \
    .withColumn("id", concat(lit("coverage:"), col("coverageId"))) \
    .withColumn("startDate", date_add(current_date(), random_start_offset())) \
    .withColumn("endDate", date_add(col("startDate"), 365))


df_member_coverage.write \
    .format("json") \
    .mode("overwrite") \
    .json(output_path('coverage.json'))

# Provider Transformation
- Reads the `organization.csv` data source
- Renames existing columns and adds missing columns

In [None]:
df_provider = spark.read \
    .option("multiline", True) \
    .option("header", True) \
    .csv(input_path('organizations.csv')) \
    .withColumn("phoneNumber", random_phone_number()) \
    .withColumn("email", email_from_name(col("NAME"))) \
    .withColumn("country", lit("US")) \
    .withColumn("type", lit("Provider")) \
    .select(
        col("Id").alias("id"),
        col("Id").alias("providerId"),
        col("NAME").alias("name"),
        "email",
        "phoneNumber",
        col("ADDRESS").alias("address"),
        col("CITY").alias("city"),
        col("STATE").alias("state"),
        "country",
        col("ZIP").alias("zip_code"),
    )

df_provider.write \
    .format("json") \
    .mode("overwrite") \
    .json(output_path('providers.json'))


# Claim Header and Claim Detail
- Reads the `provider.csv`, `claims_transaction.csv`, and `claims.csv`
- Renames existing columns and adds missing columns
- Transforms the data by aggregating the `claims_transaction.csv` to get the lineItems
- Joins everything to a single file to be separated in the Copy Data part of the pipeline

In [None]:
claim_window = Window.partitionBy("claimId", "TYPE").orderBy("claimId")

# TODO: Load Providers (above providers are actually orgs), map provider to org
# df_providers = 

df_actual_providers = spark.read \
    .option("multiline", True) \
    .option("header", True) \
    .csv(input_path('providers.csv')) \
    .select(
        col("ORGANIZATION").alias("providerId"),
        col("Id").alias("practitionerId"),
    ) \
    .join(df_provider.select(
        "providerId",
        col("name").alias("providerName")
    ), on="providerId", how="inner") \

df_transactions = spark.read \
    .option("multiline", True) \
    .option("header", True) \
    .csv(input_path('claims_transactions.csv')) \
    .filter(col("TYPE") == "CHARGE") \
    .withColumn("lineItemNo", row_number().over(claim_window)) \
    .withColumn("discount", lit(0.0)) \
    .withColumn("amount", col("AMOUNT").cast(DoubleType())) \
    .select(
        col("CLAIMID").alias("claimId"),
        "lineItemNo",
        col("FROMDATE").alias("serviceDate"),
        col("PROVIDERID").alias("providerId"),
        col("PROCEDURECODE").alias("procedureCode"),
        col("NOTES").alias("description"),
        "amount",
        "discount"
     ) \
     .groupBy("claimId") \
     .agg(
        collect_list(struct(
            "lineItemNo",
            "procedureCode",
            "description",
            "amount",
            "discount",
            "serviceDate",
            "providerId"
        )).alias("lineItems"),
        sum("amount").alias("totalAmount")
     )

df_claims = spark.read \
    .option("multiline", True) \
    .option("header", True) \
    .csv(input_path('claims.csv')) \
    .withColumn("adjustmentId", lit(0)) \
    .withColumn("claimStatus", lit("Initial")) \
    .withColumn("detailId", concat(lit("claim:"), col("Id"), lit(":0"))) \
    .withColumn("headerId", concat(lit("claim:"), col("Id"))) \
    .select(
        col("Id").alias("claimId"),
        col("PATIENTID").alias("memberId"),
        col("SERVICEDATE").alias("filingDate"),
        col("PROVIDERID").alias("practitionerId"),
        "adjustmentId",
        "claimStatus",
        "headerId",
        "detailId",
    ) \
    .join(df_transactions, on="claimId", how="inner") \
    .join(df_actual_providers.select(
        "practitionerId",
        "providerId",
        "providerName"
    ), on="practitionerId", how="left_outer") \

df_claims.write \
    .format("json") \
    .mode("overwrite") \
    .json(output_path('claims.json'))