In [7]:
import json
from striprtf.striprtf import rtf_to_text
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestRegressor
from sklearn.decomposition import PCA
from sklearn.pipeline import Pipeline
from sklearn.metrics import mean_squared_error, r2_score, accuracy_score, classification_report
import importlib

#Provided file is in rtf 'Rich Text Format'
rtf_file = 'algoparams_from_ui.json.rtf'


with open(rtf_file, 'r') as f:
    rtf_content = f.read()

#Parse text as JSON
plain_text = rtf_to_text(rtf_content)
config = json.loads(plain_text)

#Load dataset
iris = pd.read_csv('iris.csv')

#Step 1: Parse JSON for target and prediction type
target = config["design_state_data"]["target"]["target"]
prediction_type = config["design_state_data"]["target"]["type"]

#Step 2: Feature imputation for numerical features
feature_handling = config["design_state_data"]["feature_handling"]
for feature, details in feature_handling.items():
    if details["is_selected"]:
        if details["feature_variable_type"] == "numerical" and details["feature_details"]["missing_values"] == "Impute":
            impute_strategy = "mean" if details["feature_details"]["impute_with"] == "Average of values" else "constant"
            impute_value = details["feature_details"].get("impute_value", None)
            if impute_strategy == "constant":
                iris[feature].fillna(impute_value, inplace=True)
            else:
                imputer = SimpleImputer(strategy=impute_strategy)
                iris[feature] = imputer.fit_transform(iris[[feature]])

#One-hot encoding for species variables
data = pd.get_dummies(iris, drop_first=True)

#Step 3: Feature reduction
reduction_method = config["design_state_data"]["feature_reduction"]["feature_reduction_method"]
X = data.drop(columns=[target])
y = data[target]

def feature_reduction(X, reduction_method, config):
    if reduction_method == "PCA":
        n_components = int(config["design_state_data"]["feature_reduction"]["num_of_features_to_keep"])
        reduction_model = PCA(n_components=n_components)
        X_reduced = reduction_model.fit_transform(X)
    elif reduction_method == "Tree-based":
        num_of_features = int(config["design_state_data"]["feature_reduction"]["num_of_features_to_keep"])
        num_of_trees = int(config["design_state_data"]["feature_reduction"]["num_of_trees"])
        depth_of_trees = int(config["design_state_data"]["feature_reduction"]["depth_of_trees"])
        model = RandomForestRegressor(n_estimators=num_of_trees, max_depth=depth_of_trees, random_state=42)
        model.fit(X,y)
        feature_importances = model.feature_importances_
        important_features_indices = feature_importances.argsort()[-num_of_features:][::-1]
        X_reduced = X.iloc[:, important_features_indices]
    elif reduction_method == "Correlation with target":  
        num_of_features = int(config["design_state_data"]["feature_reduction"]["num_of_features_to_keep"])
        correlation_with_target = X.corrwith(y).abs()  # Absolute correlation
        top_features=correlation_with_target.nlargest(num_of_features).index
        X_reduced = X[top_features]
    else:
        X_reduced = X

    return X_reduced

X_red = feature_reduction(X,reduction_method,config)

#Step 4: Model selection based on prediction type
selected_models = {
    name: params for name, params in config["design_state_data"]["algorithms"].items() if params["is_selected"]
}

if not selected_models:
    raise ValueError("No model selected in the configuration!")

#Select the model
model_name, model_params = list(selected_models.items())[0]

#import the selected model

