<a href="https://colab.research.google.com/github/SeanMuInCa/learn_python/blob/master/groupassignment2025retry2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Step 1: Data Preprocessing

import pandas as pd
import numpy as np

# Load the dataset (ensure 'train.csv' is in the working directory)
df = pd.read_csv('train.csv')

# Print the initial shape of the dataset
print("Initial shape of train data:", df.shape)

# Drop any rows with missing values
df = df.dropna()
print("Shape after dropping missing values:", df.shape)

# Check if any force_meas values are non-positive
if (df['force_meas'] <= 0).any():
    raise ValueError("There are non-positive values in force_meas!")
else:
    print("All force_meas values are positive.")

# Display basic information and the first 5 rows of the dataset
print("Dataset info:")
print(df.info())
print("First 5 rows:")
print(df.head())


Initial shape of train data: (64000, 35)
Shape after dropping missing values: (64000, 35)
All force_meas values are positive.
Dataset info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 64000 entries, 0 to 63999
Data columns (total 35 columns):
 #   Column                Non-Null Count  Dtype  
---  ------                --------------  -----  
 0   tappingsteelgrade     64000 non-null  object 
 1   force_meas            64000 non-null  float64
 2   speed                 64000 non-null  float64
 3   entrytemperature      64000 non-null  float64
 4   entrytemperaturebot   64000 non-null  float64
 5   entrytemperaturecore  64000 non-null  float64
 6   entrytemperaturetop   64000 non-null  float64
 7   entrythickness        64000 non-null  float64
 8   entrywidth            64000 non-null  float64
 9   exitthickness         64000 non-null  float64
 10  zeropoint             64000 non-null  float64
 11  radius                64000 non-null  float64
 12  pctal                 64000 non-

In [2]:
# Select features by dropping the target column 'force_meas'
features = df.drop(columns=['force_meas'])
target = df['force_meas']

# Convert the categorical column 'tappingsteelgrade' using one-hot encoding
if 'tappingsteelgrade' in features.columns:
    features = pd.get_dummies(features, columns=['tappingsteelgrade'], drop_first=True)

print("Features shape after encoding:", features.shape)


from sklearn.preprocessing import StandardScaler

# Scale the features using StandardScaler
scaler = StandardScaler()
X_scaled = scaler.fit_transform(features)

print("Feature scaling completed. Sample of scaled features:")
print(X_scaled[:5])


