In [10]:
# Q1) Read the target and type of regression to be run

from docx import Document
import json

# Step 2: Load the Word file (ensure the file path is correct)
doc_path = r"C:\Users\pavan Anbhule\Downloads\DA_Assessment\algoparams_from_ui1.docx"  # Replace with the correct file path
doc = Document(doc_path)

# Step 3: Extract the JSON text from the Word file
json_text = ""
for paragraph in doc.paragraphs:
    json_text += paragraph.text

# Step 4: Parse the JSON text
try:
    # Parse the JSON content
    data = json.loads(json_text)

    # Step 5: Navigate to the target variable and regression type
    target = data.get("design_state_data", {}).get("target", {}).get("target", "Not found")
    regression_type = data.get("design_state_data", {}).get("target", {}).get("type", "Not found")

    # Step 6: Print results
    print(f"Target Variable: {target}")
    print(f"Type of Regression: {regression_type}")

except json.JSONDecodeError as e:
    # Handle JSON parsing errors
    print("The content in the Word file is not valid JSON.")
    print(f"Error Details: {e}")

Target Variable: petal_width
Type of Regression: regression


In [3]:
# 3) Compute feature reduction based on input. See the screenshot below where there can be No Reduction, Corr with Target, Tree-based, PCA. 
#    Please make sure you write code so that all options can work. If we rerun your code with a different Json it should work if we switch 
#    No Reduction to say PCA.

import pandas as pd
import numpy as np
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_selection import SelectKBest, f_regression
from sklearn.model_selection import train_test_split

# Function to apply feature reduction based on JSON configuration
def apply_feature_reduction(data, target_column, feature_reduction_config):
    # Extract feature reduction method from the config
    method = feature_reduction_config.get("feature_reduction_method")
    
    # Check if "No Reduction" is selected
    if feature_reduction_config["No Reduction"]["is_selected"]:
        num_features_to_keep = feature_reduction_config["No Reduction"]["num_of_features_to_keep"]
        print(f"No Reduction selected. Keeping top {num_features_to_keep} features.")
        return data.iloc[:, :num_features_to_keep]
    
    # Check if "Correlation with Target" is selected
    elif feature_reduction_config["Correlation with target"]["is_selected"]:
        num_features_to_keep = feature_reduction_config["Correlation with target"]["num_of_features_to_keep"]
        print(f"Correlation with Target selected. Keeping top {num_features_to_keep} correlated features.")
        return correlation_with_target(data, target_column, num_features_to_keep)
    
    # Check if "Tree-based" method is selected
    elif feature_reduction_config["Tree_based"]["is_selected"]:
        num_features_to_keep = feature_reduction_config["Tree_based"]["num_of_features_to_keep"]
        print(f"Tree-based selection method selected. Keeping top {num_features_to_keep} features based on importance.")
        return tree_based_selection(data, target_column, num_features_to_keep)
    
    # Check if "Principal Component Analysis" (PCA) is selected
    elif feature_reduction_config["Principal Component Analysis"]["is_selected"]:
        num_features_to_keep = feature_reduction_config["Principal Component Analysis"]["num_of_features_to_keep"]
        print(f"PCA selected. Keeping top {num_features_to_keep} principal components.")
        return pca_reduction(data, num_features_to_keep)
    
    # Return original data if no valid method is selected
    print("No valid feature reduction method selected. Returning original data.")
    return data

# Function for correlation-based feature selection
def correlation_with_target(data, target_column, num_features_to_keep):
    corr = data.corrwith(data[target_column]).abs()
    top_features = corr.sort_values(ascending=False).head(num_features_to_keep).index
    return data[top_features]

# Function for tree-based feature selection (using Random Forest)
def tree_based_selection(data, target_column, num_features_to_keep):
    X = data.drop(columns=[target_column])
    y = data[target_column]
    
    # Train a RandomForest model to rank features
    model = RandomForestClassifier()
    model.fit(X, y)
    
    feature_importances = pd.Series(model.feature_importances_, index=X.columns)
    top_features = feature_importances.sort_values(ascending=False).head(num_features_to_keep).index
    return data[top_features]

# Function for PCA-based feature reduction
def pca_reduction(data, num_features_to_keep):
    X = data.values
    pca = PCA(n_components=num_features_to_keep)
    principal_components = pca.fit_transform(X)
    
    # Get the reduced data frame
    reduced_data = pd.DataFrame(principal_components)
    return reduced_data

# Sample data for testing
# Assuming `df` is your input data frame, and 'target' is the target column in your dataset
# data = pd.read_csv('your_data.csv')  # Load your dataset

