In [None]:
#Import Libraries
import pandas as pd
import os
from google.cloud import bigquery
from datetime import date
from google.api_core.exceptions import Conflict
from statsmodels.tools.eval_measures import rmse, mse

In [None]:
#Inititate the client
project_id = "bridge-data-analytics-app" #update the project
client = bigquery.Client(project=project_id)
user = os.getenv("USER")

#create the dataset reference
dataset_id = f"{client.project}.nakul" #update the dataset
dataset = bigquery.Dataset(dataset_ref=dataset_id)
dataset.location = "australia-southeast1"

# Send the dataset to the API for creation, with an explicit timeout.
# Raises google.api_core.exceptions.Conflict if the Dataset already
# exists within the project.
try:
    dataset = client.create_dataset(dataset, timeout=30)  # Make an API request.
    print("Created dataset {}.{}".format(client.project, dataset.dataset_id))
except Conflict as c:
    print(f"Cannot create due to {c}") 
    

In [None]:
#Read data
data = pd.read_csv(f"/Users/{user}/forecasting-part-two/victoria_covid_cases_source_updated.csv", parse_dates=True)

In [None]:
#Load data bq
data.to_gbq(destination_table=f"{dataset.dataset_id}.victoria_cases_by_postcode_lga", 
                                project_id=client.project,
                                if_exists="replace")

In [None]:
#Get the data and aggregate it in bigquery

date_column = "diagnosis_date"
label = "total"

table_id = f"{client.project}.{dataset.dataset_id}.vic_daily_cases"

job_config = bigquery.QueryJobConfig(destination=table_id)
job_config.write_disposition = "WRITE_TRUNCATE" #overwrite the table

sql = f"""select 
          parse_date('%Y-%m-%d',{date_column}) as {date_column}, 
          count(Postcode) as {label} 
          from {dataset.dataset_id}.victoria_cases_by_postcode_lga
          group by {date_column}
          order by {date_column}
       """

# Start the query, passing in the extra configuration.
query_job = client.query(sql, job_config=job_config)  # Make an API request.
query_job.result()  # Wait for the job to complete.

print("Query results loaded to the table {}".format(table_id))


In [None]:
#Configuration of train test split and table and model names
train_test_split="2021-08-20"
training_table=f"{client.project}.{dataset.dataset_id}.vic_daily_cases_training"
testing_table=f"{client.project}.{dataset.dataset_id}.vic_daily_cases_testing"
model_name=f"{dataset.dataset_id}.vic_cases_bq_arima_model"

In [None]:
#Creating training and testing tables

##TRAINING
job_config = bigquery.QueryJobConfig(destination=training_table)
job_config.write_disposition = "WRITE_TRUNCATE" #overwrite the table

sql = f"select * FROM {table_id} where {date_column} <= '{train_test_split}'"

# Start the query, passing in the extra configuration.
query_job = client.query(sql, job_config=job_config)  # Make an API request.
query_job.result()  # Wait for the job to complete.

print("Query results loaded to the table {}".format(training_table))


##TESTING
job_config = bigquery.QueryJobConfig(destination=testing_table)
job_config.write_disposition = "WRITE_TRUNCATE" #overwrite the table

sql = f"select * FROM {table_id} where {date_column} > '{train_test_split}'"

# Start the query, passing in the extra configuration.
query_job = client.query(sql, job_config=job_config)  # Make an API request.
query_job.result()  # Wait for the job to complete.

print("Query results loaded to the table {}".format(testing_table))

In [None]:
#Defining model parameters
model_type = "ARIMA_PLUS"
auto_arima = "TRUE"
data_frequency = "AUTO_FREQUENCY"
decompose_series = "TRUE"

In [None]:
#CREATE THE MODEL AND TRAIN IT
#standardSQL
sql = f"""CREATE OR REPLACE MODEL {model_name}
OPTIONS
  (model_type = '{model_type}',
   time_series_timestamp_col = '{date_column}',
   time_series_data_col = '{label}',
   auto_arima = {auto_arima},
   data_frequency = '{data_frequency}',
   decompose_time_series = {decompose_series}
  ) AS
SELECT
  {date_column},
  {label}
FROM
  `{training_table}`
"""

