# Подготовка

In [None]:
import dask.dataframe as dd
import pandas as pd
import tensorflow as tf
import seaborn as sns
import numpy as np
layers = tf.keras.layers
import matplotlib.pyplot as plt
from pandas import Timestamp

In [None]:
# Загрузка данных обучения
x = dd.read_parquet("Датасеты/X_train.parquet")
df = x.compute()

In [None]:
# Берём последний месяц для оптимального времени обучения и из-за повторяемости данных
df = df[-30*24*360:]

# Чистка данных

In [None]:
# Наблюдаем выбросы
df.describe().transpose()

In [None]:
# Заполнение nan ближайшими значениями
df = df.fillna(method="ffill").fillna(method="bfill")

In [None]:
# Чистка выбросов
df[df < 0] = 0
df[df == 6000] = 300

In [None]:
df.describe().transpose()

# Разделение данных

In [None]:
column_indices = {name: i for i, name in enumerate(x.columns)}

# берём последние 84 часа и разбиваем на выборки 
n = len(df)
train_df = df[-6*360*7:-int(6*360*4)]
val_df = df[-int(6*360*4):-6*360*2]
test_df = df[-6*360*2:]

num_features = df.shape[1]
train_df.shape, val_df.shape, test_df.shape

# Нормализация данных

In [None]:
train_mean = train_df.mean()
train_std = train_df.std()

train_df = (train_df - train_mean) / train_std
val_df = (val_df - train_mean) / train_std
test_df = (test_df - train_mean) / train_std

# Классы и функции

In [None]:
class WindowGenerator():
  def __init__(self, input_width, label_width, shift,
               train_df=train_df, val_df=val_df, test_df=test_df,
               label_columns=None):
    # Store the raw data.
    self.train_df = train_df
    self.val_df = val_df
    self.test_df = test_df

    # Work out the label column indices.
    self.label_columns = label_columns
    if label_columns is not None:
      self.label_columns_indices = {name: i for i, name in
                                    enumerate(label_columns)}
    self.column_indices = {name: i for i, name in
                           enumerate(train_df.columns)}

    # Work out the window parameters.
    self.input_width = input_width
    self.label_width = label_width
    self.shift = shift

    self.total_window_size = input_width + shift

    self.input_slice = slice(0, input_width)
    self.input_indices = np.arange(self.total_window_size)[self.input_slice]

    self.label_start = self.total_window_size - self.label_width
    self.labels_slice = slice(self.label_start, None)
    self.label_indices = np.arange(self.total_window_size)[self.labels_slice]

  def __repr__(self):
    return '\n'.join([
        f'Total window size: {self.total_window_size}',
        f'Input indices: {self.input_indices}',
        f'Label indices: {self.label_indices}',
        f'Label column name(s): {self.label_columns}'])

In [None]:
def split_window(self, features):
  inputs = features[:, self.input_slice, :]
  labels = features[:, self.labels_slice, :]
  if self.label_columns is not None:
    labels = tf.stack(
        [labels[:, :, self.column_indices[name]] for name in self.label_columns],
        axis=-1)

  # Slicing doesn't preserve static shape information, so set the shapes
  # manually. This way the `tf.data.Datasets` are easier to inspect.
  inputs.set_shape([None, self.input_width, None])
  labels.set_shape([None, self.label_width, None])

  return inputs, labels

WindowGenerator.split_window = split_window

In [None]:
my_column = df.columns[4]

In [None]:
def plot(self, model=None, plot_col=my_column, max_subplots=3):
  inputs, labels = self.example
  plt.figure(figsize=(12, 8))
  plot_col_index = self.column_indices[plot_col]
  max_n = min(max_subplots, len(inputs))
  for n in range(max_n):
    plt.subplot(max_n, 1, n+1)
    plt.ylabel(f'{plot_col} [normed]')
    plt.plot(self.input_indices, inputs[n, :, plot_col_index],
             label='Inputs', marker='.', zorder=-10)

    if self.label_columns:
      label_col_index = self.label_columns_indices.get(plot_col, None)
    else:
      label_col_index = plot_col_index

    if label_col_index is None:
      continue

    plt.scatter(self.label_indices, labels[n, :, label_col_index],
                edgecolors='k', label='Labels', c='#2ca02c', s=64)
    if model is not None:
      predictions = model(inputs)
      plt.scatter(self.label_indices, predictions[n, :, label_col_index],
                  marker='X', edgecolors='k', label='Predictions',
                  c='#ff7f0e', s=64)

    if n == 0:
      plt.legend()

  plt.xlabel('Time [h]')

WindowGenerator.plot = plot

In [None]:
def make_dataset(self, data):
  data = np.array(data, dtype=np.float32)
  ds = tf.keras.utils.timeseries_dataset_from_array(
      data=data,
      targets=None,
      sequence_length=self.total_window_size,
      sequence_stride=1,
      shuffle=True,
      batch_size=32,)

  ds = ds.map(self.split_window)

  return ds

