## Libraries

In [1]:
import import_ipynb
import pandas as pd
import numpy as np
from sklearn import preprocessing
import torch
import Plots

importing Jupyter notebook from Plots.ipynb


## Read_Data Class

In [2]:
class Read_data:
  def __init__(self, loc: str, name: str, true_weeks=12, mul=1, time_interval=4, window=10, predict_length=4):
    '''
    Read and pre-process the data.
    The pre-process includes the process of nan-values, rescaling, normalisation, reservation of true weeks, and scanning into windws using the sliding window.

    loc: The position of the target file.
    name: The name of the data to be addressed as in the following sets.
    true_weeks: The number of weeks from which the data is kept as truth for comparison.
    mul: Rescale the data according to the index.
    time_interval: The number of weeks between the last known week and the first predicted week.
    window: The length of the sliding window.
    prediction_length: The length of weeks to be predicted.
    '''
    doc = pd.read_csv(loc)
    self.name = name
    self.mul = mul
    self.window = window
    self.predict_length = predict_length
    self.scaler = preprocessing.MinMaxScaler(feature_range=(-1, 1)) # Normalisation.
    
    self.data_all = doc.to_numpy()[:, 1] # All the original data before any pre-process.
    self.data_normalised = None

    self.window_interval = time_interval + predict_length - 1 # The interval between the window and the predicted week.       
                                                              # As many as 'predict_length' weeks are reserved for the later predictions in the same month.                           
    self.history_t = None
    self.future_t = None
    
    self._fill()
    self._formation()
    self.data = self.data_all[:-true_weeks]
    self.true_data = self.data_all[-true_weeks:]
    self._normalise()
    self._scan()
    
  def _fill(self):
    '''
    Fill the nan values using the mean of the nearest left-hand and right-hand non-nan values. 
    The self.data will be replaced by the data filled after calling this function.
    '''
    data_filled = self.data_all

    for index in range(data_filled.shape[0]):
      if data_filled[index] == '.':
        for i in reversed(range(0, index)):
          if data_filled[i] != '.':
            left = data_filled[i] # Search for the nearest left hand non-nan value.
            break
        for i in range(index, data_filled.shape[0]):
          if data_filled[i] != '.':
            right = data_filled[i]
            break
        data_filled[index] = str((float(left) + float(right)) / 2)
    data_filled[:] = data_filled[:].astype('float64')
    
    self.data_all = data_filled
    
  def _formation(self):
    '''
    Rescaling the data using the multiplier according to the units.
    '''
    data_format = self.data_all * self.mul
    self.data_all = data_format

  def _normalise(self):
    '''
    Normalise the data. 
    self.data_normalised is used to store the normalised data.
    '''
    data_scaled = self.scaler.fit_transform(self.data.reshape(-1, 1))
    self.data_normalised = data_scaled

  def _scan(self):
    '''
    Using sliding window to scan the data into piecies fed to the LSTM, and the sliding stride is set to 1.
    Scan uses the normalised data to generate windows.
    tensor history = num of windows * window * 1, tensor future = num of windows * 1.
    '''
    history = np.zeros((self.data_normalised.shape[0] - self.window - self.window_interval, self.window))
    future = np.zeros((history.shape[0], 1))

    for i in range(history.shape[0]):
      for j in range(self.window):
        history[i, j] = self.data_normalised[i + j]
        future[i, 0] = self.data_normalised[self.window + i + self.window_interval]
    
    future = future.astype('float')
    self.history_t = torch.from_numpy(history).unsqueeze(2)
    self.future_t = torch.from_numpy(future)

  def update(self, new_value: list[float]):
    '''
    Append the new values into the data and remove the same amount of data from the beginning.
    The original self.data will be replaced, and then the data is normalised and scanned again.
    
    newvaue: A list of new_values to add into the original data.
    '''
    self.data = np.append(self.data, np.array(new_value))
    self.data = self.data[len(new_value): ]

    self._normalise()
    self._scan()

## Validation Function

In [3]:
# def check_data(dataclass: Read_data):
#     '''
#     This function is used to verify if the Read_data is implemented correctly.
#     '''
#     # Inspect the shape of data and compare the data_all, train_data and true_data.
#     data_padded = np.pad(dataclass.data, (0, len(dataclass.data_all)-len(dataclass.data)), constant_values=np.nan)
#     true_data_padded = np.pad(dataclass.true_data, (len(dataclass.data_all)-len(dataclass.true_data), 0), constant_values=np.nan)
#     Plots.plot_lines(np.array([dataclass.data_all, data_padded, true_data_padded]).reshape(3, -1), size=(15, 6), 
#                      title='All Data', xlabel='Months', ylabel='Values', label=['data_all', 'train_data', 'true_data'], show_label=1, titlesize=18)

#     # Check the values of the normalised data.
#     Plots.plot_lines(np.array(dataclass.data_normalised).reshape(1, -1), size=(15, 6),
#                      title='Normalised Data', xlabel='Months', ylabel='Values', label=['data_normalised'], show_label=1, titlesize=18)
    
#     # Check whether history windows are generated correctly.
#     history = []
#     for i in range(dataclass.history_t.shape[0]):
#         history.append(dataclass.history_t[i, 0, 0].item())
#     for i in range(1, dataclass.history_t.shape[1]):
#         history.append(dataclass.history_t[-1, i, 0].item())
#     print('History Tensors Check ' + str(np.array_equal(history, dataclass.data_normalised[ : -dataclass.window_interval-1, 0])))

#     # # Check whether futures are generated correctly.
#     future = []
#     for i in range(dataclass.future_t.shape[0]):
#         future.append(dataclass.future_t[i, 0].item())
#     print('Future Tensors Check ' + str(np.array_equal(future, dataclass.data_normalised[dataclass.window+dataclass.window_interval : , 0])))

#     # Validation the updation function.
#     if dataclass.name == 'Demand':
#         previous = dataclass.data_normalised
#         dataclass.update([3000, 2900, 2800, 3100])
#         Plots.plot_lines(np.array([previous, dataclass.data_normalised]).reshape(2, -1), size=(15, 6), 
#                         title='Update Comparison', xlabel='Months', ylabel='Values', label=['previous data', 'updated data'], show_label=1, titlesize=18)

## Validating

In [4]:
# MUL_DEMAND = 1500
# MUL_PRICE = 1000/340.2
# MUL_YIELD = 2000

# TRUE_WEEKS = 12

# demand = Read_data('../Strawberry Demand.csv', 'Demand', true_weeks=TRUE_WEEKS, mul=MUL_DEMAND)
# price = Read_data('../Strawberry Price.csv', 'Price', true_weeks=TRUE_WEEKS, mul=MUL_PRICE)
# syield = Read_data('../Yield.csv', 'Yield', true_weeks=TRUE_WEEKS, mul=MUL_YIELD)
# check_data(demand)
# check_data(price)
# check_data(syield)