In [1]:
import dask.dataframe as dd

In [2]:
clean_path = "s3://medicare-fraud-data-25-05-2025/clean/"

df_test_labels = dd.read_csv(clean_path+"test_labels/*.csv")

In [3]:
df_test_labels.columns

Index(['Provider'], dtype='object')

In [4]:
df_test_labels.head(10)

Unnamed: 0,Provider
0,PRV51002
1,PRV51006
2,PRV51009
3,PRV51010
4,PRV51018
5,PRV51019
6,PRV51020
7,PRV51022
8,PRV51028
9,PRV51033


In [5]:
def load_merged_data():
    """
    Load all merged data from S3.
    """
    merged_dtypes = {
    'ClaimID': 'object',
    'ClaimStartDt': 'object',
    'ClaimEndDt': 'object',
    'Provider' : 'object',
    'InscClaimAmtReimbursed' : 'float64',
    'AttendingPhysician' :'object',
    'OperatingPhysician' :'object',
    'OtherPhysician' :'object',
    'AdmissionDt'  :'object',
    'ClmAdmitDiagnosisCode' :'object',
    'DeductibleAmtPaid' :'float64',
    'DischargeDt' :'object',
    'ClmAdmitDiagnosisCode': 'object',
    'ClmDiagnosisCode_1': 'object',
    'ClmDiagnosisCode_2': 'object',
    'ClmDiagnosisCode_3': 'object',
    'ClmDiagnosisCode_4': 'object',
    'ClmDiagnosisCode_5': 'object',
    'ClmDiagnosisCode_6': 'object',
    'ClmDiagnosisCode_7': 'object',
    'ClmDiagnosisCode_8': 'object',
    'ClmDiagnosisCode_9': 'object',
    'ClmDiagnosisCode_10': 'object',
    'DeductibleAmtPaid': 'float64',  # Keeping as float64 as inferred, even if int64 was expected by Dask
    'DiagnosisGroupCode': 'object'
    }
    date_columns_in = ['ClaimStartDt', 'ClaimEndDt', 'AdmissionDt', 'DischargeDt']
    clean_path = "s3://medicare-fraud-data-25-05-2025/clean/"
    df_train = dd.read_csv(clean_path+"train_full/*.csv",parse_dates=date_columns_in, dtype=merged_dtypes)
    df_test = dd.read_csv(clean_path+"test_full/*.csv", parse_dates=date_columns_in, dtype=merged_dtypes)
    print("Data loaded successfully")
    
    return (df_train, df_test)

In [6]:
import dask.dataframe as dd
df_train, df_test = load_merged_data()


Data loaded successfully


In [7]:

def convert_dates(df):
    """
    Convert date columns to datetime format.
    """
    date_columns_in = ['ClaimStartDt', 'ClaimEndDt', 'AdmissionDt', 'DischargeDt']
    for col in date_columns_in:
        df[col] = dd.to_datetime(df[col], errors='coerce')
    return df
    

In [8]:
df_train=convert_dates(df_train)
df_test=convert_dates(df_test)

In [10]:
df_train.dtypes

BeneID                             string[pyarrow]
ClaimID                            string[pyarrow]
ClaimStartDt                        datetime64[ns]
ClaimEndDt                          datetime64[ns]
Provider                           string[pyarrow]
InscClaimAmtReimbursed                     float64
AttendingPhysician                 string[pyarrow]
OperatingPhysician                 string[pyarrow]
OtherPhysician                     string[pyarrow]
AdmissionDt                         datetime64[ns]
ClmAdmitDiagnosisCode              string[pyarrow]
DeductibleAmtPaid                          float64
DischargeDt                         datetime64[ns]
DiagnosisGroupCode                 string[pyarrow]
ClmDiagnosisCode_1                 string[pyarrow]
ClmDiagnosisCode_2                 string[pyarrow]
ClmDiagnosisCode_3                 string[pyarrow]
ClmDiagnosisCode_4                 string[pyarrow]
ClmDiagnosisCode_5                 string[pyarrow]
ClmDiagnosisCode_6             