model_module_map = {
    "RandomForestClassifier": "sklearn.ensemble",
    "RandomForestRegressor": "sklearn.ensemble",
    "GradientBoostingClassifier": "sklearn.ensemble",
    "GradientBoostingRegressor": "sklearn.ensemble",
    "LinearRegression": "sklearn.linear_model",
    "LogisticRegression": "sklearn.linear_model",
    "Ridge": "sklearn.linear_model",
    "Lasso": "sklearn.linear_model",
    "ElasticNet": "sklearn.linear_model",
    "XGBRegressor": "xgboost",  # xgboost library
    "DecisionTreeClassifier": "sklearn.tree",
    "DecisionTreeRegressor": "sklearn.tree",
    "LinearSVC": "sklearn.svm",
    "SGDRegressor": "sklearn.linear_model",
    "KNeighborsRegressor": "sklearn.neighbors",
    "ExtraTreesRegressor": "sklearn.ensemble",
    "MLPRegressor": "sklearn.neural_network"
}

def update_model_name(model_name):
    model_name_updates = {
        "RandomForestClassifier":"RandomForestClassifier",
        "RandomForestRegressor":"RandomForestRegressor",
        "LinearRegression":"LinearRegression",
        "LogisticRegression":"LogisticRegression",
        "DecisionTreeClassifier":"DecisionTreeClassifier",
        "DecisionTreeRegressor":"DecisionTreeRegressor",
        "GBTClassifier": "GradientBoostingClassifier",
        "GBTRegressor": "GradientBoostingRegressor",
        "RidgeRegression": "Ridge",
        "LassoRegression": "Lasso",
        "ElasticNetRegression": "ElasticNet",
        "xg_boost": "XGBRegressor",
        "SVM": "LinearSVC",
        "SGD": "SGDRegressor",
        "KNN": "KNeighborsRegressor",
        "neural_network": "MLPRegressor"
    }
    return model_name_updates.get(model_name, model_name)

#Fetch the module based on model name
def fetch_model_class(model_name):
    model_name = update_model_name(model_name)
    if model_name in model_module_map:
        module_name = model_module_map[model_name]
        try:
            model_class = getattr(importlib.import_module(module_name), model_name)
            return model_class
        except AttributeError:
            raise ImportError(f"Model {model_name} not found in module {module_name}.")
    else:
        raise ValueError(f"Model {model_name} is not recognized.")


model_name = model_name  
model_class = fetch_model_class(model_name)
print(f"Model Class: {model_class}")


#Define hyperparameters
param_grid = {
    f"model__{k}": v for k, v in model_params.items() if isinstance(v, (list, range))
}

#Step 5: Pipeline creation
pipeline_steps = [('scaler', StandardScaler())]
pipeline_steps.append(('model', model_class()))
pipeline = Pipeline(pipeline_steps)

#Train-test split
X_train, X_test, y_train, y_test = train_test_split(X_red, y, test_size=0.2, random_state=42)

#Hyperparameter tuning
grid_search = GridSearchCV(estimator=pipeline, param_grid=param_grid, cv=3, verbose=2)

#Fit and predict
grid_search.fit(X_train, y_train)
best_model = grid_search.best_estimator_
y_pred = best_model.predict(X_test)

#Step 6: Log metrics
if prediction_type == "regression":
    print(f"Best Parameters: {grid_search.best_params_}")
    print(f"MSE: {mean_squared_error(y_test, y_pred):.4f}")
    print(f"R^2 Score: {r2_score(y_test, y_pred):.4f}")
elif prediction_type == "classification":
    print(f"Best Parameters: {grid_search.best_params_}")
    print(f"Accuracy: {accuracy_score(y_test, y_pred):.4f}")
    print(f"Classification Report:\n{classification_report(y_test, y_pred)}")
else:
    raise ValueError(f"Prediction type: {prediction_type} is not supported.")

Model Class: <class 'sklearn.ensemble._forest.RandomForestRegressor'>
Fitting 3 folds for each of 1 candidates, totalling 3 fits
[CV] END .................................................... total time=   0.2s
[CV] END .................................................... total time=   0.2s
[CV] END .................................................... total time=   0.2s
Best Parameters: {}
MSE: 0.0487
R^2 Score: 0.9235


In [2]:
reduction_method

'Tree-based'

In [3]:
model_name

'RandomForestRegressor'

## Example where JSON file is edited to test with reduction_method = 'PCA' and model = 'LinearRegression' 