Features shape after encoding: (64000, 56)
Feature scaling completed. Sample of scaled features:
[[-1.20850089e+00  1.62955293e+00  1.10845858e+00  1.76774467e+00
   9.94195846e-01  1.46945502e+00  3.67688850e-01  1.29712105e+00
  -2.87615939e-01  1.71820834e+00 -8.44419107e-01 -1.95625464e+00
   1.27155016e+00  2.70135712e-01  0.00000000e+00 -5.03154244e-01
  -4.02691354e-01  3.55943097e-01  0.00000000e+00 -2.04975855e+00
  -8.97415370e-01 -8.22165014e-01 -1.19848532e+00 -3.88496414e-01
  -1.45744129e+00  1.43831684e+00  1.67807224e+00 -1.49884912e+00
  -7.10231470e-01 -9.64868769e-01 -1.07867137e+00 -1.65212260e+00
   4.41799983e-01 -4.35224796e-02  3.13835888e+00 -3.05675752e-01
  -6.28734061e-02 -1.60640647e-01 -4.24276982e-02 -1.42536405e-02
  -1.26881108e-01 -1.50472361e-01 -3.95399676e-01 -3.48975122e-01
  -3.08113970e-01 -2.82561008e-01 -2.24160195e-01 -3.21296526e-02
  -1.11937748e-01 -1.76804323e-02 -3.83524497e-02 -1.36943478e-02
  -2.72801027e-01 -1.42536405e-02 -3.95399676

In [None]:

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, GridSearchCV, KFold, cross_val_score
from sklearn.metrics import mean_absolute_error, mean_squared_error
from sklearn.linear_model import LinearRegression
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor

# Split the scaled features and target into training and validation sets
# 注意：features_scaled 是经过之前预处理与特征缩放后的数据（形状为 (18740, 56)），
# y 是目标变量 force_meas
X_train, X_val, y_train, y_val = train_test_split(X_scaled, df['force_meas'], test_size=0.2, random_state=42)

# Define 5-fold cross-validation
cv = KFold(n_splits=5, shuffle=True, random_state=42)


lr = LinearRegression()
lr_cv_scores = cross_val_score(lr, X_train, y_train, scoring='neg_mean_absolute_error', cv=cv)
print("Linear Regression CV MAE: {:.4f}".format(-np.mean(lr_cv_scores)))

# -------------------------------
# Decision Tree Regressor with Grid Search
# -------------------------------
dt = DecisionTreeRegressor(random_state=42)
dt_param_grid = {
    'max_depth': [None, 10, 20, 30],
    'min_samples_split': [2, 5, 10]
}
dt_grid = GridSearchCV(dt, dt_param_grid, cv=cv, scoring='neg_mean_absolute_error', n_jobs=-1)
dt_grid.fit(X_train, y_train)
print("Decision Tree best params:", dt_grid.best_params_)
print("Decision Tree CV MAE: {:.4f}".format(-dt_grid.best_score_))

# -------------------------------
# Random Forest Regressor with Grid Search
# -------------------------------
rf = RandomForestRegressor(random_state=42, n_jobs=-1)
rf_param_grid = {
    'n_estimators': [50, 100],
    'max_depth': [None, 10, 20],
    'min_samples_split': [2, 5]
}
rf_grid = GridSearchCV(rf, rf_param_grid, cv=cv, scoring='neg_mean_absolute_error', n_jobs=-1)
rf_grid.fit(X_train, y_train)
print("Random Forest best params:", rf_grid.best_params_)
print("Random Forest CV MAE: {:.4f}".format(-rf_grid.best_score_))

# -------------------------------
# Gradient Boosting Regressor with Grid Search
# -------------------------------
gb = GradientBoostingRegressor(random_state=42)
gb_param_grid = {
    'n_estimators': [50, 100],
    'learning_rate': [0.01, 0.1],
    'max_depth': [3, 5]
}
gb_grid = GridSearchCV(gb, gb_param_grid, cv=cv, scoring='neg_mean_absolute_error', n_jobs=-1)
gb_grid.fit(X_train, y_train)
print("Gradient Boosting best params:", gb_grid.best_params_)
print("Gradient Boosting CV MAE: {:.4f}".format(-gb_grid.best_score_))

# -------------------------------
# Evaluate each model on the validation set
# -------------------------------
models = {
    'Linear Regression': lr,
    'Decision Tree': dt_grid.best_estimator_,
    'Random Forest': rf_grid.best_estimator_,
    'Gradient Boosting': gb_grid.best_estimator_
}

for name, model in models.items():
    model.fit(X_train, y_train)
    y_pred = model.predict(X_val)
    mae = mean_absolute_error(y_val, y_pred)
    rmse = np.sqrt(mean_squared_error(y_val, y_pred))
    print(f"{name} Validation MAE: {mae:.4f}")
    print(f"{name} Validation RMSE: {rmse:.4f}")


Linear Regression CV MAE: 5531112.7093


In [None]:
import pickle
# Save the fitted scaler for later use
with open('scaler.pkl', 'wb') as f:
    pickle.dump(scaler, f)  # Save the scaler object

# Save the best Random Forest model (obtained from GridSearchCV)
with open('best_rf_model.pkl', 'wb') as f:
    pickle.dump(rf_grid.best_estimator_, f)  # Save the best RF model

# Save the best Gradient Boosting model (obtained from GridSearchCV)
with open('best_gb_model.pkl', 'wb') as f:
    pickle.dump(gb_grid.best_estimator_, f)  # Save the best GB model

# Save the training columns (features names) for later use
training_columns = features.columns.tolist()  # 'features' 是经过预处理后用于训练的 DataFrame
with open('training_columns.pkl', 'wb') as f:
    pickle.dump(training_columns, f)
print("Training columns have been saved successfully.")

In [None]:
# ===============================
# Step 11: Final Evaluation on Test Data
# ===============================

import pandas as pd
import numpy as np
import pickle
import time
from sklearn.metrics import mean_absolute_error, mean_squared_error

# Load test data (ensure test.csv is in your working directory)
test_df = pd.read_csv('test.csv')

# Drop missing values if any (you may adjust based on your preprocessing strategy)
test_df = test_df.dropna()

# For test data, note: do not use the 'force_pre' column for prediction, it's the baseline.
# We assume test_df contains 'force_pre' and other features including 'tappingsteelgrade'
# Also assume that 'force_meas' is present for evaluation
# If not, adjust accordingly (for final evaluation, force_meas is needed)

# Separate baseline predictions for later comparison
baseline_force_pre = test_df['force_pre']

# Separate target variable for evaluation (force_meas)
y_test = test_df['force_meas']

# ===============================
# Preprocessing: Feature Selection & Encoding
# ===============================
# Exclude the target 'force_meas' and baseline column 'force_pre'
features_test = test_df.drop(columns=['force_meas', 'force_pre'])

# One-hot encode the categorical column 'tappingsteelgrade' (using the same columns as during training)
features_test = pd.get_dummies(features_test, columns=['tappingsteelgrade'], drop_first=True)

# IMPORTANT: Align the test features with the training features.
# Load the fitted scaler (assumes scaler.pkl was saved during training)
with open('scaler.pkl', 'rb') as f:
    scaler = pickle.load(f)

# To ensure that test data has the same feature columns as training data,
# we reindex the test DataFrame to match training columns.
# Assume that 'training_columns' was saved during training.
with open('training_columns.pkl', 'rb') as f:
    training_columns = pickle.load(f)

# Reindex the test features DataFrame; missing columns will be filled with zeros.
features_test = features_test.reindex(columns=training_columns, fill_value=0)

# Scale the test features using the saved scaler
X_test_scaled = scaler.transform(features_test)

# ===============================
# Load the saved models
# ===============================
# Load best Random Forest model
with open('best_rf_model.pkl', 'rb') as f:
    best_rf_model = pickle.load(f)

# Load best Gradient Boosting model
with open('best_gb_model.pkl', 'rb') as f:
    best_gb_model = pickle.load(f)

# ===============================
# Evaluate Models on Test Data
# ===============================
# Evaluate Random Forest model
start_time = time.time()
rf_predictions = best_rf_model.predict(X_test_scaled)
rf_runtime = (time.time() - start_time) / len(X_test_scaled)  # Average prediction time per sample

rf_mae = mean_absolute_error(y_test, rf_predictions)
rf_rmse = np.sqrt(mean_squared_error(y_test, rf_predictions))
rf_mse = mean_squared_error(y_test, rf_predictions)

# Evaluate Gradient Boosting model
start_time = time.time()
gb_predictions = best_gb_model.predict(X_test_scaled)
gb_runtime = (time.time() - start_time) / len(X_test_scaled)

gb_mae = mean_absolute_error(y_test, gb_predictions)
gb_rmse = np.sqrt(mean_squared_error(y_test, gb_predictions))
gb_mse = mean_squared_error(y_test, gb_predictions)

# Print evaluation metrics
print("Final Evaluation Metrics:")
print("Random Forest - MAE: {:.4f}, RMSE: {:.4f}, MSE: {:.4f}, Avg Runtime per sample: {:.6f} sec".format(
    rf_mae, rf_rmse, rf_mse, rf_runtime))
print("Gradient Boosting - MAE: {:.4f}, RMSE: {:.4f}, MSE: {:.4f}, Avg Runtime per sample: {:.6f} sec".format(
    gb_mae, gb_rmse, gb_mse, gb_runtime))

# Compare with baseline force_pre (if available)
# Calculate baseline evaluation metrics using force_pre column
baseline_mae = mean_absolute_error(y_test, baseline_force_pre)
baseline_rmse = np.sqrt(mean_squared_error(y_test, baseline_force_pre))
baseline_mse = mean_squared_error(y_test, baseline_force_pre)

print("Baseline (force_pre) - MAE: {:.4f}, RMSE: {:.4f}, MSE: {:.4f}".format(
    baseline_mae, baseline_rmse, baseline_mse))


In [None]:
# Install Keras Tuner if not already installed
!pip install keras-tuner

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, regularizers
import keras_tuner as kt
import numpy as np

# Define a model-building function for the tuner
def build_model(hp):
    """
    Build a DNN model with hyperparameters to tune.
    """
    model = keras.Sequential()

    # Input layer using an Input layer is preferred
    model.add(keras.Input(shape=(X_train.shape[1],)))  # X_train为训练集特征，确保在运行前定义好

    # Tune the number of layers (at least 1 layer)
    num_layers = hp.Int('num_layers', 1, 3, default=2)

    for i in range(num_layers):
        # Tune the number of units in this layer
        units = hp.Int(f'units_{i}', min_value=32, max_value=256, step=32, default=64)
        # Tune dropout rate for this layer
        dropout_rate = hp.Float(f'dropout_{i}', 0.0, 0.5, step=0.1, default=0.2)
        # Tune L2 regularization factor
        l2_reg = hp.Float(f'l2_reg_{i}', 1e-5, 1e-3, sampling='LOG', default=1e-4)

        model.add(layers.Dense(units, activation='relu', kernel_regularizer=regularizers.l2(l2_reg)))
        model.add(layers.BatchNormalization())
        model.add(layers.Dropout(dropout_rate))

    # Output layer
    model.add(layers.Dense(1))

    # Tune learning rate for the optimizer
    learning_rate = hp.Float('learning_rate', 1e-4, 1e-2, sampling='LOG', default=1e-3)

    model.compile(optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
                  loss='mse',
                  metrics=['mae'])
    return model

# Set up the tuner: Here we use Hyperband search algorithm
tuner = kt.Hyperband(
    build_model,
    objective='val_mae',
    max_epochs=50,
    factor=3,
    directory='dnn_tuner_dir',
    project_name='force_prediction_tuning'
)

# Optional: Early stopping callback to stop training if no improvement
stop_early = keras.callbacks.EarlyStopping(monitor='val_loss', patience=5)

# Perform hyperparameter search
tuner.search(X_train, y_train,
             epochs=50,
             validation_split=0.2,
             callbacks=[stop_early],
             verbose=1)

# Retrieve the best hyperparameters and build the best model
best_hp = tuner.get_best_hyperparameters(num_trials=1)[0]
print("Best hyperparameters found:", best_hp.values)

# Build the best model and train it
best_model = tuner.hypermodel.build(best_hp)
history = best_model.fit(X_train, y_train,
                         epochs=50,
                         validation_split=0.2,
                         callbacks=[stop_early],
                         verbose=1)

# 保存最佳模型及相关Scaler等（确保之前训练时保存了Scaler和训练列）
best_model.save('optimized_dnn_model.h5')
print("Optimized DNN model has been saved successfully.")

# 若需要后续加载最佳模型，请使用以下代码：
# loaded_best_model = tf.keras.models.load_model('optimized_dnn_model.h5', compile=False)


In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, regularizers
import keras_tuner as kt
import numpy as np
from sklearn.metrics import mean_absolute_error, mean_squared_error

# Assume that X_train, y_train, X_val, and y_val are already preprocessed
# and that y_train and y_val have been log-transformed
y_train_log = np.log(y_train)
y_val_log = np.log(y_val)

# Get the number of features from X_train
input_dim = X_train.shape[1]

# Define the model-building function for the tuner
def build_advanced_model(hp):
    model = keras.Sequential()
    model.add(layers.Input(shape=(input_dim,)))

    # Loop to add multiple hidden layers (between 4 and 6 layers)
    for i in range(hp.Int('num_layers', 4, 6)):
        # Define the number of units in the layer (range: 32 to 512, step size: 32)
        units = hp.Int(f'units_{i}', min_value=32, max_value=512, step=32)
        # Choose the activation function: ReLU or LeakyReLU
        activation_choice = hp.Choice(f'activation_{i}', values=['relu', 'leaky_relu'])
        # Define dropout rate (from 0.0 to 0.5)
        dropout_rate = hp.Float(f'dropout_{i}', min_value=0.0, max_value=0.5, step=0.1)
        # Define L2 regularization factor (using log sampling)
        l2_reg = hp.Float(f'l2_reg_{i}', min_value=1e-6, max_value=1e-3, sampling='LOG')

        # Add a Dense layer with L2 regularization
        model.add(layers.Dense(units, kernel_regularizer=regularizers.l2(l2_reg)))

        # Add the chosen activation function
        if activation_choice == 'leaky_relu':
            model.add(layers.LeakyReLU(alpha=0.1))
        else:
            model.add(layers.Activation('relu'))

        # Add a Dropout layer
        model.add(layers.Dropout(dropout_rate))

        # Optionally add Batch Normalization after each layer
        if hp.Boolean(f'batchnorm_{i}'):
            model.add(layers.BatchNormalization())

    # Output layer for predicting log(force_meas)
    model.add(layers.Dense(1))

    # Define the learning rate
    learning_rate = hp.Float('learning_rate', min_value=1e-4, max_value=1e-2, sampling='LOG')
    optimizer = keras.optimizers.Adam(learning_rate=learning_rate)

    # Compile the model with MSE loss and MAE metric
    model.compile(optimizer=optimizer,
                  loss='mse',
                  metrics=['mae'])
    return model

# Use Keras Tuner for hyperparameter tuning with RandomSearch
tuner = kt.RandomSearch(
    build_advanced_model,
    objective='val_mae',
    max_trials=30,
    executions_per_trial=1,
    directory='advanced_dnn_tuning',
    project_name='rolling_force_advanced'
)

# Start the hyperparameter search
tuner.search(X_train, y_train_log, epochs=50, validation_data=(X_val, y_val_log))

# Retrieve the best hyperparameters
best_hp = tuner.get_best_hyperparameters(num_trials=1)[0]
print("Best hyperparameters found:", best_hp.values)

# Build the best model using the best hyperparameters
best_advanced_model = tuner.hypermodel.build(best_hp)

# Define callbacks for learning rate reduction and early stopping
lr_reducer = keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, verbose=1)
early_stopper = keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)

