In [8]:
import pandas as pd
import os
import data as data
from enum import Enum
from time import time
import warnings
import numpy as np
import string
import datetime
import math
from sklearn.preprocessing import StandardScaler, MinMaxScaler
warnings.filterwarnings("ignore")
import keras

In [11]:
import tensorflow as tf

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, GRU, RNN
from tensorflow.keras.optimizers import Adam

Defaulting to user installation because normal site-packages is not writeable


The system cannot find the path specified.


In [4]:
class Traffic_Flow_Models(Enum):
    LSTM = 'lstm'
    GRU = 'gru'
    SAES = 'saes'
    RNN = 'rnn'

In [5]:
class TrafficFlowPredictor():
    def __init__(self):
        self.models = {}

        self.flow_scaler:MinMaxScaler = None
        self.days_scaler:MinMaxScaler = None
        self.scats_scaler:MinMaxScaler = None
        self.times_scaler:MinMaxScaler = None

        self.lags = 12 # must match whatever the models were trained on

        self.file1 = os.path.join(os.path.dirname(__file__),'COS30018-A2','data','train.csv')
        self.file2 = os.path.join(os.path.dirname(__file__),'COS30018-A2','data','test.csv')

        self.get_scalars()
        self.get_lookup_data()

        self.location_series_data = {}

    def get_model(self,model_name:string):
        if self.models.get(model_name) == None:
            self.models[model_name] = load_model(os.path.join(os.path.dirname(__file__),'COS30018-A2','model',f'{model_name}.h5'))
        return self.models.get(model_name)

    def get_scalars(self):
        _, _, _, _,_,_,_,_,self.flow_scaler, self.scats_scaler,self.days_scaler,self.times_scaler = data_datetime(self.file1, self.file2)

    def get_lookup_data(self):
        _, _, _, _,self.series_data,_,_,_,_,_ = data_series(self.file1, self.file2,self.lags)


    def predict_traffic_flow(self,location: int,date: datetime,steps:int,model_name: string):
        model = self.get_model(model_name)

        X = None
        if model_name == "lstm":
            X = self.get_datetime_inputs(location,date,steps)
            if X is None: return 0
            y_pred = self.predict_datetime(model,X)
        else:
            X = self.get_timeseries_inputs(location,date,steps)
            if X is None: return 0
            y_pred = self.predict_series(model,X)

        return y_pred.sum()

    def get_datetime_inputs(self,location: int,date:datetime,steps:int):
        dayindex = date.weekday() # determine weekday
        actual_time = date.hour * 60 + date.minute # determine time in minutes
        rounded_time = 15 * math.floor(actual_time / 15) # get current 15 minute interval

        days = self.days_scaler.transform(np.array([dayindex for _ in range(steps)]).reshape(-1,1)).reshape(1,-1)[0]
        times = self.times_scaler.transform(np.array([actual_time + t*15 for t in range(steps)]).reshape(-1,1)).reshape(1,-1)[0]
        scats = self.scats_scaler.transform(np.array([location for _ in range(steps)]).reshape(-1,1)).reshape(1,-1)[0]

        X = np.array([np.array([days[i],times[i],scats[i]]) for i in range(steps)])
        X = np.reshape(X, (X.shape[0], X.shape[1], 1))
        return X

    def predict_datetime(self,model,X):
        y_pred = model.predict(X)
        y_pred = self.flow_scaler.inverse_transform(y_pred.reshape(-1, 1)).reshape(1, -1)[0]
        return y_pred

    def lookup_location_data(self,location:int):
        scaled_location = self.scats_scaler.transform(np.array([location]).reshape(-1,1)).reshape(1,-1)[0][0]
        if self.location_series_data.get(location) is None:
            location_indices = [i for i in range(len(self.series_data)) if self.series_data[i][self.lags] == scaled_location]
            self.location_series_data[location] = self.series_data[location_indices]

        return self.location_series_data[location]

    def get_timeseries_inputs(self,location: int,date:datetime,steps:int):
        day = date.day
        actual_time = date.hour * 60 + date.minute # determine time in minutes
        rounded_time = 15 * math.floor(actual_time / 15) # get current 15 minute interval
        time_index = int(rounded_time / 15)

        location_X = self.lookup_location_data(location)
        if len(location_X) == 0:
            raise Exception(f"No Data exists for location {location}")

        day_X = location_X[(day-1)*96:day*96]

        # fix for bad data having incomplete days
        while len(day_X) == 0 and day >= 0:
            day -= 7
            day_X = location_X[(day-1)*96:day*96]

        if len(day_X) == 0:
            return None

        X = np.array([day_X[time_index + i] for i in range(steps)])
        X = np.reshape(X, (X.shape[0], X.shape[1], 1))
        return X

    def predict_series(self,model,X):
        y_pred = model.predict(X)
        y_pred = self.flow_scaler.inverse_transform(y_pred.reshape(-1, 1)).reshape(1, -1)[0]
        return y_pred

