# Download_traffic

In [None]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import argparse
import datetime
import gc
import glob
import os
import shutil
import sys

from expt_settings.configs import ExperimentConfig
import numpy as np
import pandas as pd
import pyunpack
import wget


In [1]:
# General functions for data downloading & aggregation.
def download_from_url(url, output_path):
  """Downloads a file froma url."""

  print('Pulling data from {} to {}'.format(url, output_path))
  wget.download(url, output_path)
  print('done')


def recreate_folder(path):
  """Deletes and recreates folder."""

  shutil.rmtree(path)
  os.makedirs(path)


def unzip(zip_path, output_file, data_folder):
  """Unzips files and checks successful completion."""

  print('Unzipping file: {}'.format(zip_path))
  pyunpack.Archive(zip_path).extractall(data_folder)

  # Checks if unzip was successful
  if not os.path.exists(output_file):
    raise ValueError(
        'Error in unzipping process! {} not found.'.format(output_file))


def download_and_unzip(url, zip_path, csv_path, data_folder):
  """Downloads and unzips an online csv file.

  Args:
    url: Web address
    zip_path: Path to download zip file
    csv_path: Expected path to csv file
    data_folder: Folder in which data is stored.
  """

  download_from_url(url, zip_path)

  unzip(zip_path, csv_path, data_folder)

  print('Done.')


In [4]:
"""Downloads traffic dataset from UCI repository."""

url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/00204/PEMS-SF.zip'

data_folder = '0927_dataset_traffic'
csv_path = os.path.join(data_folder, 'PEMS_train')
zip_path = os.path.join(data_folder, 'PEMS-SF.zip')

download_and_unzip(url, zip_path, csv_path, data_folder)


Pulling data from https://archive.ics.uci.edu/ml/machine-learning-databases/00204/PEMS-SF.zip to 0927_dataset_traffic\PEMS-SF.zip


In [None]:
print('Aggregating to hourly data')

def process_list(s, variable_type=int, delimiter=None):
    """Parses a line in the PEMS format to a list."""3
    if delimiter is None:
      l = [
          variable_type(i) for i in s.replace('[', '').replace(']', '').split()
      ]
    else:
      l = [
          variable_type(i)
          for i in s.replace('[', '').replace(']', '').split(delimiter)
      ]

    return l

In [None]:
def read_single_list(filename):
    """Returns single list from a file in the PEMS-custom format."""
    with open(os.path.join(data_folder, filename), 'r') as dat:
      l = process_list(dat.readlines()[0])
    return l

def read_matrix(filename):
    """Returns a matrix from a file in the PEMS-custom format."""
    array_list = []
    with open(os.path.join(data_folder, filename), 'r') as dat:

      lines = dat.readlines()
      for i, line in enumerate(lines):
        if (i + 1) % 50 == 0:
          print('Completed {} of {} rows for {}'.format(i + 1, len(lines),
                                                        filename))

        array = [
            process_list(row_split, variable_type=float, delimiter=None)
            for row_split in process_list(
                line, variable_type=str, delimiter=';')
        ]
        array_list.append(array)

    return array_list

In [None]:
shuffle_order = np.array(read_single_list('randperm')) - 1# index from 0
train_dayofweek = read_single_list('PEMS_trainlabels')
train_tensor = read_matrix('PEMS_train')
test_dayofweek = read_single_list('PEMS_testlabels')
test_tensor = read_matrix('PEMS_test')

In [None]:
# Inverse permutate shuffle order
print('Shuffling')
inverse_mapping = {
      new_location: previous_location
      for previous_location, new_location in enumerate(shuffle_order)
  }
reverse_shuffle_order = np.array([
      inverse_mapping[new_location]
      for new_location, _ in enumerate(shuffle_order)
  ])

In [None]:
# Group and reoder based on permuation matrix
print('Reodering')
day_of_week = np.array(train_dayofweek + test_dayofweek)
combined_tensor = np.array(train_tensor + test_tensor)

day_of_week = day_of_week[reverse_shuffle_order]
combined_tensor = combined_tensor[reverse_shuffle_order]

In [None]:
# Put everything back into a dataframe
print('Parsing as dataframe')
labels = ['traj_{}'.format(i) for i in read_single_list('stations_list')]

