In [0]:
secret_id = "cTJ8Q~z.RVTbbwjLOo~S-ckFmQ0vdHqnZHEzNasp"
tanent_id = "ad2e8e2e-301d-449d-86af-82be9b64f6ec"
client_id = "a2f26fec-2441-441b-83ec-9632c3e78203"

In [0]:
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": client_id,
"fs.azure.account.oauth2.client.secret": secret_id,
"fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{tanent_id}/oauth2/token"}

In [0]:
from pyspark.sql.functions import col, when, lit, datediff, count
from pyspark.sql.functions import monotonically_increasing_id

In [0]:
mountpoint = "/mnt/point"
if any(mount.mountPoint == mountpoint for mount in dbutils.fs.mounts()):
    dbutils.fs.unmount(mountpoint)
    print(f"Successfully unmounted: {mountpoint}")
else:
    print(f"Mount point created: {mountpoint}")
    dbutils.fs.mount(
    source="abfss://projectinfraud@eestr.dfs.core.windows.net",
    mount_point=mountpoint,
    extra_configs=configs,
    )

Mount point created: /mnt/point


In [0]:
%fs
ls "/mnt/point/raw"

path,name,size,modificationTime
dbfs:/mnt/point/raw/Test_Beneficiarydata-1542969243754.csv,Test_Beneficiarydata-1542969243754.csv,5325982,1736999556000
dbfs:/mnt/point/raw/Test_Inpatientdata-1542969243754.csv,Test_Inpatientdata-1542969243754.csv,2020712,1736999557000
dbfs:/mnt/point/raw/Test_Outpatientdata-1542969243754.csv,Test_Outpatientdata-1542969243754.csv,18806934,1736999555000


In [0]:
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    IntegerType,
    DoubleType,
    DateType,
)

inpatient_schema = StructType(
    [
        StructField("BeneID", StringType(), True),  # Beneficiary ID (String)
        StructField("ClaimID", StringType(), True),  # Claim ID (String)
        StructField("ClaimStartDt", DateType(), True),  # Claim Start Date (Date)
        StructField("ClaimEndDt", DateType(), True),  # Claim End Date (Date)
        StructField("Provider", StringType(), True),  # Provider ID (String)
        StructField(
            "InscClaimAmtReimbursed", DoubleType(), True
        ),  # Insurance Claim Amount Reimbursed (Double)
        StructField(
            "AttendingPhysician", StringType(), True
        ),  # Attending Physician ID (String)
        StructField(
            "OperatingPhysician", StringType(), True
        ),  # Operating Physician ID (String)
        StructField(
            "OtherPhysician", StringType(), True
        ),  # Other Physician ID (String)
        StructField("AdmissionDt", DateType(), True),  # Admission Date (Date)
        StructField(
            "ClmAdmitDiagnosisCode", StringType(), True
        ),  # Admission Diagnosis Code (String)
        StructField(
            "DeductibleAmtPaid", DoubleType(), True
        ),  # Deductible Amount Paid (Double)
        StructField("DischargeDt", DateType(), True),  # Discharge Date (Date)
        StructField(
            "DiagnosisGroupCode", StringType(), True
        ),  # Diagnosis Group Code (String)
        StructField(
            "ClmDiagnosisCode_1", StringType(), True
        ),  # Diagnosis Code 1 (String)
        StructField("ClmDiagnosisCode_2", StringType(), True),
        StructField("ClmDiagnosisCode_3", StringType(), True),
        StructField("ClmDiagnosisCode_4", StringType(), True),
        StructField("ClmDiagnosisCode_5", StringType(), True),
        StructField("ClmDiagnosisCode_6", StringType(), True),
        StructField("ClmDiagnosisCode_7", StringType(), True),
        StructField("ClmDiagnosisCode_8", StringType(), True),
        StructField("ClmDiagnosisCode_9", StringType(), True),
        StructField("ClmDiagnosisCode_10", StringType(), True),
        StructField(
            "ClmProcedureCode_1", StringType(), True
        ),  # Procedure Code 1 (String)
        StructField("ClmProcedureCode_2", StringType(), True),
        StructField("ClmProcedureCode_3", StringType(), True),
        StructField("ClmProcedureCode_4", StringType(), True),
        StructField("ClmProcedureCode_5", StringType(), True),
        StructField("ClmProcedureCode_6", StringType(), True),
    ]
)
outpatient_schema = StructType(
    [
        StructField("BeneID", StringType(), True),
        StructField("ClaimID", StringType(), True),
        StructField("ClaimStartDt", DateType(), True),
        StructField("ClaimEndDt", DateType(), True),
        StructField("Provider", StringType(), True),
        StructField("InscClaimAmtReimbursed", DoubleType(), True),
        StructField("AttendingPhysician", StringType(), True),
        StructField("OperatingPhysician", StringType(), True),
        StructField("OtherPhysician", StringType(), True),
        StructField("DeductibleAmtPaid", DoubleType(), True),
    ]
)