In [6]:
import os
import numpy as np
import datetime
import math
from enum import Enum
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import load_model

# Enum for Model Types
class TrafficFlowModels(Enum):
    LSTM = 'lstm'
    GRU = 'gru'
    SAES = 'saes'
    RNN = 'rnn'

class TrafficFlowPredictor:
    def __init__(self, lags=12):
        self.models = {}
        self.lags = lags
        self.location_series_data = {}
        self.train_file = os.path.join(os.path.dirname(__file__), 'COS30018-A2', 'data', 'train.csv')
        self.test_file = os.path.join(os.path.dirname(__file__), 'COS30018-A2', 'data', 'test.csv')

        # Initialize scalers and lookup data
        self.flow_scaler, self.scats_scaler, self.days_scaler, self.times_scaler = self._init_scalers()
        self.series_data = self._init_lookup_data()

    def _init_scalers(self):
        """Load scalers using training data."""
        _, _, _, _, _, _, _, _, flow, scats, days, times = data_datetime(self.train_file, self.test_file)
        return flow, scats, days, times

    def _init_lookup_data(self):
        """Load lookup data for time series predictions."""
        _, _, _, _, series_data, _, _, _, _, _ = data_series(self.train_file, self.test_file, self.lags)
        return series_data

    def load_model(self, model_name):
        """Load and cache model."""
        if model_name not in self.models:
            self.models[model_name] = load_model(os.path.join(os.path.dirname(__file__), 'COS30018-A2', 'model', f'{model_name}.h5'))
        return self.models[model_name]

    def predict_traffic_flow(self, location, date, steps, model_name):
        """Predict traffic flow based on model type."""
        model = self.load_model(model_name)
        X = self._prepare_input(location, date, steps, model_name == TrafficFlowModels.LSTM.value)

        return self._predict(model, X).sum() if X is not None else 0

    def _prepare_input(self, location, date, steps, is_datetime):
        """Prepare input data based on model type."""
        if is_datetime:
            day_index, actual_time = date.weekday(), date.hour * 60 + date.minute
            times = self.times_scaler.transform([[actual_time + t * 15] for t in range(steps)]).flatten()
            days = self.days_scaler.transform([[day_index]] * steps).flatten()
            scats = self.scats_scaler.transform([[location]] * steps).flatten()
            return np.array([[days[i], times[i], scats[i]] for i in range(steps)]).reshape(steps, 3, 1)

        location_X = self._lookup_location_data(location)
        time_index = int((15 * math.floor((date.hour * 60 + date.minute) / 15)) / 15)

        return np.array([location_X[time_index + i] for i in range(steps)]).reshape(steps, -1, 1)

    def _lookup_location_data(self, location):
        """Retrieve scaled data for a specific location."""
        if location not in self.location_series_data:
            scaled_loc = self.scats_scaler.transform([[location]]).flatten()[0]
            self.location_series_data[location] = [s for s in self.series_data if s[self.lags] == scaled_loc]
        return self.location_series_data.get(location, [])

    def _predict(self, model, X):
        """Predict values and reverse scaling."""
        y_pred = model.predict(X)
        return self.flow_scaler.inverse_transform(y_pred.reshape(-1, 1)).flatten()