hourly_list = []
for day, day_matrix in enumerate(combined_tensor):

    # Hourly data
    hourly = pd.DataFrame(day_matrix.T, columns=labels)
    hourly['hour_on_day'] = [int(i / 6) for i in hourly.index
                            ]  # sampled at 10 min intervals
    if hourly['hour_on_day'].max() > 23 or hourly['hour_on_day'].min() < 0:
      raise ValueError('Invalid hour! {}-{}'.format(
          hourly['hour_on_day'].min(), hourly['hour_on_day'].max()))

    hourly = hourly.groupby('hour_on_day', as_index=True).mean()[labels]
    hourly['sensor_day'] = day
    hourly['time_on_day'] = hourly.index
    hourly['day_of_week'] = day_of_week[day]

    hourly_list.append(hourly)
hourly_frame = pd.concat(hourly_list, axis=0, ignore_index=True, sort=False)

In [None]:
# Flatten such that each entitiy uses one row in dataframe
store_columns = [c for c in hourly_frame.columns if 'traj' in c]
other_columns = [c for c in hourly_frame.columns if 'traj' not in c]
flat_df = pd.DataFrame(columns=['values', 'prev_values', 'next_values'] +
                         other_columns + ['id'])

In [None]:
def format_index_string(x):
    """Returns formatted string for key."""

    if x < 10:
      return '00' + str(x)
    elif x < 100:
      return '0' + str(x)
    elif x < 1000:
      return str(x)

    raise ValueError('Invalid value of x {}'.format(x))

for store in store_columns:
    print('Processing {}'.format(store))

    sliced = hourly_frame[[store] + other_columns].copy()
    sliced.columns = ['values'] + other_columns
    sliced['id'] = int(store.replace('traj_', ''))

    # Sort by Sensor-date-time
    key = sliced['id'].apply(str) \
      + sliced['sensor_day'].apply(lambda x: '_' + format_index_string(x)) \
        + sliced['time_on_day'].apply(lambda x: '_' + format_index_string(x))
    sliced = sliced.set_index(key).sort_index()

    sliced['values'] = sliced['values'].fillna(method='ffill')
    sliced['prev_values'] = sliced['values'].shift(1)
    sliced['next_values'] = sliced['values'].shift(-1)

    flat_df = flat_df.append(sliced.dropna(), ignore_index=True, sort=False)

In [None]:
# Filter to match range used by other academic papers
index = flat_df['sensor_day']
flat_df = flat_df[index < 173].copy()

# Creating columns fo categorical inputs
flat_df['categorical_id'] = flat_df['id'].copy()
flat_df['hours_from_start'] = flat_df['time_on_day'] \
+ flat_df['sensor_day']*24.
flat_df['categorical_day_of_week'] = flat_df['day_of_week'].copy()
flat_df['categorical_time_on_day'] = flat_df['time_on_day'].copy()

## Export 

In [None]:
data_csv_path = '0927_traffic_out.csv'
flat_df.to_csv(data_csv_path)
print('Done.')

# Data transformation

In [None]:
data_csv_path = '0927_traffic_out.csv'
df = pd.read_csv(data_csv_path, index_col=0)

In [None]:
import data_formatters.base
import data_formatters.volatility

VolatilityFormatter = data_formatters.volatility.VolatilityFormatter
DataTypes = data_formatters.base.DataTypes
InputTypes = data_formatters.base.InputTypes


In [None]:
_column_definition = [
      ('id', DataTypes.REAL_VALUED, InputTypes.ID),
      ('hours_from_start', DataTypes.REAL_VALUED, InputTypes.TIME),
      ('values', DataTypes.REAL_VALUED, InputTypes.TARGET),
      ('time_on_day', DataTypes.REAL_VALUED, InputTypes.KNOWN_INPUT),
      ('day_of_week', DataTypes.REAL_VALUED, InputTypes.KNOWN_INPUT),
      ('hours_from_start', DataTypes.REAL_VALUED, InputTypes.KNOWN_INPUT),
      ('categorical_id', DataTypes.CATEGORICAL, InputTypes.STATIC_INPUT),
  ]

## Split data

In [None]:
print('Formatting train-valid-test splits.')
valid_boundary=151
test_boundary=166
index = df['sensor_day']
train = df.loc[index < valid_boundary]
valid = df.loc[(index >= valid_boundary - 7) & (index < test_boundary)]
test = df.loc[index >= test_boundary - 7]

## Transform inputs

#### Column name

In [None]:
print('Setting scalers with training data...')
column_definitions = get_column_definition()

In [None]:
id_column = utils.get_single_col_by_input_type(InputTypes.ID,
                                                   column_definitions)
target_column = utils.get_single_col_by_input_type(InputTypes.TARGET,
                                                       column_definitions)

In [None]:
# Extract identifiers in case required
identifiers = list(df[id_column].unique())

In [None]:
# (1) Format real scalers
real_inputs = utils.extract_cols_from_data_type(
        DataTypes.REAL_VALUED, column_definitions,
        {InputTypes.ID, InputTypes.TIME})

