# TensorFlow Data Validation (TFDV): Scaling with Apache Beam

This notebook demonstrates how to use TensorFlow Data Validation (TFDV) with Apache Beam and DataFlow. It reuses the introduction to TFDV made in the previous lab.

- Extract data from BigQuery to GCS using DataFlow.
- Compute the summary statistics using TFDV and DataFlow.
- Explore the computed statistics visually to understand information about the data.
- Infer the schema.
- Save the updated schema to be used as a contract during inference.

### Install dependencies

In [1]:
!pip install tensorflow tensorflow_data_validation google-cloud-bigquery

You should consider upgrading via the '/Users/matthieu/dev/freeldom/mlops-framework/venv/bin/python -m pip install --upgrade pip' command.[0m


### Dataset

This notebook uses [Chicago crime data](https://data.cityofchicago.org/) data published as a public dataset in BigQuery. This dataset reflects reported incidents of crime (with the exception of murders where data exists for each victim) that occurred in the City of Chicago from 2001 to present, minus the most recent seven days. The data will be extracted with the following columns:

- **date**: Date when the incident occurred. this is sometimes a best estimate.
- **iucr**: The Illinois Unifrom Crime Reporting code.
- **primary_type**: The primary description of the IUCR code.
- **location_description**: Description of the location where the incident occurred.
- **arrest**: Indicates whether an arrest was made.
- **domestic**: Indicates whether the incident was domestic-related as defined by the Illinois Domestic Violence Act.
- **district**: Indicates the police district where the incident occurred. 
- **ward**: The ward (City Council district) where the incident occurred.
- **fbi_code**: Indicates the crime classification.
- **year**: Year the incident occurred.



### Imports

In [90]:
import tensorflow as tf 
from google.cloud import bigquery
import tensorflow_data_validation as tfdv
import pandas as pd
from tensorflow.python.lib.io import file_io
from google.protobuf import text_format
import apache_beam as beam 
from datetime import datetime

GCS_BUCKET = "freeldom-mlops-deployments" # Set your GCS bucket
PROJECT_ID = 'freeldom' # Set your GCP Project Id
REGION = 'asia-southeast1' # Set the region for Dataflow jobs
LOCAL = False

CHICAGO_CRIME_TABLE = 'bigquery-public-data.chicago_crime.crime'


## Extract data from BigQuery

Instead of extracting data from 2019 (260,673 records) to generate the schema, let's extract data between 2015 and 2019 (1,331,957 records). We will use the same SQL query to extract the data. However, this time we will use DataFlow. We will convert the records into TFRecord format as Tensorflow usually performs better on this type of file, but you could also use CSV if you want to.

In [75]:
def generate_query(year_from= None, year_to= None, limit= None) -> str:
    query = f"""
        SELECT 
            FORMAT_DATE('%Y',  CAST(date AS DATE)) AS crime_year,
            FORMAT_DATE('%b',  CAST(date AS DATE)) AS crime_month,
            FORMAT_DATE('%d',  CAST(date AS DATE)) AS crime_day, 
            FORMAT_DATE('%a',  CAST(date AS DATE)) AS crime_day_of_week, 
            iucr,
            primary_type,
            location_description,
            CAST(domestic AS INT64) AS domestic,
            district,
            ward,
            fbi_code,
            CAST(arrest AS INT64) AS arrest,
        FROM 
          {CHICAGO_CRIME_TABLE}
        """
    if year_from:
        query += f"WHERE year >= {year_from}"
        if year_to:
            query += f" AND year <= {year_to} \n"
    if limit:
        query  += f"LIMIT {limit}"
        
    return query

### Dataflow Pipeline

To convert to TFRecord, we need a function that returns each row of our dataset into a tf.Example record.

In [95]:
DATA_TYPES = {
    'crime_year': 'STRING',
    'crime_month': 'STRING',
    'crime_day': 'STRING',
    'crime_day_of_week': 'STRING',
    'iucr': 'STRING',
    'primary_type': 'STRING',
    'location_description': 'STRING',
    'fbi_code': 'STRING',
    'domestic': 'INTEGER',
    'district': 'INTEGER',
    'ward': 'INTEGER',
    'arrest': 'INTEGER'
}

def to_example(row, type_mapping):
    features = {}
    for name, value in row.items():
        datatype = type_mapping[name]
        if value is None:
            features[name] = tf.train.Feature()
        elif datatype == 'INTEGER':
            features[name] = tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
        else:
            features[name] = tf.train.Feature(bytes_list=tf.train.BytesList(value=[tf.compat.as_bytes(value)]))
    return tf.train.Example(features=tf.train.Features(feature=features))


Now, let's define our DataFlow pipeline. We have 4 steps as below:

In [96]:
def execute_pipeline(query, output_location, runner, type_mapping, args):
    options = beam.options.pipeline_options.GoogleCloudOptions(**args)
    
    with beam.Pipeline(runner, options=options) as pipeline:
        (pipeline 
             | "Extract dataset">> beam.io.Read(beam.io.ReadFromBigQuery(query=query, use_standard_sql=True))
             | "Convert" >> beam.Map(lambda instance: to_example(instance, type_mapping))
             | "Serialize" >> beam.Map(lambda example: example.SerializeToString(deterministic=True))
             | "Save as TFRecords" >> beam.io.WriteToTFRecord(file_path_prefix = output_location, file_name_suffix=".tfrecords")
        )

Once done, we can start triggering the job. Make sure your API for DataFlow has been enabled. 

We also define a setup.py to install tfdv on the workers.

In [97]:
%%writefile setup.py

from setuptools import setup

setup(
    name='tfdv',
    install_requires=[
      'tensorflow_data_validation'
    ]
)

Overwriting setup.py


In [101]:
job_time = datetime.now().strftime('%Y%m%d-%H%M%S')
root_folder = f"gs://{GCS_BUCKET}/tfdv/{job_name}" if not LOCAL else '.'

In [102]:
runner = "DataflowRunner" if not LOCAL else "DirectRunner"
job_name = f"tfdv-chicago-crime-{job_time}"
year_from =  2019
year_to = 2019
data_folder = root+"/data/train/"
query = generate_query(year_from, year_to, limit=None if not LOCAL else 10000)
args = {
    'job_name': job_name,
    'runner': runner,
    'project': PROJECT_ID,
    'region': REGION,
    'save_main_session': True,
    'staging_location': root_folder+"/staging/",
    'temp_location': root_folder+"/temp/",
    'setup_file': './setup.py'
}

Let's trigger the job. It may take about 15min with 1 worker.

In [103]:
execute_pipeline(query, data_folder, runner, DATA_TYPES, args)



In [105]:
data_folder

'gs://freeldom-mlops-deployments/tfdv/tfdv-chicago-crime-210526192954/data/train/'

Confirm the data has been correctly extracted.

In [104]:
!gsutil ls {data_folder}

gs://freeldom-mlops-deployments/tfdv/tfdv-chicago-crime-210526192954/data/train/-00000-of-00002.tfrecords
gs://freeldom-mlops-deployments/tfdv/tfdv-chicago-crime-210526192954/data/train/-00000-of-00003.tfrecords
gs://freeldom-mlops-deployments/tfdv/tfdv-chicago-crime-210526192954/data/train/-00001-of-00002.tfrecords
gs://freeldom-mlops-deployments/tfdv/tfdv-chicago-crime-210526192954/data/train/-00001-of-00003.tfrecords
gs://freeldom-mlops-deployments/tfdv/tfdv-chicago-crime-210526192954/data/train/-00002-of-00003.tfrecords


## Generating statistics

The next step is to generate the statistics. We also have to use DataFlow for this step. We specify a sample rate to compute the statistics over the sample.

In [66]:
job_name = f"tfdv-chicago-crime-stats-{job_time}"
args['job_name'] = job_name
stats_location = root_folder + "/stats/stats.pb"
options =  beam.options.pipeline_options.GoogleCloudOptions(**args)

_ = tfdv.generate_statistics_from_tfrecord(
    data_location=data_folder, 
    output_path=stats_location,
    stats_options=tfdv.StatsOptions(
        sample_rate=.3
    ),
    pipeline_options = options
)



Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`


Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`


Confirm the statistics have been correctly computed.

In [81]:
!gsutil ls {stats_location}

gs://freeldom-mlops-deployments/tfdv/tfdv-chicago-crime-210526195416/stats/


## Load statistics

From there, the remaining steps are the same as the previous notebook. We can read the statistics directly from GCS and visualize them.

In [82]:
stats = tfdv.load_statistics(stats_location)
tfdv.visualize_statistics(stats)

## Infer schema

Let's infer the schema now. Remember that you should always review the schema before using it.

In [70]:
schema = tfdv.infer_schema(statistics=stats)
tfdv.display_schema(schema)

Unnamed: 0_level_0,Type,Presence,Valency,Domain
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
'crime_year',BYTES,required,,-
'crime_day_of_week',STRING,required,,'crime_day_of_week'
'crime_month',STRING,required,,'crime_month'
'arrest',INT,required,,-
'crime_day',BYTES,required,,-
'district',INT,required,,-
'domestic',INT,required,,-
'fbi_code',STRING,required,,'fbi_code'
'iucr',BYTES,required,,-
'location_description',BYTES,optional,single,-


  pd.set_option('max_colwidth', -1)


Unnamed: 0_level_0,Values
Domain,Unnamed: 1_level_1
'crime_day_of_week',"'Fri', 'Mon', 'Sat', 'Sun', 'Thu', 'Tue', 'Wed'"
'crime_month',"'Apr', 'Aug', 'Dec', 'Feb', 'Jan', 'Jul', 'Jun', 'Mar', 'May', 'Nov', 'Oct', 'Sep'"
'fbi_code',"'01A', '01B', '02', '03', '04A', '04B', '05', '06', '07', '08A', '08B', '09', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '22', '24', '26'"
'primary_type',"'ARSON', 'ASSAULT', 'BATTERY', 'BURGLARY', 'CONCEALED CARRY LICENSE VIOLATION', 'CRIM SEXUAL ASSAULT', 'CRIMINAL DAMAGE', 'CRIMINAL SEXUAL ASSAULT', 'CRIMINAL TRESPASS', 'DECEPTIVE PRACTICE', 'GAMBLING', 'HOMICIDE', 'HUMAN TRAFFICKING', 'INTERFERENCE WITH PUBLIC OFFICER', 'INTIMIDATION', 'KIDNAPPING', 'LIQUOR LAW VIOLATION', 'MOTOR VEHICLE THEFT', 'NARCOTICS', 'NON - CRIMINAL', 'NON-CRIMINAL', 'NON-CRIMINAL (SUBJECT SPECIFIED)', 'OBSCENITY', 'OFFENSE INVOLVING CHILDREN', 'OTHER NARCOTIC VIOLATION', 'OTHER OFFENSE', 'PROSTITUTION', 'PUBLIC INDECENCY', 'PUBLIC PEACE VIOLATION', 'ROBBERY', 'SEX OFFENSE', 'STALKING', 'THEFT', 'WEAPONS VIOLATION'"


## End of lab

We are ending the lab here as the next steps are the same as previously. You must try to enhance your schema, and you can try to replicate the steps to extract data from 2020. Remember that you can include TFDV as a step in your ML pipelines. It could be added at the beginning to ensure the raw data has the correct format and after the transformations. You can also use the schema to validate or monitor inference data.