In [1]:
from dotenv import load_dotenv
from service.s3_storage_service import S3StorageService
from storage.s3_client import S3Client

load_dotenv() # Load environment variable with the bucket name. 
s3_client = S3Client().get_client()
s3_service = S3StorageService(s3_client)

## Step 1: Define functions for data retrieval and storage in a dataframe
#### First, retrieve the data from each csv link stored in the station's folder.
#### If the last entry of retrieved dataset is not the same as current date, stop the downloading process completely

In [5]:
from datetime import datetime
import pandas as pd
from dateutil.parser import parse

def try_parsing_date(text):
    for fmt in ('%d/%m/%Y', '%Y-%m-%d'):  # Add or remove formats as needed
        try:
            return pd.to_datetime(text, format=fmt)
        except ValueError:
            continue
    return parse(text, dayfirst=True)  # Fallback on dateutil's parse

def preprocess_data(df, parameter_name, fill_method='mean'):
    # First check if the last date is up to the current date
    if df.index.max().date() != datetime.now().date():
        print(f"Data for {parameter_name} is not up to the current date: {df.index.max().date()}")
        return None  # Stop processing this data

    # Ensure all dates in the range are present
    all_dates = pd.date_range(start=df.index.min(), end=df.index.max())
    df = df.reindex(all_dates)

    # Fill NaN values in the column based on the parameter
    if parameter_name == 'rain':
        # For rain data, fill gaps with 0
        df[parameter_name] = df[parameter_name].fillna(0)
    else:
        # For other types of data, fill gaps with the mean or specified method
        if fill_method == 'mean':
            df[parameter_name] = df[parameter_name].fillna(df[parameter_name].mean())
        else:
            print(f"Unknown fill method for {parameter_name}.")
            return None

    return df

def verify_required_columns(df, interval, required_columns):
    if interval == '15min':
        # Adjust required columns for 15min interval data
        required_columns = ['finst', 'linst']  # Adjust as needed, rain is optional
    missing_columns = [col for col in required_columns if col not in df.columns]
    if missing_columns:
        print(f"Missing required columns: {missing_columns} for interval {interval}")
        return None
    return df


def load_csv_data_to_dataframe(data):
    dfs = []
    process_failed = False  # Flag to check if any dataset preprocessing failed
    
    for item in data:
        if process_failed:  # Check if the flag is already set to True
            print("Process stopped due to outdated dataset.")
            return pd.DataFrame()  # Return empty DataFrame
        
        url = item['link']
        parameter_name = item['type']
        
        df = pd.read_csv(url, usecols=['date', 'value'])
        df.rename(columns={'value': parameter_name}, inplace=True)
        df['date'] = df['date'].apply(try_parsing_date)
        df.set_index('date', inplace=True)
        
        # Preprocess each dataset
        preprocessed_df = preprocess_data(df, parameter_name)
        if preprocessed_df is None:
            process_failed = True  # Set flag to True if preprocessing failed
        else:
            dfs.append(preprocessed_df)

    if process_failed:  # Additional check, in case the loop was skipped
        print("Process stopped due to outdated dataset.")
        return pd.DataFrame()

    if not dfs:
        return pd.DataFrame()
    
    # Combine all preprocessed dataframes
    combined_df = dfs[0]
    for df in dfs[1:]:
        combined_df = combined_df.merge(df, left_index=True, right_index=True, how='outer')

    combined_df.sort_values(by='date', inplace=True)
    combined_df.reset_index(drop=True, inplace=True)

    return combined_df



def generate_csv_urls(station_guid, measurement_interval="1d"):
    if measurement_interval == "15min":
        filename = "15min_measurements_links.json"
    elif measurement_interval == "1d":
        filename = "1d_measurements_links.json"
    else:
        raise ValueError("Invalid measurement interval. Choose either '15min' or '1d'.")
    
    return s3_service.load_json_from_s3(f"flood_stations/{station_guid}/{filename}")


In [6]:
example_id = '00dfeaa4-34bd-4975-967a-b11e5d86520e'

In [7]:
urls = generate_csv_urls(example_id)
df = load_csv_data_to_dataframe(urls)
print(df.head())
print(df.tail(100))