# Train the best model
history = best_advanced_model.fit(X_train, y_train_log,
                                  epochs=100,
                                  validation_data=(X_val, y_val_log),
                                  callbacks=[lr_reducer, early_stopper])

# Save the optimized model (using HDF5 format; you can also use the native Keras format)
best_advanced_model.save("optimized_dnn_model_advanced_log.h5")

# -------------------------------
# Evaluate the optimized model on the validation set
# -------------------------------
# Predict log-transformed values on validation set and revert the transformation
y_val_pred_log = best_advanced_model.predict(X_val).flatten()
y_val_pred = np.exp(y_val_pred_log)  # Reverse log transformation
y_val_orig = np.exp(y_val_log)

# Calculate evaluation metrics on the original scale
val_mae = mean_absolute_error(y_val_orig, y_val_pred)
val_rmse = np.sqrt(mean_squared_error(y_val_orig, y_val_pred))
print("Advanced Optimized DNN (with log transformation) Validation MAE: {:.4f}".format(val_mae))
print("Advanced Optimized DNN (with log transformation) Validation RMSE: {:.4f}".format(val_rmse))


In [None]:
import time
import pickle
import numpy as np
import tensorflow as tf
from sklearn.metrics import mean_absolute_error, mean_squared_error

# ---------------------------------------
# Step 1: Load saved preprocessing artifacts
# ---------------------------------------
# Load the scaler used for the DNN (after applying log transformation on the target)
with open('scaler_dnn_log.pkl', 'rb') as f:
    scaler_dnn_log = pickle.load(f)

