In [1]:
from crontab import CronTab
import datetime as dt
import numpy as np
import os
import pandas as pd
from pathlib import Path
from sqlalchemy import select, text
from sqlalchemy.orm import sessionmaker
import sys
import tensorflow as tf
from time import strftime
import timeit

# Add path of subdirectory containing own modules
modules_path = os.path.join(os.getcwd(), 'data_collect_app')
if modules_path not in sys.path:
    sys.path.append(modules_path)

import finrail_db

# Load tensorboard
%load_ext tensorboard

# Set random seed for reproduceability
tf.keras.utils.set_random_seed(42)

2024-03-13 10:52:17.796063: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2024-03-13 10:52:18.228671: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2024-03-13 10:52:18.230579: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
# Define directory for tensorboard log files
def dir_logs(parent_dir='tf_log'):
    return Path(parent_dir) / strftime('%Y_%m_%d_%H_%M_%S')


In [3]:
def tweak_train(df_):
    '''Function takes DataFrame as returned from SQL-query and returns processed DataFrame
    Transformations:
        - DataType: update to all columns
        - Introducing columns "commuter" and "long_distance" by grouping by date and train category
          and then unstacking ones
        - pushing the date information from index to own column
        - Renaming and setting back nested column names
        
    '''
    return (df_
    .astype({
        'date': 'datetime64',
        'train_cat': 'category', # set as category because of low cardenality
        'total_length': np.float32 # float32 as used later in tensorflow
    })
    .groupby(['date', 'train_cat']) # grouping twice, so "train_cat" can be unstacked later
    .max().unstack()
    .reset_index() # to have dates in own column
    .set_axis(['date', 'commuter', 'long_distance'], axis=1) # set column names, flatten nested column index
)
# Creates tables in finrail db, returns database engine
engine = finrail_db.create_tables(db_str='mysql+mysqlconnector://root:admin123@localhost:5000/finrail')

# Open fire and read stored SQL query to variable
with open('sql_query.txt', 'r') as w:
    sql_query_str = w.read()
    
# Open SQL connection and send query. This query will:
# 1. Sum length of all wagon in a journey section
# 2. Choose maximum length of all wagons among journey sections for each train
# 3. Sum length of wagons for all trains per day, grouped by train category (Commuter, Long-distance)
with engine.connect() as connection:
    df = pd.read_sql_query(text(sql_query_str), connection)

# Apply tweak_train to output of SQL query to obtain desired time series
df = tweak_train(df)

In [157]:
class Custom_Metric(tf.keras.metrics.Metric):
    '''Metric calculating the root mean squared error (RMSE) for a sequence to sequence recurrent
    neuronal network (RNN) exclusively based on the last predicted vector of a sequence. 
    This is useful in situation, where a sequence to sequence RNN is trained, but for production 
    only the last predicted vector matters. This occurs for example in time series prediction.
    This metric allows to evaluate the model performance in time series prediction exclusivley on
    the parts of output that matters for production. Instead the loss of a sequence to sequence 
    model training takes all predicted vectors along a sequence into account.
    
    '''
    def __init__(self, time_series_index=None, **kwarg):
        '''Function hands over kwargs to parent class and initiates two weights, which will 
        hold the sum of squares and the total count of summed numbers.
        Parameters:
            time_series_index <int> If used with a model, that outputs more than one time series, 
            specify index of time series for which custom metric value should be calculated
        '''
        super().__init__(**kwarg) # pass kwargs to parent class
        self.time_series_index = time_series_index #index of time series if multivariate forecast
        self.sum_of_squares = self.add_weight('sum_of_squares', initializer='zeros')
        self.sample_count = self.add_weight('sample_count', initializer='zeros')
    
    def update_state(self, y_true, y_pred, sample_weight=None):
        ''' Function will add to sum_of_squares and sample_counts every batch.'''
        if self.time_series_index == None:
            # True, if class is in use for forecasting single time series
            # sum up how many data point in batch will be summed            
            self.sample_count.assign_add(tf.cast(tf.size(y_pred[:, -1, :]), tf.float32))
            # sum of squares of difference of y_true and y_pred on last sequence
            self.sum_of_squares.assign_add(tf.reduce_sum(
                tf.math.square(y_pred[:, -1, :] - y_true[:, -1, :]))
            )
        else:
            # If class is in use for multivariate forecasting, calculate for selected time series only
            # sum up how many data point in batch will be summed
            self.sample_count.assign_add(tf.cast(
                tf.size(y_pred[:, -1, :, self.time_series_index]), tf.float32)
            )
            # sum of squares of difference of y_true and y_pred on last sequence
            self.sum_of_squares.assign_add(tf.reduce_sum(tf.math.square(
                y_true[:, -1, :, self.time_series_index] - y_pred[:, -1, :, self.time_series_index]
            )))
    
    def result(self):
        '''Function will calculate the RMSE at the end of every epoch'''
        return tf.math.sqrt(self.sum_of_squares / self.sample_count)
                                    
    def reset_state(self):
        '''Function will reset all stateful variables to zero'''
        self.sample_count.assign(0)
        self.sum_of_squares.assign(0)
        
    def get_config(self):
        '''Function will overwrite get_config() of parent class to include "time_series_index"'''
        conf_dict = super().get_config()
        return {**conf_dict, 'time_series_index': self.time_series_index}
        
