In [None]:
!pip freeze | grep apache-beam==2.10.0 || pip install apache-beam[gcp]

In [None]:
!pip freeze | grep tensorflow==1.12.0 || pip install tensorflow==1.12.0

In [None]:
import apache_beam as beam
import datetime
import os

In [None]:
PROJECT = 'qwiklabs-gcp-3f19cbba7aa3ae63'
os.environ['PROJECT'] = PROJECT
os.environ['CLOUDSDK_PYTHON'] = 'python3'

In [None]:
%%bash
gcloud config set project $PROJECT

In [None]:
query = {
    'train': """
    SELECT DISTINCT
      bucket_name,
      CONCAT("gs://project-sample/dataset1/", CAST(product_id AS STRING), ".jpeg") as product_id,
      REPLACE(product_name, '\\n', ' ') AS product_name,
      REPLACE(description, '\\n', ' ') AS description
    FROM `qwiklabs-gcp-3f19cbba7aa3ae63.project.raw`
    WHERE
      bucket_name is not null
      and product_id is not null
      and product_name is not null
      and description is not null
      AND MOD(ABS(FARM_FINGERPRINT(CAST(product_id AS STRING))), 1000) < 700
    """,

    'eval': """
    SELECT DISTINCT
      bucket_name,
      CONCAT("gs://project-sample/dataset1/", CAST(product_id AS STRING), ".jpeg") as product_id,
      REPLACE(product_name, '\\n', ' ') AS product_name,
      REPLACE(description, '\\n', ' ') AS description
    FROM `qwiklabs-gcp-3f19cbba7aa3ae63.project.raw`
    WHERE
      bucket_name is not null
      and product_id is not null
      and product_name is not null
      and description is not null
      AND MOD(ABS(FARM_FINGERPRINT(CAST(product_id AS STRING))), 1000) >= 700 AND MOD(ABS(FARM_FINGERPRINT(CAST(product_id AS STRING))), 1000) < 900
    """,

    'test': """
    SELECT DISTINCT
      bucket_name,
      CONCAT("gs://project-sample/dataset1/", CAST(product_id AS STRING), ".jpeg") as product_id,
      REPLACE(product_name, '\\n', ' ') AS product_name,
      REPLACE(description, '\\n', ' ') AS description
    FROM `qwiklabs-gcp-3f19cbba7aa3ae63.project.raw`
    WHERE
      bucket_name is not null
      and product_id is not null
      and product_name is not null
      and description is not null
      AND MOD(ABS(FARM_FINGERPRINT(CAST(product_id AS STRING))), 1000) >= 900
    """,
}

In [None]:
import logging
from apache_beam.io.gcp.bigquery import BigQueryDisposition

# This will clean and validate the rows
class CleanRow(beam.DoFn):
  def process(self, element):
    def rm_whitespaces(v):
        return v.replace('\n', ' ').replace('\r', ' ')
    
    def img_exists(url):
        from apache_beam.io.gcp.gcsio import GcsIO
        return GcsIO().exists(path=url)
    
    # removing special characters from fields
    element['description'] = rm_whitespaces(element['description'])
    element['product_name'] = rm_whitespaces(element['product_name'])
    
    # making sure the image exists
    if not img_exists(element['product_id']):
        return
    
    yield element


def preprocess(runner):
  job_name = 'test-dataset-cleaning' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')
  print('Launching Dataflow job {} ... hang on'.format(job_name))

  options = {
    'staging_location': 'gs://project-sample/out/tmp/staging',
    'temp_location': 'gs://project-sample/out/tmp',
    'job_name': job_name,
    #'requirements_file': 'requirements_cleaning.txt',
    'project': PROJECT,
    'runner': runner,
  }
  
  #instantiate PipelineOptions object using options dictionary
  opts = beam.pipeline.PipelineOptions(flags=[], **options)

  #instantantiate Pipeline object using PipelineOptions
  with beam.Pipeline(options=opts) as p:
    for mode in ['train', 'eval', 'test']:
        q = query[mode]
        (
          p | 'read_{}'.format(mode) >> beam.io.Read(beam.io.BigQuerySource(query=q, use_standard_sql=True))
            | 'clean_{}'.format(mode) >> beam.ParDo(CleanRow())
            | 'write_{}'.format(mode) >> beam.io.Write(beam.io.BigQuerySink(
                write_disposition=BigQueryDisposition.WRITE_TRUNCATE,
                project='qwiklabs-gcp-3f19cbba7aa3ae63',
                dataset='project',
                table='model_dataset1_{}'.format(mode)))
        )
  print("Done")

In [None]:
preprocess("DirectRunner")

In [None]:
preprocess("DataflowRunner")