Data for fmean is not up to the current date: 2024-04-09
Process stopped due to outdated dataset.
Empty DataFrame
Columns: []
Index: []
Empty DataFrame
Columns: []
Index: []


#### Produce two dataframes: one with rain column and one without. This is because the rainfall has been measured for a considerably shorter time, therefore resulting dataset would have much less data. The idea is to have two datasets that are used to train two separate models, later used in ensemble.

In [8]:

def split_dataframe_by_rain(df):
    # Check if 'rain' column exists in the DataFrame
    if 'rain' in df.columns:
        # DataFrame without 'rain' column
        df_without_rain = df.drop(columns=['rain'])
        # DataFrame only with 'rain' column and the index
        df_with_rain = df[['rain']]
    else:
        # If there's no 'rain' column, return the original df and an empty df for rain
        df_without_rain = df.copy()
        df_with_rain = pd.DataFrame(columns=['rain'], index=df.index)
    
    return df_without_rain, df_with_rain

### Trim the dataframe so it does not have any NaN values at the start. It is because certain values have been recorded much earlier than the others. 

In [9]:
def trim_dataframe_start(df):
    # Find the index of the first non-NaN value in each column
    first_valid_indices = df.apply(pd.Series.first_valid_index)
    
    # Find the maximum of these indices to get the latest start point across all columns
    latest_start_index = max(first_valid_indices)
    
    # Trim the DataFrame to start from this index
    trimmed_df = df.loc[latest_start_index:]
    
    return trimmed_df

### Normalise all values in the dataframe to prepare them for the training process. Keep the scaler for later use.

In [None]:
from tensorflow.keras.preprocessing.sequence import TimeseriesGenerator
from sklearn.preprocessing import MinMaxScaler


def prepare_data_for_training(df, n_input, n_output, batch_size, target_column):
    """
    Prepares data for training by normalizing and creating TimeseriesGenerators.

    Args:
        df (pd.DataFrame): The input DataFrame containing the time series data.
        n_input (int): Number of input time steps to use for each output.
        n_output (int): Number of output time steps.
        batch_size (int): Batch size for the generators.
        target_column (str): The name of the target column for prediction.

    Returns:
        tuple: Tuple containing train_generator, test_generator, scaler, n_features,
               and additional info like split index and column target index.
    """
    # Normalize the data
    data = df.values
    scaler = MinMaxScaler(feature_range=(0, 1))
    data_normalized = scaler.fit_transform(data)

    # Split data into training and test sets
    split_percent = 0.95
    split_index = int(len(data_normalized) * split_percent)
    train_data = data_normalized[:split_index]
    test_data = data_normalized[split_index - n_input:]

    # n_features is the number of columns in the dataframe
    n_features = df.shape[1]

    # Determine the index of the target column
    target_index = df.columns.get_loc(target_column)

    # Create TimeseriesGenerators
    train_generator = TimeseriesGenerator(train_data, train_data[:, target_index],
                                          length=n_input, batch_size=batch_size)
    test_generator = TimeseriesGenerator(test_data, test_data[:, target_index],
                                         length=n_input, batch_size=batch_size)

    return train_generator, test_generator, scaler, n_features, split_index, target_index

In [11]:
from sklearn.metrics import mean_squared_error
from math import sqrt
import numpy as np


def evaluate_model_performance(model, test_generator, scaler, n_output):
    # Generate predictions for the test data
    predictions = model.predict(test_generator)
    predictions_reshaped = predictions.reshape(-1, 1)

    # Collect the actual y values from the test generator
    actuals = []
    for _, y in test_generator:
        actuals.extend(y)
    y_test = np.array(actuals)
    y_test_reshaped = y_test.reshape(-1, 1)

    # Inverse transform predictions and actuals to original scale
    predictions_inverse = scaler.inverse_transform(predictions_reshaped).flatten()
    actuals_inverse = scaler.inverse_transform(y_test_reshaped).flatten()

    # Initialize an empty list to store RMSE for each prediction window
    rmse_values = []

    # Loop through the dataset, aligning each prediction with its corresponding actual values
    for i in range(len(actuals_inverse) - n_output + 1):  # Adjusted to avoid index error
        # Extract the actual values and predictions for the window
        actuals_for_window = actuals_inverse[i:i+n_output]
        predictions_for_window = predictions_inverse[i:i+n_output]  # Adjusted to correctly extract predictions
        
        # Calculate the RMSE for this window and append to our list
        rmse = sqrt(mean_squared_error(actuals_for_window, predictions_for_window))
        rmse_values.append(rmse)

    # Calculate the average RMSE across all windows for an overall performance metric
    average_rmse = np.mean(rmse_values)

    return average_rmse