# Example of the JSON input provided
feature_reduction_config = {
    "feature_reduction_method": "Correlation with target",
    "No Reduction": {
        "is_selected": True,
        "num_of_features_to_keep": 5
    },
    "Correlation with target": {
        "is_selected": False,
        "num_of_features_to_keep": 0
    },
    "Tree_based": {
        "is_selected": False,
        "num_of_features_to_keep": 0,
        "depth_of_trees": 0,
        "num_of_trees": 0
    },
    "Principal Component Analysis": {
        "is_selected": False,
        "num_of_features_to_keep": 0
    }
}

# Example of feature reduction applied
# Here, we assume you have a DataFrame `df` and the target column is 'target'
# data = df
target_column = 'target'  # Change this to your actual target column

# Apply feature reduction based on config
reduced_data = apply_feature_reduction(data, target_column, feature_reduction_config)

# The reduced data can now be used for further processing, training, or analysis


No Reduction selected. Keeping top 5 features.


In [4]:
# 2) Read the features (which are column names in the csv) and figure out what missing imputation
#    needs to be applied and apply that to the columns loaded in a dataframe

import pandas as pd

# Step 1: Load your data into a pandas dataframe
df = pd.read_csv(r"C:\Users\pavan Anbhule\Downloads\DA_Assessment\iris.csv")

# Step 2: Define the feature handling dictionary (corrected the syntax)
feature_handling = {
    "sepal_length": {
        "feature_name": "sepal_length",
        "is_selected": True,
        "feature_variable_type": "numerical",
        "feature_details": {
            "numerical_handling": "keep as regular numerical feature",
            "rescaling": "No rescaling",
            "make_derived_feats": False,
            "missing_values": "Impute",
            "impute_width": "Average of values",
            "impute_value": 0
        }
    }
}

# Step 3: Function to impute missing values based on the feature handling configuration
def impute_missing_values(df, feature_handling):
    for feature, details in feature_handling.items():
        if details["is_selected"]:  # Check if the feature is selected
            column_name = details["feature_name"]
            if "missing_values" in details["feature_details"]:
                missing_values_action = details["feature_details"]["missing_values"]
                if missing_values_action == "Impute":
                    impute_method = details["feature_details"]["impute_width"]
                    
                    if impute_method == "Average of values":  # Check if impute method is mean
                        # Calculate the mean of the column (ignoring NaN values)
                        mean_value = df[column_name].mean()
                        # Impute missing values with the mean value
                        df[column_name].fillna(mean_value, inplace=True)
                    # You can add more imputation methods if required (e.g., Median, Mode)
    return df

# Step 4: Apply the imputation function to your dataframe
df = impute_missing_values(df, feature_handling)

# Step 5: Verify that missing values have been imputed
print(df['sepal_length'].isnull().sum())  # This should print 0 if imputation was successful


0


In [7]:
# 4) Parse the Json and make the model objects (using sklean) that can handle what is required 
#   in the “prediction_type” specified in the JSON (See #1 where “prediction_type” is specified).
#   Keep in mind not to pick models that don’t apply for the prediction_type specified

import json
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split

# Sample JSON configuration
config = {
    "prediction_type": "classification",
    "LogisticRegression": {
        "model_name": "LogisticRegression",
        "is_selected": True,
        "parallelism": 2,
        "min_iter": 30,
        "max_iter": 50,
        "min_regparam": 0.5,
        "max_regparam": 0.8,
        "min_elasticnet": 0.5,
        "max_elasticnet": 0.8
    },
    "RandomForestClassifier": {
        "model_name": "RandomForestClassifier",
        "is_selected": False,
        "num_estimators": 100,
        "max_depth": 10
    }
}

# Function to create the selected model(s)
def create_model(config, X_train, y_train):
    # Parse prediction type
    prediction_type = config["prediction_type"]
    models = []
    
    # If classification is required, select classification models
    if prediction_type == "classification":
        # Logistic Regression
        if config["LogisticRegression"]["is_selected"]:
            log_reg_params = {
                "max_iter": config["LogisticRegression"]["max_iter"],
                "solver": "saga",  # "saga" solver supports ElasticNet
                "penalty": "elasticnet",
                "l1_ratio": config["LogisticRegression"]["max_elasticnet"],  # Use max elasticnet ratio
                "C": 1 / config["LogisticRegression"]["min_regparam"]  # Regularization parameter
            }
            models.append(LogisticRegression(**log_reg_params))

        # Random Forest (only if selected)
        if config["RandomForestClassifier"]["is_selected"]:
            rf_params = {
                "n_estimators": config["RandomForestClassifier"]["num_estimators"],
                "max_depth": config["RandomForestClassifier"]["max_depth"]
            }
            models.append(RandomForestClassifier(**rf_params))

    return models

