# Using BQ connectors with Wallaroo pipelines

In [1]:
import json
import os
import datetime

import wallaroo
from wallaroo.object import EntityNotFoundError

# used to display dataframe information without truncating
from IPython.display import display
import pandas as pd
import numpy as np

# for Big Query connections
from google.cloud import bigquery
from google.oauth2 import service_account
import db_dtypes

# utility functions for creating demo queries
from resources import util

pd.set_option('display.max_colwidth', None)

#
# convenience functions
#

def get_workspace(name):
    workspace = None
    for ws in wl.list_workspaces():
        if ws.name() == name:
            workspace= ws
    if(workspace == None):
        workspace = wl.create_workspace(name)
    return workspace

# get a pipeline by name in the workspace
def get_pipeline(pname, create_if_absent=False):
    plist = wl.get_current_workspace().pipelines()
    pipeline = [p for p in plist if p.name() == pname]
    if len(pipeline) <= 0:
        if create_if_absent:
            pipeline = wl.build_pipeline(pname)
        else:
            raise KeyError(f"pipeline {pname} not found in this workspace")
    else:
        pipeline = pipeline[0]
    return pipeline


## Connect to Wallaroo and Set Workspace

In [2]:
# Login through local Wallaroo instance

workspace_name = 'bikerental-nbz'
pipeline_name = 'bikeforecast-pipe'
model_name = 'bikeforecast-arima'

wl = wallaroo.Client()
workspace = get_workspace(workspace_name)
_ = wl.set_current_workspace(workspace)

## Connect to BigQuery

In [3]:
conn_name = 'bq-wl-dev'
connection = wl.get_connection(conn_name)

# set the credentials
bigquery_credentials = service_account.Credentials.from_service_account_info(connection.details())

# start the client
bigqueryclient = bigquery.Client(
    credentials=bigquery_credentials, 
    project=connection.details()['project_id']
)

In [4]:
# table information

dataset = 'bikerental_forecast_demo'
input_table = 'bikerentals'
in_tablename = f'{dataset}.{input_table}'
output_table = 'bikeforecasts'
out_tablename = f'{dataset}.{output_table}'

print(f'input data source: {in_tablename}; output staging table: {out_tablename}')

input data source: bikerental_forecast_demo.bikerentals; output staging table: bikerental_forecast_demo.bikeforecasts