beneficiary_schema = StructType(
    [
        StructField("BeneID", StringType(), True),
        StructField("DOB", DateType(), True),
        StructField("DOD", DateType(), True),
        StructField("Gender", IntegerType(), True),
        StructField("Race", IntegerType(), True),
        StructField("RenalDiseaseIndicator", StringType(), True),
        StructField("NoOfMonths_PartACov", IntegerType(), True),
        StructField("NoOfMonths_PartBCov", IntegerType(), True),
    ]
)
inpatient_df = spark.read.csv("/mnt/point/raw/Test_Inpatientdata-1542969243754.csv",header=True, schema= inpatient_schema)
beneficiary_df = spark.read.csv("/mnt/point/raw/Test_Beneficiarydata-1542969243754.csv",header=True, schema= beneficiary_schema )
outpatient_df = spark.read.csv("/mnt/point/raw/Test_Outpatientdata-1542969243754.csv",header=True, schema=outpatient_schema)

 

In [0]:
# Replace missing values with placeholders
inpatient_df = inpatient_df.fillna(
    {
        "AttendingPhysician": "Unknown",
        "OperatingPhysician": "Unknown",
        "OtherPhysician": "Unknown",
        "DeductibleAmtPaid": 0,
        "InscClaimAmtReimbursed": 0,
    }
)
outpatient_df = outpatient_df.fillna(
    {
        "AttendingPhysician": "Unknown",
        "OperatingPhysician": "Unknown",
        "OtherPhysician": "Unknown",
        "DeductibleAmtPaid": 0,
        "InscClaimAmtReimbursed": 0,
    }
)
# Adding dummy value for DOB
beneficiary_df = beneficiary_df.fillna({"DOD": "1970-01-01"})

In [0]:
# Add missing columns to outpatient data to match inpatient schema
outpatient_df = (
    outpatient_df.withColumn("AdmissionDt", lit(None).cast(DateType()))
    .withColumn("DischargeDt", lit(None).cast(DateType()))
    .withColumn("ClmAdmitDiagnosisCode", lit(None).cast(StringType()))
    .withColumn("DiagnosisGroupCode", lit(None).cast(StringType()))
)
 

In [0]:
missing_columns = list(set(inpatient_df.columns) - set(outpatient_df.columns))



In [0]:
# Add missing columns to outpatient data with null values
for column in missing_columns:
    outpatient_df = outpatient_df.withColumn(column, lit(None).cast(inpatient_df.schema[column].dataType))

In [0]:
# Reorder columns in outpatient data to match inpatient schema
outpatient_df = outpatient_df.select(inpatient_df.columns)

In [0]:
combined_claims_df = inpatient_df.unionByName(outpatient_df)

In [0]:
combined_df = combined_claims_df.join(beneficiary_df, on="BeneID", how="inner")

