In [1]:
!pip install dask_ml



In [2]:
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
pbar = ProgressBar(); pbar.register()

0. Initial Setup: Explore & Persist
Before transformations, persist the DataFrame:

In [3]:
dic = {
    'Provider': 'category',
    'Race_mode': 'category',
    'State_mode': 'category',
    'County_mode': 'category',
    'PotentialFraud': 'category',
    'Bene_Age_Sum': 'float64',
    'TotalClaims': 'float64',
    'AttendingPhysician_TotalClaims': 'float64',
    'Prv_Physician_Count': 'float64',
    'Provider_Insurance_Claim_Reimbursement_Amt': 'float64',
    'Provider_Total_Patients': 'float64',
    'Provider_Total_ChronicCond_Alzheimer_Patients': 'float64',
    'Provider_Total_ChronicCond_Heartfailure_Patients': 'float64',
    'Provider_Total_ChronicCond_KidneyDisease_Patients': 'float64',
    'Provider_Total_ChronicCond_Cancer_Patients': 'float64',
    'Provider_Total_ChronicCond_ObstrPulmonary_Patients': 'float64',
    'Provider_Total_ChronicCond_Depression_Patients': 'float64',
    'Provider_Total_ChronicCond_Diabetes_Patients': 'float64',
    'Provider_Total_ChronicCond_IschemicHeart_Patients': 'float64',
    'Provider_Total_ChronicCond_Osteoporasis_Patients': 'float64',
    'Provider_Total_ChronicCond_rheumatoidarthritis_Patients': 'float64',
    'Provider_Total_ChronicCond_stroke_Patients': 'float64',
    'ClmAdmitDiagnosisCode_Count': 'float64',
    'ClmDiagnosisCode_1_Count': 'float64',
    'ClmDiagnosisCode_2_Count': 'float64',
    'ClmDiagnosisCode_3_Count': 'float64',
    'ClmAdmitDiagnosisCode_Most_Frequent': 'category',
    'ClmDiagnosisCode_1_Most_Frequent': 'category',
    'ClmDiagnosisCode_2_Most_Frequent': 'category',
    'ClmDiagnosisCode_3_Most_Frequent': 'category',
    'AttendingPhysician_Most_Frequent': 'category',
    'OperatingPhysician_Most_Frequent': 'category',
    'OtherPhysician_Most_Frequent': 'category',
    'Avg_allocated_Amount_Per_Provider': 'float64',
    'Avg_Deductible_Amt_Paid_Per_Provider': 'float64',
    'Avg_InscClaimAmtReimbursed_Per_Provider': 'float64',
    'perc_allocated_used': 'float64',
    'prv_avg_claims': 'float64',
    'prv_avg_claim_cost_indicator': 'float64',
    'prv_avg_claims_indicator': 'float64'
}


In [4]:
merged_path = "s3://medicare-fraud-data-25-05-2025/merged_ready/train/*.csv"
df_train = dd.read_csv(merged_path,dtype=dic, assume_missing=True)


In [5]:
df_train = df_train.persist()