In [4]:
import json
from striprtf.striprtf import rtf_to_text
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestRegressor
from sklearn.decomposition import PCA
from sklearn.pipeline import Pipeline
from sklearn.metrics import mean_squared_error, r2_score, accuracy_score, classification_report
import importlib

#Provided file is in rtf 'Rich Text Format'
rtf_file = 'algoparams_from_ui.json - Copy.rtf'


with open(rtf_file, 'r') as f:
    rtf_content = f.read()

#Parse text as JSON
plain_text = rtf_to_text(rtf_content)
config = json.loads(plain_text)

#Load dataset
iris = pd.read_csv('iris.csv')

#Step 1: Parse JSON for target and prediction type
target = config["design_state_data"]["target"]["target"]
prediction_type = config["design_state_data"]["target"]["type"]

#Step 2: Feature imputation for numerical features
feature_handling = config["design_state_data"]["feature_handling"]
for feature, details in feature_handling.items():
    if details["is_selected"]:
        if details["feature_variable_type"] == "numerical" and details["feature_details"]["missing_values"] == "Impute":
            impute_strategy = "mean" if details["feature_details"]["impute_with"] == "Average of values" else "constant"
            impute_value = details["feature_details"].get("impute_value", None)
            if impute_strategy == "constant":
                iris[feature].fillna(impute_value, inplace=True)
            else:
                imputer = SimpleImputer(strategy=impute_strategy)
                iris[feature] = imputer.fit_transform(iris[[feature]])

#One-hot encoding for species variables
data = pd.get_dummies(iris, drop_first=True)

#Step 3: Feature reduction
reduction_method = config["design_state_data"]["feature_reduction"]["feature_reduction_method"]
X = data.drop(columns=[target])
y = data[target]

def feature_reduction(X, reduction_method, config):
    if reduction_method == "PCA":
        n_components = int(config["design_state_data"]["feature_reduction"]["num_of_features_to_keep"])
        reduction_model = PCA(n_components=n_components)
        X_reduced = reduction_model.fit_transform(X)
    elif reduction_method == "Tree-based":
        num_of_features = int(config["design_state_data"]["feature_reduction"]["num_of_features_to_keep"])
        num_of_trees = int(config["design_state_data"]["feature_reduction"]["num_of_trees"])
        depth_of_trees = int(config["design_state_data"]["feature_reduction"]["depth_of_trees"])
        model = RandomForestRegressor(n_estimators=num_of_trees, max_depth=depth_of_trees, random_state=42)
        model.fit(X,y)
        feature_importances = model.feature_importances_
        important_features_indices = feature_importances.argsort()[-num_of_features:][::-1]
        X_reduced = X.iloc[:, important_features_indices]
    elif reduction_method == "Correlation with target":  
        num_of_features = int(config["design_state_data"]["feature_reduction"]["num_of_features_to_keep"])
        correlation_with_target = X.corrwith(y).abs()  # Absolute correlation
        top_features=correlation_with_target.nlargest(num_of_features).index
        X_reduced = X[top_features]
    else:
        X_reduced = X

    return X_reduced

X_red = feature_reduction(X,reduction_method,config)
#Step 4: Model selection based on prediction type
selected_models = {
    name: params for name, params in config["design_state_data"]["algorithms"].items() if params["is_selected"]
}

if not selected_models:
    raise ValueError("No model selected in the configuration!")

#Select the model
model_name, model_params = list(selected_models.items())[0]

#import the selected model

