# instala Pré-Reqs

In [1]:
%pip install apache-beam[gcp]==2.13.0



# <font color="red"> ATENÇÃO!</font>

Após instalar o beam no colab, pode ser necessário reiniciar o runtime

In [2]:
#%pip install pydot==1.2.4



In [3]:
import apache_beam as beam
print(beam.__version__)

  'Running the Apache Beam SDK on Python 3 is not yet fully supported. '


2.13.0


# Autenticação
Primeiro precisamos autenticar nossa sessão do Colab no Google e definimos o ID do projeto

In [4]:
from google.colab import auth
auth.authenticate_user()
print('Authenticated')

Authenticated


In [0]:
#Informe o id do projeto nesta linha:

project_id = "workshop-5ia-tensorflow-253922" #@param {type:"string"}
BUCKET = "workshop-5ia-tensorflow" #@param {type:"string"}
REGION = "us-east1" #@param {type:"string"}

# Client Big Query

Como vamos buscar dados no bigquery, é necessário criar um cliente BG

In [0]:
# Call BigQuery and examine in dataframe
from google.cloud import bigquery

bgclient = bigquery.Client(project=project_id)

Demais imports:

In [0]:
import matplotlib.pyplot as plt
import seaborn as sns, numpy as np
import sklearn
import datetime, os


#Obtenção dos dados

Vamos preparar a (mesma) query que fizemos na obtenção de dados do Pandas, porém agora vamos usar o Dataflow (que é o Beam) :-)

In [0]:
query = """
SELECT
  weight_pounds * 0.45359237 AS weight_kilos,
  is_male,
  mother_age,
  mother_married,
  plurality,
  gestation_weeks,
  ABS(FARM_FINGERPRINT(CONCAT(CAST(YEAR AS STRING), CAST(month AS STRING)))) AS hashmonth
FROM
  publicdata.samples.natality
WHERE year > 2000
AND is_male IS NOT NULL
AND weight_pounds > 0
AND mother_age > 0
AND gestation_weeks > 0
AND plurality > 0
AND month > 0
"""

In [9]:
df = bgclient.query(query + " LIMIT 100").to_dataframe()
df.shape

(100, 7)

In [10]:
df.head()

Unnamed: 0,weight_kilos,is_male,mother_age,mother_married,plurality,gestation_weeks,hashmonth
0,3.235,True,22,True,1,41,6392072535155213407
1,3.62,False,28,True,1,44,6691862025345277042
2,3.935,False,33,True,1,41,8904940584331855459
3,2.425,True,25,True,1,35,7170969733900686954
4,3.175,False,29,True,1,39,5896567601480310696


# Pré-Processamento

Cria função que converte linha a linha para uma linha CSV (será usada no beam.FlatMap)

In [0]:
def to_csv(rowdict):
  # Pull columns from BQ and create a line
  import hashlib
  import copy
  CSV_COLUMNS = ['weight_kilos', 'is_male', 'mother_age', 'mother_married', 'plurality', 'gestation_weeks'] #Observe que removi a coluna hashmonth

  data = ','.join( [str(rowdict[k]) if k in rowdict else 'None' for k in CSV_COLUMNS])
  key = hashlib.sha224(data.encode('utf-8')).hexdigest()  # e então, adicionei uma nova hash agora por observação!
  yield str('{},{}'.format(data, key))