# Example usage with valid dataset (ensure y has at least two classes)
X = [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6]]  # Features
y = [0, 1, 0, 1, 0]  # Labels (binary classification with two classes)

# Split data into training and testing
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# Check if the dataset contains at least two classes
if len(set(y_train)) < 2:
    raise ValueError("The training data contains less than two classes. Logistic Regression requires at least two classes.")

# Create model(s) based on config
models = create_model(config, X_train, y_train)

# Example: Training the Logistic Regression model (if it is selected)
for model in models:
    model.fit(X_train, y_train)
    print(f"Model {model.__class__.__name__} trained.")


Model LogisticRegression trained.


In [8]:
# 5) Run the fit and predict on each model – keep in mind that you need to do hyper parameter tuning
#    i.e., use GridSearchCV

import json
import numpy as np
from sklearn.model_selection import TimeSeriesSplit, GridSearchCV, train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

# Sample JSON Configuration
config = {
    "hyperparameters": {
        "search_method": "Grid Search",
        "Grid Search": {
            "is_selected": True,
            "shuffle_grid": True,
            "random_state": 0,
            "cross_validation_strategy": "Time-based K-fold(with overlap)",
            "Time-based K-fold(with overlap)": {
                "is_selected": True,
                "num_of_folds": 3,
                "split_ratio": 0.8,
                "stratified": False
            }
        }
    },
    "LogisticRegression": {
        "model_name": "LogisticRegression",
        "is_selected": True,
        "max_iter": [50, 100],
        "penalty": ["l2", "elasticnet"],
        "C": [0.1, 1.0, 10]
    },
    "RandomForestClassifier": {
        "model_name": "RandomForestClassifier",
        "is_selected": True,
        "n_estimators": [50, 100, 200],
        "max_depth": [5, 10, 20]
    }
}

# Sample Dataset
X = np.random.rand(100, 5)  # Random features
y = np.random.choice([0, 1], size=100)  # Binary target

# Split dataset
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Function to perform Grid Search with hyperparameter tuning
def hyperparameter_tuning(config, X_train, y_train, X_test, y_test):
    models = []
    results = []

    # Time-based K-Fold Cross Validation
    cv_strategy = None
    if config["hyperparameters"]["Grid Search"]["Time-based K-fold(with overlap)"]["is_selected"]:
        num_of_folds = config["hyperparameters"]["Grid Search"]["Time-based K-fold(with overlap)"]["num_of_folds"]
        cv_strategy = TimeSeriesSplit(n_splits=num_of_folds)
    
    # Logistic Regression
    if config["LogisticRegression"]["is_selected"]:
        param_grid = {
            "max_iter": config["LogisticRegression"]["max_iter"],
            "penalty": config["LogisticRegression"]["penalty"],
            "C": config["LogisticRegression"]["C"],
            "solver": ["saga"]  # saga supports elasticnet
        }
        model = LogisticRegression()
        grid_search = GridSearchCV(model, param_grid, cv=cv_strategy, scoring="accuracy", verbose=1)
        grid_search.fit(X_train, y_train)
        models.append(grid_search.best_estimator_)
        results.append((grid_search.best_estimator_, grid_search.best_params_, grid_search.best_score_))

    # Random Forest
    if config["RandomForestClassifier"]["is_selected"]:
        param_grid = {
            "n_estimators": config["RandomForestClassifier"]["n_estimators"],
            "max_depth": config["RandomForestClassifier"]["max_depth"]
        }
        model = RandomForestClassifier()
        grid_search = GridSearchCV(model, param_grid, cv=cv_strategy, scoring="accuracy", verbose=1)
        grid_search.fit(X_train, y_train)
        models.append(grid_search.best_estimator_)
        results.append((grid_search.best_estimator_, grid_search.best_params_, grid_search.best_score_))

    # Evaluate each model on test data
    for model, best_params, best_score in results:
        y_pred = model.predict(X_test)
        acc = accuracy_score(y_test, y_pred)
        print(f"Model: {model.__class__.__name__}")
        print(f"Best Params: {best_params}")
        print(f"Validation Score: {best_score:.4f}")
        print(f"Test Accuracy: {acc:.4f}")
        print("-" * 50)

# Run Hyperparameter Tuning
hyperparameter_tuning(config, X_train, y_train, X_test, y_test)