In [13]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, GRU
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.losses import Huber
from tensorflow.keras.optimizers import Adam
import os
import json

def train_model(train_generator, test_generator, n_input, n_features, n_output, scaler, model_type='LSTM', learning_rate=0.001, epochs=10, patience=10, dropout_rate=0.2):
    """
    Trains an LSTM or GRU model based on the given parameters and data generators.

    Parameters:
    - train_generator: The generator providing training data.
    - val_generator: The generator providing validation data.
    - n_input: Number of input time steps.
    - n_features: Number of features.
    - n_output: Number of output time steps.
    - model_type: Type of model to train ('LSTM' or 'GRU').
    - learning_rate: Learning rate for the optimizer.
    - epochs: Number of epochs to train for.
    - patience: Number of epochs with no improvement after which training will be stopped.
    - dropout_rate: Dropout rate for regularization.
    - model_save_path: Path to save the trained model.
    - performance_save_path: Path to save the model's performance statistics.

    Returns:
    A dictionary containing training history and model performance statistics.
    """
    # Define the model
    model = Sequential()
    if model_type == 'LSTM':
        model.add(LSTM(128, return_sequences=False, input_shape=(n_input, n_features)))
    elif model_type == 'GRU':
        model.add(GRU(128, return_sequences=False, input_shape=(n_input, n_features)))
    else:
        raise ValueError("Unsupported model type. Choose 'LSTM' or 'GRU'.")
    
    model.add(Dropout(dropout_rate))
    model.add(Dense(n_output))

    model.compile(optimizer=Adam(learning_rate=learning_rate), loss=Huber(delta=0.005))

    # Early stopping callback
    early_stopping = EarlyStopping(monitor='val_loss', patience=patience, mode='min', restore_best_weights=True)

    # Train the model
    history = model.fit(train_generator, validation_data=test_generator, epochs=epochs, callbacks=[early_stopping])

    # Prepare performance statistics
    performance_stats = {
        'history': history.history,
        'final_val_loss': history.history['val_loss'][-1],
        'final_val_accuracy': history.history['val_accuracy'][-1] if 'val_accuracy' in history.history else None,
        'average_rmse': evaluate_model_performance(model,test_generator,scaler, n_output)
    }


    return performance_stats, model, model_type

In [14]:
def generate_metadata(performance_stats, n_input, n_output, contains_rainfall, model_type, measurement_interval, station_guid, target_column ):
    metadata = {
        'station_guid' : station_guid,
        'n_input': n_input,
        'n_output': n_output,
        'measurement_interval' : measurement_interval,
        'rainfall': contains_rainfall,
        'model_type' : model_type,
        'average_rmse' : performance_stats['average_rmse'],
        'target_column': target_column
    }
    return metadata

In [ ]:
import joblib
from tensorflow.keras import backend as K
import shutil

def generate_model_path(metadata):
    rainfall = 'rainfall' if metadata['rainfall'] else 'no_rainfall'
    return f"flood_stations/{metadata['station_guid']}/models/{metadata['measurement_interval']}_{metadata['model_type']}_{rainfall}/"
        

