In [1]:
# Import dependencies 
import tensorflow as tf
import pandas as pd
import numpy as np
import seaborn as sns
import apache_beam as beam

# Print version for easy debugging 
print "Library versions \n Tensorflow version: {} \n DataFlow version: {}".format(tf.__version__, beam.__version__)

Library versions 
 Tensorflow version: 1.11.0 
 DataFlow version: 2.10.0


<h2>Setup environment variables</h2>

Store local paths and filenames in environment variables for easy use. 

In [2]:
import os

# Store the root directory of the project
CWD = os.getcwd() # path to this notebook on the local filesystem
ROOT,_ = os.path.split(CWD) # on level up, the root directory of the project 

# Save path to raw in envron variables  
DATA_DIR = os.path.join(ROOT,'raw_data/')

DATA_PATH = os.path.join(DATA_DIR,'true_car_listings.csv')
CSV_HEADERS = 'Price,Year,Mileage,City,State,Vin,Make,Model'

TRAIN_STATS_PATH = os.path.join(DATA_DIR,'train_stats.tfrecord') # path to store statistics 
EVAL_STATS_PATH = os.path.join(DATA_DIR,'eval_stats.tfrecord') # path to store statistics 

TRAIN_DATA_PATH = os.path.join(DATA_DIR,'train_data.csv')
EVAL_DATA_PATH = os.path.join(DATA_DIR,'eval_data.csv')

PROJECTID = None 
STAGING_BUCKET = None
REGION = 'europe-west1'

# Check on the root oflder
print "Root project folder is: {}".format(ROOT)

Root project folder is: /Users/evanderknaap/Documents/Projects/tfvalidate


## Split train & test data in an Apache Beam pipeline 
We use a random number generator to split the data into a train and test-set. We don't care at this point if the files cannot be reused. 

In [214]:
import random
from apache_beam.options.pipeline_options import GoogleCloudOptions

class PipeOptions(GoogleCloudOptions):

  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--split_prob',
                        help='probability',
                        default=0.8)

def train_eval_fn(data_row, num_partitions):
    """Partitions data in train and evaluate based on a split prob"""
    """
        Args: 
            data_row: string of input data
            num_partitions: number of splits in data, 2 in this case
        Out: tuple the PCollections of the train and test data
    """        
    # Sample a number between 0,1 and one
    sample = random.uniform(0, 1)

    # Check if the number is smaller then defined treshold
    if sample <= options.split_prob:
        return 0 # for train
    else:
        return 1 # for evaluate

# execute the graph 
options = PipeOptions()

with beam.Pipeline(options = options) as p:
    raw_data = p |'ReadCSV' >> beam.io.ReadFromText(DATA_PATH, skip_header_lines=True)
    partitioned_data = raw_data | 'Split in train and test' >> beam.Partition(train_eval_fn,2)
    
    train_data = partitioned_data[0]
    eval_data = partitioned_data[1]
    
    _ =  train_data |'Write train data' >> beam.io.WriteToText(TRAIN_DATA_PATH,header='Price,Year,Mileage,City,State,Vin,Make,Model')
    _ =  eval_data  |'Write eval data' >> beam.io.WriteToText(EVAL_DATA_PATH,header='Price,Year,Mileage,City,State,Vin,Make,Model')

<h2>Compute statistics on data on local machine</h2>

Load the tfdv dependencies, this may take some time 

In [3]:
# Import tensorflow
import tensorflow_data_validation as tfdv

print "TFDV version: {}".format(tfdv.version.__version__)

TFDV version: 0.11.0


Next, we point TFDV to the location of our raw data, and compute the statistics on our local machine using an apache beam pipeline. They will be stored as a protobuffer .tfrecord file in the the folder located in the statistics path. We'll get a warning if there is an existing .tfrecord file, which you can choose to overwrite.

In [90]:
# Compute statistics from the raw data and store stats as a tfrecord
# the DataFeatureList, is protobuf, serialized as a string. The statistics protobuf is also serialized as a TFRecord
# binary and saved at the specified path
DataFeatureList = tfdv.generate_statistics_from_csv(data_location=TRAIN_DATA_PATH, output_path=TRAIN_STATS_PATH)



We notice a few interesting things already
- There is a huge spread in **mileage**. Avearge mileage is $52.5K$ where the std dev is $42.0K$. 
- Outliers in **mileage** of $2.86M$ skew the picture. We might need to exclude those to get a better fell for the distribution
- The spread in **price** is quite large. The std dev is half if the mean prive of $21.5K$. Most expensive car is $500K$, making it hard to judge the distribution. 
- Apparantly most cars that are sold second-hand are actually when they are just 1 **year** old: $2017$.
- **Vin** numbers show a unit linear distribution, indicating they are unique to each car. The slight increase slop inidicates there are some duplicates. 
- There are $58$ different **makes** where the Ford is most popular. About $85%$ of listings belong to about $25$ cars. They are quite concentrated. The tail is made up by more exotic cars like Porches.
- **Models** are also quite concentrated, $80%$ of the $1000$ listing are concentrated in the first 400 models. 

In [131]:
# Load the statistics from file, so they won't have be re-run everytime 
train_stats_proto = tfdv.load_statistics(TRAIN_STATS_PATH)

