## Task 5.2.1 - Brute-Force Hyperparameter Search

* Implement a brute-force search function that finds the best parameter combination for a given model. **(RESULT)**
* Test your implementation on the following problems:
    - 1) A `SVM` model on the two moons problem. **(RESULT)**
    - 2) A `LinearRegression` model with Ridge regularization on the `California Housing Dataset`. **(RESULT)**

Feel free to use `sklearn`'s model implementations.

In [2]:
# Useful imports
import numpy as np
from sklearn.datasets import make_moons, fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
from sklearn.linear_model import Ridge
from sklearn.metrics import accuracy_score, mean_squared_error
from sklearn.preprocessing import StandardScaler



In [3]:
# You might check for the following hyperparameter ranges:

svm_params = {
    'C': [0.1, 1.0, 10.0, 100.0],
    'kernel': ['linear', 'rbf', 'poly'],
    'gamma': ['scale', 'auto']
}


ridge_params = {
    'alpha': [0.01, 0.1, 1.0, 10.0, 100.0],
    'solver': ['auto', 'svd', 'cholesky', 'lsqr']
}





# Building Function

In [4]:
import itertools
def brute_force_search(model, params, task, X_train, y_train, X_test, y_test):
  mydict ={}
  paramdict={}
  combinations = list(itertools.product(*list(params.values())))
  for combination in combinations:
    for key, value in zip(params.keys(), combination):
      paramdict[key]=value
    mymodel= model(**paramdict)
    mymodel.fit(X_train, y_train)
    if task =="Classification":
      score = accuracy_score(y_test, mymodel.predict(X_test))
    else:
      score = mean_squared_error(y_test, mymodel.predict(X_test))
    mydict[tuple(combination)] = score
  if task =="Classification":
      best_combination = max(mydict, key=lambda k: mydict[k])
  else:
    best_combination = min(mydict, key=lambda k: mydict[k])
  return best_combination


This function should have the task type passed to make sure the the correct metric is being calculated.

# Exploring different ways

In [5]:
# combination = []
# for i in range(len(svm_params['C'])):
#     for j in range(len(svm_params['kernel'])):
#         for k in range(len(svm_params['gamma'])):
#             combination.append((i,j,k))
# print(combination)

We have thought of getting the combinations using just loops but wouldn't have been generic for any model because who says i have 3 parameters only , so 3 loops.

In [6]:
# def brute_force_search(model, params, task, X_train, y_train, X_test, y_test):
#   mydict ={}
#   paramdict={}
#   sizes = [len(v) for v in params.values()]
#   grids = np.meshgrid(*list(params.values()), indexing='ij')
#   combinations = zip(*(g.flatten() for g in grids))

#   for combination in combinations:

#     for key, value in zip(params.keys(), combination):
#       paramdict[key]=value
#     mymodel= model(**paramdict)
#     mymodel.fit(X_train, y_train)
#     if task =="Classification":
#       score = accuracy_score(y_test, mymodel.predict(X_test))
#     else:
#       score = mean_squared_error(y_test, mymodel.predict(X_test))
#     mydict[tuple(combination)] = score
#   if task =="Classification":
#       best_combination = max(mydict, key=lambda k: mydict[k])
#   else:
#     best_combination = min(mydict, key=lambda k: mydict[k])
#   return best_combination




Another way to find the different combinations of parameters that we explored is through the np.meshgrid function (uncomment if you want to test). we explored it in case itertools is not allowed to be used although nothing has been mentioned like using numpy only in this task.

#Testing

In [7]:
X_moons,y_moons = make_moons(n_samples=1000, noise=0.2, random_state=42)
X_train_moons, X_test_moons, y_train_moons, y_test_moons = train_test_split(X_moons, y_moons, test_size=0.2, random_state=42)

In [8]:

best_combinations = brute_force_search(SVC, svm_params, "Classification",X_train_moons, y_train_moons, X_test_moons, y_test_moons )
C, Kernel, gamma=best_combinations
print("C:", C)
print("Kernel:", Kernel)
print("gamma:", gamma)

C: 10.0
Kernel: rbf
gamma: scale


In [9]:

house_data = fetch_california_housing(as_frame=True)
X = house_data.data
y = house_data.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

