# DATA PIPELINE

<b>Data format</b><br>
1 Row = 1 Example = 1 Cycle<br>
Each cycle so far has:
 - Qdlin (1000,1)
 - Tdlin (1000,1)
 - Cdlin (1000,1) (WIP)
 - discharge_time (1,) (WIP)
 - IR (1,)
 - remaining_cycle_life (1,) <- target
 
For every cell we create one TFRecord file where each row represents data for one cycle. Before model training we read data from all files, create one dataset and feed it to the model. <br>Below are two different ways of structuring this data while reading and writing. Depending on what input the final model needs, we can chose the appropriate dataset design.

In [None]:
import os
import numpy as np
import pickle
from pathlib import Path

import tensorflow as tf
from tensorflow.train import FloatList, Int64List, Feature, FeatureList, FeatureLists, SequenceExample, Features, Example
from tensorflow.feature_column import numeric_column, make_parse_example_spec
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Conv1D, Flatten, TimeDistributed, Activation

In [None]:
# load only batch1 for testing
path1 = Path("Data/batch1.pkl")
batch1 = pickle.load(open(path1, 'rb'))

# remove batteries that do not reach 80% capacity
del batch1['b1c8']
del batch1['b1c10']
del batch1['b1c12']
del batch1['b1c13']
del batch1['b1c22']

# 1) With tf.train.Example

## Write

In [None]:
"""
see Hands-On Machine Learning pp.416

1. The get_cycle_features function fetches all features and targets from 
the batch1 file and convert to "Example" objects. Every Example contains 
data from one charging cycle.

2. Create a "Data/tfrecords" directory.

3. For each cell create a tfrecord file with the naming convention "b1c0.tfrecord".
The SerializeToString method creates binary data out of the Example objects that can
be read natively in TensorFlow.
"""

def get_cycle_example(cell, idx):
    cycle_example = Example(
        features=Features(
            feature={
                "IR": Feature(float_list=FloatList(value=[batch1[cell]["summary"]["IR"][idx]])),
                "Qdlin": Feature(float_list=FloatList(value=batch1[cell]["cycles"][str(idx)]["Qdlin"])),
                "Tdlin": Feature(float_list=FloatList(value=batch1[cell]["cycles"][str(idx)]["Tdlin"])),
                "Remaining_cycles": Feature(int64_list=Int64List(value=[int(batch1[cell]["cycle_life"]-idx)]))
            }
        )
    )
    return cycle_example

data_dir = "Data/tfrecords/"
if not os.path.exists(data_dir):
    os.mkdir(data_dir)
    
for cell in batch1:
    filename = os.path.join(data_dir + cell + ".tfrecord")
    with tf.io.TFRecordWriter(filename) as f:
        num_cycles = int(batch1[cell]["cycle_life"])-1
        for cycle in range(num_cycles):
            cycle_to_write = get_cycle_example(cell, cycle)
            f.write(cycle_to_write.SerializeToString())

## Read

In [None]:
# define columns for our dataset
ir = numeric_column("IR", shape=[])
qdlin = numeric_column("Qdlin", shape=[1000, 1])
tdlin = numeric_column("Tdlin", shape=[1000, 1])
remaining_cycles = numeric_column("Remaining_cycles", shape=[], dtype=tf.int64)

columns = [ir, qdlin, tdlin, remaining_cycles]
example_spec = make_parse_example_spec(columns)