In [11]:
df_test.dtypes

BeneID                             string[pyarrow]
ClaimID                            string[pyarrow]
ClaimStartDt                        datetime64[ns]
ClaimEndDt                          datetime64[ns]
Provider                           string[pyarrow]
InscClaimAmtReimbursed                     float64
AttendingPhysician                 string[pyarrow]
OperatingPhysician                 string[pyarrow]
OtherPhysician                     string[pyarrow]
AdmissionDt                         datetime64[ns]
ClmAdmitDiagnosisCode              string[pyarrow]
DeductibleAmtPaid                          float64
DischargeDt                         datetime64[ns]
DiagnosisGroupCode                 string[pyarrow]
ClmDiagnosisCode_1                 string[pyarrow]
ClmDiagnosisCode_2                 string[pyarrow]
ClmDiagnosisCode_3                 string[pyarrow]
ClmDiagnosisCode_4                 string[pyarrow]
ClmDiagnosisCode_5                 string[pyarrow]
ClmDiagnosisCode_6             

In [12]:
df_train.sample(frac=0.1).compute()

Unnamed: 0,BeneID,ClaimID,ClaimStartDt,ClaimEndDt,Provider,InscClaimAmtReimbursed,AttendingPhysician,OperatingPhysician,OtherPhysician,AdmissionDt,...,ChronicCond_Osteoporasis,ChronicCond_rheumatoidarthritis,ChronicCond_stroke,IPAnnualReimbursementAmt,IPAnnualDeductibleAmt,OPAnnualReimbursementAmt,OPAnnualDeductibleAmt,Bene_Age,Bene_Alive,PotentialFraud
361415,BENE47604,CLM265636,2009-03-25,2009-03-25,PRV51346,10.0,PHY400945,,,NaT,...,0,0,0,57000,1068,40,0,71,1,0
114291,BENE128301,CLM39411,2009-02-21,2009-02-28,PRV52118,10000.0,PHY413491,PHY413491,,2009-02-21,...,0,1,1,17000,2136,4120,1070,84,1,0
10184,BENE102644,CLM292913,2009-04-08,2009-04-08,PRV57321,20.0,PHY420356,,,NaT,...,1,0,0,0,0,2530,40,57,1,0
93140,BENE12325,CLM80162,2009-12-20,2009-12-25,PRV52342,4000.0,PHY376710,PHY362496,,2009-12-20,...,0,0,0,4000,1068,0,0,61,1,0
30522,BENE108070,CLM241066,2009-03-11,2009-03-11,PRV56104,60.0,PHY335092,,,NaT,...,1,1,0,0,0,550,970,82,1,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
439941,BENE68528,CLM287109,2009-04-05,2009-04-05,PRV56997,30.0,PHY375624,PHY375624,PHY375624,NaT,...,0,0,0,0,0,90,0,67,1,0
337217,BENE41081,CLM533111,2009-08-19,2009-08-19,PRV51459,200.0,PHY341578,,PHY341578,NaT,...,0,0,0,0,0,1410,620,91,1,1
159802,BENE139245,CLM112045,2008-12-29,2009-01-18,PRV54333,300.0,PHY403255,,PHY344406,NaT,...,1,1,1,11000,1068,1320,250,83,1,1
493035,BENE82615,CLM346348,2009-05-07,2009-05-07,PRV54837,80.0,PHY384055,,PHY350997,NaT,...,0,1,0,0,0,730,330,96,1,1


In [9]:
physician_cols = [col for col in df_test.columns if "Physician" in col]
print(physician_cols)



['AttendingPhysician', 'OperatingPhysician', 'OtherPhysician']


In [14]:
print(list(df_test.columns))


