# This is a preprocessing tools for time series


This tools focus 

1. mapping from columns in the dataframe of Pandas to  values by 
  (1) One-hot coding; 
  (2) Categorical indicator
  (3) normalizing functions.

2. Windowing historic data automatically, so that RNN model can be fitted.



I am working on Deep Time (https://github.com/MRYingLEE/DeepTime-Deep-Learning-Framework-for-Time-Series-Forecasting). This tools is part of my research work.

Tensorflow 2.x is used.


![alt text](https://www.tensorflow.org/tutorials/structured_data/images/time_series.png)

# Import TensorFlow and other libraries

Maybe later sklearn Preprocessing function (https://scikit-learn.org/stable/modules/preprocessing.html#preprocessing) will be supported.

So far, only train_test_split of sklearn is used.

In [0]:
!pip install sklearn

In [0]:
import numpy as np
import pandas as pd

import tensorflow as tf

from tensorflow import feature_column
from tensorflow.feature_column import *
from tensorflow.keras import layers
from sklearn.model_selection import train_test_split

from io import StringIO

# ipywidgets （https://github.com/jupyter-widgets/ipywidgets） makes the Jupyter Notebook interactive.
from ipywidgets import *

import sys
import os

# Useful helper functions for Map transformation

The reason I create some helper function is that I want to make the generated code short and easy to read.

In [0]:
# A function to generate a one-hot column by the vocabulary list.
def categorical_strings(column,vocabulary_list):
  def one_hot_column(row):
    count_v=len(vocabulary_list)

    table = tf.lookup.StaticVocabularyTable(
      tf.lookup.KeyValueTensorInitializer(
      vocabulary_list, range(count_v), key_dtype=tf.string, value_dtype=tf.int64, name=column
      ),
      1)

    out = table.lookup(row)

    return tf.one_hot(out,count_v+1)

  return column, one_hot_column

# # A function to generate an embedding column by the vocabulary list.
# def categorical_strings_embedding(column,vocabulary_list, embedding_dim=8):
#   sparse_column = feature_column.categorical_column_with_vocabulary_list(
#       column, vocabulary_list)
#   embedding_column = feature_column.embedding_column(sparse_column, dimension=embedding_dim)
#   return embedding_column

# # A function to generate a hashed column by the vocabulary list.
# def categorical_hash(column,vocabulary_list, bucket_size=1000):
#   hashed = feature_column.categorical_column_with_hash_bucket(
#       column, hash_bucket_size=bucket_size)
#   hashed=feature_column.indicator_column(hashed)
#   return hashed

# A function to generate a one-hot column by the vocabulary list for an integer column.
def categorical_identitys(column,vocabulary_list):
  def one_hot_column(row):
    count_v=len(vocabulary_list)

    table = tf.lookup.StaticVocabularyTable(
      tf.lookup.KeyValueTensorInitializer(
      vocabulary_list, range(count_v), key_dtype=tf.int64, value_dtype=tf.int64, name=column
      ),
      1)

    out = table.lookup(row)

    return tf.one_hot(out,count_v+1)

  return column, one_hot_column

# Class of an Estimator

I like the idea of estimator to make machine learning more easily, but this is NOT an child of tf.estimator.Estimator class(https://www.tensorflow.org/guide/estimator).

tf.estimator.Estimator class depends on feature_column (https://www.tensorflow.org/api_docs/python/tf/feature_column) heavily. I like the idea of feature_column also, but feature_column doesn't work well with time series and feature_column doeen't support functional API of Keras well. (If you find a way to solve my headache, please let me know.)


In [0]:
class TsEstimator:
## The feature column types
    # Here is a full list of built-in features of tensorflow 2.
    # But actually not all are supported in this tools.
  feature_kinds={
      "bucketized_column(...)":"Represents discretized dense input bucketed by boundaries.",
      "categorical_column_with_hash_bucket(...)":"Represents sparse feature where ids are set by hashing.",
      "categorical_column_with_identity(...)":"A CategoricalColumn that returns identity values.",
      "categorical_column_with_vocabulary_file(...)":"A CategoricalColumn with a vocabulary file.",
      "categorical_column_with_vocabulary_list(...)":"A CategoricalColumn with in-memory vocabulary.",
      "crossed_column(...)":"Returns a column for performing crosses of categorical features.",
      "embedding_column(...)":"DenseColumn that converts from sparse, categorical input.",
      "indicator_column(...)":"Represents multi-hot representation of given categorical column.",
      "make_parse_example_spec(...)":"Creates parsing spec dictionary from input feature_columns.",
      "numeric_column(...)":"Represents real valued or numerical features.",
      "sequence_categorical_column_with_hash_bucket(...)":"A sequence of categorical terms where ids are set by hashing.",
      "sequence_categorical_column_with_identity(...)":"Returns a feature column that represents sequences of integers.",
      "sequence_categorical_column_with_vocabulary_file(...)":"A sequence of categorical terms where ids use a vocabulary file.",
      "sequence_categorical_column_with_vocabulary_list(...)":"A sequence of categorical terms where ids use an in-memory list.",
      "sequence_numeric_column(...)":"Returns a feature column that represents sequences of numeric data.",
      "shared_embeddings(...)":"List of dense columns that convert from sparse, categorical input.",
      "weighted_categorical_column(...)":"Applies weight values to a CategoricalColumn.",
      "?":"Unknown"
    }
    # ## The default feature kind for dtype of Pandas

    # For every dtype of Pandas, a default feature kind is assigned.

  dtype_default_feature={
      "object":"?",
      "int64":"numeric_column(...)",
      "float64":"numeric_column(...)",
      "bool":"numeric_column(...)",
      "datetime64":"?",
      "timedelta[ns]":"?",
      "category":"categorical_strings(...)"
    }

  dtype_features_cross = StringIO("""Kind,object,int64,float64,bool,datetime64,timedelta[ns],category,cat_int64,cat_string
    categorical_identitys,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,TRUE,FALSE
    categorical_strings,TRUE,FALSE,FALSE,FALSE,FALSE,FALSE,TRUE,FALSE,TRUE
        """)
  df_dtype_features_cross = pd.read_csv(dtype_features_cross, sep=",")

  def __init__(self, df_all, df_train, df_val=None, df_test=None, categories_limit=20):
    self._df_all=df_all

    assert(df_all is not None)

    if (df_train is None):
      self._df_train=df_all
    else:
      self._df_train=df_train
    self._df_val=df_val
    self._df_df_test=df_test

    self.input_features=[]
    self.label_features=[]

    self.columns_label=[]
    self.columns_input=[]

    self.global_normalizers={} # Not used so far 
    self.categorical_columns=[]
    self.categories_limit=categories_limit
    self.grid=None
    self.category_lists= self.__df_desc()
      # If a column has less than this number (20 as default) of unique value, I will treate it as a category column.
    self.code=""

  @classmethod
  def get_available_features(cls,col_dtype):
    return set(cls.df_dtype_features_cross[["Kind",col_dtype]][cls.df_dtype_features_cross[col_dtype]]["Kind"].unique())

    # ## To generate normalizer lambda and denormalizer one

    # So far, only 2 kinds of normalizer and denormalizer are supported:

  # min-max  : (value-min)/(max-min)
  # To generate min-max normalizer and denomalizer lambda statements
  @staticmethod
  def min_max_normalizer(min_v,max_v, v_str="by_train",is_int64=False):
    if is_int64:
      ext_v_str="tf.cast("+v_str+",tf.float32)"
    else:
      ext_v_str=v_str
    
    return "lambda "+v_str+": ("+ext_v_str+ " -"+str(min_v)+")/("+str(max_v)+"-"+str(min_v)+")","lambda "+v_str+": "+ext_v_str+ " *("+str(max_v)+"-"+str(min_v)+")+"+str(min_v)

  # mean-std  : (value-mean)/std
  # To generate mean-std normalizer and denomalizer lambda statements
  @staticmethod
  def std_normalizer(v_mean,v_std, v_str="by_train",is_int64=False):
    if is_int64:
      ext_v_str="tf.cast("+v_str+",tf.float32)"
    else:
      ext_v_str=v_str

    return "lambda "+v_str+": ("+ext_v_str+ " -"+str(v_mean)+")/"+str(v_std),"lambda "+v_str+": "+ext_v_str+ " *"+str(v_std)+"+"+str(v_mean)

  # To generate min-max/mean-std normalizer and denomalizer lambda statements given an statistics data
  @staticmethod
  def create_local_normalizers(col_name,df_statistics, v_str="by_train",is_int64=False):
    v_min=df_statistics.loc[col_name]["min"]
    v_max=df_statistics.loc[col_name]["max"]
    v_mean=df_statistics.loc[col_name]["mean"]
    v_std=df_statistics.loc[col_name]["std"]

    n1,d1=TsEstimator.min_max_normalizer(v_min,v_max,v_str,is_int64=is_int64)
    n2,d2=TsEstimator.std_normalizer(v_mean,v_std,v_str,is_int64=is_int64)

    locals={n1:d1,n2:d2}
    return locals

  # To generated a suitable string for an integer list
  @staticmethod
  def int_list_as_string(a):
    s = [str(i) for i in a]
    return  "["+",".join(s)+"]"

  # To generated a suitable string for a string list
  @staticmethod
  def string_list_as_string(s):
    return  "['"+"','".join(s)+"']"

    # ## To generate available feature kinds and suitable normalizer lambda statements for every column.

  # Please note the whole dataframe and the train part are both required.

  # The whole dataframe is used to decide the vocalbulary list for each column.

  # Both the whole dataframe and the train part are used to generate lambda statements for NUMERIC columns. So normalizing can be based on the whole data or only the train part. It's up to the data scientist.

  def __df_desc(self):
    df_all=self._df_all
    df_train=self._df_train

    df_statistics_train=df_train.describe().T # I use train part to normalize!
    df_statistics_all=df_all.describe().T # I use train part to normalize!
    
    category_lists={}
    
    for c in df_train.columns:
      dtype_name=df_train[c].dtype.name

      availables=self.get_available_features(dtype_name)

      if availables is None:
        availables={}

      feature="numeric_column('"+c+"')"

      local_normalizers={}

      if ((dtype_name=="int64") or (dtype_name=="object")):
        is_int64=(dtype_name=="int64")

        values_unique=df_all[c].unique()
        f=len(values_unique)   # I use all rows to decide the cetegory list   
        if f<self.categories_limit: #Category
          if is_int64:
            feature=categorical_identitys.__name__+"('"+c+"',"+self.int_list_as_string(values_unique)+")"
          else:
            feature=categorical_strings.__name__+"('"+c+"',"+self.string_list_as_string(values_unique)+")"
          self.categorical_columns.append(c)
        else:
          if is_int64:
            feature="numeric_column('"+c+"')"
            local_normalizers=self.create_local_normalizers(c,df_statistics_train,v_str="by_train", is_int64=True)
            self.global_normalizers.update(local_normalizers)
            local_normalizers1=self.create_local_normalizers(c,df_statistics_all,v_str="by_all", is_int64=True)
            self.global_normalizers.update(local_normalizers1)
            local_normalizers.update(local_normalizers1)
          else:
            feature="embedding_column('"+"('"+c+"')"
      else:
        if (dtype_name=="float64"):
            feature="numeric_column('"+c+"')"
            local_normalizers=self.create_local_normalizers(c,df_statistics_train,v_str="by_train", is_int64=False)
            self.global_normalizers.update(local_normalizers)
            local_normalizers1=self.create_local_normalizers(c,df_statistics_all,v_str="by_all", is_int64=False)
            self.global_normalizers.update(local_normalizers1)
            local_normalizers.update(local_normalizers1)
        elif  (dtype_name=="bool"):
            feature="numeric_column('"+c+"')"
        elif (dtype_name=="category"):
          feature="categorical_column_with_vocabulary_list('"+"('"+c+"')"
          self.categorical_columns.append(c)
        else:
          feature=dtype_defaults[dtype_name] 
      
      availables.add(feature)

      availables={s.replace("(...)","('"+c+"')") for s in availables}
      category_lists[c]={"default":feature,"available":availables,"normalizers": local_normalizers}

    return category_lists

  def get_feature_grid(self,default_inputs=[], default_labels=[]):
    if self.grid is not None:
      return self.grid

    # category_lists=df_desc(df_all,df_train)
    df_all=self._df_all
    df_train=self._df_train

    cols=len(df_train.columns)
    grid = GridspecLayout(cols+1, 12)
    # To add a header at row 0
    grid[0,0]= widgets.Label(value="Column")
    grid[0,1]= widgets.Label(value="dtype")
    grid[0,2]= widgets.Label(value="Input?")
    grid[0,3]= widgets.Label(value="Label?")
    grid[0,4:7]= widgets.Label(value="Feature Kind")
    grid[0,8:]= widgets.Label(value="Numeric Normalizer")

    for i in range(cols):
      feature_option=self.category_lists[df_train.columns[i]]
      grid[i+1,0]= widgets.Label(value=df_train.columns[i])
      grid[i+1,1]= widgets.Label(value=df_train.dtypes[i].name)
      grid[i+1,2]=widgets.Checkbox(value=(df_train.columns[i] in default_inputs),description='',indent=False,layout=Layout(height='auto', width='auto'))
      grid[i+1,3]=widgets.Checkbox(value=(df_train.columns[i] in default_labels),indent=False,description='',layout=Layout(height='auto', width='auto'))
      
      grid[i+1,4:7]= widgets.Dropdown(
        options=list(feature_option['available']),
        value=feature_option['default'],
        description="",
        layout=Layout(height='auto', width='auto')
        )
      
      if len(feature_option['normalizers'])>0:
        grid[i+1,8:]=widgets.Dropdown(
          options=list(feature_option['normalizers'].keys()),
          value=list(feature_option['normalizers'].keys())[0],
          layout=Layout(height='auto', width='auto'),
          description=""
          )
    
    self.grid=grid

    return grid

    # To generate code based on interactive grid
  def __generate_code(self):
    code_generator=[]

    lambda_1="lambda x: x"

    grid=self.grid
    for i in range(1,grid.n_rows):
      f_col=grid[i,4].value
      # print(f_col)
      if (grid[i,4].value.startswith("numeric_column(")):
        if (grid[i,1].value =="bool"):
          lambda_f=lambda_1
        else:
          lambda_f=grid[i,8].value

        # f_col=f_col[:-1]
        f_col=grid[i,0].value

        if (grid[i,2].value==True):
          code_generator.append("input_features.append(('"+f_col+"',"+lambda_f+"))")
          self.columns_input.append(grid[i,0].value)
        if (grid[i,3].value==True):
          code_generator.append("label_features.append(('"+f_col+"',"+lambda_f+"))")
          self.columns_label.append(grid[i,0].value)
      else:
        if (grid[i,2].value==True):
          code_generator.append("input_features.append("+f_col+")")
          self.columns_input.append(grid[i,0].value)
        if (grid[i,3].value==True):
          code_generator.append("label_features.append("+f_col+")")
          self.columns_label.append(grid[i,0].value)
    return code_generator, self.columns_label    

  def __run_generated_code(self, code_generator):
    code=';'.join(code_generator)
    # print(code)

    try:
      self.input_features.clear()
      self.label_features.clear()
      exec(code,None, {'input_features':self.input_features,'label_features':self.label_features})
      print("The feature_columns have been generated!")
    except:
      print("Please check the generated code", sys.exc_info()[0])
    # print(code_generator)

  def update_by_grid(self):
    self.code,_ =self.__generate_code()
    # print("code:",code)
    self.__run_generated_code(self.code)

  def input_label_1d(self):
    def transform(row):
      i_result1=[tf.reshape(tf.cast(y(row[x]),tf.float64),[-1]) for (x, y) in (self.input_features) ]
      i_result2=[tf.reshape(tf.cast(y(row[x]),tf.float64),[-1]) for (x, y) in (self.label_features) ]

      return tf.concat(i_result1+i_result2,0)
    return transform

  def label_1d(self):
    def transform(row):
      i_result2=[tf.reshape(tf.cast(y(row[x]),tf.float64),[-1]) for (x, y) in (self.label_features) ]

      return tf.concat(i_result2,0)
    return transform

  @staticmethod
  def split_input_label(forecast_size,v_labels, single_step=False):
    def input_label(row):
      i_input=row[:-forecast_size,:]

      if single_step:
        l_label=row[-1,-v_labels:]
      else:
        i_label=tf.reshape(row[-forecast_size:,-v_labels:],[-1]) # To reshape to a 1-d tensor

      return i_input, i_label

    return input_label

    # A utility method to create a tf.data dataset from a Pandas Dataframe
  def df_to_dataset(self, dataframe,past_history=5, future_target=2, shuffle=False, batch_size=32):
    ds = tf.data.Dataset.from_tensor_slices(dict(dataframe)) 

    ds_map=ds.map(self.input_label_1d())

    v_labels=ds.map(self.label_1d()).element_spec.shape[-1]

    # Feel free to play with shuffle buffer size
    shuffle_buffer_size = len(dataframe)
    # Total size of window is given by the number of steps to be considered
    # before prediction time + steps that we want to forecast

    total_size = past_history + future_target

    # Selecting windows
    data = ds_map.window(total_size, shift=1, drop_remainder=True)
    data = data.flat_map(lambda k: k.batch(total_size))

    # Shuffling data (seed=Answer to the Ultimate Question of Life, the Universe, and Everything)
    if shuffle:
      data = data.shuffle(shuffle_buffer_size, seed=42)

    # Extracting past features  + labels
    data = data.map(TsEstimator.split_input_label(future_target,v_labels))

    ds_4_train= data.batch(batch_size).prefetch(tf.data.experimental.AUTOTUNE)

    return ds_4_train


## <font color=red> Your Dataframe here</font>
Typically, this is the <font color=red>ONLY</font> place for you to type.


In [0]:
csvURL = '' # the csv data file or web path

default_inputs=[]  # The default features list for inputs
default_labels=[] # The default features list for labels

dataframe=None
if (csvURL!=''):
  dataframe = pd.read_csv(csvURL)
  dataframe.head()

## A demo dataframe if you don't create one

This tutorial uses a <a href="https://www.bgc-jena.mpg.de/wetter/" class="external">[weather time series dataset</a> recorded by the <a href="https://www.bgc-jena.mpg.de" class="external">Max Planck Institute for Biogeochemistry</a>.

This dataset contains 14 different features such as air temperature, atmospheric pressure, and humidity. These were collected every 10 minutes, beginning in 2003. For efficiency, you will use only the data collected between 2009 and 2016. This section of the dataset was prepared by François Chollet for his book [Deep Learning with Python](https://www.manning.com/books/deep-learning-with-python).

In both the following tutorials, the first 300,000 rows of the data will be the training dataset, and there remaining will be the validation dataset. This amounts to ~2100 days worth of training data.

In [0]:
if (dataframe is None):
  zip_path = tf.keras.utils.get_file(
      origin='https://storage.googleapis.com/tensorflow/tf-keras-datasets/jena_climate_2009_2016.csv.zip',
      fname='jena_climate_2009_2016.csv.zip',
      extract=True)
  csv_path, _ = os.path.splitext(zip_path)
  dataframe = pd.read_csv(csv_path,index_col='Date Time')
  default_inputs=['p (mbar)', 'T (degC)', 'rho (g/m**3)']  # The default features list for inputs
  default_labels=['T (degC)'] # The default features list for labels

dataframe.head()

## Split data into Train and Test 

In [0]:
dataframe_train, dataframe_test = train_test_split(dataframe, test_size=0.2)
dataframe_train, dataframe_val = train_test_split(dataframe_train, test_size=0.2)
print(len(dataframe_train), 'train examples')
print(len(dataframe_val), 'validation examples')

## Create an estimator

In [0]:
estimator=TsEstimator(dataframe, dataframe_train, dataframe_val, dataframe_test)

## Inspect the data by categorical columns

In [0]:
if len(estimator.categorical_columns)>0:
  # Use seaborn for pairplot
  !pip install -q seaborn
  # import matplotlib.pyplot as plt
  import seaborn as sns
  # plt.figure(figsize=(20,5))
  sns.pairplot(dataframe[estimator.categorical_columns], diag_kind="kde")

## To create an interactive grid

You may try the builder INTERACTIVELY.

In [0]:
grid=estimator.get_feature_grid(default_inputs, default_labels)
grid

<font color=red>**RERUN** the following cells once you change the above settings.</font>

In [0]:
estimator.update_by_grid()

assert(len(estimator.input_features)>0)
assert(len(estimator.label_features)>0)
# code_generator
estimator.input_features,estimator.label_features

In [0]:
for x,y in estimator.input_features:
  print(x, y)

In [0]:
estimator.code

## Create an input pipeline using tf.data

Next, I will wrap the dataframes with [tf.data](https://www.tensorflow.org/guide/datasets).

In [0]:
past_history = 720
future_target = 72
batch_size = 5 # A small batch sized is used for demonstration purposes
single_step=False

In [0]:
a_past_history=widgets.Label("Past History Periods:")
v_past_history = widgets.IntText(value=past_history)
box_past_history = widgets.HBox([a_past_history, v_past_history])

a_future_target=widgets.Label("Future_target Periods:")
v_future_target = widgets.IntText(value=future_target)
box_future_target = widgets.HBox([a_future_target, v_future_target])

a_batch_size=widgets.Label("Batch Size:")
v_batch_size = widgets.IntText(value=batch_size)
box_batch_size = widgets.HBox([a_batch_size, v_batch_size])

a_single_step=widgets.Label("Single Step?")
v_single_step= widgets.Checkbox(value=single_step)

grid = widgets.GridspecLayout(4, 5)

grid[0,0]=a_past_history; grid[0,1:]=v_past_history
grid[1,0]=a_future_target; grid[1,1:]=v_future_target
grid[2,0]=a_batch_size; grid[2,1:]=v_batch_size
grid[3,0]=a_single_step; grid[3,1:]=v_single_step

grid


In [0]:
past_history = v_past_history.value
future_target = v_future_target.value
batch_size = v_batch_size.value # A small batch sized is used for demonstration purposes
single_step=v_single_step.value

In [0]:
train_ds = estimator.df_to_dataset(dataframe_train,past_history=past_history, future_target=future_target,  shuffle=False, batch_size=32)
val_ds = estimator.df_to_dataset(dataframe_val,past_history=past_history, future_target=future_target, shuffle=False, batch_size=32)
test_ds = estimator.df_to_dataset(dataframe_test,past_history=past_history, future_target=future_target, shuffle=False, batch_size=32)

In [0]:
next(iter(train_ds))[0].shape,next(iter(train_ds))[1].shape

## Create, compile, and train the model

Slightly modified from Multi-Step model for a multivariate time series in https://www.tensorflow.org/tutorials/structured_data/time_series

Depends on your target, you may need to change the model.


In [0]:
input_shape=next(iter(train_ds))[0].shape[-2:]
label_shape=next(iter(train_ds))[1].shape[-1]
input_shape

In [0]:
EVALUATION_INTERVAL = 200
multi_step_model = tf.keras.models.Sequential()
multi_step_model.add(tf.keras.layers.LSTM(32,
                                          return_sequences=True,
                                          input_shape=input_shape))
multi_step_model.add(tf.keras.layers.LSTM(16, activation='relu'))
multi_step_model.add(tf.keras.layers.Dense(label_shape))

multi_step_model.compile(optimizer=tf.keras.optimizers.RMSprop(clipvalue=1.0), loss='mae')

multi_step_history = multi_step_model.fit(train_ds, epochs=5,
                                          steps_per_epoch=EVALUATION_INTERVAL,
                                          validation_data=val_ds,
                                          validation_steps=50)