In [None]:
# Import required packages
import time
import os
import pandas as pd
from matplotlib import pyplot as plt
import seaborn as sns
import numpy as np
import sys
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import PowerTransformer
from sklearn.ensemble import RandomForestRegressor
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.linear_model import LinearRegression
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.model_selection import cross_val_score
import time
from sklearn.metrics import mean_absolute_error
from scipy.optimize import curve_fit
from sklearn.decomposition import PCA
import seaborn as sns


In [None]:
### The code in this cell uses the functions defined below to run cross-validation using each feature list
# Define the infile
infile = "/content/drive/MyDrive/Colab Notebooks/results"
# Define feature sets. ls1 will also be used for yeo-johnson green cumulative, and ls2 will also have PCA applied
feature_ls1 = ['green_cumulative']
feature_ls2 = ['green', 'red', 'blue', 'uvaI', 'uvaE', 'uvbE', 'uvbI', 'stirRate', 
               'pressureE', 'humidity', 'temperature', 'best_temperature', 'hydrazine_batches', 
               'luminance_percent', 'saturation', 'hue', 'red_cumulative', 'blue_cumulative', 
               'green_cumulative', 'pressureE_cumulative', 'best_temperature_cumulative', 
               'hue_cumulative', 'saturation_cumulative', 'luminance_percent_cumulative', 
               'humidity_cumulative', 'best_temperature_std_dev', 'best_temperature_std_dev_cumulative', 
               'std_dev_over_cum_temp', 'std_dev_cumulative_over_cumulative_temp']
feature_ls3 = ['uvaI', 'uvaE', 'uvbE', 'uvbI', 'stirRate', 
               'pressureE', 'humidity', 'temperature', 'best_temperature', 'hydrazine_batches', 
               'pressureE_cumulative', 'best_temperature_cumulative', 
               'humidity_cumulative', 'best_temperature_std_dev', 'best_temperature_std_dev_cumulative', 
               'std_dev_over_cum_temp', 'std_dev_cumulative_over_cumulative_temp']
# Define lists of feature sets and models. Then create dictionary which includes the names
feature_ls_ls = [feaure_ls1, feature_ls2, feature_ls3]
model_ls = [linear_regression, polynomial_regression, gradient_boosting_regressor, random_forest]
feature_ls_folder_name_dic = {''.join(feature_ls1): 'Green cumulative/', ''.join(feature_ls2): 'All features/', ''.join(feature_ls3): 'No colour features/'}
# Iterate through the lists + models to do cross-validation on all combinations of interest
for feature_ls in feature_ls_ls:

  for model in model_ls:
    # if polynomial only do green cumulative feature and don't scale
    if model == polynomial_regression:
      if len(feature_ls) > 1:
        continue
      cross_validation(model, feature_ls, feature_ls_folder_name_dic[''.join(feature_ls)], model.__name__, infile, scale=False)
      cross_validation(model, feature_ls, "Yeo-Johnson " + feature_ls_folder_name_dic[''.join(feature_ls)], model.__name__, infile, scale=False, yeo_johnson=True)
    # if not polynomial, 
    else:
      cross_validation(model, feature_ls, feature_ls_folder_name_dic[''.join(feature_ls)], model.__name__, infile)
      if feature_ls == feature_ls1:
        # if green cumulative repeat with YJ transform
        cross_validation(model, feature_ls, "Yeo-Johnson " + feature_ls_folder_name_dic[''.join(feature_ls)], model.__name__, infile, yeo_johnson=True)
      elif feature_ls == feature_ls2:
        # if all features repeat with pca
        cross_validation(model, feature_ls, "PCA/", model.__name__, infile pca=True)

In [None]:
def open_dataframe(in_file):
  # Load csv as a Pandas DataFrame from in_file location
  in_file = os.path.join(in_file)
  df = pd.read_csv(in_file, header=0, index_col='crocus_id', parse_dates=["datetime"])
  # The experiment Crocus-024 was removed due to inadequate data types
  if any(df.loc['Crocus-024']):
    df.drop('Crocus-024', inplace=True)
  return df