Fitting 3 folds for each of 12 candidates, totalling 36 fits
Fitting 3 folds for each of 9 candidates, totalling 27 fits


18 fits failed out of a total of 36.
The score on these train-test partitions for these parameters will be set to nan.
If these failures are not expected, you can try to debug them by setting error_score='raise'.

Below are more details about the failures:
--------------------------------------------------------------------------------
18 fits failed with the following error:
Traceback (most recent call last):
  File "C:\Users\pavan Anbhule\anaconda3\lib\site-packages\sklearn\model_selection\_validation.py", line 686, in _fit_and_score
    estimator.fit(X_train, y_train, **fit_params)
  File "C:\Users\pavan Anbhule\anaconda3\lib\site-packages\sklearn\linear_model\_logistic.py", line 1291, in fit
    fold_coefs_ = Parallel(n_jobs=self.n_jobs, verbose=self.verbose, prefer=prefer)(
  File "C:\Users\pavan Anbhule\anaconda3\lib\site-packages\sklearn\utils\parallel.py", line 63, in __call__
    return super().__call__(iterable_with_config)
  File "C:\Users\pavan Anbhule\anaconda3\lib\site-pa

Model: LogisticRegression
Best Params: {'C': 10, 'max_iter': 100, 'penalty': 'l2', 'solver': 'saga'}
Validation Score: 0.4500
Test Accuracy: 0.5000
--------------------------------------------------
Model: RandomForestClassifier
Best Params: {'max_depth': 5, 'n_estimators': 200}
Validation Score: 0.6000
Test Accuracy: 0.5500
--------------------------------------------------


In [9]:
# 6) Log to the console the standard model metrics that apply

from docx import Document
import json
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.metrics import (
    mean_absolute_error, mean_squared_error, r2_score,
    accuracy_score, precision_score, recall_score, f1_score
)

# Load and Parse JSON from Word Document
doc_path = r"C:\Users\pavan Anbhule\Downloads\DA_Assessment\algoparams_from_ui1.docx"  # Replace with the correct file path
doc = Document(doc_path)
json_text = "".join([p.text for p in doc.paragraphs])

try:
    data = json.loads(json_text)
    target = data.get("design_state_data", {}).get("target", {}).get("target", "Not found")
    regression_type = data.get("design_state_data", {}).get("target", {}).get("type", "Not found").lower()  # Lowercase for consistency
    print(f"Target Variable: {target}")
    print(f"Type of Model: {regression_type}")
except json.JSONDecodeError as e:
    print("The content in the Word file is not valid JSON.")
    print(f"Error Details: {e}")
    exit()

# Generate Sample Data
X = np.random.rand(100, 5)  # Random features
y = np.random.rand(100) if "regression" in regression_type else np.random.choice([0, 1], size=100)

# Split Data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Train and Evaluate Model
if "regression" in regression_type:
    # Linear Regression
    model = LinearRegression()
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    
    # Compute Metrics
    mae = mean_absolute_error(y_test, y_pred)
    mse = mean_squared_error(y_test, y_pred)
    r2 = r2_score(y_test, y_pred)
    
    # Log Metrics
    print("Model Metrics (Regression):")
    print(f"Mean Absolute Error (MAE): {mae:.4f}")
    print(f"Mean Squared Error (MSE): {mse:.4f}")
    print(f"R² Score: {r2:.4f}")

elif "classification" in regression_type:
    # Logistic Regression
    model = LogisticRegression(max_iter=100)
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    
    # Compute Metrics
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)
    
    # Log Metrics
    print("Model Metrics (Classification):")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")

else:
    print(f"Unknown Model Type: {regression_type}")


Target Variable: petal_width
Type of Model: regression
Model Metrics (Regression):
Mean Absolute Error (MAE): 0.2882
Mean Squared Error (MSE): 0.1065
R² Score: -0.0492


In [8]:
# 7) Please write generic code that can parse any JSON that follow this JSON format. So goal is you are using generic function in python.
#  It will be most efficient if you use sklean pipelines for each stage namely
#  a) feature handling part
#  b) feature reduction part and
#  c) model fit with grid search cv so that you can execute the pipeline object. 
#     For your testing try and change the fields in the JSON like say enable some algos setting ‘is_selected’ to true and now that algo 
#     should get executed when you run your script again. 


import json
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.decomposition import PCA
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.linear_model import LogisticRegression, LinearRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, mean_absolute_error, mean_squared_error, r2_score
from docx import Document


