- First draft of interfaces for models and train/test flow.
- Interfaces for Predictor and the Test flow definitely need some changes to make sure the "history" is correct
- The idea is to be able to swap out different Configs and Models for testing different scenarios

In [1]:
import numpy as np
from dataclasses import dataclass
import os
from abc import ABC, abstractmethod
from typing import Tuple, List, Generator

# @dataclass(frozen=True)
# class DataConfig
#     num_rx_antennas: int
#     num_tx_antennas: int
#     num_subcarriers: int

@dataclass(frozen=True)
class Config:
    num_rx_antennas: int
    num_tx_antennas: int
    num_subcarriers: int
    # Additional options/configurations...
    train_test_split: float
    pca_model_path: str
    predictor_path: str
    kmeans_path: str
    retrain_all: bool = True
    predictor_window_size = 5


# Not sure if we want to group stuff like this into sample
@dataclass(frozen=True)
class DataSample:
    index: int
    csi: np.ndarray
    loc: np.ndarray
    vel: np.ndarray
    

@dataclass(frozen=True)
class DataWindow:
    index: int
    sample: DataSample  # Data Sample at index
    previous: List[DataSample]  # Previous predictor_window_size elements
    

class Dataset:
    csi_samples: np.ndarray
    ue_locations: np.ndarray
    ue_velocity: np.ndarray
    
    def __init__(self, cfg: Config, csis: np.ndarray, ue_locs: np.ndarray, ue_vels: np.ndarray, window_indexes: List[int]):
        """
        :param csis:            Points to entire array of CSIs
        :param ue_locs:         Points to entire array of Locs
        :param ue_vels:         Points to entire array of Vels
        :param window_indexes:  List of indices of end of prediction Window. 
            ie for each index, the previous cfg.predictor_window_size values will be used to predict the CSI at that index.
            
        """
        self.cfg = cfg
        self.csi_samples = csis
        self.ue_locations = ue_locs
        self.ue_velocity = ue_vels
        self.window_indexes = window_indexes
        assert all(self.window_indexes >= cfg.predictor_window_size)
        
        
    def __getitem__(self, index) -> DataSample:
        return DataSample(index, self.csi_samples[index], self.ue_locations[index], self.ue_velocity[index])
        
    def __len__(self):
        return len(self.window_indexes)
    
    def __iter__(self):
        # Iterate over Samples
        for i in self.window_indexes:
            yield self[i]
    
    def get_window(self, index) -> DataWindow:
        return DataWindow(
            index,
            sample=self[index],
            previous=[
                self[idx] for idx in range(index - cfg.predictor_window_size, index)
            ]
        )
    
    def each_window(self) -> Generator[DataWindow]:
        # Iterate over each Window
        for i in self.window_indexes:
            yield self.get_window(i)

# We may want to move these class definitions into separate file

class Model(ABC):
    # Model interface for different predictors - basically anything that needs to be trained and then can predict stuff
    @abstractmethod
    def __init__(self):  # Instantiate Model hyperparameters
        pass
    @abstractmethod
    def fit(self, data: np.ndarray):
        pass
    @abstractmethod
    def process(self, data: np.ndarray) -> np.ndarray:
        pass
    @abstractmethod
    def load(self, path):
        pass
    @abstractmethod
    def save(self, path):
        pass


# TODO: Implement Models
class PCACompressor(Model):
    def __init__(self):
        pass

    def fit(self, data: np.ndarray):
        pass

    def process(self, csis: np.ndarray) -> np.ndarray:
        # return zdls
        pass
    
    def decode(self, zdl: np.ndarray) -> np.ndarray:
        # Go from zdl to csi space
        pass

    def load(self, path):
        pass

    def save(self, path):
        pass


class KMeansErrorCompressor(Model):
    def __init__(self):
        pass

    def fit(self, data: np.ndarray):
        pass

    def process(self, data: np.ndarray) -> np.ndarray:
        pass
    
    def decode(self, error: np.ndarray) -> np.ndarray:
        pass

    def load(self, path):
        pass

    def save(self, path):
        pass


class CSIPredictor(Model):
    def __init__(self):
        pass

    def fit(self, data: np.ndarray):
        pass

    def process(self, data: np.ndarray) -> np.ndarray:
        pass
    
    def load(self, path):
        pass

    def save(self, path):
        pass



IndentationError: expected an indented block (2387970700.py, line 66)

## Data Processing


In [3]:
## Configure
# Use this cfg variable whenever we need to access some constant
cfg = Config(
    num_rx_antennas=1,
    num_tx_antennas=64,
    num_subcarriers=80,
    train_test_split=0.8
)

def load_data(folder: str) -> Tuple[Dataset, Dataset]:
    ## TODO Load Data using cfg, train/test split
    pass

train_set, test_set = load_data("folder")

## TODO Add Noise

TypeError: __init__() missing 1 required positional argument: 'pca_model_path'