['BeneID', 'ClaimID', 'ClaimStartDt', 'ClaimEndDt', 'Provider', 'InscClaimAmtReimbursed', 'AttendingPhysician', 'OperatingPhysician', 'OtherPhysician', 'AdmissionDt', 'ClmAdmitDiagnosisCode', 'DeductibleAmtPaid', 'DischargeDt', 'DiagnosisGroupCode', 'ClmDiagnosisCode_1', 'ClmDiagnosisCode_2', 'ClmDiagnosisCode_3', 'ClmDiagnosisCode_4', 'ClmDiagnosisCode_5', 'ClmDiagnosisCode_6', 'ClmDiagnosisCode_7', 'ClmDiagnosisCode_8', 'ClmDiagnosisCode_9', 'ClmDiagnosisCode_10', 'ClmProcedureCode_1', 'ClmProcedureCode_2', 'ClmProcedureCode_3', 'ClmProcedureCode_4', 'ClmProcedureCode_5', 'ClmProcedureCode_6', 'ClaimDuration', 'HospitalDuration', 'DOB', 'DOD', 'Gender', 'Race', 'RenalDiseaseIndicator', 'State', 'County', 'ChronicCond_Alzheimer', 'ChronicCond_Heartfailure', 'ChronicCond_KidneyDisease', 'ChronicCond_Cancer', 'ChronicCond_ObstrPulmonary', 'ChronicCond_Depression', 'ChronicCond_Diabetes', 'ChronicCond_IschemicHeart', 'ChronicCond_Osteoporasis', 'ChronicCond_rheumatoidarthritis', 'Chronic

In [10]:
# 1. Replacing NANs in all Physician Columns by Zero
cols_to_fill = ['AttendingPhysician', 'OperatingPhysician', 'OtherPhysician']
df_test[cols_to_fill] = df_test[cols_to_fill].fillna(0)
df_train[cols_to_fill] = df_train[cols_to_fill].fillna(0)


In [11]:
# 2. Sum of the Beneficiary Age for every Provider
bene_age_sum_per_prv = df_test.groupby("Provider")["Bene_Age"].sum().reset_index()
bene_age_sum_per_prv = bene_age_sum_per_prv.rename(columns={"Bene_Age": "Bene_Age_Sum"})


In [12]:
bene_age_sum_per_prv

Unnamed: 0_level_0,Provider,Bene_Age_Sum
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1
,string,int64
,...,...


In [13]:
# 3. Number of Total Claims per Provider. The original Idea was to identify the Total Number of false Claims by a Provider. For that he subtract the number of fradulent claims from the number of total claims
total_claims_per_prv = df_test.groupby("Provider")["ClaimID"].count().reset_index()
total_claims_per_prv.columns = ["Provider", "TotalClaims"]
total_claims_per_prv

Unnamed: 0_level_0,Provider,TotalClaims
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1
,string,int64
,...,...


In [14]:
# 4. Same with Physican
total_claims_per_Attphysician = df_test.groupby("AttendingPhysician")["ClaimID"].count().reset_index()
total_claims_per_Attphysician.columns = ["AttendingPhysician", "TotalClaims"]
total_claims_per_Attphysician
# 5. Atenting_Physican_Total_Claims, Operating_Physician_total claims, Other_Physician_total_claims, Attending_Operating_Physican_total_claims
total_claims_per_OPphysician = df_test.groupby(["OperatingPhysician"])["ClaimID"].count().reset_index()
total_claims_per_OPphysician.columns = ["OperatingPhysician", "TotalClaims"]
total_claims_per_other_Otphysician = df_test.groupby(["OtherPhysician"])["ClaimID"].count().reset_index()
total_claims_per_other_Otphysician.columns = ["OtherPhysician", "TotalClaims"]

In [15]:
# 6. Prv_Att_Phy_Total_Claims, Prv_Op_Phy_Total_Claims etc. Same like above
total_claims_per_prv_Attphysician = df_test.groupby(["Provider", "AttendingPhysician"])["ClaimID"].count().reset_index()
total_claims_per_prv_Attphysician.columns = ["Provider", "AttendingPhysician", "TotalClaims"]
# 7. Total Claims per Provider and Operating Physician
total_claims_per_prv_OPphysician = df_test.groupby(["Provider", "OperatingPhysician"])["ClaimID"].count().reset_index()
total_claims_per_prv_OPphysician.columns = ["Provider", "OperatingPhysician", "TotalClaims"]
# 8. Total Claims per Provider and Other Physician
total_claims_per_prv_Otphysician = df_test.groupby(["Provider", "OtherPhysician"])["ClaimID"].count().reset_index()
total_claims_per_prv_Otphysician.columns = ["Provider", "OtherPhysician", "TotalClaims"]

In [16]:
# 7. Prv_Physician_Count
def count_unique_physicians(df, physician_col):
    """
    Count unique physicians for each provider.
    If multiple columns are provided, all unique physician IDs across them are counted.
    Works with Dask DataFrames.
    """
    if isinstance(physician_col, list):
        # Combine provider with all physician columns, then reshape and deduplicate
        dfs = []
        for col in physician_col:
            temp = df[["Provider", col]].rename(columns={col: "Physician"}).dropna()
            dfs.append(temp)
        
        combined = dd.concat(dfs)
        unique_counts = (
            combined.dropna()
            .drop_duplicates()
            .groupby("Provider")["Physician"]
            .nunique()
            .reset_index()
        )
        unique_counts = unique_counts.rename(columns={"Physician": "Prv_Physician_Count"})

    else:
        unique_counts = (
            df.groupby("Provider")[physician_col]
            .nunique()
            .reset_index()
            .rename(columns={physician_col: f"{physician_col}_Count"})
        )

    return unique_counts


In [17]:
pr_Attphysician_count = count_unique_physicians(df_test, ["AttendingPhysician"])
pr_OPphysician_count = count_unique_physicians(df_test, "OperatingPhysician")
pr_Otphysician_count = count_unique_physicians(df_test, "OtherPhysician")
pr_allphysician_count = count_unique_physicians(df_test, ["AttendingPhysician", "OperatingPhysician", "OtherPhysician"])


In [18]:
pr_allphysician_count

Unnamed: 0_level_0,Provider,Prv_Physician_Count
npartitions=3,Unnamed: 1_level_1,Unnamed: 2_level_1
,string,int64
,...,...
,...,...
,...,...


In [None]:
# 8. Is the Date of the Claim identical to the Date of the Admission to the Hospital?

In [None]:
# 9. Faked Code, connected to ICD-9 or 10 

In [19]:
# 10. Provider_Insurance_Clam_Reimbursement_Amt
def calculate_provider_insurance_reimbursement(df):
    """
    Calculate the total insurance reimbursement amount per provider.
    """
    return df.groupby("Provider")["InscClaimAmtReimbursed"].sum().reset_index().rename(
        columns={"InscClaimAmtReimbursed": "Provider_Insurance_Claim_Reimbursement_Amt"}
    )
provider_insurance_reimbursement = calculate_provider_insurance_reimbursement(df_test)
provider_insurance_reimbursement

Unnamed: 0_level_0,Provider,Provider_Insurance_Claim_Reimbursement_Amt
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1
,string,float64
,...,...


In [20]:
# 11. Provider_Total_Patients
def calculate_provider_total_patients(df):
    """
    Calculate the total number of unique patients per provider.
    """
    return df.groupby("Provider")["BeneID"].nunique().reset_index().rename(
        columns={"BeneID": "Provider_Total_Patients"}
    )
provider_total_patients = calculate_provider_total_patients(df_test)
provider_total_patients

Unnamed: 0_level_0,Provider,Provider_Total_Patients
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1
,string,int64
,...,...


In [21]:
# 12. Provider_Total_Chronic_Alzheimer Patients

def calculate_provider_total_chronic_patients(df, chronic_cols):
    """
    Calculates the total number of patients per provider for each chronic condition.

    Parameters:
        df (Dask or Pandas DataFrame): Input beneficiary DataFrame
        chronic_cols (list of str): List of chronic condition columns (values should be 0 or 1)

    Returns:
        DataFrame with one row per provider and total counts of each chronic condition.
    """
    # Check if all columns exist
    missing = [col for col in chronic_cols if col not in df.columns]
    if missing:
        raise ValueError(f"The following columns are missing: {missing}")
    
    # Group and sum per provider
    agg_df = df.groupby("Provider")[chronic_cols].sum().reset_index()

    # Rename columns
    agg_df = agg_df.rename(columns={col: f"Provider_Total_{col}_Patients" for col in chronic_cols})

    return agg_df
chronic_cols = [
    "ChronicCond_Alzheimer",
    "ChronicCond_Heartfailure",
    "ChronicCond_KidneyDisease",
    "ChronicCond_Cancer",
    "ChronicCond_ObstrPulmonary",
    "ChronicCond_Depression",
    "ChronicCond_Diabetes",
    "ChronicCond_IschemicHeart",
    "ChronicCond_Osteoporasis",
    "ChronicCond_rheumatoidarthritis",
    "ChronicCond_stroke"
]

provider_total_chronic_patients = calculate_provider_total_chronic_patients(df_test, chronic_cols)
provider_total_chronic_patients


Unnamed: 0_level_0,Provider,Provider_Total_ChronicCond_Alzheimer_Patients,Provider_Total_ChronicCond_Heartfailure_Patients,Provider_Total_ChronicCond_KidneyDisease_Patients,Provider_Total_ChronicCond_Cancer_Patients,Provider_Total_ChronicCond_ObstrPulmonary_Patients,Provider_Total_ChronicCond_Depression_Patients,Provider_Total_ChronicCond_Diabetes_Patients,Provider_Total_ChronicCond_IschemicHeart_Patients,Provider_Total_ChronicCond_Osteoporasis_Patients,Provider_Total_ChronicCond_rheumatoidarthritis_Patients,Provider_Total_ChronicCond_stroke_Patients
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
,string,int64,int64,int64,int64,int64,int64,int64,int64,int64,int64,int64
,...,...,...,...,...,...,...,...,...,...,...,...


In [23]:
# 14. count of diagnosis for every Provider
def count_diagnosis_per_provider(df, diagnosis_cols):
    """
    Count occurrences of each diagnosis code per provider.
    """
    counts = {}
    for col in diagnosis_cols:
        counts[col] = df.groupby("Provider")[col].count().reset_index().rename(columns={col: f"{col}_Count"})
    
    return counts
diagnosis_cols = [
    "ClmAdmitDiagnosisCode",
    "ClmDiagnosisCode_1",
    "ClmDiagnosisCode_2",
    "ClmDiagnosisCode_3",
    "ClmDiagnosisCode_4",
    "ClmDiagnosisCode_5",
    "ClmDiagnosisCode_6",
    "ClmDiagnosisCode_7",
    "ClmDiagnosisCode_8",
    "ClmDiagnosisCode_9",
    "ClmDiagnosisCode_10"
]
diagnosis_counts = count_diagnosis_per_provider(df_test, diagnosis_cols)



In [24]:
diagnosis_counts

{'ClmAdmitDiagnosisCode': Dask DataFrame Structure:
               Provider ClmAdmitDiagnosisCode_Count
 npartitions=1                                     
                 string                       int64
                    ...                         ...
 Dask Name: operation, 23 expressions
 Expr=RenameFrame(frame=ResetIndex(frame=Count(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=ReadCSV(f9034f9))))))[['Provider', 'ClmAdmitDiagnosisCode']], observed=False, _slice='ClmAdmitDiagnosisCode')), columns={'ClmAdmitDiagnosisCode': 'ClmAdmitDiagnosisCode_Count'}),
 'ClmDiagnosisCode_1': Dask DataFrame Structure:
               Provider ClmDiagnosisCode_1_Count
 npartitions=1                                  
                 string                    int64
                    ...                      ...
 Dask Name: operation, 23 expressions
 Expr=RenameFrame(frame=ResetIndex(frame=Count(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Read

In [None]:
# 15. Average of Claims for every Provider

In [None]:
# 16. Average of Claimcost for every Provider  

In [None]:
# 17. Median of the Claimscost for every Provider

In [None]:
# 18. Most frequent Claimcodes for every Provider

In [None]:
# 19. Most frequent ClaimDiagnosis Code 1-10 for every Provider

In [None]:
# 20. Most frequent Physician appears in every Provider