In [155]:
# Lets loop through the features we have, and print the Name, data type, where P0 is INT and P2 is STRING,
# A Feature has 1 of 4 possible statistics set. Numerical, String, Bytes and Structs
def printSomeProtoStuff(train_buf):

    for dataset in train_buf.datasets:
        for feature in dataset.features:
            if feature.name == 'City':
                print "Name {} and type {}".format(feature.name,feature.type)

                # Check if custom stats already exist 
                if len(feature.custom_stats) > 0:
                    print "Multiple features are found"

                    # Print the ones we have
                    for stats in feature.custom_stats:
                        print stats
                    
                    # Change value here if you want
                    stats.num = 12
                else:
                    # Lets add one 
                    stat = feature.custom_stats.add()
                    stat.name = "MyStat"
                    stat.num = 11

printSomeProtoStuff(train_stats_proto)

Name City and type 2
Multiple features are found
name: "MyStat"
num: 12.0



In [149]:
from google.protobuf import text_format
from tensorflow.python.lib.io import file_io
from tensorflow_metadata.proto.v0 import statistics_pb2 # manipulate the statistics protobuff

# Write to disk
def saveToTextfile(stats, output_path):
    text_to_write = text_format.MessageToString(stats)
    file_io.write_string_to_file(filename=output_path,file_content=text_to_write) # do

def loadStatsFromText(input_path):
  DFSL_ob = statistics_pb2.DatasetFeatureStatisticsList()
  DFSL_text = file_io.read_file_to_string(input_path)
  text_format.Parse(DFSL_text, DFSL_ob)
  return DFSL_ob    

In [143]:
# Save as a test a file to disk 
saveToTextfile(train_stats_proto, 'test')

In [151]:
train_stats_proto_2 = loadStatsFromText('test')

printSomeProtoStuff(train_stats_proto_2)

Name City and type 2
Multiple featuers are found
name: "MyStat"
num: 10.0



In [115]:
# Visualize using facets
tfdv.visualize_statistics(train_stats_proto)

In [116]:
# Now we want to actually see some more detail on a subfield. For this we need to import the statistics_lb2 

##  Infer a schema

When loading the data, we need to define a schema to convert the data into Tensors. We can use TFDV to infer a first  schema automatically. This schema is then used, to check if new data fits the schema, if new data arrives. 

In [221]:
train_schema = tfdv.infer_schema(statistics=train_stats_proto)
tfdv.display_schema(schema=train_schema)

Unnamed: 0_level_0,Type,Presence,Valency,Domain
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
'City',BYTES,required,,-
'Mileage',INT,required,,-
'Price',INT,required,,-
'Vin',BYTES,required,,-
'State',STRING,required,,'State'
'Year',INT,required,,-
'Model',BYTES,required,,-
'Make',STRING,required,,'Make'


Unnamed: 0_level_0,Values
Domain,Unnamed: 1_level_1
'State',"' AK', ' AL', ' AR', ' AZ', ' Az', ' CA', ' CO', ' CT', ' Ca', ' DC', ' DE', ' FL', ' Fl', ' GA', ' Ga', ' HI', ' IA', ' ID', ' IL', ' IN', ' KS', ' KY', ' LA', ' MA', ' MD', ' ME', ' MI', ' MN', ' MO', ' MS', ' MT', ' Md', ' NC', ' ND', ' NE', ' NH', ' NJ', ' NM', ' NV', ' NY', ' OH', ' OK', ' OR', ' Oh', ' PA', ' RI', ' SC', ' SD', ' TN', ' TX', ' UT', ' VA', ' VT', ' Va', ' WA', ' WI', ' WV', ' WY', ' ga'"
'Make',"'AM', 'Acura', 'Alfa', 'Aston', 'Audi', 'BMW', 'Bentley', 'Buick', 'Cadillac', 'Chevrolet', 'Chrysler', 'Dodge', 'FIAT', 'Ferrari', 'Fisker', 'Ford', 'Freightliner', 'GMC', 'Genesis', 'HUMMER', 'Honda', 'Hyundai', 'INFINITI', 'Isuzu', 'Jaguar', 'Jeep', 'Kia', 'Lamborghini', 'Land', 'Lexus', 'Lincoln', 'Lotus', 'MINI', 'Maserati', 'Maybach', 'Mazda', 'McLaren', 'Mercedes-Benz', 'Mercury', 'Mitsubishi', 'Nissan', 'Oldsmobile', 'Plymouth', 'Pontiac', 'Porsche', 'Ram', 'Rolls-Royce', 'Saab', 'Saturn', 'Scion', 'Subaru', 'Suzuki', 'Tesla', 'Toyota', 'Volkswagen', 'Volvo', 'smart'"


## Compare statistics

In [218]:
# Compute statistics from the eval data and store stats as a tfrecords
_ = tfdv.generate_statistics_from_csv(data_location=EVAL_DATA_PATH, output_path=EVAL_STATS_PATH)

In [219]:
# Load the statistics from file, so they won't have be re-run everytime 
eval_stats_proto = tfdv.load_statistics(EVAL_STATS_PATH)

In [222]:
# Compare the statistics 
tfdv.visualize_statistics(lhs_statistics=train_stats_proto, rhs_statistics=eval_stats_proto, lhs_name='train',
                         rhs_name='eval')

In [223]:
anomalies = tfdv.validate_statistics(statistics=eval_stats_proto, schema=train_schema)
tfdv.display_anomalies(anomalies)

Unnamed: 0_level_0,Anomaly short description,Anomaly long description
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1
'Make',Unexpected string values,Examples contain values missing from the schema: Geo (<1%).