model_module_map = {
    "RandomForestClassifier": "sklearn.ensemble",
    "RandomForestRegressor": "sklearn.ensemble",
    "GradientBoostingClassifier": "sklearn.ensemble",
    "GradientBoostingRegressor": "sklearn.ensemble",
    "LinearRegression": "sklearn.linear_model",
    "LogisticRegression": "sklearn.linear_model",
    "Ridge": "sklearn.linear_model",
    "Lasso": "sklearn.linear_model",
    "ElasticNet": "sklearn.linear_model",
    "XGBRegressor": "xgboost",  # xgboost library
    "DecisionTreeClassifier": "sklearn.tree",
    "DecisionTreeRegressor": "sklearn.tree",
    "LinearSVC": "sklearn.svm",
    "SGDRegressor": "sklearn.linear_model",
    "KNeighborsRegressor": "sklearn.neighbors",
    "ExtraTreesRegressor": "sklearn.ensemble",
    "MLPRegressor": "sklearn.neural_network"
}

def update_model_name(model_name):
    model_name_updates = {
        "RandomForestClassifier":"RandomForestClassifier",
        "RandomForestRegressor":"RandomForestRegressor",
        "LinearRegression":"LinearRegression",
        "LogisticRegression":"LogisticRegression",
        "DecisionTreeClassifier":"DecisionTreeClassifier",
        "DecisionTreeRegressor":"DecisionTreeRegressor",
        "GBTClassifier": "GradientBoostingClassifier",
        "GBTRegressor": "GradientBoostingRegressor",
        "RidgeRegression": "Ridge",
        "LassoRegression": "Lasso",
        "ElasticNetRegression": "ElasticNet",
        "xg_boost": "XGBRegressor",
        "SVM": "LinearSVC",
        "SGD": "SGDRegressor",
        "KNN": "KNeighborsRegressor",
        "neural_network": "MLPRegressor"
    }
    return model_name_updates.get(model_name, model_name)

#Fetch the module based on model name
def fetch_model_class(model_name):
    model_name = update_model_name(model_name)
    if model_name in model_module_map:
        module_name = model_module_map[model_name]
        try:
            model_class = getattr(importlib.import_module(module_name), model_name)
            return model_class
        except AttributeError:
            raise ImportError(f"Model {model_name} not found in module {module_name}.")
    else:
        raise ValueError(f"Model {model_name} is not recognized or supported.")


model_name = model_name  
model_class = fetch_model_class(model_name)
print(f"Model Class: {model_class}")


#Define hyperparameters
param_grid = {
    f"model__{k}": v for k, v in model_params.items() if isinstance(v, (list, range))
}

#Step 5: Pipeline creation
pipeline_steps = [('scaler', StandardScaler())]
pipeline_steps.append(('model', model_class()))
pipeline = Pipeline(pipeline_steps)

#Train-test split
X_train, X_test, y_train, y_test = train_test_split(X_red, y, test_size=0.2, random_state=42)

#Hyperparameter tuning
grid_search = GridSearchCV(estimator=pipeline, param_grid=param_grid, cv=3, verbose=2)

#Fit and predict
grid_search.fit(X_train, y_train)
best_model = grid_search.best_estimator_
y_pred = best_model.predict(X_test)

#Step 6: Log metrics
if prediction_type == "regression":
    print(f"Best Parameters: {grid_search.best_params_}")
    print(f"Mean Squared Error: {mean_squared_error(y_test, y_pred):.4f}")
    print(f"R^2 Score: {r2_score(y_test, y_pred):.4f}")
elif prediction_type == "classification":
    print(f"Best Parameters: {grid_search.best_params_}")
    print(f"Accuracy: {accuracy_score(y_test, y_pred):.4f}")
    print(f"Classification Report:\n{classification_report(y_test, y_pred)}")
else:
    raise ValueError(f"Unsupported prediction type: {prediction_type}")

Model Class: <class 'sklearn.linear_model._base.LinearRegression'>
Fitting 3 folds for each of 1 candidates, totalling 3 fits
[CV] END .................................................... total time=   0.0s
[CV] END .................................................... total time=   0.0s
[CV] END .................................................... total time=   0.0s
Best Parameters: {}
Mean Squared Error: 0.0391
R^2 Score: 0.9385


In [5]:
reduction_method

'PCA'

In [6]:
model_name

'LinearRegression'