# Feature extraction
### _TODO_
This notebook continues the codifies the capabilities discussed in this [blog post](https://8081-dot-3124631-dot-devshell.appspot.com/). In a nutshell, it uses the pre-trained inception model as a starting point and then uses transfer learning to train it further on additional, customer-specific images. For explanation, simple flower images are used. Compared to training from scratch, the time and costs are drastically reduced.

This notebook does preprocessing, training and prediction by calling CloudML API instead of running them in the Datalab container.  The purpose of local work is to do some initial prototyping and debugging on small scale data - often by taking a suitable (say 0.1 - 1%) sample of the full data. The same basic steps can then be repeated with much larger datasets in cloud.

In [None]:
import IPython
import apache_beam as beam
import datetime
import logging
import os
import shutil
import tempfile
import warnings

from apache_beam.io import tfrecordio

warnings.filterwarnings(action='ignore')

In [None]:
project_id = datalab_project_id()
bucket = 'gs://candies-{}-dantest'.format(project_id)
preprocess_dir = '{}/candies_preprocessed_cloud'.format(bucket)
model_dir = '{}/candies_model_cloud'.format(bucket)
staging_dir = '{}/staging'.format(bucket)

In [None]:
!gsutil mb $bucket

## Pipeline transformations
Code the pipeline transformations

### Extract images and labels from CSV
Extracts (uri, label_ids) tuples from CSV rows.

In [None]:
class ExtractLabelIdsDoFn(beam.DoFn):
  """Extracts (uri, label_ids) tuples from CSV rows.
  """

  def start_bundle(self, context=None):
    self.label_to_id_map = {}

  def process(self, element, all_labels):
    all_labels = list(all_labels)
    # DataFlow cannot garuantee the order of the labels when materializing it.
    # The labels materialized and consumed by training may not be in the same order
    # as the one used in preprocessing. So we need to sort it in both preprocessing
    # and training so the order matches.
    all_labels.sort()
    if not self.label_to_id_map:
      for i, label in enumerate(all_labels):
        label = label.strip()
        if label:
          self.label_to_id_map[label] = i

    # Row format is:
    # image_uri,label_id
    if not element:
      return

    uri = element['image_url']
    label_id = self.label_to_id_map[element['label'].strip()]
    yield uri, label_id

### Convert images to JPEG
Read files from GCS and convert images to JPEG format. We do this even for JPEG images to remove variations such as different number of channels.

In [None]:
class ReadImageAndConvertToJpegDoFn(beam.DoFn):

  def process(self, element):
    
    import cStringIO
    from PIL import Image
    from tensorflow.python.lib.io import file_io as tf_file_io

    uri, label_id = element
    try:
      with tf_file_io.FileIO(uri, 'r') as f:
        img = Image.open(f).convert('RGB')
    # A variety of different calling libraries throw different exceptions here.
    # They all correspond to an unreadable file so we treat them equivalently.
    # pylint: disable broad-except
    except Exception as e:
      logging.exception('Error processing image %s: %s', uri, str(e))
      return

    # Convert to desired format and output.
    output = cStringIO.StringIO()
    img.save(output, 'jpeg')
    image_bytes = output.getvalue()
    yield uri, label_id, image_bytes

### Extract features from images

Embeds image bytes and labels, stores them in tensorflow.Example.

In [None]:
class TFExampleFromImageDoFn(beam.DoFn):
  """Embeds image bytes and labels, stores them in tensorflow.Example.

  (uri, label_ids, image_bytes) -> (tensorflow.Example).

  Output proto contains 'label', 'image_uri' and 'embedding'.
  The 'embedding' is calculated by feeding image into input layer of image
  neural network and reading output of the bottleneck layer of the network.

  Attributes:
    image_graph_uri: an uri to gcs bucket where serialized image graph is
                     stored.
  """
  
  def __init__(self, checkpoint_path):
    import tensorflow as tf
    self._tf = tf
    self._tf_train = tf.train
    self.tf_session = None
    self.graph = None
    self.preprocess_graph = None
    self._checkpoint_path = checkpoint_path

  def start_bundle(self, context=None):
    # There is one tensorflow session per instance of TFExampleFromImageDoFn.
    # The same instance of session is re-used between bundles.
    # Session is closed by the destructor of Session object, which is called
    # when instance of TFExampleFromImageDoFn() is destructed.
    import mltoolbox.image.classification._preprocess as preprocess
    if not self.graph:
      self.graph = self._tf.Graph()
      self.tf_session = self._tf.InteractiveSession(graph=self.graph)
      with self.graph.as_default():
        self.preprocess_graph = preprocess.EmbeddingsGraph(self.tf_session, self._checkpoint_path)

  def finish_bundle(self, context=None):
    if self.tf_session is not None:
      self.tf_session.close()

  def process(self, element):

    def _bytes_feature(value):
      return self._tf_train.Feature(bytes_list=self._tf_train.BytesList(value=value))

    def _float_feature(value):
      return self._tf_train.Feature(float_list=self._tf_train.FloatList(value=value))

    uri, label_id, image_bytes = element

    try:
      embedding = self.preprocess_graph.calculate_embedding(image_bytes)
    except self._tf.errors.InvalidArgumentError as e:
      logging.warning('Could not encode an image from %s: %s', uri, str(e))
      return

    features = self._tf_train.Features(
      feature={
        'image_uri': _bytes_feature([str(uri)]),
        'embedding': _float_feature(embedding.ravel().tolist())
      })
    example = self._tf_train.Example(features=features)
    example.features.feature['label'].int64_list.value.append(label_id)

    yield example

### Split data into training and evaluation sets

In [None]:
class TrainEvalSplitPartitionFn(beam.PartitionFn):
  """Split train and eval data."""
  def partition_for(self, element, num_partitions):
    import random
    return 1 if random.random() > 0.7 else 0

### Save features in TFRecordIO format

In [None]:
class ExampleProtoCoder(beam.coders.Coder):
  """A coder to encode and decode TensorFlow Example objects."""

  def __init__(self):
    import tensorflow as tf
    self._tf_train = tf.train

  def encode(self, example_proto):
    return example_proto.SerializeToString()

  def decode(self, serialized_str):
    example = self._tf_train.Example()
    example.ParseFromString(serialized_str)
    return example


class SaveFeatures(beam.PTransform):
  """Save Features in a TFRecordIO format.
  """

  def __init__(self, file_path_prefix):
    super(SaveFeatures, self).__init__('SaveFeatures')
    self._file_path_prefix = file_path_prefix

  def expand(self, features):
    return (features |
            'Write features' >> tfrecordio.WriteToTFRecord(file_path_prefix=self._file_path_prefix,
                                                           file_name_suffix='.tfrecord.gz',
                                                           coder=ExampleProtoCoder()))

## Create the pipeline

Boilerplate code to create the pipeline

In [None]:
_DEFAULT_CHECKPOINT_GSURL = 'gs://cloud-ml-data/img/flower_photos/inception_v3_2016_08_28.ckpt'

def create_pipeline(train_dataset, output_dir, project, pipeline_option):
  """Create the Dataflow pipeline."""
  import csv

  job_name = ('preprocess-image-classification-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S'))
  options = {
      'staging_location': os.path.join(output_dir, 'tmp', 'staging'),
      'temp_location': os.path.join(output_dir, 'tmp'),
      'job_name': job_name,
      'project': project,
      'teardown_policy': 'TEARDOWN_ALWAYS',
      'no_save_main_session': True
  }
  if pipeline_option is not None:
    options.update(pipeline_option)

  opts = beam.pipeline.PipelineOptions(flags=[], **options)
  p = beam.Pipeline('DataflowRunner', options=opts)
  
  labeled_images = (p |
                    'Read from CSV' >> beam.io.ReadFromText(train_dataset, strip_trailing_newlines=True) |
                    'Create Dict from CSV' >> beam.Map(lambda line: csv.DictReader([line], fieldnames=['image_url', 'label']).next()))  
  labels = (labeled_images |
            'Parse input for labels' >> beam.Map(lambda x: str(x['label'])) |
            'Combine labels' >> beam.transforms.combiners.Count.PerElement() |
            'Get labels' >> beam.Map(lambda label_count: label_count[0]))  
  
  preprocessed = (labeled_images |
                 'Extract label ids' >> beam.ParDo(ExtractLabelIdsDoFn(), beam.pvalue.AsIter(labels)) |
                 'Read and convert to JPEG' >> beam.ParDo(ReadImageAndConvertToJpegDoFn()) |
                 'Embed and make TFExample' >> beam.ParDo(TFExampleFromImageDoFn(_DEFAULT_CHECKPOINT_GSURL)))
  
  train_preprocessed, eval_preprocessed = (preprocessed |
                                           'Random partition' >> beam.Partition(TrainEvalSplitPartitionFn(), 2))  
  
  output_train_path = os.path.join(output_dir, job_name, 'train')
  output_eval_path = os.path.join(output_dir, job_name, 'eval')
  labels_file = os.path.join(output_dir, job_name, 'labels')
  
  labels_save = (labels |
                 'Write labels' >> beam.io.textio.WriteToText(labels_file, shard_name_template=''))
  train_save = (train_preprocessed |
                'Save train to disk' >> SaveFeatures(output_train_path))
  eval_save = (eval_preprocessed |
               'Save eval to disk' >> SaveFeatures(output_eval_path))
  
  output_latest_file = os.path.join(output_dir, 'latest')
  ([eval_save, train_save, labels_save] |
   'Wait for train eval saving' >> beam.Flatten() |
   'Fixed One' >> beam.transforms.combiners.Sample.FixedSizeGlobally(1) |
   beam.Map(lambda path: job_name) |
   'WriteLatest' >> beam.io.textio.WriteToText(output_latest_file, shard_name_template=''))
  
  return p