def timeseries_window(data, seq_length, shift=1, stride=1):
    '''Function takes dataset and returns dataset containing windows with data from input dataset.
    Parameters:
        data <tf.data.Dataset> input dataset
        seq_length <int> defines length of windows in output dataset
        shift <int> defines how many time steps of gap are between two consecutive windows
        stride <int> defines how many time steps are between two consecutive output data points
        
    Return:
        <tf.data.Dataset> Dataset containing windows of seq_length based on input dataset data
    '''
    data = data.window(size=seq_length, shift=shift, stride=stride, drop_remainder=True)
    data = data.flat_map(lambda x: x) # flatten nested Dataset structure returned by .window()
    return data.batch(seq_length) # batch of size seq_length will give one window in each batch

def timeseries_dataset_seq2seq(data, forecast_length=1, seq_length=7):
    '''Function takes Dataset and returns Dataset with windows suitable to train a 
    sequence to sequence RNN
    Parameters:
        data <tf.data.Dataset> input dataset
        forecast_length <int> number of time steps to be forecasted into the future
        seq_length <int> length of sequences fed to RNN (number of consecutive time steps 
        in one training instance)
    '''
    data = timeseries_window(data, forecast_length+1) # First dimension one time step longer than
                                                      # forecast_length, as targets are generated as well
    data = timeseries_window(data, seq_length) # Second dimension consists of windows of size sequence length
    # map to tuple (training instance, target)
    return data.map(lambda x: (x[:, 0], x[:, 1:]), num_parallel_calls=tf.data.AUTOTUNE)


def prepare_training_dataset(df_, column, row_split, forecast_length=14, seq_length=30, 
                             batch_size=32, seed=42, reshuffle_each_iteration=True):
    '''Function takes Dataframe and returns tf.data.Dataset with specs:
    Parameters:
        df_ <pd.Dataframe> Dataframe with time series data (np.float32) in columns
        column <string> name of column or list of column names in DataFrame to use
        row_split <tuple of two int> defines row index between data is extracted from df_
        forecast_length <int> number of time steps to be forecasted into the future
        seq_length <int> length of sequences fed to RNN (number of consecutive time steps 
        batch_size <int> batch_size of returned Dataset
        seed <int> random seed for shuffling data
        reshuffle_each_iteration <boolean> Defines wheater Dataset is ot be reshuffled after each
        training epoch
    Return:
        <tf.data.Dataset> ready to feed to .fit() of an sequence to sequence RNN
    '''
    data = tf.data.Dataset.from_tensor_slices(df[column][:1847].values / 1E5)
    data = timeseries_dataset_seq2seq(data, forecast_length, seq_length)
    data = data.cache() # cache, so that previous transformation are only performed ones
    data = data.shuffle(500, seed=seed, reshuffle_each_iteration=reshuffle_each_iteration)
    return data.batch(batch_size=batch_size).prefetch(tf.data.AUTOTUNE)

#training set until 2020 including
timeseries_train = prepare_training_dataset(df, ['commuter', 'long_distance'], (0, 1847), seq_length=56)

#validation set from 2021 to 2022 including
timeseries_val = prepare_training_dataset(df, ['commuter', 'long_distance'], (1847, 2577), 
                                        batch_size=500, reshuffle_each_iteration=False, seq_length=56)
#test set from 2023 to 2024-03-06
timeseries_test = prepare_training_dataset(df, ['commuter', 'long_distance'], (2577, 3008), 
                                        batch_size=500, reshuffle_each_iteration=False, seq_length=56)

In [158]:
#Input layer stack that defines input shape and will scale down inputs by a factor of 1E5
input_processing = tf.keras.Sequential([
    tf.keras.layers.Input(shape=(None, 1)),
    tf.keras.layers.Rescaling(1E-5)
])

#Output layer that will scale up predictions by a factor of 1E5
output_processing = tf.keras.Sequential([
    tf.keras.layers.Rescaling(1E5)
])