In [0]:
def preprocess(in_test_mode):
  import shutil, os, subprocess
  job_name = 'preprocess-curso-gcp-ml-tensorflow-beam' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')

  if in_test_mode:
      print('Launching local job ... hang on')
      OUTPUT_DIR = './preproc'
      shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
      os.makedirs(OUTPUT_DIR)
  else:
      print('Launching Dataflow job {} ... hang on'.format(job_name))
      OUTPUT_DIR = 'gs://{0}/preproc/'.format(BUCKET)
      try:
        subprocess.check_call('gsutil -m rm -r {}'.format(OUTPUT_DIR).split())
      except:
        pass

  options = {
      'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
      'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
      'job_name': job_name,
      'project': project_id,
      'region': REGION,
      'teardown_policy': 'TEARDOWN_ALWAYS',
      'no_save_main_session': True,
      'max_num_workers': 6
  }
  opts = beam.pipeline.PipelineOptions(flags = [], **options)
  if in_test_mode:
      RUNNER = 'DirectRunner'
  else:
      RUNNER = 'DataflowRunner'
  p = beam.Pipeline(RUNNER, options = opts)
  query = """
SELECT
  weight_pounds * 0.45359237 AS weight_kilos,
  is_male,
  mother_age,
  mother_married,
  plurality,
  gestation_weeks,
  ABS(FARM_FINGERPRINT(CONCAT(CAST(YEAR AS STRING), CAST(month AS STRING)))) AS hashmonth
FROM
  publicdata.samples.natality
WHERE year > 2000
AND is_male IS NOT NULL
AND weight_pounds > 0
AND mother_age > 0
AND gestation_weeks > 0
AND plurality > 0
AND month > 0
    """

  if in_test_mode:
    query = query + ' LIMIT 100' 

  for step in ['train', 'eval']:
    if step == 'train':
      selquery = 'SELECT * FROM ({}) WHERE MOD(ABS(hashmonth),4) < 3'.format(query)
    else:
      selquery = 'SELECT * FROM ({}) WHERE MOD(ABS(hashmonth),4) = 3'.format(query)

    (p 
     | '{}_read'.format(step) >> beam.io.Read(beam.io.BigQuerySource(query = selquery, use_standard_sql = True))
     | '{}_csv'.format(step) >> beam.FlatMap(to_csv)
     | '{}_out'.format(step) >> beam.io.Write(beam.io.WriteToText(os.path.join(OUTPUT_DIR, '{}.csv'.format(step))))
    )

  job = p.run()
  if in_test_mode:
    job.wait_until_finish()
    print("Done!")

Roda o pré-processamento em teste!

In [16]:
preprocess(in_test_mode = True)

Launching local job ... hang on




Done!


In [17]:
! ls ./preproc

eval.csv-00000-of-00001  train.csv-00000-of-00001


In [18]:
!cat ./preproc/train.csv-00000-of-00001

3.0329999974565545,False,35,False,1,35,89318d9180ff1a80b76211c563f023eee7c84449afb07de56c6370c3
3.203999997313156,False,28,True,1,35,39cf167acd89157314e17d61becef5e80f5055f8c2eb78884a7213a2
2.777999997670395,False,37,True,1,36,ef0dbfbc1e354e2f8f7b84a838c992324a06bc18443b411b8bc6d0a0
3.1229999973810814,False,34,True,1,37,adb7835910f0ebb029a6be1fb6afe2a454bfb92020563b55104b92cd
2.8349999976225955,False,30,True,1,37,7a761f822eadce4a197cadfe7563a5baffc985ada049b08b2a5791e8
3.401999997147115,False,17,True,1,37,f38d03655165b907b87cda22592a97942f1d37bff67448c5555d31e8
3.685999996908955,False,25,True,1,37,750958929ce53851c77033cf451a9c46910835fc193066971b446bf3
2.8349999976225955,False,37,True,1,38,0cda180ad7cd1133ba79a2b1b18f3165f7279045d2e3ae1c985937e3
3.174999997337475,False,34,True,1,38,65ffe521ac60885f96b2f3f260094ee798deeba8f66c3d97412b1b83
2.6369999977886365,False,33,True,1,38,ab2b8e637f1753ea2d9eec6d3c4df8f9990503ecc0f4fcae6bf29fa1
3.0219999974657794,False,19,False,1,38,0802302084d4514

Roda o pré-processamento para valer!
Obs.: Anote o nome e acompanhe no painel da GCP

In [21]:
preprocess(in_test_mode = False)

Launching Dataflow job preprocess-curso-gcp-ml-tensorflow-beam-191003-232544 ... hang on


#Copiar para outro Storage (Bucket)

Opcional para quem quiser os dados já processados

In [0]:
#!gsutil -m cp -r gs://curso-gcp-ml-tensorflow-beam/preproc gs://<teu bucket>/preproc