In [None]:
import numpy as np
import threading as thr
import time
import random as rand
import pandas as pd

In [None]:
df = pd.read_csv("london_weather.csv")

In [None]:
# load the data
df = pd.read_csv('london_weather.csv')

# 1) drop any rows with missing values
df_clean = df.dropna()

# 2) remove the 'date' column if it exists
if 'date' in df_clean.columns:
    df_clean = df_clean.drop(columns=['date'])

df = df_clean.copy()
df = df.head(1000)

In [None]:
class lstmModel:
  def __init__(self, alpha):
    self.weights = np.zeros(6)            # 6 “weights”
    self.grad    = np.zeros_like(self.weights)
    self.alpha   = alpha

  def zero_grad(self):
    self.grad.fill(0.)

  def train(self, chunk):
    # 1) pretend our “model” is y_pred = X @ w
    #    split chunk into X (6 features) and y (1 target)
    X = chunk.iloc[:, :6].values           # shape (batch,6)
    y = chunk.iloc[:, 6].values            # shape (batch,)

    # 2) forward → MSE loss
    y_pred = X.dot(self.weights)           # (batch,)
    error  = y_pred - y                    # (batch,)
    loss   = np.mean(error**2)             # scalar, not used further

    # 3) backward → ∂(MSE)/∂w = (2/batch) * Xᵀ·error
    self.zero_grad()
    self.grad = (2.0 / X.shape[0]) * X.T.dot(error)  # shape (6,)

    # simulate a bit of compute time
    time.sleep(5)

    # 4) return that gradient vector
    return self.grad
  
  def predict(self, data):
    M = data.shape[0]
    # simulate a prediction in [0,1)
    return [rand.random() for _ in range(M)]
  

  def updateWeights(self):
    self.weights -= self.alpha * self.grad

In [None]:
# Hyper params
N = 3
grads = [None]*N
models = [None]*N
epochs = 5

In [None]:
chunks = np.array_split(df, N)
chunks

In [None]:

compBarrier = thr.Barrier(N+1)
updateBarrier = thr.Barrier(N+1)

def aggregate(grads):
  return sum(grads) / len(grads)

mutex = thr.Lock()
def addGrad(i, grad):
  print("THREAD", i, "IS CURRENTLY WAITING FOR THE `grads` MUTEX TO BE ACQUIRED...")
  mutex.acquire()
  try: grads[i] = grad
  finally: 
    mutex.release()
    print("THREAD", i, "HAS RELEASED THE `grads` MUTEX...")

def worker(threadNum, chunk):
  LSTMi = models[threadNum]
  for epoch in range(epochs):
    print("================================")
    grad = LSTMi.train(chunk)
    addGrad(threadNum, grad)
    print("THREAD:", threadNum, ", EPOCH:", epoch, "GRAD:", grad)
    compBarrier.wait()
    print("THREAD", threadNum, "is waiting for compBarrier. EPOCH:", epoch)
    updateBarrier.wait()
    print("THREAD", threadNum, "is waiting for updateBarrier. EPOCH:", epoch)
    print("================================")

def controller():
  M = df.shape[0]
  chunks = np.array_split(df, N)
  threads = []
  for i, chunk in enumerate(chunks):
    models[i] = lstmModel(0.01)
    t = thr.Thread(target=worker, args=(i, chunk), daemon=True)
    t.start()
    print("Started thread", i, " using chunk", i, ".")
    threads.append(t)
    
  for epoch in range(epochs):
    print("================================")
    compBarrier.wait()           # wait for all workers to finish computing
    newGrad = aggregate(grads)
    print("[main] NEW GRAD:", newGrad)
    for m in models:
      m.grad = newGrad        # or call m.updateWeights() if you stash grad there
      m.updateWeights()
    updateBarrier.wait()         # let workers continue to next epoch

  for t in threads:
      t.join()

controller()

In [None]:
### predictions
predBarrier = thr.Barrier(N+1)
allPreds = [None]*N

predMutex = thr.Lock()
def storePreds(i, preds):
  print("THREAD", i, "IS CURRENTLY WAITING FOR THE `allPreds` MUTEX TO BE ACQUIRED...")
  predMutex.acquire()
  try: allPreds[i] = preds
  finally:
    predMutex.release()
    print("THREAD", i, "HAS RELEASED THE `allPreds` MUTEX...")

def predWorker(num, data):
  model = models[num]
  preds = model.predict(data)
  print("THREAD", num, "has completed its predictions.")
  storePreds(num, preds)
  print("THREAD", num, "is waiting for predBarrier.")
  predBarrier.wait() # wait for all threads to finish predictions
  

def predsCont(testSet):
  M = testSet.shape[0]
  threads = []
  for i in range(N):
    t = thr.Thread(target=predWorker, args=(i, testSet,), daemon=True)
    t.start()
    print("Started thread", i, " using chunk", i, ".")
    threads.append(t)
  
  predBarrier.wait() # wait for all models to submit their predictions
  
  finalPreds = [None]*M

  # using average voting
  for predi in range(M):
    predsForThisInstance = []
    for modelsPreds in allPreds:
      print(predi)
      predsForThisInstance.append(modelsPreds[predi])
    finalPreds[predi] = sum(predsForThisInstance)/len(predsForThisInstance)
  for t in threads:
    t.join()
  return finalPreds

test = df.sample(n=5, random_state=42)
test

In [None]:
predsCont(test)