# Step 1: Load JSON from Word Document
def load_json_from_docx(doc_path):
    """Load JSON from a Word document."""
    doc = Document(doc_path)
    json_text = "".join([p.text for p in doc.paragraphs])
    return json.loads(json_text)


# Step 2: Validate JSON Configuration
def validate_json(config, task_type):
    """Validate JSON configuration to ensure at least one valid model is selected."""
    if task_type == "regression" and not config.get("LinearRegression", {}).get("is_selected", False):
        raise ValueError("No regression model selected in the JSON configuration.")
    if task_type == "classification" and not config.get("LogisticRegression", {}).get("is_selected", False):
        raise ValueError("No classification model selected in the JSON configuration.")


# Step 3: Create Pipeline
def create_pipeline(config, task_type):
    """Create a pipeline based on the JSON configuration."""
    steps = []

    # Feature Handling
    steps.append(('scaler', StandardScaler()))

    # Feature Reduction
    feature_reduction_config = config.get("feature_reduction", {})
    if feature_reduction_config.get("No Reduction", {}).get("is_selected", False):
        print("No feature reduction applied.")
    elif feature_reduction_config.get("Correlation with target", {}).get("is_selected", False):
        steps.append(('feature_selection', SelectKBest(score_func=f_classif,
                                                       k=feature_reduction_config.get("Correlation with target", {}).get("num_of_features_to_keep", 5))))
    elif feature_reduction_config.get("Principal Component Analysis", {}).get("is_selected", False):
        steps.append(('pca', PCA(n_components=feature_reduction_config.get("Principal Component Analysis", {}).get("num_of_features_to_keep", 2))))

    # Model Selection
    model = None
    grid_params = {}
    if config.get("LogisticRegression", {}).get("is_selected", False) and task_type == "classification":
        model = LogisticRegression()
        grid_params = {
            'model__C': np.linspace(
                config["LogisticRegression"]["min_regparam"],
                config["LogisticRegression"]["max_regparam"],
                5
            ),
            'model__max_iter': [config["LogisticRegression"]["max_iter"]]
        }
    elif config.get("LinearRegression", {}).get("is_selected", False) and task_type == "regression":
        model = LinearRegression()
        grid_params = {}  # No hyperparameter tuning needed for LinearRegression
    elif config.get("RandomForestClassifier", {}).get("is_selected", False) and task_type == "classification":
        model = RandomForestClassifier()
        grid_params = {
            'model__n_estimators': [50, 100, 150],
            'model__max_depth': [5, 10, 20]
        }

    if not model:
        raise ValueError("No valid model selected in the JSON configuration.")

    steps.append(('model', model))
    pipeline = Pipeline(steps)

    # Return pipeline and GridSearchCV
    return GridSearchCV(pipeline, param_grid=grid_params, cv=5, scoring='accuracy' if task_type == 'classification' else 'neg_mean_squared_error', verbose=1)


# Step 4: Main Function
def main():
    # Load JSON from Word document
    doc_path = r"C:\Users\pavan Anbhule\Downloads\DA_Assessment\algoparams_from_ui1.docx"  # Replace with your path
    config = load_json_from_docx(doc_path)

    # Extract Target and Task Type
    target = config.get("design_state_data", {}).get("target", {}).get("target", "Not found")
    task_type = config.get("design_state_data", {}).get("target", {}).get("type", "").lower()

    print(f"Target Variable: {target}")
    print(f"Task Type: {task_type}")

    # Validate JSON Configuration
    validate_json(config, task_type)

    # Generate Sample Data
    np.random.seed(42)
    X = np.random.rand(100, 5)  # Random features
    y = np.random.choice([0, 1], size=100) if "classification" in task_type else np.random.rand(100)

    # Split Data
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Create and Train Pipeline
    grid_search = create_pipeline(config, task_type)
    grid_search.fit(X_train, y_train)

    # Evaluate the Best Model
    print("Best Parameters:", grid_search.best_params_)
    best_model = grid_search.best_estimator_
    y_pred = best_model.predict(X_test)

    if "classification" in task_type:
        print("Classification Report:\n", classification_report(y_test, y_pred))
    elif "regression" in task_type:
        mae = mean_absolute_error(y_test, y_pred)
        mse = mean_squared_error(y_test, y_pred)
        r2 = r2_score(y_test, y_pred)
        print("Regression Metrics:")
        print(f"MAE: {mae:.4f}, MSE: {mse:.4f}, R²: {r2:.4f}")
    else:
        print("Unknown task type. Please update JSON configuration.")


if __name__ == "__main__":
    main()


Target Variable: petal_width
Task Type: regression


ValueError: No regression model selected in the JSON configuration.