In [None]:
"""
Reading the remaining code from bottom to top:

When writing to TFrecord we created one file for each cell. Now we merge the
data back into one dataset and prepare it to be fed directly into a model.

The interleave() method will create a dataset that pulls 4 file paths from the
filepath_dataset and for each one calls the function "read_tfrecords". It will then
cycle through these 4 datasets, reading one line at a time from each until all datasets
are out of items. Then it gets the next 4 file paths from the filepath_dataset and
interleaves them the same way, and so on until it runs out of file paths. 
Note: Even with parallel calls specified, data within batches is still sequential.

The read_tfrecords() function reads a file, skipping the first row which in our case
is 0/NaN most of the time. It then loops over each example/row in the dataset and
calls the parse_feature function. Then it batches the dataset, so it always feeds
multiple examples at the same time, and then shuffles the batches. It is important 
that we batch before shuffling, so the examples within the batches stay in order.

The parse_features function takes an example and converts it from binary/message format
into a more readable format. The make_parse_example_spec generates a feature mapping 
according to the columns we defined. To be able to feed the dataset directly into a
Tensorflow model later on, we need to split the data into examples and targets (i.e. X and y).
"""
# define variables
batch1_keys = ['b1c0', 'b1c1', 'b1c2', 'b1c3', 'b1c4', 'b1c5', 'b1c6', 'b1c7', 'b1c9', 'b1c11', 'b1c14', 'b1c15', 'b1c16', 'b1c17', 'b1c18', 'b1c19', 'b1c20', 'b1c21', 'b1c23', 'b1c24', 'b1c25', 'b1c26', 'b1c27', 'b1c28', 'b1c29', 'b1c30', 'b1c31', 'b1c32', 'b1c33', 'b1c34', 'b1c35', 'b1c36', 'b1c37', 'b1c38', 'b1c39', 'b1c40', 'b1c41', 'b1c42', 'b1c43', 'b1c44', 'b1c45']
window_size = 5
shift = 1
stride = 1

def parse_features(example_proto):
    examples = tf.io.parse_single_example(example_proto, example_spec)
    targets = examples.pop("Remaining_cycles")
    return examples, targets

def flatten_windows(features, target):
    feat1 = features["Qdlin"].batch(window_size)    
    target_flat = target.skip(window_size-1)
    return tf.data.Dataset.zip((feat1, target_flat))

def flatten_windows_all_features(features, target):
    """
    This method returns all features instead of just one, but 
    messes up the input shape. This method is not used at the
    moment, but after we  figure out how to feed every feature
    to the model, it could replace 'flatten_windows()'.
    """
    feat1 = features["IR"].batch(window_size)
    feat2 = features["Qdlin"].batch(window_size)
    feat3 = features["Tdlin"].batch(window_size)
    features = tf.data.Dataset.zip((feat1, feat2, feat3))
    target_flat = target.skip(window_size-1)
    return tf.data.Dataset.zip((features, target_flat))

def read_tfrecords(file):
    dataset = tf.data.TFRecordDataset(file).skip(1) # skip can be removed when we have clean data
    dataset = dataset.map(parse_features)
    dataset = dataset.window(size=window_size, shift=shift, stride=stride, drop_remainder=True)
    dataset = dataset.flat_map(flatten_windows)
    dataset = dataset.shuffle(1000).batch(10).prefetch(1) # prefetch is only relevant for CPU to GPU pipelines, see Hands-On ML p.411
    return dataset

# define files to read from and store in a list_files object
filepaths = [os.path.join("Data/tfrecords/" + cell + ".tfrecord") for cell in batch1_keys] 
filepath_dataset = tf.data.Dataset.list_files(filepaths)

dataset = filepath_dataset.interleave(read_tfrecords, cycle_length=4, num_parallel_calls=4)

To feed it into a CNN with a Timedistibuted layer, we need this input:<br>
Examples: [ batch_size, window_size, steps, input_dim ]<br>
e.g.: [ 10, 5, 1000, 1 ]<br>
Laels labels are contained in dataset

In [None]:
for feature, target in dataset.take(1):
    print("Input shape: %s" % [*feature.shape])
    print("Target: %s" % target)

## Test with model

In [None]:
# Test dataset compatibility with a CNN + LSTM model layout
steps = 1000
input_dim = 1

model = Sequential()
# define CNN model
model.add(TimeDistributed(Conv1D(filters=1, kernel_size=3, activation='relu'), input_shape=(window_size,steps,input_dim)))
model.add(TimeDistributed(Flatten()))
model.add(TimeDistributed(Dense(1)))
# define LSTM model
model.add(LSTM(50, activation='relu'))
model.add(Dense(1))
model.compile(loss="mean_squared_error", optimizer='adam')
model.fit(dataset, epochs=2)

# 2) With tf.train.SequenceExample (and FeatureLists)

## Write

In [None]:
"""
Same as in 1), but with an additional layer that sorts features into two categories 
 - "context": "IR", "Remaining_cycles", "Discharge_time" (WIP) 
 - "details": "Qdlin", "Tdlin"
This stores Qdlin and Tdlin in one matrix which we use access more easily as input,
but we lose their name handle. 
"""