#RNN laer stack for a sequence to sequence model for univariate time series
rnn_seq2seq = tf.keras.Sequential([
    tf.keras.layers.LSTM(32, return_sequences=True),
    tf.keras.layers.Dense(28, activation='linear'),
    tf.keras.layers.Reshape(target_shape=(-1, 14, 2))
])

#Complete model including Input, Output and RNN layer stacks
rnn_seq2seq_complete = tf.keras.Sequential([
    input_processing,
    rnn_seq2seq,
    output_processing
])

#Model used during training, to avoid calculating scaling on every iteration
rnn_seq2seq_training = tf.keras.Sequential([
    tf.keras.layers.Input(shape=(None, 2)),
    rnn_seq2seq
])


In [155]:
%tensorboard --logdir tf_log

In [160]:
# Define callback for Tensorboard update
current_dir = dir_logs()
callback_tensorboard = tf.keras.callbacks.TensorBoard(current_dir, histogram_freq=50)

# Define a learning rate schedule
# Get number of gradient descent steps in one epoch
steps_in_epoch = 0
for i in timeseries_train:
    steps_in_epoch += 1
# select exponential sheduling scaling learning rate down by a factor of 0.9 every 10 epochs
schedule_exp = tf.keras.optimizers.schedules.ExponentialDecay(8E-3, 12*steps_in_epoch, 0.9)


optimizer = tf.keras.optimizers.Nadam(learning_rate=schedule_exp)
rnn_seq2seq_training.compile(optimizer=optimizer, loss='mse', metrics=[Custom_Metric(0), Custom_Metric(1)])
rnn_seq2seq_training.fit(timeseries_train, validation_data=timeseries_val, 
                         epochs = 500, callbacks=[callback_tensorboard], verbose=0)

<keras.src.callbacks.History at 0x7f04172d16c0>

In [167]:
# Evaluating model on test dataset
loss_test, cm_commuter_test, cm_long_distance_test = rnn_seq2seq_training.evaluate(timeseries_test)
cm_long_distance_test
#print(f'\nOn the test data, the model achieves as custom metric (RMSE on 14 day prediction): {cm_cummuter_test:.5f}')



0.019101861864328384

In [108]:
# Predicting with single sequence
rnn_seq2seq_complete.predict(df['commuter'][-70:-14].values.reshape(1, 56, 1))[0, -1, :]
rnn_seq2seq_training.predict(commuter_val)[:, -1, 0].shape
sum_of_squares = []
# TO DO create empty tensors and calculate RMSE for every predicted day sperately
for i, (seq, target) in enumerate(commuter_val):
    print(tf.reduce_sum(tf.square(target[:, -1, :] - rnn_seq2seq_training.predict(seq)[:, -1, :]), axis=0))

tf.Tensor(
[0.63821864 0.60805494 0.5865427  0.703927   0.7427517  0.75127006
 0.7837306  0.76499516 0.7869938  0.78536195 0.79228884 0.7827962
 0.7899372  0.80240345], shape=(14,), dtype=float32)
tf.Tensor(
[0.74441934 0.8358058  0.80125093 0.75349295 0.7329046  0.73170555
 0.7214597  0.7580035  0.7566102  0.6741412  0.6847973  0.7380386
 0.77389437 0.7614313 ], shape=(14,), dtype=float32)
tf.Tensor(
[1.5469093 1.6020226 1.5785794 1.6956469 1.7125258 1.7338915 1.754523
 1.740252  1.7029456 1.6800865 1.6887573 1.6908216 1.6769826 1.7032954], shape=(14,), dtype=float32)
tf.Tensor(
[1.1521171 1.2205957 1.1580614 1.2765483 1.253045  1.2647339 1.2954752
 1.2995462 1.3228484 1.3064107 1.3099891 1.3146882 1.3097942 1.3084298], shape=(14,), dtype=float32)
tf.Tensor(
[0.95098263 1.0702331  1.0580924  1.0400943  1.0302141  0.99703133
 0.9942888  1.0394493  1.0087981  1.0771745  1.0501808  1.0629119
 1.0392264  1.0647553 ], shape=(14,), dtype=float32)