In [0]:
# Calculate Length of Stay (LOS) for inpatient claims; set LOS to 0 for outpatient claims
combined_df = combined_df.withColumn(
    "LengthOfStay",
    when(
        col("AdmissionDt").isNotNull() & col("DischargeDt").isNotNull(),
        datediff(col("DischargeDt"), col("AdmissionDt")),
    ).otherwise(lit(0)),
)
# Add an age column (based on DOB)
combined_df = combined_df.withColumn(
    "Age",
    when(
        col("DOB").isNotNull(), (lit(2025) - col("DOB").substr(1, 4).cast(DoubleType()))
    ).otherwise(lit(None)),
)

In [0]:
# Rule 1: High reimbursement amounts (e.g., > $20,000)
combined_df = combined_df.withColumn(
    "HighReimbursement",
    when(col("InscClaimAmtReimbursed") > 20000, lit(1)).otherwise(lit(0)),
)
# Rule 2: Short stays with high costs (e.g., LOS <= 2 and reimbursement > $5,000)
combined_df = combined_df.withColumn(
    "ShortStayHighCost",
    when(
        (col("LengthOfStay") <= 2) & (col("InscClaimAmtReimbursed") > 5000), lit(1)
    ).otherwise(lit(0)),
)

# Rule 3: Duplicate claims for the same BeneID and overlapping dates
duplicate_claims = (
    combined_df.groupBy("BeneID", "ClaimStartDt", "ClaimEndDt")
    .count()
    .filter(col("count") > 1)
)
# Rule 4: Excessive claims by a single provider
provider_claims = (
    combined_df.groupBy("Provider")
    .agg({"*": "count"})
    .withColumnRenamed("count(1)", "TotalClaims")
    .filter(col("TotalClaims") > 100)
)

In [0]:
fraud_cases = combined_df.filter(
        (col("HighReimbursement") == 1) |
            (col("ShortStayHighCost") == 1)
            )


In [0]:
# Add unique IDs to each table
duplicate_claims = duplicate_claims.withColumn("unique_id", monotonically_increasing_id())
provider_claims = provider_claims.withColumn("unique_id", monotonically_increasing_id())
fraud_cases = fraud_cases.withColumn("unique_id", monotonically_increasing_id())

In [0]:
from pyspark.sql.functions import current_timestamp
duplicate_claims = duplicate_claims.withColumn("ingestion_timestamp", current_timestamp())
provider_claims = provider_claims.withColumn("ingestion_timestamp", current_timestamp())
fraud_cases = fraud_cases.withColumn("ingestion_timestamp", current_timestamp())

In [0]:
duplicate_claims.write.format("delta").mode("overwrite").save("/mnt/point/gold/delta/duplicate_claims")
provider_claims.write.format("delta").mode("overwrite").save("/mnt/point/gold/delta/provider_claims")
fraud_cases.write.format("delta").mode("overwrite").save("/mnt/point/gold/delta/fraud_cases")

In [0]:
from delta.tables import DeltaTable

existing_data_dc = DeltaTable.forPath(spark, "/mnt/point/gold/delta/duplicate_claims")
existing_data_pc = DeltaTable.forPath(spark, "/mnt/point/gold/delta/provider_claims")
existing_data_fc = DeltaTable.forPath(spark, "/mnt/point/gold/delta/fraud_cases")

 

In [0]:
display(duplicate_claims)


BeneID,ClaimStartDt,ClaimEndDt,count,ingestion_timestamp
BENE132510,2009-01-26,2009-01-27,2,2025-01-16T06:17:24.407Z
BENE23214,2009-03-12,2009-03-15,2,2025-01-16T06:17:24.407Z
BENE29226,2009-09-04,2009-09-04,2,2025-01-16T06:17:24.407Z
BENE55465,2009-05-12,2009-05-12,2,2025-01-16T06:17:24.407Z
BENE12650,2009-02-06,2009-02-06,2,2025-01-16T06:17:24.407Z
BENE15255,2009-04-08,2009-04-08,2,2025-01-16T06:17:24.407Z
BENE45069,2009-06-08,2009-06-08,2,2025-01-16T06:17:24.407Z
BENE48443,2009-11-06,2009-11-06,2,2025-01-16T06:17:24.407Z
BENE49838,2009-02-22,2009-02-22,2,2025-01-16T06:17:24.407Z
BENE37842,2009-06-20,2009-06-20,2,2025-01-16T06:17:24.407Z