query_job = client.query(sql)
query_job.result()#waiting for the result

In [None]:
#Evaluate the model parameters
query=f"""SELECT
            *
            FROM
            ML.ARIMA_EVALUATE(MODEL {model_name})
        """
model_evaluate = pd.read_gbq(query=query, project_id=client.project, dialect='standard')
model_evaluate

In [None]:
#Get model coefficients
query=f"""SELECT
            *
            FROM
            ML.ARIMA_COEFFICIENTS(MODEL {model_name})
        """
model_coefficients = pd.read_gbq(query=query, project_id=client.project, dialect='standard')
model_coefficients

In [None]:
#FORECAST 5 days ahead

forecast_days=5
query=f"""
SELECT parse_date('%Y-%m-%d', format_datetime('%Y-%m-%d', forecast_timestamp)) as diagnosis_date, *
FROM
(
SELECT
 *
FROM
 ML.FORECAST(MODEL {model_name},
             STRUCT({forecast_days} AS horizon, 0.9 AS confidence_level))
)
"""
forecast_data=pd.read_gbq(query=query, project_id=client.project, dialect='standard')
forecast_data

In [None]:
#test data
query=f"""SELECT * FROM {testing_table} order by {date_column}"""
test_data=pd.read_gbq(query=query, project_id=client.project, dialect='standard')
test_data

In [None]:
RMSE = rmse(test_data.total, forecast_data.forecast_value)
print(f"rmse is {RMSE}")

In [None]:
#train data
query=f"""SELECT * FROM {training_table} order by {date_column}"""
train_data=pd.read_gbq(query=query, project_id=client.project, dialect='standard')
train_data

In [None]:
#plot the data
train_data.set_index(f"{date_column}", inplace=True)
test_data.set_index(f"{date_column}", inplace=True)
forecast_data.set_index(f"{date_column}", inplace=True)

In [None]:
#Plot the training, test and Forecast
ax = train_data[f"{label}"].plot(figsize=(14,8), lw=4)
ax.plot(test_data[f"{label}"], lw=4)
ax.plot(forecast_data.forecast_value, lw=4)
ax.set_xlim(date(2021,6,1), date(2021,8,25))
ax.set_ylim(0,100)
ax.legend(["TRAIN","TEST","FORECAST"]);

In [None]:
#FORECAST INTO THE FUTURE


#CREATE THE MODEL AND TRAIN IT
#standardSQL
sql = f"""CREATE OR REPLACE MODEL {model_name}
OPTIONS
  (model_type = '{model_type}',
   time_series_timestamp_col = '{date_column}',
   time_series_data_col = '{label}',
   auto_arima = {auto_arima},
   data_frequency = '{data_frequency}',
   decompose_time_series = {decompose_series}
  ) AS
SELECT
  {date_column},
  {label}
FROM
  `{table_id}`
"""

query_job = client.query(sql)
query_job.result()#waiting for the result

In [None]:
#Forecasting 10 days ahead
forecast_days=10
query=f"""
SELECT parse_date('%Y-%m-%d', format_datetime('%Y-%m-%d', forecast_timestamp)) as diagnosis_date, *
FROM
(
SELECT
 *
FROM
 ML.FORECAST(MODEL {model_name},
             STRUCT({forecast_days} AS horizon, 0.9 AS confidence_level))
)
"""
forecast_data=pd.read_gbq(query=query, project_id=client.project, dialect='standard')
forecast_data

In [None]:
#Getting the full data 
full_data = train_data.append(test_data, sort=True)
forecast_data.set_index(f"{date_column}", inplace=True)

In [None]:
full_data

In [None]:
forecast_data.forecast_value

In [None]:
ax = full_data[f"{label}"].plot(figsize=(14,8), lw=4)
ax.plot(forecast_data.forecast_value, lw=4)
ax.set_xlim(date(2021,6,1), date(2021,9,4))
ax.set_ylim(0,100)
ax.legend(["TRAIN","FORECAST"]);