In [1]:
%load_ext google.datalab.kernel

import google.datalab.bigquery as bq
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
import datetime
import time
import math
from matplotlib import cm
from matplotlib import gridspec
from matplotlib import pyplot as plt
import numpy as np
import pandas as pd
from sklearn import metrics
import tensorflow as tf
from tensorflow.python.data import Dataset

tf.logging.set_verbosity(tf.logging.ERROR)
pd.options.display.max_rows = 10
pd.options.display.float_format = '{:.1f}'.format

## Function to create Tensorflow feature object

In [2]:
def construct_feature_columns(input_features):
  """Construct the TensorFlow Feature Columns.

  Args:
    input_features: The names of the numerical input features to use.
  Returns:
    A set of feature columns
  """ 
  return set([tf.feature_column.numeric_column(my_feature)
              for my_feature in input_features])

## Input function

In [3]:
def my_input_fn(features, targets, batch_size=1, shuffle=True, num_epochs=None):
    """Inputs data into TensorFlow object.
  
    Args:
      features: pandas DataFrame of features
      targets: pandas DataFrame of targets
      batch_size: Size of batches to be passed to the model
      shuffle: True or False. Whether to shuffle the data.
      num_epochs: Number of epochs for which data should be repeated. None = repeat indefinitely
    Returns:
      Tuple of (features, labels) for next data batch
    """
    
    # Convert pandas data into a dict of np arrays.
    features = {key:np.array(value) for key,value in dict(features).items()}                                             
 
    # Construct a dataset, and configure batching/repeating.
    ds = Dataset.from_tensor_slices((features,targets)) # warning: 2GB limit
    ds = ds.batch(batch_size).repeat(num_epochs)
    
    # Shuffle the data, if specified.
    if shuffle:
      ds = ds.shuffle(10000)
    
    # Return the next batch of data.
    features, labels = ds.make_one_shot_iterator().get_next()
    return features, labels

## Get latest saved trained model

In [4]:
def get_model(features):
  # Load the DNNRegressor object.
  my_optimizer = tf.train.GradientDescentOptimizer(learning_rate=0.000001)
  my_optimizer = tf.contrib.estimator.clip_gradients_by_norm(my_optimizer, 5.0)
  dnn_regressor = tf.estimator.DNNRegressor(
      feature_columns=construct_feature_columns(features),
      hidden_units=[10, 10],
      optimizer=my_optimizer,
      model_dir='./models/wine_pred')

  return dnn_regressor

# dnn_regressor = get_model(features)

# Get predicted values

In [5]:
def get_predicted_values(features, dnn_regressor):
  predict_training_input_fn = lambda: my_input_fn(features, 
                                                    targets = pd.DataFrame(index=range(len(features))),
                                                    num_epochs=1, 
                                                    shuffle=False)

  predictions = dnn_regressor.predict(input_fn=predict_training_input_fn)
  predictions = np.array([item['predictions'][0] for item in predictions])
  return predictions

## Class preparing data and defining features
Data from Looker webhook should not have `quality`, but should have `wine_id`

In [6]:
class NewData():
  """
  Feature creation, one-hot encoding of wine type, 
  removing unneeded fields, features dataframe and 
  wine id dataframe (to be mearged with predictions)
  
  Args:
    Data from Looker
  
  Returns: 
    self.wine_id - Vector of wine ids to be joined with predictions
    self.features - Dataframe of feature values for creating predictions
  
  """
  def __init__(self, df):
      # Create vector of wine_ids
      try:
        self.wine_id = df[["wine_id"]].copy()
        df = df.drop("wine_id", 1)
        id_message = "Wine_id found"
      except:
        id_message = "Warning, no wine_id"
      
      # Drop target if it exists
      try:
        df = df.drop("quality", 1)
        target_message = "Data contained target"
      except:
        target_message = "Data did NOT contain target"
        
      # Drop date if it exists
      try:
        df = df.drop("created_at", 1)
        date_mess = "Data contained created_at"
      except:
        date_mess = "Data did NOT contain created_at"
        
      # Ordering fields (also checks for missing or extra fields)
      try:
        df = df[["wine_type", "fixed_acidity", "volatile_acidity", "citric_acid", "residual_sugar", "chlorides", "free_sulfur_dioxide", "total_sulfur_dioxide", "density", "pH", "sulphates", "alcohol"]]
        order_mess = "Fields Complete"
      except:
        order_mess = "Error: missing fields"

      # One-hot encoding for wine types
      # Creates on variable, 1 for red and 0 for white
      df = pd.get_dummies(df, columns=["wine_type"])
      
      if "wine_type_red" in df:
        red_wine_encoding_message = "Contains red wine"
      else:
        red_wine_encoding_message = "No red wine"
        df["wine_type_red"] = 0
      
      if "wine_type_white" in df:
        white_wine_encoding_message = "Contains white wine"
        df = df.drop("wine_type_white", 1)
      else:
        white_wine_encoding_message = "No white wine"

      # Checks for fixed_acidity = 0
      # Change to neutral pH if 0
      for i in range(len(df["fixed_acidity"])):
        if df.at[i, "fixed_acidity"] <= 0:
          df.at[i,"fixed_acidity"] = 7
      # Calculate sugar_to_acidity_ratio
      df["sugar_to_acidity_ratio"] = (
        df["residual_sugar"] /
        df["fixed_acidity"])
      
      # Final features variable
      self.features = df.copy()

      # Status messages
      self.wine_id_mess = id_message
      self.red_wine_mess = red_wine_encoding_message
      self.white_wine_mess = white_wine_encoding_message
      self.target_mess = target_message
      self.order_mess = order_mess
      self.date_mess = date_mess

      