In [0]:
existing_data_dc.alias("existingdc").merge(duplicate_claims.alias("newdc"),"existingdc.unique_id=newdc.unique_id").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
 

In [0]:
existing_data_pc.alias("existingpc").merge(provider_claims.alias("newpc"),"existingpc.Provider=newpc.Provider").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
existing_data_fc.alias("existingfc").merge(fraud_cases.alias("newfc"),"existingfc.unique_id=newfc.unique_id").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

In [0]:
print("\nPotential Fraud Cases:")
display(fraud_cases) 

print("\nDuplicate Claims:")
display(duplicate_claims)

print("\nProviders with Excessive Claims:")
display(provider_claims)


Potential Fraud Cases:


BeneID,ClaimID,ClaimStartDt,ClaimEndDt,Provider,InscClaimAmtReimbursed,AttendingPhysician,OperatingPhysician,OtherPhysician,AdmissionDt,ClmAdmitDiagnosisCode,DeductibleAmtPaid,DischargeDt,DiagnosisGroupCode,ClmDiagnosisCode_1,ClmDiagnosisCode_2,ClmDiagnosisCode_3,ClmDiagnosisCode_4,ClmDiagnosisCode_5,ClmDiagnosisCode_6,ClmDiagnosisCode_7,ClmDiagnosisCode_8,ClmDiagnosisCode_9,ClmDiagnosisCode_10,ClmProcedureCode_1,ClmProcedureCode_2,ClmProcedureCode_3,ClmProcedureCode_4,ClmProcedureCode_5,ClmProcedureCode_6,DOB,DOD,Gender,Race,RenalDiseaseIndicator,NoOfMonths_PartACov,NoOfMonths_PartBCov,LengthOfStay,Age,HighReimbursement,ShortStayHighCost,ingestion_timestamp,unique_id
BENE11176,CLM55569,2009-06-14,2009-06-14,PRV57214,15000.0,PHY405200,PHY432053,,2009-06-14,0389,1068.0,2009-06-14,853.0,0389,51881,2752,5849,42731,53540,2760,486,99591,5856,4513.0,,,,,,1940-09-01,1970-01-01,1,1,Y,49,820,0,85.0,0,1,2025-01-16T06:47:11.823Z,0
BENE11298,CLM40803,2009-03-02,2009-03-04,PRV54853,11000.0,PHY343909,PHY343909,,2009-03-02,99649,1068.0,2009-03-04,479.0,7140,25000,,,,,,,,,8154.0,,,,,,1934-09-01,1970-01-01,1,1,Y,32,25,2,91.0,0,1,2025-01-16T06:47:11.823Z,1
BENE11358,CLM59268,2009-07-11,2009-07-12,PRV53275,13000.0,PHY425441,PHY389026,,2009-07-11,V454,1068.0,2009-07-12,557.0,7210,42789,V8531,53081,99649,3051,3899,72610,32723,,8102.0,,,,,,1940-09-01,1970-01-01,2,1,0,18,460,1,85.0,0,1,2025-01-16T06:47:11.823Z,2
BENE11391,CLM53392,2009-05-30,2009-05-31,PRV52091,16000.0,PHY368847,PHY413798,,2009-05-30,0389,1068.0,2009-05-31,856.0,0389,42731,7863,42821,27651,5990,4271,59010,V4581,,3893.0,,,,,,1927-07-01,1970-01-01,2,1,Y,31,270,1,98.0,0,1,2025-01-16T06:47:11.823Z,3
BENE11430,CLM45661,2009-04-05,2009-04-14,PRV54731,25000.0,PHY427189,PHY352136,,2009-04-05,5761,1068.0,2009-04-14,417.0,1977,5849,03849,99590,4778,5722,2762,5768,5715,,5411.0,,,,,,1929-06-01,1970-01-01,1,1,0,31,370,9,96.0,1,0,2025-01-16T06:47:11.823Z,4
BENE11460,CLM78388,2009-12-04,2009-12-05,PRV56230,7000.0,PHY422086,PHY433453,PHY390380,2009-12-04,4131,1068.0,2009-12-05,234.0,41401,2724,71590,42832,2449,34690,,,,,3722.0,4242.0,,,,,1933-05-01,1970-01-01,2,1,0,42,310,1,92.0,0,1,2025-01-16T06:47:11.823Z,5
BENE11897,CLM67666,2009-09-11,2009-10-02,PRV51520,57000.0,PHY352236,PHY343937,,2009-09-11,1532,1068.0,2009-10-02,7.0,56081,2761,0389,V462,8054,5570,9587,5070,,,8417.0,5185.0,,,,,1940-08-01,1970-01-01,1,3,0,5,80,21,85.0,1,0,2025-01-16T06:47:11.823Z,6
BENE11961,CLM38196,2009-02-13,2009-02-14,PRV52117,17000.0,PHY378885,PHY369576,,2009-02-13,4414,1068.0,2009-02-14,246.0,4414,41400,2851,V1083,E8782,,,,,,3712.0,,,,,,1927-08-01,1970-01-01,1,1,Y,10,580,1,98.0,0,1,2025-01-16T06:47:11.823Z,7
BENE12083,CLM69229,2009-09-23,2009-09-24,PRV57178,6000.0,PHY432904,PHY382087,,2009-09-23,27651,1068.0,2009-09-24,822.0,V5811,4019,78609,71535,3051,E9353,2724,,,,9925.0,,,,,,1925-01-01,1970-01-01,2,1,0,49,430,1,100.0,0,1,2025-01-16T06:47:11.823Z,8
BENE12261,CLM55017,2009-06-10,2009-06-13,PRV51589,44000.0,PHY406035,PHY387588,,2009-06-10,7802,1068.0,2009-06-13,234.0,4148,V433,2724,41400,412,42840,V4581,4019,42731,,51.0,,,,,,1933-03-01,1970-01-01,2,1,0,5,330,3,92.0,1,0,2025-01-16T06:47:11.823Z,9