tf.Tensor(
[1.3884495 1.4804518 1.5782862 1.

In [89]:
data = tf.data.Dataset.from_tensor_slices(df[['commuter', 'long_distance']].values)
data = data.window(15, shift=1, drop_remainder=True)
data = data.flat_map(lambda x: x)
data = data.batch(15)
data = data.window(30, shift=1, drop_remainder=True)
data = data.flat_map(lambda x: x)
data = data.batch(30)
data = data.map(lambda x: (x[:, 0], x[:, 1:]))
for i, item in enumerate(data):
    if i < 1:
        print(item[0].shape)
        print(item[1].shape)

(30, 2)
(30, 14, 2)


In [85]:
# Saving model

# Make sure, that every model is compiled for saving:
rnn_seq2seq.compile(optimizer=optimizer, loss='mse', metrics=[Custom_Metric()])
rnn_seq2seq_complete.compile(optimizer=optimizer, loss='mse', metrics=[Custom_Metric()])
rnn_seq2seq_training.compile(optimizer=optimizer, loss='mse', metrics=[Custom_Metric()])

rnn_seq2seq.save('rnn_seq2seq', save_format='tf')
rnn_seq2seq_complete.save('rnn_seq2seq_complete', save_format='tf')
rnn_seq2seq_training.save('rnn_seq2seq_training', save_format='tf')

INFO:tensorflow:Assets written to: rnn_seq2seq/assets


INFO:tensorflow:Assets written to: rnn_seq2seq/assets


INFO:tensorflow:Assets written to: rnn_seq2seq_complete/assets


INFO:tensorflow:Assets written to: rnn_seq2seq_complete/assets


INFO:tensorflow:Assets written to: rnn_seq2seq_training/assets


INFO:tensorflow:Assets written to: rnn_seq2seq_training/assets


In [90]:
# Load model
model2 = tf.keras.models.load_model('rnn_seq2seq_training', custom_objects={'Custom_Metric': Custom_Metric})

In [91]:
model2.evaluate(commuter_test)



[0.00707469554618001, 0.08247807621955872]

In [126]:
for item in timeseries_train:
    print(item[0].shape)

(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(32, 56, 2)
(18, 56, 2)


In [5]:
test_rnn = tf.keras.Sequential([
    tf.keras.layers.Input(shape=(None, 1)),
    tf.keras.layers.LSTM(3, return_sequences=True)
])
test_rnn.compile(loss='mse', optimizer='adam')
x_training_data = np.random.rand(500, 1)
x_train = tf.data.Dataset.from_tensor_slices(x_training_data)
x_train = timeseries_dataset_seq2seq(x_train)
y_training_data = np.random.rand(500, 3)
y_train = tf.data.Dataset.from_tensor_slices(y_training_data)


test_rnn.fit(x=x_train, epochs=10)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.src.callbacks.History at 0x7f83c8841f60>

In [47]:
test_rnn.predict(np.random.rand(1, 10, 1))
np.random.rand(1, 60, 1).shape



(1, 60, 1)

In [182]:
a = np.zeros(14, dtype=np.float32)

for i in range(22):
    b = np.random.rand(14)
    a += b
    print(b)
a

[0.67388077 0.18984745 0.61122116 0.31137044 0.50503867 0.18929037
 0.61636287 0.97836272 0.70692446 0.79188471 0.11336993 0.01475838
 0.83786145 0.50861334]
[0.81867838 0.79164872 0.11088158 0.29023907 0.57815831 0.0342671
 0.00434693 0.2741532  0.15625099 0.78368318 0.18649465 0.0013391
 0.85996041 0.29445972]
[0.01330934 0.24549398 0.08873156 0.71743076 0.21148537 0.72991601
 0.66889605 0.83391747 0.81458398 0.35312731 0.35237431 0.73738273
 0.96517253 0.52211691]
[0.08045153 0.92695746 0.30384688 0.22169256 0.85231981 0.36898274
 0.69632564 0.17785737 0.99774497 0.93419654 0.98740287 0.81108123
 0.90829649 0.51305405]
[0.23834562 0.90531048 0.00863958 0.08093868 0.22606262 0.85072973
 0.22908413 0.73069026 0.762691   0.32499139 0.65763599 0.64558172
 0.69650676 0.10202842]
[0.60028532 0.07613098 0.38184849 0.11085824 0.82751313 0.58460833
 0.13727838 0.87527711 0.51458954 0.46814201 0.28440379 0.12228945
 0.7698344  0.23207313]
[0.32205817 0.39806014 0.68102314 0.28397828 0.0140300

array([11.24642  , 13.268541 , 11.237595 ,  7.1986403,  8.0593405,
       10.96287  ,  9.327009 , 12.3163805, 11.230288 , 10.433632 ,
        8.914517 , 12.7504015, 11.564948 ,  8.877827 ], dtype=float32)

In [None]:
# This blocks evaluates all possible keys in the nested dictionary "wagon" in compositions of one day

properties_dict = dict()
for train in k.json():
    for journey in (train['journeySections']):
        for wagon in journey['wagons']:
            for i, prop in enumerate(wagon.keys()):
                try:
                    properties_dict[prop]
                except:
                    properties_dict[prop] = prop
print(properties_dict.keys())

In [None]:
r.json()

In [26]:
Session = sessionmaker(bind=engine)
session = Session()
session.add(bsp)
session.commit()

In [5]:
with open('test.txt', 'w') as w:
    w.write('haha')