## Function to write predictions back to Bigquery

In [7]:
def write_preds_to_bq(preds):
  """Writes predictions back to Bigquery.
  Includes timestamp for deduping in db
  """
  
  # Specify GCP project and BQ dataset
  client = bigquery.Client(project='looker-action-hub')
  dataset_ref = client.dataset('wine_data')

  # Define table
  schema = [
      bigquery.SchemaField('wine_id', 'INTEGER', mode='REQUIRED'),
      bigquery.SchemaField('predicted_quality', 'FLOAT', mode='REQUIRED'),
      bigquery.SchemaField('time', 'TIMESTAMP'),
  ]
  
  table_ref = dataset_ref.table('predicted_quality')
  table = bigquery.Table(table_ref, schema=schema)
  
  # Get current timestamp
  import time
  import datetime
  ts = time.time()
  ts = datetime.datetime.fromtimestamp(ts).isoformat()
  preds["time"] = ts
  
  # Writes new rows to table
  try:
    client.create_rows(table, preds.as_matrix())  # API request
    rows_mess = "Rows inserted in predicted_quality"
  except:
    rows_mess = "Error: problem inserting rows into predicted_quality"
    
  return rows_mess

  # If you ever need to recreate the table
  # client.create_table(table)  # API request
  

In [8]:
def final_predictions(data):
  """Final function
  Args:
    New data. (Currently taking Pandas dataframe, needs to be modified to take JSON from Looker when we know what form that will be.)
    
  Returns:
    Writes predictions to back to Bigquery with wine_id to join back to wines and a timestamp of load for deduping
  """
  
  # Creates features to be put into model, and
  # wine_id to be merged with predicted values
  df = NewData(data)
  print df.wine_id_mess
  print df.red_wine_mess + "; " + df.white_wine_mess
  print df.target_mess
  print df.order_mess
  print df.date_mess
  
  # Loads saved model from file
  try:
    dnn_regressor = get_model(df.features)
    print "Model loaded"
  except:
    print "Error loading model"
  
  # Gets predicted values from new features and model
  try:
    predicted_values = pd.DataFrame(get_predicted_values(df.features, dnn_regressor))
    predicted_values.columns = ["predicted_quality"]
    print "Predicted values produced"
  except:
    print "Error getting predicted values"

  # Merges predictions with wine_id
  # Returns Pandas dataframe with wine_id and predicted values
  predicted_values["wine_id"] = df.wine_id
  predicted_values = predicted_values[["wine_id", "predicted_quality"]]
  
  # Writes predictions to Bigquery
  mess = write_preds_to_bq(predicted_values)
  print mess

## Import Data [Insert code getting new data from Looker webhook here]
This will be replaced by code that accepts new data from Looker webhook.
For now, we'll just pull it straight out of Bigquery

In [9]:
%%bq query -n wine_query
select *
from `looker-action-hub.wine_data.new_wines_no_quality`
where created_at > '2018-05-01'

In [10]:
features = wine_query.execute(output_options=bq.QueryOutput.dataframe()).result()

DefaultCredentialsError: Could not automatically determine credentials. Please set GOOGLE_APPLICATION_CREDENTIALS or explicitly create credentials and re-run the application. For more information, please see https://developers.google.com/accounts/docs/application-default-credentials.

In [None]:
final_predictions(features)