# Load the training columns saved during training (to ensure correct feature ordering)
with open('training_columns.pkl', 'rb') as f:
    training_columns = pickle.load(f)

# ---------------------------------------
# Step 2: Load the Advanced Optimized DNN Model
# ---------------------------------------
# Note: Provide any required custom_objects if using custom activations, e.g., 'leaky_relu'
custom_objects = {'leaky_relu': tf.keras.layers.LeakyReLU}
advanced_dnn = tf.keras.models.load_model('advanced_optimized_dnn_model.h5', custom_objects=custom_objects)

print("Advanced optimized DNN model loaded successfully.")

# ---------------------------------------
# Step 3: Preprocess the Test Data
# ---------------------------------------
# Assume test_df is a DataFrame containing the raw test features (after initial cleaning and encoding)
# Ensure the test DataFrame contains all the features used in training and in the same order
features_test = test_df[training_columns]  # training_columns ensures correct feature order

# Scale the test features using the loaded scaler
X_test_dnn = scaler_dnn_log.transform(features_test)

# Assume y_test contains the true force_meas values for the test set
# (注意：如果在训练时对 force_meas 进行了对数变换，预测时需要进行逆变换)

# ---------------------------------------
# Step 4: Evaluate the Model on Test Data
# ---------------------------------------
start_time = time.time()
# Obtain predictions in log-scale
dnn_predictions_log = advanced_dnn.predict(X_test_dnn)
# Reverse the log transformation (assume natural logarithm was applied during training)
dnn_predictions = np.exp(dnn_predictions_log)
dnn_runtime = (time.time() - start_time) / X_test_dnn.shape[0]  # average runtime per sample

# Calculate evaluation metrics
mae_dnn = mean_absolute_error(y_test, dnn_predictions)
rmse_dnn = np.sqrt(mean_squared_error(y_test, dnn_predictions))

print("Advanced Optimized DNN (with log transformation) Test MAE: {:.4f}".format(mae_dnn))
print("Advanced Optimized DNN (with log transformation) Test RMSE: {:.4f}".format(rmse_dnn))
print("Advanced Optimized DNN (with log transformation) Average prediction runtime per sample: {:.6f} seconds".format(dnn_runtime))