data = df[real_inputs].values 
_real_scalers = sklearn.preprocessing.StandardScaler().fit(data)
_target_scaler = sklearn.preprocessing.StandardScaler().fit(
        df[[target_column]].values)  # used for predictions

In [None]:
# (2) Format categorical scalers
categorical_inputs_name = utils.extract_cols_from_data_type(
          DataTypes.CATEGORICAL, real_inputs + categorical_inputs,
          {InputTypes.ID, InputTypes.TIME})

categorical_scalers = {}
num_classes = []

valid_idx = df['traj_id'].apply(lambda x: x in set(identifiers))
for col in categorical_inputs_name:
# Set all to str so that we don't have mixed integer/string columns
    srs = df[col].apply(str).loc[valid_idx]
    categorical_scalers[col] = sklearn.preprocessing.LabelEncoder().fit(
            srs.values)
    num_classes.append(srs.nunique())
num_classes_per_cat_input = num_classes
_cat_scalers = categorical_scalers

In [None]:
def transform_inputs(df):
    """Performs feature transformations.

    This includes both feature engineering, preprocessing and normalisation.

    Args:
      df: Data frame to transform.

    Returns:
      Transformed data frame.

    """
    output = df.copy()

    if _real_scalers is None and _cat_scalers is None:
      raise ValueError('Scalers have not been set!')

    column_definitions = get_column_definition()

    # (1) Format real inputs: standardization
    real_inputs = utils.extract_cols_from_data_type(
        DataTypes.REAL_VALUED, column_definitions,
        {InputTypes.ID, InputTypes.TIME})
    
    output[real_inputs] = _real_scalers.transform(df[real_inputs].values)

    # (2) Format categorical inputs: LabelEncoder()
    categorical_inputs = utils.extract_cols_from_data_type(
        DataTypes.CATEGORICAL, column_definitions,
        {InputTypes.ID, InputTypes.TIME})
    
    for col in categorical_inputs:
      string_df = df[col].apply(str)
      output[col] = _cat_scalers[col].transform(string_df)

    return output

In [None]:
train_tf = transform_inputs(train) 
valid_tf = transform_inputs(valid) 
test_tf = transform_inputs(test) 

## Export dataset

In [None]:
# export file
train_tf.to_csv('train_tf.csv',index=False)
valid_tf.to_csv('valid_tf.csv',index=False)
test_tf.to_csv('test_tf.csv',index=False)

# Hyperparam Optimization

## Default params

In [None]:
fixed_params = {
        'total_time_steps': 8 * 24,
        'num_encoder_steps': 7 * 24,
        'num_epochs': 100,
        'early_stopping_patience': 5,
        'multiprocessing_workers': 5
    }

# default optimised model parameters.
model_params = {
        'dropout_rate': 0.3,
        'hidden_layer_size': 320,
        'learning_rate': 0.001,
        'minibatch_size': 128,
        'max_gradient_norm': 100.,
        'num_heads': 4,
        'stack_size': 1
}

In [None]:
def get_num_samples_for_calibration(self):
    """Gets the default number of training and validation samples.

    Use to sub-sample the data for network calibration and a value of -1 uses
    all available samples.

    Returns:
      Tuple of (training samples, validation samples)
    """
    return 450000, 50000

In [None]:
def _get_tft_input_indices():
    """Returns the relevant indexes and input sizes required by TFT."""

    # Functions
    def _get_locations(input_types, defn):
      return [i for i, tup in enumerate(defn) if tup[2] in input_types]

    locations = {
        'input_size': # not a str
            len(real_inputs + categorical_inputs) ,# remove ID, TIME

        'output_size': # loc within total inputs
            len(_get_locations({InputTypes.TARGET}, real_inputs + categorical_inputs)),

        'category_counts':
            num_classes_per_cat_input,
            
        'input_obs_loc': # loc within total inputs
            _get_locations({InputTypes.TARGET}, real_inputs + categorical_inputs),

        'static_input_loc':# loc within total inputs
            _get_locations({InputTypes.STATIC_INPUT}, real_inputs + categorical_inputs),

        'known_regular_inputs':# loc within real_inputs
            _get_locations({InputTypes.STATIC_INPUT, InputTypes.KNOWN_INPUT},
                           real_inputs), 

        'known_categorical_inputs':# loc within categorical_inputs
            _get_locations({InputTypes.STATIC_INPUT, InputTypes.KNOWN_INPUT},
                           categorical_inputs)  
    }
    return locations
fixed_params.update(_get_tft_input_indices())
fixed_params