## Offline Training


In [5]:
## Train PCA
def train_or_load(model: Model, data, path):
    if not cfg.retrain_all and os.path.exists(path):
        model.load(path)
    else:
        model.fit(data)
        model.save(path)
    
pca = PCACompressor()
train_or_load(pca, train_set.csi_samples, cfg.pca_model_path)
zdl_train = pca.process(train_set.csi_samples)

## TODO Bit Allocation?

## Train Predictor
predictor = CSIPredictor()
train_or_load(predictor, zdl_train, cfg.predictor_path)  # TODO maybe want to also pass in location etc
predicted_zdl = predictor.process(zdl_train)

## TODO Compute Prediction Error
prediction_error = zdl_train - predicted_zdl

## Error Compression
error_compressor = KMeansErrorCompressor()
train_or_load(error_compressor, prediction_error, cfg.kmeans_path)

# DL and UL each get trained pca, predictor, and error_compressor

NameError: name 'cfg' is not defined

## Testing

In [None]:
## Downlink Side
zdl_test = pca.process(test_set.csi_samples)
# Bit allocation?
predicted_zdl_test = predictor.process(zdl_test)
prediction_error_test = zdl_test - predicted_zdl
compressed_error_test = error_compressor.process(predicted_zdl_test)

# CHANNEL: Send compressed_error to UL

## UL Side
ul_predicted_error = error_compressor.decode(compressed_error_test)
ul_predicted_zdl = predictor.process(zdl_test)  # TODO ? Shouldn't be test_set Might need to do this iteratively like loop over each sample and load the History 
ul_reconstructed_zdl = ul_predicted_error + ul_predicted_zdl
ul_predicted_csi = pca.decode(ul_reconstructed_zdl)

## TODO Compute Accuracy 
# test_set.csi_samples == ul_reconstructed_zdl

## Visualize?


## Simulation
- "End to End" Simulation
- Show simulation of entire new "test" path made up of portions of paths that were trained on

In [None]:
from copy import deepcopy

class SimpleSimulator(ABC):
    def __init__(self, cfg: Config, pca: PCACompressor, predictor: CSIPredictor, error_compressor: KMeansErrorCompressor):
        # Given pretrained components
        self.pca = pca
        self.predictor = predictor
        self.error_compressor = error_compressor
        self.csi_history = []
        self.cfg = cfg
        
    def add_history(self, zdl):
        self.csi_history.append(zdl)
        if self.csi_history > self.cfg.predictor_window_size:
            self.csi_history.pop(0)
    
    def set_history(self, csi_history: List[np.ndarray]):
        self.csi_history = deepcopy(csi_history)
    
    def reset(self):
        self.csi_history = []
        
    def get_current_csi_window(self):
        return self.csi_history[-cfg.predictor_window_size:]

class DLSimulator(SimpleSimulator):
    @abstractmethod
    def simulate(self, sample: DataSample):
        pass


class ULSimulator(SimpleSimulator):
    @abstractmethod
    def simulate(self, channel_input: np.ndarray) -> np.ndarray:
        # Returns a CSI Estimation for this timestamp
        pass


class DLSimple(DLSimulator):
    def simulate(self, sample: DataSample) -> np.ndarray:
        # Returns Quantized Error for this timestep
        zdl = self.pca.process(sample.csi)
        predicted = self.predictor.process(self.get_current_csi_window())
        error = zdl - predicted
        compressed_error = self.error_compressor.process(error)
        decompressed_error = self.error_compressor.decode(compressed_error)
        reconstructed_zdl = predicted + decompressed_error
        self.add_history(reconstructed_zdl)
        return compressed_error  # Send on channel
        

class ULSimple(ULSimulator):
    def simulate(self, compressed_error: np.ndarray)-> np.ndarray:
        decompressed_error = self.error_compressor.decode(compressed_error)
        predicted = self.predictor.process(self.get_current_csi_window())
        reconstructed_zdl = predicted + decompressed_error
        self.add_history(reconstructed_zdl)
        return self.pca.decode(reconstructed_zdl)  # Estimated CSI on UL Side
    
class Evaluator:
    def evaluate(self, sample: DataSample, predicted_csi: np.ndarray):
        # Evaluate the sample from one time step
        pass
    
    def report(self):
        # Finalize evaluation and provide report
        pass
    
    def visualize(self):
        pass
    
    def reset(self):
        pass

In [None]:
dataset = Dataset()

# Load in pretrained components
dl_sim: DLSimulator = DLSimple(cfg, pca, predictor, error_compressor)
ul_sim: ULSimulator = ULSimple(cfg, pca, predictor, error_compressor)

evaluator = Evaluator()

for sample in dataset:
    quantized_error = dl_sim.simulate(sample)
    predicted_csi = ul_sim.simulate(quantized_error)
    evaluator.evaluate(sample, predicted_csi)
    
evaluator.report()
evaluator.visualize()