Duplicate Claims:


BeneID,ClaimStartDt,ClaimEndDt,count,ingestion_timestamp,unique_id
BENE132510,2009-01-26,2009-01-27,2,2025-01-16T06:47:14.952Z,0
BENE23214,2009-03-12,2009-03-15,2,2025-01-16T06:47:14.952Z,1
BENE29226,2009-09-04,2009-09-04,2,2025-01-16T06:47:14.952Z,2
BENE55465,2009-05-12,2009-05-12,2,2025-01-16T06:47:14.952Z,3
BENE12650,2009-02-06,2009-02-06,2,2025-01-16T06:47:14.952Z,4
BENE15255,2009-04-08,2009-04-08,2,2025-01-16T06:47:14.952Z,5
BENE45069,2009-06-08,2009-06-08,2,2025-01-16T06:47:14.952Z,6
BENE48443,2009-11-06,2009-11-06,2,2025-01-16T06:47:14.952Z,7
BENE49838,2009-02-22,2009-02-22,2,2025-01-16T06:47:14.952Z,8
BENE37842,2009-06-20,2009-06-20,2,2025-01-16T06:47:14.952Z,9



Providers with Excessive Claims:


Provider,TotalClaims,ingestion_timestamp,unique_id
PRV52338,1551,2025-01-16T06:47:16.09Z,0
PRV57229,205,2025-01-16T06:47:16.09Z,1
PRV51051,362,2025-01-16T06:47:16.09Z,2
PRV54957,556,2025-01-16T06:47:16.09Z,3
PRV52134,172,2025-01-16T06:47:16.09Z,4
PRV54512,142,2025-01-16T06:47:16.09Z,5
PRV52819,268,2025-01-16T06:47:16.09Z,6
PRV52840,759,2025-01-16T06:47:16.09Z,7
PRV52329,219,2025-01-16T06:47:16.09Z,8
PRV55478,160,2025-01-16T06:47:16.09Z,9