X_scaler = StandardScaler()
X_train = X_scaler.fit_transform(X_train)
X_test = X_scaler.transform(X_test)

In [10]:
best_combinations = brute_force_search(Ridge, ridge_params, "Regression",X_train, y_train, X_test, y_test)
alpha, solver=best_combinations
print("alpha:", alpha)
print("solver:", solver)


alpha: 100.0
solver: svd


## Task 5.2.2 - Simple TPE (BONUS)

* Implement the Tree-Structured Parzen Estimator using `numpy` only. **(RESULT)**
* Find decent hyperparameters for
    - 1) An `SVM` model on the `two moons` problem. **(RESULT)**
    - 2) A `LinearRegression` model with Ridge regularization on the `California Housing Dataset`. **(RESULT)**

# Building TPE

In [11]:
import numbers
class TPE:
  def __init__(self, model, param_ranges, n_iterations, task, n_init_samples=5):
    self.model = model
    self.param_ranges = param_ranges
    self.n_iterations = n_iterations
    self.initsample={}
    self.task=task
    for param, values in param_ranges.items():

        if (isinstance(values, (tuple)) and len(values) == 2):
                self.initsample[param] = np.random.uniform(values[0], values[1], n_init_samples)
        else:
                self.initsample[param] = values

    self.trials=[]

  def fit(self, X_train, y_train, X_test, y_test):

    self.X_train = X_train
    self.y_train = y_train
    self.X_test = X_test
    self.y_test = y_test

  def find_initialGoodBad(self):
    self.param_names = list(self.initsample.keys())
    param_values = list(self.initsample.values())

    all_combinations = list(itertools.product(*param_values))

    # all_combinations_lists = [list(t) for t in all_combinations]
    l_x=[]
    g_x=[]
    paramdict={}
    for combination in all_combinations:

        for key, value in zip(self.param_names, combination):
          paramdict[key]=value
        mymodel= self.model(**paramdict)
        mymodel.fit(self.X_train, self.y_train)
        if self.task =="Classification":
          score = accuracy_score(self.y_test, mymodel.predict(self.X_test))
        else:
          score = mean_squared_error(self.y_test, mymodel.predict(self.X_test))
        self.trials.append({"params": combination, "score": score})
    scores = [list(d.values())[1] for d in self.trials]

    self.threshold= np.quantile(scores, 0.2)
    if self.task=="Classification":

      for diction in self.trials:
          if list(diction.values())[1] > self.threshold:
                l_x.append(list(diction.values())[0])
          else:
                g_x.append(list(diction.values())[0])
    else:
      for diction in self.trials:
          if list(diction.values())[1] < self.threshold:
                l_x.append(list(diction.values())[0])
          else:
                g_x.append(list(diction.values())[0])

    return l_x, g_x

  def find_modeldistribution(self, l_x, g_x):

       self.result = [[d[param] for d in l_x] for param in range(len(self.param_names))]
       self.result2 = [[d[param] for d in g_x] for param in range(len(self.param_names))]
       goodmean=[]
       goodstd=[]
       distcatperparametergood=[]
       for param in self.result:
        if isinstance(param[0], numbers.Number):
          goodmean.append(np.mean(param))
          goodstd.append(np.std(param))
        else:
          distpercat=[]
          par=list(set(param))
          for value in par:
           count= param.count(value)
           distpercat.append(count/len(param))
          distcatperparametergood.append(distpercat)



       return self.result, goodmean, goodstd, distcatperparametergood

  def drawnewsample(self, distcatperparametergood, n_samples, goodmean, goodstd):
    new_candidates = []
    i=0
    j=0
    k=0
    for param in self.result:
      if isinstance(param[0], numbers.Number):
        samples = np.random.normal(loc=goodmean[i], scale=goodstd[i], size=n_samples)
        par=self.param_names[k]
        lower_bound = self.param_ranges[par][0]
        upper_bound = self.param_ranges[par][1]
        samples = np.clip(samples, lower_bound, upper_bound)
        new_candidates.append(samples)
        i+=1
      else:
        par=list(set(param))
        samples = np.random.choice(par, size=n_samples, p=distcatperparametergood[j])
        j+=1
        new_candidates.append(samples)
      k+=1
    return new_candidates

  def kernel(self, u):
    return 1 / np.sqrt(2 * np.pi) * np.exp(-1 / 2 * u * u)

  def kde_single_point(self, history_data, query_value):
    n = len(history_data)
    if n == 0:
        return 0.0

    sigma = np.std(history_data)

    if sigma == 0:
        h = 0.1
    else:
        h = 1.06 * sigma * (n ** -0.2)

    kernel_sum = 0
    for i in range(n):
        # u = (x - xi) / h
        u = (query_value - history_data[i]) / h
        kernel_sum += self.kernel(u)

    density = kernel_sum / (n * h)

    return density

  def evaluate_new_candidates(self, new_candidates):

    finallx=[]
    finalgx=[]
    for group in zip(*new_candidates):
      elements = list(group)
      lx=[]
      gx=[]

      for i, val in enumerate(elements):
        if (isinstance(val, numbers.Number)):
          kdegood=self.kde_single_point(self.result[i], val)
          kdebad=self.kde_single_point(self.result2[i], val)
          lx.append(kdegood)
          gx.append(kdebad)

        else:
           count= self.result[i].count(val)
           kdegood=count/len(self.result[i])
           count= self.result2[i].count(val)
           kdebad=count/len(self.result2[i])
           lx.append(kdegood)
           gx.append(kdebad)

      finallx.append(np.prod(np.array(lx)))
      finalgx.append(np.prod((np.array(gx))))

    idx= np.argmax(np.array(finallx)/(np.array(finalgx) +1e-9))
    paramdict={}
    best_candidate = [param[idx] for param in new_candidates]
    for key, value in zip(self.param_names, best_candidate):
      paramdict[key]=value
    mymodel= self.model(**paramdict)
    mymodel.fit(self.X_train, self.y_train)
    if self.task =="Classification":
        score = accuracy_score(self.y_test, mymodel.predict(self.X_test))
    else:
        score = mean_squared_error(self.y_test, mymodel.predict(self.X_test))
    # current_values = [param[idx] for param in new_candidates]
    self.trials.append({"params":best_candidate, "score": score})





  def optimise(self):
    l_x, g_x = self.find_initialGoodBad()

    for i in range(self.n_iterations):
      result, goodmean, goodstd, distcatperparametergood = self.find_modeldistribution(l_x, g_x)
      new_candidates = self.drawnewsample(distcatperparametergood, 10, goodmean, goodstd)
      self.evaluate_new_candidates(new_candidates)
      if self.task=="Classification" and self.trials[-1]['score']>self.threshold:
          l_x.append(self.trials[-1]['params'])
      elif self.task=="Classification" and self.trials[-1]['score']<self.threshold:
          g_x.append(self.trials[-1]['params'])
      elif self.task=="Regression" and self.trials[-1]['score']<self.threshold:
          l_x.append(self.trials[-1]['params'])
      else:
          g_x.append(self.trials[-1]['params'])
    if self.task=="Classification":
      best_trial = max(self.trials, key=lambda t: t["score"])
    else:
      best_trial = min(self.trials, key=lambda t: t["score"])
    best_params = best_trial["params"]
    return best_params







Assumption have been made in this implementation: if the parameter is continous, it will be passed as a tuple of 2 values namely the min and max values.

# Testing

In [12]:
tpe=TPE(SVC, {
    'C': (0.1, 100.0),
    'kernel': ['linear', 'rbf', 'poly'],
    'gamma': ['scale', 'auto']
}, 100, "Classification")
tpe.fit(X_train_moons, y_train_moons, X_test_moons, y_test_moons)
best_params = tpe.optimise()
C, Kernel, gamma=best_params
print("C:", C)
print("Kernel:", Kernel)
print("gamma:", gamma)

C: 21.888615737517284
Kernel: rbf
gamma: scale


In [13]:
tpe=TPE(Ridge, {
    'alpha': (0.01, 100.0),
    'solver': ['auto', 'svd', 'cholesky', 'lsqr']
}, 100, "Regression")
tpe.fit(X_train, y_train, X_test, y_test)
best_params = tpe.optimise()
alpha, solver=best_params
print("alpha:", alpha)
print("solver:", solver)

alpha: 62.23286539411037
solver: svd


## Congratz, you made it! :)