In [None]:
def cross_validation(model_func, features, feature_folder, model_folder, infile, yeo_johnson=False, pca=False, scale=True):
  """
  model_func: function object, which ML method to use. E.g., Linear regression, or random forest, etc.
  features: list, which set of features to use
  feature_folder: str, the name used for this set of features to save metadata
  model_folder: str, the name used for this model to save metadata
  infile: str, save location for file
  yeo_johnson: bool, whether to include the Yeo-Johnson power transform
  pca: bool, whether to apply pca to the feature list to reduce dimensionality
  scale: bool, whether to apply scalar to the feature data. 
  
  This function takes a model and a list of features and will perform cross validation for that model and feature set"""

  file_check = infile + feature_folder + model_folder +".csv"
  # if the file exists then skip
  if os.path.exists(file_check):
    return 
  # The dataframe is opened
  df = open_dataframe()
  # a list of reaction runs is made and sorted.
  run_list = sorted(list(set(df.index)))
  run_list_iter = run_list
  results_ls = []
  # iterate through runs
  for run in run_list_iter:
    # split into test and training runs
    test_run = run
    training_runs = [x for x in run_list if x != run]
    # split data into test and train data
    train_df = df.loc[training_runs]
    test_df = df.loc[run]
    # split data into x and y for test and training
    x_train = train_df[features]
    y_train = train_df['Product']
    x_test = test_df[features]
    y_test = test_df['Product']
    # # Scale and then power transform data
    if scale is True:
      ss = StandardScaler()
      x_train = ss.fit_transform(x_train)
      x_test = ss.transform(x_test)
    # apply pca if true until 95% explained variance is reached
    if pca is True:
        for i in range(1, len(features)):
          print(i)
          pca = PCA(n_components=i)
          principalComponents = pca.fit_transform(x_train)
          print(f'with {i} principal components explained variance is:', pca.explained_variance_ratio_,
                "and the total is:", sum(pca.explained_variance_ratio_))
          if sum(pca.explained_variance_ratio_) > 0.95:
              break
        x_train = pca.fit_transform(x_train)
        x_test = pca.transform(x_test)
    # if yeo-johnson is true then power transform the data
    if yeo_johnson is True:
      pt = PowerTransformer(method='yeo-johnson')
      x_train = pt.fit_transform(x_train)
      x_test = pt.transform(x_test)
    # train model
    trained_model = model_func(x_train, y_train)
    # test model
    y_pred = trained_model.predict(x_test)
    # evaluate predictions
    mae = mean_absolute_error(y_test, y_pred)
    # plots predictions vs actual product to provide visual assessment of accuracy
    
      # plt.text(1, 25, mae)
      # plt.xlim(0, 40)
      # plt.ylim(0, 40)
      # plt.plot(y_pred, y_test, linewidth=2, label=test_run, c='black')
      # plt.ylabel('Actual')
      # plt.xlabel('Predictions')
      # plt.show()

    # per file, only 1 model, 1 feature set but each cross validation (10 runs)
    results_df = pd.DataFrame({'test_run': [test_run], 'training_runs': [training_runs], 'features': [features], 'model': [model_folder], 'mae': [mae]})
    results_ls.append(results_df)
  cv_results_df = pd.concat(results_ls)
  average_mae = np.mean(cv_results_df['mae'])
  average_row = pd.DataFrame({'test_run': 'average', 'training_runs': 'average', 'features': [features], 'model': [model_folder], 'mae': average_mae})
  cv_results_df_save = pd.concat([cv_results_df, average_row])
  # if no folder, make a new one
  directory_check = infile + feature_folder
  if not os.path.exists(directory_check):
    os.makedirs(directory_check)
  cv_results_df_save.to_csv(infile + feature_folder + model_folder +".csv")


In [None]:
"""Models"""
def linear_regression(x_train, y_train):
    """Runs a linear regressor and sends the object back for evaluation"""
    regressor = LinearRegression(fit_intercept=True)
    regressor.fit(x_train, y_train)
    return regressor

def gradient_boosting_regressor(x_train, y_train):
    """Runs a gradient boosting regressor and sends the object back for evaluation"""
    regressor = GradientBoostingRegressor(n_estimators=1000, random_state=0)
    regressor.fit(x_train, y_train)
    return regressor
  
def random_forest(x_train, y_train):
    """Runs a random forest regressor and sends the object back for evaluation"""
    regressor = RandomForestRegressor(n_estimators=1000, random_state=0)
    regressor.fit(x_train, y_train)
    return regressor

def polynomial_regression(x_train, y_train):
  """Runs a polynomial regressor and sends the object back for evaluation"""
  regressor = PolynomialRegressor()
  regressor.fit(x_train, y_train)
  return regressor

In [None]:
class PolynomialRegressor:
  """For polynomial regressor model"""
  def __init__(self):
    pass

  def cubic_polynomial(self, x, a, b, c, d=0):
    z = a * x + b * x ** 2 + c * x ** 3 + d
    return z

  def fit_transform(self, x, y):
    # need to reshape arrrays
    n = len(x)
    assert n == len(y), "x and y must have same length"
    x, y = x.values.reshape(n,), y.values.reshape(n, )
    self.coeffs, cov = curve_fit(f=self.cubic_polynomial, xdata=x, ydata=y, p0=[0, 0, 0], bounds=(-np.inf, np.inf))
    y_train_pred = self.cubic_polynomial(x_train, *self.coeffs)
    return y_train_pred

  def fit(self, x, y):
    n = len(x)
    assert n == len(y), "x and y must have same length"
    if isinstance(x, np.ndarray):
      x, y = x.reshape(n,), y.values.reshape(n, )
    elif isinstance(x, pd.Series):
      x, y = x.values.reshape(n,), y.values.reshape(n, )
    self.coeffs, cov = curve_fit(f=self.cubic_polynomial, xdata=x, ydata=y, p0=[0, 0, 0], bounds=(-np.inf, np.inf))

  def predict(self, x):
    n = len(x)
    if isinstance(x, np.ndarray):
      x = x.reshape(n, )
    elif isinstance(x, pd.Series):
      x = x.values.reshape(n,)
    y_pred = self.cubic_polynomial(x, *self.coeffs)
    return y_pred
