In [1]:
# Import Libraries and Setup
import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
from dask_ml.preprocessing import StandardScaler
from dask_ml.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from dask_ml.wrappers import ParallelPostFit
import time
from dask.diagnostics import ProgressBar

In [2]:
# Start Dask client
cluster = LocalCluster(n_workers=4,threads_per_worker=2)
client = Client(cluster)
print(f"Dask dashboard available at: {client.dashboard_link}")

# 📌 Load CSV Data (Replaces Synthetic Data)
file_path = "ai4i2020.csv"
df = pd.read_csv(file_path)

# Drop unnecessary columns (UDI and Product ID are unique identifiers)
df.drop(columns=["UDI", "Product ID"], inplace=True)


Dask dashboard available at: http://127.0.0.1:8787/status


In [3]:
# Rename columns to simpler names for ease of use
df.rename(columns={
    "Air temperature [K]": "air_temp",
    "Process temperature [K]": "process_temp",
    "Rotational speed [rpm]": "rpm",
    "Torque [Nm]": "torque",
    "Tool wear [min]": "tool_wear",
    "Machine failure": "failure_risk"
}, inplace=True)
# Convert to Dask DataFrame
ddf = dd.from_pandas(df, npartitions=8)

# Step 4: Preprocess Data
def preprocess_data(df):
    df['temp_diff'] = df['process_temp'] - df['air_temp']  # Process Temp vs Air Temp
    df['torque_rpm_ratio'] = df['torque'] / df['rpm']  # Torque-to-RPM ratio
    return df

In [4]:
# Non-Dask Preprocessing
start_time = time.time()
processed_data_non_dask = preprocess_data(df)
end_time = time.time()
non_dask_time = end_time - start_time
print(f"Non-Dask Preprocessing Time: {non_dask_time:.3f} seconds")

# Dask Preprocessing
start_time = time.time()
ddf_processed = ddf.map_partitions(preprocess_data)
ddf_processed = ddf_processed.persist()
end_time = time.time()
dask_time = end_time - start_time
print(f"Dask Preprocessing Time: {dask_time:.3f} seconds")
# Prepare Features and Scale
features = ["air_temp", "process_temp", "rpm", "torque", "tool_wear", "temp_diff", "torque_rpm_ratio"]
target = "failure_risk"

Non-Dask Preprocessing Time: 0.003 seconds
Dask Preprocessing Time: 2.484 seconds


In [5]:
# Non-Dask Scaling
start_time = time.time()
X_non_dask = processed_data_non_dask[features]
y_non_dask = processed_data_non_dask[target]
scaler_non_dask = StandardScaler()
X_scaled_non_dask = scaler_non_dask.fit_transform(X_non_dask)
end_time = time.time()
non_dask_scaling_time = end_time - start_time
print(f"Non-Dask Scaling Time: {non_dask_scaling_time:.3f} seconds")

# Dask Scaling
start_time = time.time()
X_dask = ddf_processed[features]
y_dask = ddf_processed[target]
scaler_dask = StandardScaler()
X_scaled_dask = scaler_dask.fit_transform(X_dask)
X_scaled_dask = X_scaled_dask.persist()  
end_time = time.time()
dask_scaling_time = end_time - start_time
print(f"Dask Scaling Time: {dask_scaling_time:.3f} seconds")



Non-Dask Scaling Time: 25.116 seconds
Dask Scaling Time: 0.087 seconds




In [6]:

# Train Model
rf = RandomForestClassifier(n_estimators=100, random_state=42)
model = ParallelPostFit(estimator=rf)

# Non-Dask Model Training
start_time = time.time()
X_train_non_dask, X_test_non_dask, y_train_non_dask, y_test_non_dask = train_test_split(
    X_scaled_non_dask, y_non_dask, test_size=0.2, random_state=42, shuffle=True
)
model.fit(X_train_non_dask, y_train_non_dask)
end_time = time.time()
non_dask_training_time = end_time - start_time
print(f"Non-Dask Model Training Time: {non_dask_training_time:.3f} seconds")

# Dask Model Training
start_time = time.time()
X_train_dask, X_test_dask, y_train_dask, y_test_dask = train_test_split(
    X_scaled_dask, y_dask, test_size=0.2, random_state=42,shuffle=True
)
with ProgressBar():
    model.fit(X_train_dask.compute(), y_train_dask.compute())
end_time = time.time()
dask_training_time = end_time - start_time
print(f"Dask Model Training Time: {dask_training_time:.3f} seconds")


Non-Dask Model Training Time: 108.112 seconds
Dask Model Training Time: 34.898 seconds


In [7]:
# Evaluate Model
# Non-Dask Prediction
start_time = time.time()
y_pred_non_dask = model.predict(X_test_non_dask)
end_time = time.time()
non_dask_prediction_time = end_time - start_time
print(f"Non-Dask Prediction Time: {non_dask_prediction_time:.3f} seconds")

# Dask Prediction
start_time = time.time()
y_pred_dask = model.predict(X_test_dask.compute())
end_time = time.time()
dask_prediction_time = end_time - start_time
print(f"Dask Prediction Time: {dask_prediction_time:.3f} seconds")

Non-Dask Prediction Time: 7.549 seconds
Dask Prediction Time: 0.051 seconds


In [8]:
client.close()
cluster.close()



In [9]:
total_no_dask_time =  non_dask_time + non_dask_scaling_time + non_dask_training_time + non_dask_prediction_time
total_dask_time = dask_time + dask_scaling_time + dask_training_time + dask_prediction_time

print("The total time without parallel processing : ",total_no_dask_time)
print("The total time with parallel processing : ",total_dask_time)

The total time without parallel processing :  140.77893042564392
The total time with parallel processing :  37.52039980888367