WindowGenerator.make_dataset = make_dataset

In [None]:
@property
def train(self):
  return self.make_dataset(self.train_df)

@property
def val(self):
  return self.make_dataset(self.val_df)

@property
def test(self):
  return self.make_dataset(self.test_df)

@property
def example(self):
  """Get and cache an example batch of `inputs, labels` for plotting."""
  result = getattr(self, '_example', None)
  if result is None:
    # No example batch was found, so get one from the `.train` dataset
    result = next(iter(self.train))
    # And cache it for next time
    self._example = result
  return result

WindowGenerator.train = train
WindowGenerator.val = val
WindowGenerator.test = test
WindowGenerator.example = example

In [None]:
MAX_EPOCHS = 20

def compile_and_fit(model, window, patience=2):
  early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss',
                                                    patience=patience,
                                                    mode='min')

  model.compile(loss=tf.losses.MeanSquaredError(),
                optimizer=tf.optimizers.Adam(),
                metrics=[tf.metrics.MeanAbsoluteError()])

  history = model.fit(window.train, epochs=MAX_EPOCHS,
                      validation_data=window.val,
                      callbacks=[early_stopping])
  return history

In [None]:
def make_end_predict(data, window, model, test_interval):
    x = [i for i in tf.keras.utils.timeseries_dataset_from_array(data,
                                             sequence_length=len(wide_window.label_indices),
                                             batch_size=data.shape[0],
                                             targets=None)][0]
    pred = model.predict(x)[-1][:test_interval.shape[0]]
    predd = pd.DataFrame(pred, index=test_interval.index, columns=data.columns)
    return predd

# Модель

In [None]:
# 6 часов для предсказания следующих 6 часов
wide_window = WindowGenerator(
    input_width=6*360, label_width=6*360, shift=6*360)

In [None]:
wide_window

ResNets

In [None]:
class ResidualWrapper(tf.keras.Model):
  def __init__(self, model):
    super().__init__()
    self.model = model

  def call(self, inputs, *args, **kwargs):
    delta = self.model(inputs, *args, **kwargs)

    # The prediction for each time step is the input
    # from the previous time step plus the delta
    # calculated by the model.
    return inputs + delta

In [None]:
%%time
residual_lstm = ResidualWrapper(
    tf.keras.Sequential([
    tf.keras.layers.LSTM(32, return_sequences=True),
    tf.keras.layers.Dense(
        num_features,
        # The predicted deltas should start small.
        # Therefore, initialize the output layer with zeros.
        kernel_initializer=tf.initializers.zeros())
]))

history = compile_and_fit(residual_lstm, wide_window)

# Предсказание для X_test

In [None]:
intervals = pd.read_excel("Датасеты/test_intervals.xlsx")

intervals = intervals.drop(intervals.columns[0], axis=1)
intervals = intervals.drop(intervals.columns[-2:], axis=1)
intervals[intervals.columns[0]] = intervals[intervals.columns[0]].round("10s")
intervals[intervals.columns[1]] = intervals[intervals.columns[1]].round("10s")

In [None]:
six = 21600*10**9-1

In [None]:
# Разбивка интервалов по 6 часов и меньше
j = []
for start, finish in zip(intervals["start"], intervals["finish"]):
    time_delta = (finish - start).value
    if time_delta > six:
        t = start.value
        while time_delta > six:
            j += [(Timestamp(t), Timestamp(t + six))]
            t += six
            time_delta -= six
        j += [(Timestamp(t), Timestamp(finish))]
    else:
        j += [(start, finish)]

In [None]:
ti = dd.read_parquet("Датасеты/X_test.parquet").compute()

In [None]:
for i in range(len(j)):
    # Выделение необходимого интревала
    start = j[i][0]
    finish = j[i][1]
    period = Timestamp(start.value - six)
    # Считывание необходимого интревала
    to_pred = ti.loc[period:start]
    test_interval = ti.loc[start:finish]
    if test_interval.isna().sum().sum() == 0:
        pass
    else:
        to_pred = to_pred.fillna(method="ffill").fillna(method="bfill")
        # Предсказание пропущенных значений
        pred = make_end_predict(to_pred, wide_window, residual_lstm, test_interval)
        pred_interval = test_interval.combine_first(pred)
        # Замена пропущенных значений
        ti.loc[start:finish] = pred_interval

In [None]:
ti.to_parquet("Предсказания/x_test_pred.parquet")

In [None]:
new_intervals = pd.DataFrame(j, columns=intervals.columns)

new_intervals[new_intervals.columns[0]] = new_intervals[intervals.columns[0]].round("10s")
new_intervals[new_intervals.columns[1]] = new_intervals[intervals.columns[1]].round("10s")

In [None]:
new_intervals.to_parquet("Предсказания/new_intervals.parquet")