# Purpose

Demonstrate how to make SQL-based features testable and verifiable within Python client, as well as show how the same client can run in pre-production and production environments.

# Setup

To reproduce this, you'll need to:

1. Install dependencies: `pip install google-cloud-bigquery pyyaml jinja2`
1. Setup [GCP authenication](https://cloud.google.com/docs/authentication/getting-started)
1. Create a BigQuery table and populate using the national summary data from the [Atlantic COVID Tracking Project data](https://covidtracking.com/data/download).

# Sample data

Here's a sample query using the BigQuery client.

In [1]:
from google.cloud import bigquery

client = bigquery.Client()

QUERY = (
    """SELECT date, state, death, hospitalized 
         FROM `testable-features-poc.covid.us-states` 
        ORDER BY date DESC, state ASC
        LIMIT 5""")
query_job = client.query(QUERY)
rows = query_job.result()

for row in rows:
    print(row)

Row((datetime.date(2021, 3, 7), 'AK', 305, 1293), {'date': 0, 'state': 1, 'death': 2, 'hospitalized': 3})
Row((datetime.date(2021, 3, 7), 'AL', 10148, 45976), {'date': 0, 'state': 1, 'death': 2, 'hospitalized': 3})
Row((datetime.date(2021, 3, 7), 'AR', 5319, 14926), {'date': 0, 'state': 1, 'death': 2, 'hospitalized': 3})
Row((datetime.date(2021, 3, 7), 'AS', 0, None), {'date': 0, 'state': 1, 'death': 2, 'hospitalized': 3})
Row((datetime.date(2021, 3, 7), 'AZ', 16328, 57907), {'date': 0, 'state': 1, 'death': 2, 'hospitalized': 3})


## Define & validate feature
We'll define the feature in our notebook, and without publishing it, validate it against production data.

I doubt developers will get direct access to production data on their notebooks, so in practice, we'll probably need a service that proxies these requests between local environments and various prod and pre-prod environments.

In [53]:
import yaml

feature_def = yaml.safe_load("""
  parameters:
    - foo: bar
  sources:
    - source1: 
        prod: testable-features-poc.covid.us-states
  query: |
    SELECT death
      FROM `{{ source1 }}`
     WHERE state = '{{ state }}'
     ORDER BY date DESC
     LIMIT 1
""")

In [98]:
from jinja2 import Template
import csv
import os

class Connection:
    
    def __init__(self, feature):
        self.feature_def = feature
        self.gbq = bigquery.Client()
    
    def _build_sources(self, env):
        srcs = self.feature_def['sources']
        src_map = {}
        
        # Convert [ { 'source1': { 'env1': 'src1', 'env2': 'src2' } }, ... ]
        #   to { 'source1': 'src1, ... } using specified environment
        for src in srcs:
            for src_key in src:
                src_map[src_key] = src[src_key][env]
        return src_map
    
    def _build_query(self, **kwargs):
        t = Template(self.feature_def['query'])
        srcs = self._build_sources(os.getenv('ENV'))
        return t.render({**kwargs, **srcs})
    
    def condition_env(self, data, env):
        lines = data.strip().splitlines()
        reader = csv.reader(lines)
        parsed = list(reader)
        headers = parsed[0]
        data = parsed[1::]
        print(headers)
        print(data)
    
    def inference(self, **kwargs):
        q = self._build_query(**kwargs)
        job = self.gbq.query(q)
        val = next(job.result(), None)
        return val[0] if val else None
    
    def close(self):
        print("closed")

class FeaturesClient:
        
    @classmethod
    def load_feature(cls, feature):
        return Connection(feature)

In [95]:
import os

# connect to production environ to validate result
os.environ["ENV"] = "prod" 

c = FeaturesClient.load_feature(feature_def)

# this is a non-blocking LRO, like a future/promise...
c.inference(state="DC")    # expect 1030

1030

# Unit test

Now we'll write junit tests, enabling us to repeatedly validate our feature definition as well as test out edge cases.

In [133]:
import uuid

data = """
date, state, death
"2021-11-27","DC",123
"2021-11-27","VA",456
"""

#c.condition_env(data, "dev")


client = bigquery.Client()

# TODO: use the feature definition sources instead of hard coding
tmp_table_name = "testable-features-poc.covid.us-states-{}".format(str(uuid.uuid4()))
query = """
CREATE TABLE `{}`
    OPTIONS (
      expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
    )
    AS SELECT * FROM `testable-features-poc.covid.us-states` LIMIT 0
""".format(tmp_table_name)

print(query)

query_job = client.query(query)
query_job.result()

#
# TODO: put CSV in Cloud Storage and import. this is quick hack.
#
lines = data.strip().splitlines()
print(lines)
reader = csv.reader(lines)
parsed = list(reader)
headers = parsed[0]
data = parsed[1::]
print(headers)
print(data)


cols_str = ",".join(headers)
vals_str_tmp = ["({})".format(",".join(["\"{}\"".format(d) for d in row])) for row in data]
print(vals_str_tmp)
vals_str = ",".join(vals_str_tmp)

query = """
INSERT INTO `{}`
  ({})
VALUES {}
""".format(tmp_table_name, cols_str, vals_str)

print(query)
query_job = client.query(query)
query_job.result()


CREATE TABLE `testable-features-poc.covid.us-states-9dc9c4be-6dd6-4c87-a1d2-14b814fd047c`
    OPTIONS (
      expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
    )
    AS SELECT * FROM `testable-features-poc.covid.us-states` LIMIT 0

['date, state, death', '"2021-11-27","DC",123', '"2021-11-27","VA",456']
['date', ' state', ' death']
[['2021-11-27', 'DC', '123'], ['2021-11-27', 'VA', '456']]
['("2021-11-27","DC","123")', '("2021-11-27","VA","456")']

INSERT INTO `testable-features-poc.covid.us-states-9dc9c4be-6dd6-4c87-a1d2-14b814fd047c`
  (date, state, death)
VALUES ("2021-11-27","DC","123"),("2021-11-27","VA","456")



BadRequest: 400 Value has type STRING which cannot be inserted into column death, which has type INT64 at [4:27]

(job ID: 88a49f11-bc0e-4aa8-bb21-362f965fcd8b)

                               -----Query Job SQL Follows-----                               

    |    .    |    .    |    .    |    .    |    .    |    .    |    .    |    .    |
   1:
   2:INSERT INTO `testable-features-poc.covid.us-states-9dc9c4be-6dd6-4c87-a1d2-14b814fd047c`
   3:  (date, state, death)
   4:VALUES ("2021-11-27","DC","123"),("2021-11-27","VA","456")
    |    .    |    .    |    .    |    .    |    .    |    .    |    .    |    .    |

In [103]:
os.environ["ENV"] = "dev"