def save_to_s3_and_cleanup(model, scaler, performance_stats, metadata, model_id):
    """
    Saves the model, scaler, performance stats, and metadata to S3, then cleans up local files.

    Parameters:
    - s3_service (S3StorageService): Instance of the S3StorageService class.
    - model (tf.keras.Model): Trained model to save.
    - scaler (sklearn.preprocessing): Scaler object used for the model.
    - performance_stats (dict): Performance statistics of the model.
    - metadata (dict): Metadata about the model training.
    - model_id (str): Unique identifier for the model.
    """
    model_storage_key = generate_model_path(metadata)
    # Save Model
    model_dir = f"tmp/{model_id}"
    model.save(model_dir)
    
    s3_service.save_file_to_s3(model_storage_key, model_dir)
    shutil.rmtree(model_dir)  # Clean up the local directory

    # Save Scaler
    scaler_path = f"tmp/{model_id}_scaler.save"
    joblib.dump(scaler, scaler_path)
    s3_service.save_file_to_s3(model_storage_key, scaler_path)
    os.remove(scaler_path)  # Remove the local file

    # Save Performance Metrics
    s3_service.save_json_to_s3(f"{model_storage_key}/performance_metrics.json", performance_stats)

    # Save Metadata
    s3_service.save_json_to_s3(f"{model_storage_key }/metadata.json", metadata)
    
    K.clear_session()

In [15]:
def process_station(guid):
    required_columns = ['lmax', 'lmin', 'fmax', 'fmin', 'fmean']

    results = []
    intervals = {
        '1d': ['lmax', 'lmin'],
        '15min': ['finst', 'linst']  # Assuming these are the correct identifiers
    }

    for interval, targets in intervals.items():
        urls = generate_csv_urls(guid, interval)
        df = load_csv_data_to_dataframe(urls)
        if df is not None:
            df = verify_required_columns(df, interval, required_columns)
            if df is not None:
                df_without_rain, df_with_rain = split_dataframe_by_rain(df)
                for target_column in targets:
                    # Process both with and without rain
                    if not df_without_rain.empty:
                        results.append(process_data_model(df_without_rain, interval, target_column, guid, False))
                    if not df_with_rain.empty:
                        results.append(process_data_model(df_with_rain, interval, target_column, guid, True))
            else:
                results.append((guid, interval, "Missing required columns"))
        else:
            results.append((guid, interval, "No data available"))

    return results




def process_data_model(df, interval, target_column, guid, contains_rainfall):
    # Trim the data
    trimmed_df = trim_dataframe_start(df)

    # Define parameters based on the interval type
    n_input, n_output, model_type = (64, 7, 'LSTM') if interval == '1d' else (96, 24, 'LSTM')

    # Prepare and train models
    train_gen, test_gen, scaler, n_features, split_idx, target_idx = prepare_data_for_training(
        trimmed_df, n_input, n_output, batch_size=32, target_column=target_column
    )
    performance_stats, model, model_type = train_model(
        train_gen, test_gen, n_input, n_features, n_output, scaler,
        model_type=model_type, learning_rate=0.001, epochs=10, patience=10, dropout_rate=0.2
    )
    metadata = generate_metadata(performance_stats, n_input, n_output, contains_rainfall, model_type, interval, guid, target_column)
    save_to_s3_and_cleanup(model, scaler, performance_stats, metadata, guid)
    return (guid, interval, target_column, 'Model trained and saved', contains_rainfall)

    

In [ ]:
import json

def get_last_processed_guid():
    try:
        # Assume you have a function to load this from S3
        data = s3_service.load_json_from_s3("last_processed_guid.json")
        return data.get('last_guid', None)
    except Exception as e:
        print(f"Failed to load last processed GUID: {e}")
        return None

def save_last_processed_guid(guid):
    try:
        # Save to S3
        s3_service.save_json_to_s3("last_processed_guid.json", {'last_guid': guid})
    except Exception as e:
        print(f"Failed to save last processed GUID: {e}")

def process_stations(stations):
    last_processed_guid = get_last_processed_guid()
    start_processing = False if last_processed_guid else True

    for station in stations:
        guid = station['guid']
        if not start_processing:
            if guid == last_processed_guid:
                start_processing = True
            continue  # Skip until we reach the last processed guid

        try:
            # Here you would call your function to process each station
            print(f"Processing station with GUID: {guid}")
            # Assuming a function to handle all steps:
            result = process_station(guid)
            save_last_processed_guid(guid)  # Update the last successfully processed guid

            if result is None:
                print(f"No data available for station {guid}, skipping.")
        except Exception as e:
            print(f"Error processing station {guid}: {e}")
            continue  # Continue with next station even in case of error

if __name__ == "__main__":
    stations = [
        # Your list of stations goes here
    ]
    process_stations(stations)
