In [17]:
from dotenv import load_dotenv
import os
import requests

# Load the environment variables from the .env file
load_dotenv()

connection_string = os.getenv('SQL_CONNECTION')

## Test database exists

In [29]:
import sqlitecloud

# Open the connection to SQLite Cloud
conn = sqlitecloud.connect(connection_string)

# You can autoselect the database during the connect call
# by adding the database name as path of the SQLite Cloud
# connection string, eg:
# conn = sqlitecloud.connect("sqlitecloud://myhost.sqlite.cloud:8860/mydatabase?apikey=myapikey")
db_name = "chinook.sqlite"
conn.execute(f"USE DATABASE {db_name}")

cursor = conn.execute("SELECT * FROM waves")
result = cursor.fetchall()

print(result)

conn.close()

[('2024-08-19 20:30:00', 131.877779157453, 1.76619076445175, 4.41100902209299), ('2024-08-19 21:00:00', 133.015772679781, 1.7756477274836, 4.42997977809064), ('2024-08-19 21:30:00', 132.598119684354, 1.76639528867632, 4.44222507154165), ('2024-08-19 22:00:00', 132.14053581763, 1.76229408697882, 4.46189379076297), ('2024-08-19 22:30:00', 131.766407985454, 1.76288172959074, 4.48705171555564), ('2024-08-19 23:00:00', 132.004712058903, 1.7597375472672, 4.51182314221698), ('2024-08-19 23:30:00', 132.156773830951, 1.75111003552085, 4.54869858994786), ('2024-08-20 00:00:00', 132.120199306469, 1.75539574892978, 4.57098018668599), ('2024-08-20 00:30:00', 132.534890331414, 1.7517244368043, 4.6067567166195), ('2024-08-20 01:00:00', 131.497043604576, 1.75634151975175, 4.62788310522349), ('2024-08-20 01:30:00', 131.279308446694, 1.7480018003994, 4.66251521693702), ('2024-08-20 02:00:00', 131.264601766133, 1.75171153644945, 4.70685376829908), ('2024-08-20 02:30:00', 131.744350149621, 1.7426465086455

## Test push/overwrite data

In [18]:
import pandas as pd

# Example: Assume `predictions_df` is the DataFrame output from your model
predictions_df = pd.DataFrame({
    'wave_direction': [45.0, 50.0, 30.0],
    'wave_height': [1.5, 2.0, 1.2],
    'wave_period': [5.5, 6.0, 5.2]
}, index=pd.to_datetime(['2024-08-15 00:00:00', '2024-08-15 00:30:00', '2024-08-15 01:00:00']))
predictions_df.index.name = 'datetime'
predictions_df

Unnamed: 0_level_0,wave_direction,wave_height,wave_period
datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2024-08-15 00:00:00,45.0,1.5,5.5
2024-08-15 00:30:00,50.0,2.0,6.0
2024-08-15 01:00:00,30.0,1.2,5.2


In [30]:
import sqlitecloud
import pandas as pd

# Example DataFrame output from your model
predictions_df = pd.DataFrame({
    'wave_direction': [45.0, 50.0, 30.0],
    'wave_height': [1.5, 2.0, 1.2],
    'wave_period': [5.5, 6.0, 5.2]
}, index=pd.to_datetime(['2024-08-15 00:00:00', '2024-08-15 00:30:00', '2024-08-15 01:00:00']))
predictions_df.index.name = 'datetime'
# Convert datetime index to ISO 8601 string format
predictions_df.index = predictions_df.index.strftime('%Y-%m-%d %H:%M:%S')

# Connect to the SQLiteCloud database
conn = sqlitecloud.connect(connection_string)
db_name = "chinook.sqlite"
conn.execute(f"USE DATABASE {db_name}")

def upsert_row(conn, datetime, wave_direction, wave_height, wave_period):
    upsert_sql = f'''
    INSERT INTO waves (datetime, wave_direction, wave_height, wave_period)
    VALUES (?, ?, ?, ?)
    ON CONFLICT(datetime) DO UPDATE SET
        wave_direction=excluded.wave_direction,
        wave_height=excluded.wave_height,
        wave_period=excluded.wave_period;
    '''
    conn.execute(upsert_sql, (datetime, wave_direction, wave_height, wave_period))

try:
    for index, row in predictions_df.iterrows():
        upsert_row(conn, index, row['wave_direction'], row['wave_height'], row['wave_period'])
    
    print("Data appended and updated successfully.")
except Exception as e:
    print(f"An error occurred: {e}")
finally:
    conn.close()


Data appended and updated successfully.


In [None]:
# Connect to the SQLiteCloud database
conn = sqlitecloud.connect(connection_string)
db_name = "chinook.sqlite"
conn.execute(f"USE DATABASE {db_name}")

def upsert_row(conn, datetime, wave_direction, wave_height, wave_period):
    upsert_sql = f'''
    INSERT INTO waves (datetime, wave_direction, wave_height, wave_period)
    VALUES (?, ?, ?, ?)
    ON CONFLICT(datetime) DO UPDATE SET
        wave_direction=excluded.wave_direction,
        wave_height=excluded.wave_height,
        wave_period=excluded.wave_period;
    '''
    conn.execute(upsert_sql, (datetime, wave_direction, wave_height, wave_period))

try:
    for index, row in predictions_df.iterrows():
        upsert_row(conn, index, row['wave_direction'], row['wave_height'], row['wave_period'])
    
    print("Data appended and updated successfully.")
except Exception as e:
    print(f"An error occurred: {e}")
finally:
    conn.close()

## Test pipeline

In [24]:
import pickle 
import urllib.request
import urllib.parse
import json
import pandas as pd
import numpy as np

def fetch_data():
    # Base URL and resource ID
    base_url = 'https://www.data.qld.gov.au/api/3/action/datastore_search'
    resource_id = '2bbef99e-9974-49b9-a316-57402b00609c'

    # Define the filter query for the site "Mooloolaba"
    filters = {
        "Site": "Mooloolaba"
    }

    # Define the parameters, including the resource ID and the filters
    params = {
        'resource_id': resource_id,
        'limit': 48,  # 48 records for the last 24 hours with 30 min intervals
        'q': json.dumps(filters)  # Convert the filters dictionary to a JSON string
    }

    # Encode the parameters and create the full URL
    url = f"{base_url}?{urllib.parse.urlencode(params)}"

    # Initialize the dictionary to store results
    result_dict = {}

    # Make the request
    try:
        fileobj = urllib.request.urlopen(url)
        response = fileobj.read()
        data = json.loads(response)
        
        # Store the records in the dictionary
        for record in data.get('result', {}).get('records', []):
            record_id = record.get('_id')
            result_dict[record_id] = record
                
    except urllib.error.HTTPError as e:
        print(f"HTTPError: {e.code} - {e.reason}")
    except Exception as e:
        print(f"Error: {e}")

    return result_dict


def preprocess_data(result_dict):
    
    # Conversion to dataframe
    df = pd.DataFrame.from_dict(result_dict, orient='index')

    # Renaming
    df.rename(columns = {
        'DateTime':'datetime',
        'Hmax':'wave_height',
        'Tz':'wave_period',
        'Direction': 'wave_direction'
    }, inplace = True)

    # Timestamp format
    df['datetime'] = pd.to_datetime(df['datetime'])
    # df['DateTime'] = df['DateTime'].dt.strftime('%Y-%m-%d %H:%M:%S')    
    df.set_index(keys = 'datetime', inplace=True)
    df = df.asfreq('30T')

    # Keep only desired variables
    target_vars = ['wave_height', 'wave_period', 'wave_direction']
    df = df[target_vars]

    # Null values
    df = df.replace(-99.9, np.nan)

    return df


# Step 1: Load the trained model from the pickle file
def load_model(model_path):
    with open(model_path, 'rb') as f:
        model = pickle.load(f)
    return model

# Step 4: Make predictions using the loaded model
def make_predictions(model, processed_data):
    predictions = model.predict(steps = 24, last_window=processed_data)
    predictions.index.name = 'datetime'
    predictions.index = predictions.index.strftime('%Y-%m-%d %H:%M:%S')
    return predictions


In [25]:
import sqlitecloud

def upsert_dataframe(connection_string, dataframe):
    # Connect to the SQLiteCloud database
    conn = sqlitecloud.connect(connection_string)
    db_name = "chinook.sqlite"
    conn.execute(f"USE DATABASE {db_name}")
    
    # Define the upsert function inside the main function
    def upsert_row(conn, datetime, wave_direction, wave_height, wave_period):
        upsert_sql = f'''
        INSERT INTO waves (datetime, wave_direction, wave_height, wave_period)
        VALUES (?, ?, ?, ?)
        ON CONFLICT(datetime) DO UPDATE SET
            wave_direction=excluded.wave_direction,
            wave_height=excluded.wave_height,
            wave_period=excluded.wave_period;
        '''
        conn.execute(upsert_sql, (datetime, wave_direction, wave_height, wave_period))
    
    try:
        # Iterate over each row in the dataframe and perform the upsert
        for index, row in dataframe.iterrows():
            upsert_row(conn, index, row['wave_direction'], row['wave_height'], row['wave_period'])
        
        print("Data appended and updated successfully.")
    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        conn.close()

In [28]:
result_dict = fetch_data()
data = preprocess_data(result_dict)
model = load_model('../../Models/mooloolaba/forecaster_mool.pkl')
preds = make_predictions(model, data)
upsert_dataframe(connection_string, preds)

Data appended and updated successfully.


In [27]:
preds

Unnamed: 0_level_0,wave_direction,wave_height,wave_period
datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2024-08-13 00:00:00,90.19446,4.575471,6.047095
2024-08-13 00:30:00,90.487054,4.528628,6.051239
2024-08-13 01:00:00,90.860984,4.480879,6.050688
2024-08-13 01:30:00,90.498948,4.438721,6.029083
2024-08-13 02:00:00,90.567556,4.475573,6.012432
2024-08-13 02:30:00,90.658839,4.415594,6.01272
2024-08-13 03:00:00,90.485481,4.391019,6.007793
2024-08-13 03:30:00,90.54661,4.269774,6.006239
2024-08-13 04:00:00,90.693059,4.346009,6.006679
2024-08-13 04:30:00,90.517163,4.351915,6.004305