[########################################] | 100% Completed | 420.06 ms


In [6]:
dtypes_long = df_train.dtypes.reset_index()
dtypes_long.columns = ['column_name', 'dtype']

# Ganze Tabelle anzeigen, unabhängig von Länge
print(dtypes_long.to_string(index=False))

                                            column_name    dtype
                                               Provider category
                                      ClaimDuration_sum  float64
                                     ClaimDuration_mean  float64
                                      ClaimDuration_std  float64
                                      ClaimDuration_max  float64
                                      ClaimDuration_min  float64
                                   HospitalDuration_sum  float64
                                  HospitalDuration_mean  float64
                                   HospitalDuration_std  float64
                                   HospitalDuration_max  float64
                                   HospitalDuration_min  float64
                                  DeductibleAmtPaid_sum  float64
                                 DeductibleAmtPaid_mean  float64
                                  DeductibleAmtPaid_std  float64
                         

In [7]:

print(df_train.shape)         # returns a (delayed) tuple, e.g. (n_rows, n_cols)
print(df_train.head(3)) 

(<dask_expr.expr.Scalar: expr=FromGraph(d7267b5).size() // 95, dtype=int64>, 95)
[                                        ] | 0% Completed | 154.91 us

[########################################] | 100% Completed | 108.11 ms
   Provider  ClaimDuration_sum  ClaimDuration_mean  ClaimDuration_std  \
0  PRV52145              335.0            1.367347           4.708267   
1  PRV55104               72.0            1.384615           4.822959   
2  PRV54894             1024.0            4.471616           6.659338   

   ClaimDuration_max  ClaimDuration_min  HospitalDuration_sum  \
0               20.0                0.0                   0.0   
1               20.0                0.0                   0.0   
2               35.0                0.0                 918.0   

   HospitalDuration_mean  HospitalDuration_std  HospitalDuration_max  ...  \
0                    NaN                   NaN                   NaN  ...   
1                    NaN                   NaN                   NaN  ...   
2               6.652174              7.147714                  35.0  ...   

   AttendingPhysician_Most_Frequent  OperatingPhysician_Most_Freq

In [29]:
missing = df_train.isnull().sum().compute()
print(missing[missing > 0])


Series([], dtype: int64)


In [30]:
import dask.dataframe as dd
import pandas as pd
import numpy as np
from dask_ml.preprocessing import StandardScaler

class MedicarePreprocessor:
    def __init__(self, df: dd.DataFrame):
        self.df = df

        self.cat_cols = [
            "Provider",
            "Race_mode",
            "State_mode",
            "County_mode",
            "PotentialFraud",
            "ClmAdmitDiagnosisCode_Most_Frequent",
            "ClmDiagnosisCode_1_Most_Frequent",
            "ClmDiagnosisCode_2_Most_Frequent",
            "ClmDiagnosisCode_3_Most_Frequent",
            "AttendingPhysician_Most_Frequent",
            "OperatingPhysician_Most_Frequent",
            "OtherPhysician_Most_Frequent"
        ]

        self.num_cols = [
            col
            for col, dtype in df.dtypes.items()
            if (dtype == "float64" and col != "PotentialFraud")
        ]

        self.target_col = "PotentialFraud"

    def drop_unused_columns(self, drop_cols: list = None):
        if drop_cols is None:
            drop_cols = [
                "ClmAdmitDiagnosisCode_Most_Frequent",
                "ClmDiagnosisCode_2_Most_Frequent",
                "ClmDiagnosisCode_3_Most_Frequent",
                "AttendingPhysician_Most_Frequent"
            ]
        to_drop = [c for c in drop_cols if c in self.df.columns]
        if to_drop:
            self.df = self.df.drop(columns=to_drop)
        self.cat_cols = [c for c in self.cat_cols if c not in to_drop]
        return self

    def fill_missing(self):
        # 1) Compute all numeric means in one shot
        means = self.df[self.num_cols].mean().compute()  # pandas.Series of means
        means_dict = {col: float(means[col]) for col in self.num_cols}

        # 2) Cast cat columns to string, then fill all missing with "MISSING"
        for col in self.cat_cols:
            self.df[col] = self.df[col].astype("string")

        fill_dict = means_dict.copy()
        for col in self.cat_cols:
            fill_dict[col] = "MISSING"

        # 3) One fillna call for everything:
        self.df = self.df.fillna(fill_dict)
        return self

    def encode_categoricals(self):
        # 1) Cast all cat columns to category dtype
        for col in self.cat_cols:
            self.df[col] = self.df[col].astype("category")

        # 2) One global categorize/shuffle
        self.df = self.df.categorize(columns=self.cat_cols)

        # 3) One pass to convert each cat to codes
        def _apply_cat_codes(pdf):
            pdf2 = pdf.copy()
            for c in self.cat_cols:
                pdf2[c] = pdf2[c].cat.codes.astype("int64")
            return pdf2

        self.df = self.df.map_partitions(_apply_cat_codes)
        return self

    def clip_outliers(self, lower_quantile=0.01, upper_quantile=0.99):
        for col in self.num_cols:
            q_low = self.df[col].quantile(lower_quantile).compute()
            q_high = self.df[col].quantile(upper_quantile).compute()
            self.df[col] = self.df[col].clip(lower=q_low, upper=q_high)
        return self

    def feature_engineering(self):
        def _safe_divide_avg_cost(pdf):
            num = pdf["Provider_Insurance_Claim_Reimbursement_Amt"]
            den = pdf["TotalClaims"]
            return pd.Series(
                np.where(den == 0, 0.0, num / den),
                index=pdf.index,
                name="avg_cost_per_claim"
            )

        self.df["avg_cost_per_claim"] = self.df.map_partitions(_safe_divide_avg_cost)
        self.num_cols.append("avg_cost_per_claim")

        def _pct_chronic_alz(pdf):
            num = pdf["Provider_Total_ChronicCond_Alzheimer_Patients"]
            den = pdf["Provider_Total_Patients"]
            return pd.Series(
                np.where(den == 0, 0.0, num / den),
                index=pdf.index,
                name="perc_chronic_alz"
            )

        self.df["perc_chronic_alz"] = self.df.map_partitions(_pct_chronic_alz)
        self.num_cols.append("perc_chronic_alz")

        return self

    def scale_numeric_features(self):
        exclude = {self.target_col, "Provider"}
        to_scale = [c for c in self.num_cols if c not in exclude]

        if to_scale:
            scaler = StandardScaler()
            scaled_df = scaler.fit_transform(self.df[to_scale])
            self.df = dd.concat(
                [self.df.drop(columns=to_scale), scaled_df],
                axis=1
            )
        return self

    def get_processed_df(self) -> dd.DataFrame:
        self.df = self.df.persist()
        return self.df


In [31]:
# 2.3) Instantiate the preprocessor
preprocessor = MedicarePreprocessor(df_train)

# 2.4) Chain the preprocessing steps in order
df_train_processed = (
    preprocessor
      .drop_unused_columns()      # drop columns you don’t plan to use
      .fill_missing()             # impute numeric → mean, category → mode
      .encode_categoricals()      # convert each category to integer codes
      .feature_engineering()      # OPTIONAL: add new ratio features
      .scale_numeric_features()   # standard‐scale every numeric column
      .get_processed_df()         # persist & retrieve final Dask DataFrame
)


KeyError: 'ClmAdmitDiagnosisCode_Most_Frequent'

In [11]:
print(df_train_processed.dtypes)
print(df_train_processed.head(3))


Provider                          int64
Race_mode                         int64
State_mode                        int64
County_mode                       int64
PotentialFraud                    int64
                                 ...   
prv_avg_claims                  float64
prv_avg_claim_cost_indicator    float64
prv_avg_claims_indicator        float64
avg_cost_per_claim              float64
perc_chronic_alz                float64
Length: 93, dtype: object
[########################################] | 100% Completed | 102.01 ms
   Provider  Race_mode  State_mode  County_mode  PotentialFraud  \
0       914          0           1            9               0   
1      3273          0          26           16               0   
2      3112          0          26           58               1   

   ClmDiagnosisCode_1_Most_Frequent  OperatingPhysician_Most_Frequent  \
0                               277                                 0   
1                               277            

In [12]:
def save_dask_to_s3(df, path, file_format="csv", single_file=False, index=False):
    """
    Save a Dask DataFrame to S3 in CSV or Parquet format.
    
    Parameters:
        df (dask.DataFrame): The Dask DataFrame to save
        path (str): S3 path (e.g. s3://bucket/folder/)
        file_format (str): 'csv' or 'parquet'
        single_file (bool): Save as single file (only for small data)
        index (bool): Whether to save the index
    """
    if file_format == "csv":
        if single_file:
            df.compute().to_csv(path, index=index)
        else:
            df.to_csv(path + "part-*.csv", index=index)
    elif file_format == "parquet":
        df.to_parquet(path, write_index=index)
    else:
        raise ValueError("Unsupported file_format: choose 'csv' or 'parquet'")


In [13]:
prosessed_path = "s3://medicare-fraud-data-25-05-2025/processed/"
# Save the merged DataFrames to S3
save_dask_to_s3(df_train_processed, prosessed_path + "train/")
#save_dask_to_s3(df_train_processed, prosessed_path + "test/")

[                                        ] | 0% Completed | 215.26 us

[########################################] | 100% Completed | 1.21 sms


In [28]:
df_train_processed.head(3)

Unnamed: 0,Provider,Race_mode,State_mode,County_mode,PotentialFraud,ClmDiagnosisCode_1_Most_Frequent,OperatingPhysician_Most_Frequent,OtherPhysician_Most_Frequent,ClaimDuration_sum,ClaimDuration_mean,...,ClmDiagnosisCode_3_Count,Avg_allocated_Amount_Per_Provider,Avg_Deductible_Amt_Paid_Per_Provider,Avg_InscClaimAmtReimbursed_Per_Provider,perc_allocated_used,prv_avg_claims,prv_avg_claim_cost_indicator,prv_avg_claims_indicator,avg_cost_per_claim,perc_chronic_alz
0,914,0,1,9,0,277,0,0,0.364006,-0.31428,...,0.509379,-0.477602,-0.375682,-0.42312,0.132287,-0.539095,-0.156164,-0.539095,-0.428244,-0.487604
1,3273,0,26,16,0,277,0,0,-0.246895,-0.305887,...,-0.177849,0.042061,0.065578,-0.133897,-0.713648,-0.366773,-0.327462,-0.366773,-0.417469,0.2292
2,3112,0,26,58,1,11,0,0,1.964424,1.194455,...,1.116283,1.41015,1.397284,1.513495,0.589595,-0.431634,6.155988,-0.431634,1.702587,0.036966


In [27]:
import fsspec

fs = fsspec.filesystem("s3")
fs.ls("medicare-fraud-data-25-05-2025/processed/train/")

['medicare-fraud-data-25-05-2025/processed/train/part-0.csv',
 'medicare-fraud-data-25-05-2025/processed/train/part-1.csv',
 'medicare-fraud-data-25-05-2025/processed/train/part-2.csv']

In [14]:

# 1.1) Separate X and y
X = df_train_processed.drop(columns=["Provider", "PotentialFraud"])
y = df_train_processed["PotentialFraud"]

# 1.2) Optional: Persist X and y separately for downstream reuse
X = X.persist()
y = y.persist()


[########################################] | 100% Completed | 106.59 ms
[########################################] | 100% Completed | 103.51 ms


In [15]:
import dask.dataframe as dd

# 1) Suppose df_train_processed has columns ["Provider", …, "PotentialFraud"].
#    We’ll split it into two subsets: non-fraud (0) and fraud (1).

df_zero = df_train_processed[df_train_processed.PotentialFraud == 0]
df_one  = df_train_processed[df_train_processed.PotentialFraud == 1]

# 2) For each subset, call `.random_split([0.8, 0.2], random_state=42)` 
#    to get an 80/20 split within that class.

#    Note: random_split returns a list [train_part, test_part].
train_zero, test_zero = df_zero.random_split([0.8, 0.2], random_state=42)
train_one,  test_one  = df_one.random_split([0.8, 0.2], random_state=42)

# 3) Concatenate the “zero” and “one” pieces to form overall train/test sets:
df_train = dd.concat([train_zero, train_one])
df_test  = dd.concat([test_zero,  test_one])

# 4) (Optional) Shuffle each combined DataFrame so the 0/1 examples are interleaved
df_train = df_train.shuffle(on="Provider", random_state=42)
df_test  = df_test.shuffle(on="Provider", random_state=42)

# 5) Finally, separate X_train/y_train and X_test/y_test:
X_train = df_train.drop(columns=["Provider", "PotentialFraud"])
y_train = df_train["PotentialFraud"]

X_test  = df_test.drop(columns=["Provider", "PotentialFraud"])
y_test  = df_test["PotentialFraud"]

# 6) Persist each:
X_train = X_train.persist()
y_train = y_train.persist()
X_test  = X_test.persist()
y_test  = y_test.persist()


[########################################] | 100% Completed | 205.57 ms
[########################################] | 100% Completed | 205.55 ms
[########################################] | 100% Completed | 207.77 ms
[########################################] | 100% Completed | 205.97 ms


In [16]:
from dask.distributed import Client

# If you already have a scheduler address, you can pass it here. 
# Otherwise, this will start a local cluster with several threads/processes.
client = Client()


Perhaps you already have a cluster running?
Hosting the HTTP server on port 34649 instead




In [17]:
pip install xgboost[dask]


Note: you may need to restart the kernel to use updated packages.


In [18]:
import xgboost as xgb

# 3.2.1) Create DaskDMatrix for train and test
dtrain = xgb.dask.DaskDMatrix(client, X_train, y_train)
dtest  = xgb.dask.DaskDMatrix(client, X_test, y_test)


In [19]:
# Count negatives and positives in the training set
neg_count = (y_train == 0).sum().compute()
pos_count = (y_train == 1).sum().compute()

scale_pos_weight = neg_count / pos_count


In [20]:
params = {
    "objective": "binary:logistic",
    "eval_metric": "auc",            # area under ROC
    "tree_method": "hist",           # “hist” is faster on large data
    "scale_pos_weight": scale_pos_weight,
    "learning_rate": 0.1,            # you can tune this later
    "max_depth": 5,                  # also tunable
    "subsample": 0.8,
    "colsample_bytree": 0.8
}
num_boost_round = 100


In [21]:
output = xgb.dask.train(
    client,
    params,
    dtrain,
    num_boost_round=num_boost_round,
    evals=[(dtest, "validation")],
    verbose_eval=False
)
bst = output["booster"]   # the trained Booster object
history = output["history"]  # dictionary of evaluation metrics


[16:33:43] Task [xgboost.dask-0]:tcp://127.0.0.1:36239 got rank 0
[16:33:43] Task [xgboost.dask-1]:tcp://127.0.0.1:39019 got rank 1


In [None]:
import optuna
from xgboost import dask as dxgb