def get_sequence_example(cell, idx):
    ir = Feature(float_list=FloatList(value=[batch1[cell]["summary"]["IR"][idx]]))
    qdlin = Feature(float_list=FloatList(value=batch1[cell]["cycles"][str(idx)]["Qdlin"]))
    tdlin = Feature(float_list=FloatList(value=batch1[cell]["cycles"][str(idx)]["Tdlin"]))
    remaining_cycles = Feature(float_list=FloatList(value=[int(batch1[cell]["cycle_life"]-idx)]))

    detail_features = FeatureList(feature=[qdlin, tdlin])

    cycle_example = SequenceExample(
        context = Features(
            feature={"IR":ir,
                     "Remaining_cycles": remaining_cycles
                    }
        ),
        feature_lists = FeatureLists(feature_list={"Details":detail_features})
    )
    return cycle_example
    
data_dir = "Data/tfrecords_featurelists/"
if not os.path.exists(data_dir):
    os.mkdir(data_dir)
    
for cell in batch1:
    filename = os.path.join(data_dir + cell + ".tfrecord")
    with tf.io.TFRecordWriter(filename) as f:
            num_cycles = int(batch1["b1c0"]["cycle_life"])-1
            for cycle in range(num_cycles):
                cycle_to_write = get_sequence_example(cell, cycle)
                f.write(cycle_to_write.SerializeToString())
    break # write only one cell for testing. Remove this to write all cells from batch1

## Read

In [None]:
# define variables
batch1_keys = ['b1c0', 'b1c1', 'b1c2', 'b1c3', 'b1c4', 'b1c5', 'b1c6', 'b1c7', 'b1c9', 'b1c11', 'b1c14', 'b1c15', 'b1c16', 'b1c17', 'b1c18', 'b1c19', 'b1c20', 'b1c21', 'b1c23', 'b1c24', 'b1c25', 'b1c26', 'b1c27', 'b1c28', 'b1c29', 'b1c30', 'b1c31', 'b1c32', 'b1c33', 'b1c34', 'b1c35', 'b1c36', 'b1c37', 'b1c38', 'b1c39', 'b1c40', 'b1c41', 'b1c42', 'b1c43', 'b1c44', 'b1c45']
window_size = 5
shift = 1
stride = 1

context_feature_description = {
    "IR": tf.io.FixedLenFeature([], tf.float32),
    "Remaining_cycles": tf.io.FixedLenFeature([], tf.float32)
}

sequence_feature_description = {
    "Details": tf.io.FixedLenSequenceFeature([1000], tf.float32),
}
    
def parse_features(example_proto):
    sequence_example = tf.io.parse_single_sequence_example(
        example_proto,
        context_feature_description,
        sequence_feature_description
    )
    targets = sequence_example[0].pop("Remaining_cycles")
    return sequence_example, targets

def flatten_windows(features, target):
    feat1 = features[1]["Details"].batch(window_size)
    feat2 = features[0]["IR"].batch(window_size)
    target_flat = target.skip(window_size-1)
    return tf.data.Dataset.zip((feat1, feat2, target_flat))

def read_tfrecords(file):
    dataset = tf.data.TFRecordDataset(file).skip(1) # skip can be removed when we have clean data
    dataset = dataset.map(parse_features)
    dataset = dataset.window(size=window_size, shift=shift, stride=stride, drop_remainder=True)
    dataset = dataset.flat_map(flatten_windows).batch(8).shuffle(1000)
    return dataset

filepaths = [os.path.join("Data/tfrecords_featurelists/" + cell + ".tfrecord") for cell in batch1_keys] 
filepath_dataset = tf.data.Dataset.list_files(filepaths)

dataset = filepath_dataset.interleave(read_tfrecords, cycle_length=4, num_parallel_calls=4)

In [None]:
"""
Instead of the standard way to return data - (Features, Target) - 
this dataset returns (Details, Context, Target). This can be changed
in flatten_windows().
One way to retain the (Features, Target) structure would be to zip
feat1 and feat2 before returning everything. But this would nest 
the data so it can't be accessed as easily.
"""

# The Qdlin/Tdlin matrix needs to be (1000,2) instead of (2,1000)
for ex in dataset.take(1):
    print("Qdlin and Tdlin:\n %s" % ex[0].shape)
    print("IR:\n %s" % ex[1].shape)
    print("Target: \n%s" % ex[2])

## Test with model

In [None]:
# Shapes don't match yet