## Run the pipeline 
Boilerplate code to run the pipeline

In [None]:
_TF_GS_URL = 'gs://cloud-datalab/deploy/tf/tensorflow-1.2.0-cp27-none-linux_x86_64.whl'
_PROTOBUF_GS_URL = 'gs://cloud-datalab/deploy/tf/protobuf-3.1.0-py2.py3-none-any.whl'


def run_pipeline(train_dataset, output_dir, project, pipeline_options=None):
  """Preprocess data in Cloud with DataFlow."""
  import mltoolbox.image.classification._util as util
  from tensorflow.python.lib.io import file_io
  
  tmpdir = tempfile.mkdtemp()
  original_level = logging.getLogger().getEffectiveLevel()
  logging.getLogger().setLevel(logging.ERROR)
  try:
    # Workaround for DataFlow 2.0, which doesn't work well with extra packages in GCS.
    # Remove when the issue is fixed and new version of DataFlow is included in Datalab.
    staging_package_url = util.repackage_to_staging(output_dir)
    extra_packages = [staging_package_url, _TF_GS_URL, _PROTOBUF_GS_URL]
    local_packages = [os.path.join(tmpdir, os.path.basename(p)) for p in extra_packages]
    for source, dest in zip(extra_packages, local_packages):
      file_io.copy(source, dest, overwrite=True)
    if pipeline_options is None:
      additional_options = {}
    else:
      additional_options = dict(pipeline_options)
    additional_options['extra_packages'] = local_packages
    
    p = create_pipeline(train_dataset, output_dir, project, additional_options)
    job_results = p.run()
    job_results.wait_until_finish()
  finally:
    shutil.rmtree(tmpdir)
    logging.getLogger().setLevel(original_level)
    
  dataflow_url = 'https://console.developers.google.com/dataflow?project=%s' % project
  html = 'Job "%s" submitted.' % p.options.get_all_options()['job_name']
  html += '<p>Click <a href="%s" target="_blank">here</a> to track preprocessing job. <br/>' % dataflow_url
  IPython.display.display_html(html, raw=True)
  #return google.datalab.utils.DataflowJob(job_results)


train_set = 'gs://candies-ml/dataset/metadata/train_candies560.csv'
preprocess_job = run_pipeline(train_dataset=train_set,
                              output_dir=preprocess_dir,
                              project=project_id,
                              pipeline_options={'num_workers': 10})
#preprocess_job.wait() # Alternatively, you can query the job status by train_job.state. The wait() call blocks the notebook execution.