In [3]:
import pandas as pd
import numpy as np
import threading
import time
from sklearn.preprocessing import StandardScaler
from typing import Dict, List, Tuple, Optional
from queue import Queue
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
File_Path = '/Users/shreyasravi/PycharmProjects/Embedded-Systems/London_Weather.csv'

A thread-safe class for preprocessing weather data, implementing feature selection, and preparing data chunks for LSTM training and inference processes. This class acts as a producer in a producer-consumer pattern, sending data to both training and inference threads.

Perform Correlation-based Feature Selection (CFS) by thresholding to determine relevant features

Semaphore is like a counter which is used to control the number of threads that can access a shared resource at the same time. It is a signaling mechanism like traffic light block threads when resources are not available and allow threads when the resources are available.

In [5]:
class DataPreprocessing(threading.Thread):
    def __init__(self,
                 data_path: str,  # Path to the weather dataset CSV
                 time_steps: int = 20,  # Number of timesteps to look at while predicting
                 train_size: float = 0.8,  # Proportion of data to use for training - 80%
                 trainer_chunk_size: int = 10,  # Number of data chucks to send to the trainer
                 inference_chunk_size: int = 1,  # Number of data chunks to send to the inference
                 capacity: int = 5,  # Buffer capacity for each consumer
                 target_column: str = 'mean_temp'):  # Column to predict (e.g. 'mean_temp' or 'precipitation')

        threading.Thread.__init__(self)

        # Data parameters
        self.data_path = data_path
        self.time_steps = time_steps
        self.train_size = train_size
        self.trainer_chunk_size = trainer_chunk_size
        self.inference_chunk_size = inference_chunk_size
        self.target_column = target_column

        # Thread synchronization primitives
        self.capacity = capacity
        self.trainer_buffer = Queue(maxsize=capacity)
        self.inference_buffer = Queue(maxsize=capacity)

        # Semaphores for trainer
        self.trainer_mutex = threading.Semaphore()  # It is like ON and OFF switch and only one thread can access the trainer
        self.trainer_empty = threading.Semaphore(capacity)  # It has a capacity of 5, so 5 threads can access the trainer
        self.trainer_full = threading.Semaphore(0)  # It is like a counter, it is 0, so no thread can access the trainer

        # Semaphores for inference
        self.inference_mutex = threading.Semaphore()  # It is like ON and OFF switch and only one thread can access the trainer
        self.inference_empty = threading.Semaphore(capacity)  # It has a capacity of 5, so 5 threads can access the trainer
        self.inference_full = threading.Semaphore(0)  # It is like a counter, it is 0, so no thread can access the trainer

        # Internal state
        self.data = None
        self.features = None
        self.scaler = None
        self.features_selected = False
        self.data_processed = False
        self.relevant_features = []
        self.irrelevant_features = []
        self.X_train = None
        self.y_train = None
        self.X_test = None
        self.y_test = None

        # Tracking variables
        self.trainer_chunks_sent = 0
        self.inference_chunks_sent = 0
        self.is_running = True

    def load_data(self) -> None:
        # Load and perform basic cleaning of the data
        print(f"Loading data from {self.data_path}")
        self.data = pd.read_csv(self.data_path)

        # Basic cleaning and preprocessing
        self.data['date'] = pd.to_datetime(self.data['date'], format='%Y%m%d')
        self.data['snow_depth'] = self.data['snow_depth'].fillna(0)

        # Drop rows with any remaining NaN values
        self.data.dropna(inplace=True)

        # Extract features (All columns except date and Target)
        self.features = self.data.drop(['date', self.target_column], axis=1)
        self.targets = self.data[self.target_column].values

        print(f"Data loaded: {len(self.data)} rows with {len(self.features.columns)} features")

    def select_features(self, correlation_threshold: float = 0.4) -> None:
        if self.data is None:
            raise ValueError("Data must be loaded before feature selection")

        print("Performing Correlation-based Feature Selection (CFS)")

        # Calculate correlation with target
        correlations = {}
        target_values = self.data[self.target_column]

        for column in self.features.columns:
            correlation = abs(self.data[column].corr(target_values))
            correlations[column] = correlation

        # Categorize features
        self.relevant_features = []
        self.irrelevant_features = []

        for feature, correlation in correlations.items():
            if correlation >= correlation_threshold:
                self.relevant_features.append(feature)
            else:
                self.irrelevant_features.append(feature)

        print(f"Selected {len(self.relevant_features)} relevant features: {self.relevant_features}")
        print(f"Identified {len(self.irrelevant_features)} less relevant features: {self.irrelevant_features}")

        # Update features to only include relevant ones
        self.features = self.features[self.relevant_features]
        self.features_selected = True

    def prepare_data(self) -> None:
        if not self.features_selected:
            print("Warning: Feature selection has not been performed")

        print("Preparing data for LSTM training and inference")

        # Get dimensions
        D = self.features.shape[1]  # Number of features
        N = len(self.features) - self.time_steps

        # Split into train and test
        train_size = int(len(self.features) * self.train_size)

        # Normalize features
        self.scaler = StandardScaler()
        self.scaler.fit(self.features[:train_size])
        features_normalized = self.scaler.transform(self.features)

        # Preparing X_train and y_train
        self.X_train = np.zeros((train_size, self.time_steps, D))
        self.y_train = np.zeros((train_size, 1))

        for t in range(train_size):
            self.X_train[t, :, :] = features_normalized[t:t + self.time_steps]
            self.y_train[t] = self.targets[t + self.time_steps]

        # Preparing X_test and y_test
        self.X_test = np.zeros((N - train_size, self.time_steps, D))
        self.y_test = np.zeros((N - train_size, 1))

        for i in range(N - train_size):
            t = i + train_size
            self.X_test[i, :, :] = features_normalized[t:t + self.time_steps]
            self.y_test[i] = self.targets[t + self.time_steps]

        # Convert to PyTorch tensors
        self.X_train = torch.from_numpy(self.X_train.astype(np.float32))
        self.y_train = torch.from_numpy(self.y_train.astype(np.float32))
        self.X_test = torch.from_numpy(self.X_test.astype(np.float32))
        self.y_test = torch.from_numpy(self.y_test.astype(np.float32))

        print(f"Data prepared: X_train: {self.X_train.shape}, y_train: {self.y_train.shape}")
        print(f"               X_test: {self.X_test.shape}, y_test: {self.y_test.shape}")

        self.data_processed = True

    def prepare_train_chunk(self, start_idx: int) -> Tuple[torch.Tensor, torch.Tensor]:
        # Prepare a chunk of training data starting from the given index
        end_idx = min(start_idx + self.trainer_chunk_size, len(self.X_train))
        X_chunk = self.X_train[start_idx:end_idx]
        y_chunk = self.y_train[start_idx:end_idx]
        return X_chunk, y_chunk

    def prepare_inference_chunk(self, start_idx: int) -> Tuple[torch.Tensor, torch.Tensor]:
        # Prepare a chunk of inference data starting from the given index
        end_idx = min(start_idx + self.inference_chunk_size, len(self.X_test))
        X_chunk = self.X_test[start_idx:end_idx]
        y_chunk = self.y_test[start_idx:end_idx]
        return X_chunk, y_chunk

    def send_to_trainer(self, data_chunk: Tuple[torch.Tensor, torch.Tensor]) -> None:
        # Send a chunk of data to the trainer through the thread-safe buffer
        self.trainer_empty.acquire()
        self.trainer_mutex.acquire()

        self.trainer_buffer.put(data_chunk)
        self.trainer_chunks_sent += 1
        print(f"Sent chunk {self.trainer_chunks_sent} to trainer")

        self.trainer_mutex.release()
        self.trainer_full.release()

    def send_to_inferer(self, data_chunk: Tuple[torch.Tensor, torch.Tensor]) -> None:
        # Send a chunk of data to the inference through the thread-safe buffer
        self.inference_empty.acquire()
        self.inference_mutex.acquire()

        self.inference_buffer.put(data_chunk)
        self.inference_chunks_sent += 1
        print(f"Sent chunk {self.inference_chunks_sent} to inference")

        self.inference_mutex.release()
        self.inference_full.release()

    def run(self) -> None:
        print("Starting data preprocessing thread")

        # Load and prepare data
        if self.data is None:
            self.load_data()

        if not self.features_selected:
            self.select_features()

        if not self.data_processed:
            self.prepare_data()

        # Send training data chunks
        start_time = time.time()
        train_idx = 0
        test_idx = 0

        # Process all training data first
        while train_idx < len(self.X_train):
            # Prepare and send training data chunk
            train_chunk = self.prepare_train_chunk(train_idx)
            self.send_to_trainer(train_chunk)
            train_idx += self.trainer_chunk_size

            # Simulate processing time
            time.sleep(0.5)  # Adjust this time as needed

        # Then process test data for inference
        while test_idx < len(self.X_test):
            # Prepare and send inference data chunk
            inference_chunk = self.prepare_inference_chunk(test_idx)
            self.send_to_inferer(inference_chunk)
            test_idx += self.inference_chunk_size

            # Simulate processing time
            time.sleep(0.2)  # Adjust this time as needed

        end_time = time.time()
        print(f"Data preprocessing completed in {end_time - start_time:.2f} seconds")
        print(f"Sent {self.trainer_chunks_sent} chunks to trainer and {self.inference_chunks_sent} chunks to inferer")

        # Signal that we're done feeding data
        self.is_running = False

    def get_buffer_status(self) -> Dict[str, int]:
        # Get the current status of the buffers
        return {
            'trainer_buffer_size': self.trainer_buffer.qsize(),
            'inference_buffer_size': self.inference_buffer.qsize(),
            'trainer_chunks_sent': self.trainer_chunks_sent,
            'inference_chunks_sent': self.inference_chunks_sent
        }

    def stop(self) -> None:
        # Stop the preprocessing thread
        self.is_running = False