In [5]:
# FOR THE PURPOSES OF THE DEMO, recreate a blank staging table
def create_staging_table(bqclient, tablename):
    fulltablename = f'{bqclient.project}.{tablename}'
    schema = [
        bigquery.SchemaField("dteday", "DATE", mode="REQUIRED"),
        bigquery.SchemaField("site_id", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("forecast", "INTEGER", mode="REQUIRED")
    ]
    table = bigquery.Table(fulltablename, schema=schema)
    table = bqclient.create_table(table) 
    
# drop the table if it already exists
bigqueryclient.delete_table(out_tablename, not_found_ok=True)
# recreate it
create_staging_table(bigqueryclient, out_tablename)

## Deploy the Pipeline

In [6]:
pipeline = get_pipeline(pipeline_name)
pipeline.deploy()

Waiting for deployment - this will take up to 45s ................................ ok


0,1
name,bikeforecast-pipe
created,2023-06-20 16:49:16.533439+00:00
last_updated,2023-06-20 16:53:38.733377+00:00
deployed,True
tags,
versions,"01e1f0f6-8828-42b2-8d5f-bd2ad7848da2, d8f9fcc1-3732-4d3d-8202-4521c76f5034, 33c4432f-9013-4535-a1b8-c4d8a08fa4e7"
steps,bikeforecast-arima


In [7]:
pipeline.status()

{'status': 'Running',
 'details': [],
 'engines': [{'ip': '10.244.24.2',
   'name': 'engine-6949f8d-9d5wm',
   'status': 'Running',
   'reason': None,
   'details': [],
   'pipeline_statuses': {'pipelines': [{'id': 'bikeforecast-pipe',
      'status': 'Running'}]},
   'model_statuses': {'models': [{'name': 'bikeforecast-arima',
      'version': 'bc1d79c5-79b4-4864-b351-98828b865db5',
      'sha': '7ed0600d8b754ff9d901633cf78de7f825c9d2d0f79a2647d271d2b04f4befd8',
      'status': 'Running'}]}}],
 'engine_lbs': [{'ip': '10.244.24.3',
   'name': 'engine-lb-584f54c899-cpjpk',
   'status': 'Running',
   'reason': None,
   'details': []}],
 'sidekicks': []}

### Run Inference
For this example, we will forecast bike rentals for the following seven days, based on the previous month's rentals (inclusive of "today").

In [8]:
# set the day we are doing the forecasts
# in a real life situation this would be fetched via datetime.datetime.now() or something
today = '2011-03-01'

print(f'Running analysis on {today}')

Running analysis on 2011-03-01


In [9]:
# get all the site names
sites = bigqueryclient.query(f"select distinct site_id from {in_tablename}").to_dataframe()
sites = sites['site_id'].to_numpy()
print(f'{len(sites)} rental sites')

10 rental sites


#### Do one site, without writing results to table, to show process.

In [10]:
# create the query to get historical data
query = util.mk_dt_range_query(tablename=in_tablename, day_of_forecast=today, site_id=sites[0])
print(query)


    select dteday, site_id, cnt, season, holiday, weekday, workingday
    from bikerental_forecast_demo.bikerentals where dteday > DATE_SUB(DATE '2011-03-01', INTERVAL 1 MONTH) AND dteday <= PARSE_DATE('%F', '2011-03-01')
    and site_id = 'site0001'
    order by dteday


In [11]:
# create the query to get the exogenous variables for the forecast days ("future data")
# In practice, this data likely would come from a different table, not bikerentals

xquery = util.mk_exog_query(tablename=in_tablename, day_of_forecast=today, site_id=sites[0], nforecast=7)
print(xquery)


    select dteday, site_id, season, holiday, weekday, workingday
    from bikerental_forecast_demo.bikerentals where dteday > PARSE_DATE('%F', '2011-03-01') AND dteday <= DATE_ADD(DATE '2011-03-01', INTERVAL 7 day)
    and site_id = "site0001"
    order by dteday


The model takes both historical data (bike rental counts and exogenous data about each day (holiday, weekday, etc). and exogenous information about the future days to be forecasted. We combine it into one table, where `cnt` is -1 for days in the future (to mark that the count is unknown and must be predicted).

In [12]:
# get the historical data and the exogenous variables and concat them for the model

historical_data = bigqueryclient.query(query).to_dataframe()
exog = bigqueryclient.query(xquery).to_dataframe()

# fill the exog frame with a "nan" count: -1 so I can pass everything in as one frame
# actual nan would be better but I'm not sure if the platform handles it
exog['cnt'] = -1

input_frame = pd.concat([historical_data, exog]).reset_index(drop=True)
# convert the date to a string
input_frame['dteday'] = input_frame['dteday'].astype(str)
input_frame

Unnamed: 0,dteday,site_id,cnt,season,holiday,weekday,workingday
0,2011-02-02,site0001,1240,1,0,3,1
1,2011-02-03,site0001,1551,1,0,4,1
2,2011-02-04,site0001,2324,1,0,5,1
3,2011-02-05,site0001,805,1,0,6,0
4,2011-02-06,site0001,1948,1,0,0,0
5,2011-02-07,site0001,1650,1,0,1,1
6,2011-02-08,site0001,913,1,0,2,1
7,2011-02-09,site0001,931,1,0,3,1
8,2011-02-10,site0001,1256,1,0,4,1
9,2011-02-11,site0001,1614,1,0,5,1


In [13]:
# send data to model for forecast

results = pipeline.infer(input_frame.to_dict())[0]
resultframe = pd.DataFrame(results)
resultframe

# normally we would write these results back to the database bikeforecast table, but I won't do that here because I'll do it in the loop

# output_table = bigqueryclient.get_table(out_tablename)
# bigqueryclient.insert_rows_from_dataframe(output_table, dataframe=resultframe)

Unnamed: 0,dteday,site_id,forecast
0,2011-03-02,site0001,2269
1,2011-03-03,site0001,1712
2,2011-03-04,site0001,1795
3,2011-03-05,site0001,1371
4,2011-03-06,site0001,1819
5,2011-03-07,site0001,2045
6,2011-03-08,site0001,1974


# Loop over all sites


In [14]:
# function to execute the task. Wrapping it this way will be useful when we create the orchestration

def do_forecast(bqclient, pipeline, in_table, out_table, forecast_day, site):
    # get input data
    query = util.mk_dt_range_query(tablename=in_table, day_of_forecast=forecast_day, site_id=site)
    xquery = util.mk_exog_query(tablename=in_table, day_of_forecast=forecast_day, site_id=site, nforecast=7)
    
    historical_data = bqclient.query(query).to_dataframe()
    exog = bqclient.query(xquery).to_dataframe()
    exog['cnt'] = -1

    input_frame = pd.concat([historical_data, exog]).reset_index(drop=True)
    input_frame['dteday'] = input_frame['dteday'].astype(str)
    
    # infer
    results = pipeline.infer(input_frame.to_dict())[0]
    resultframe = pd.DataFrame(results)

    # write to staging table
    output_table = bqclient.get_table(out_table)
    bqclient.insert_rows_from_dataframe(output_table, dataframe=resultframe)
    
    

In [15]:
for site in sites:
    print(f'forecasting {site}')
    do_forecast(bigqueryclient, pipeline, in_tablename, out_tablename, today, site)

    
print('forecast complete, results written to bikeforecast table')

forecasting site0001
forecasting site0002
forecasting site0003
forecasting site0004
forecasting site0005
forecasting site0006
forecasting site0007
forecasting site0008
forecasting site0009
forecasting site0010
forecast complete, results written to bikeforecast table


# Check the table


In [16]:
query = f'''SELECT * from {out_tablename} ORDER BY site_id, dteday'''
print(query)

bigqueryclient.query(query).to_dataframe()


SELECT * from bikerental_forecast_demo.bikeforecasts ORDER BY site_id, dteday


Unnamed: 0,dteday,site_id,forecast
0,2011-03-02,site0001,2269
1,2011-03-03,site0001,1712
2,2011-03-04,site0001,1795
3,2011-03-05,site0001,1371
4,2011-03-06,site0001,1819
...,...,...,...
65,2011-03-04,site0010,1717
66,2011-03-05,site0010,1400
67,2011-03-06,site0010,1997
68,2011-03-07,site0010,2117


### Undeploy the Pipeline

Undeploy the pipeline and return the resources back to the Wallaroo instance.

In [17]:
pipeline.undeploy()

Waiting for undeployment - this will take up to 45s ..................................... ok


0,1
name,bikeforecast-pipe
created,2023-06-20 16:49:16.533439+00:00
last_updated,2023-06-20 16:53:38.733377+00:00
deployed,False
tags,
versions,"01e1f0f6-8828-42b2-8d5f-bd2ad7848da2, d8f9fcc1-3732-4d3d-8202-4521c76f5034, 33c4432f-9013-4535-a1b8-c4d8a08fa4e7"
steps,bikeforecast-arima


In [18]:
bigqueryclient.close()