![alt text](https://drive.google.com/uc?export=view&id=1hPOPZSkTxtaHBtSGdw0BgDXa-Prl7VRu)

In [6]:
!pip install apache-beam[gcp,interactive]



![alt text](https://drive.google.com/uc?export=view&id=1tc7bmOPk7svpPP0jeYgctyHfyQUriYux)

#Schemas e Headers

In [7]:
SCHEMAS = {
  'components': '''[ { "name": "component_id", "type": "STRING", "mode": "REQUIRED" }, { "name": "component_type_id", "type": "STRING", "mode": "REQUIRED" }, { "name": "type", "type": "STRING", "mode": "NULLABLE" }, { "name": "connection_type_id", "type": "STRING", "mode": "NULLABLE" }, { "name": "outside_shape", "type": "STRING", "mode": "NULLABLE" }, { "name": "base_type", "type": "STRING", "mode": "NULLABLE" }, { "name": "height_over_tube", "type": "FLOAT", "mode": "REQUIRED" }, { "name": "bolt_pattern_long", "type": "FLOAT", "mode": "NULLABLE" }, { "name": "bolt_pattern_wide", "type": "FLOAT", "mode": "NULLABLE" }, { "name": "groove", "type": "BOOLEAN", "mode": "REQUIRED" }, { "name": "base_diameter", "type": "FLOAT", "mode": "NULLABLE" }, { "name": "shoulder_diameter", "type": "FLOAT", "mode": "NULLABLE" }, { "name": "unique_feature", "type": "BOOLEAN", "mode": "REQUIRED" }, { "name": "orientation", "type": "BOOLEAN", "mode": "REQUIRED" }, { "name": "weight", "type": "FLOAT", "mode": "NULLABLE" } ]''',
  'materials': '''[ { "name": "tube_assembly_id", "type": "STRING", "mode": "REQUIRED" }, { "name": "component_id_1", "type": "STRING", "mode": "NULLABLE" }, { "name": "quantity_1", "type": "STRING", "mode": "NULLABLE" }, { "name": "component_id_2", "type": "STRING", "mode": "NULLABLE" }, { "name": "quantity_2", "type": "STRING", "mode": "NULLABLE" }, { "name": "component_id_3", "type": "STRING", "mode": "NULLABLE" }, { "name": "quantity_3", "type": "STRING", "mode": "NULLABLE" }, { "name": "component_id_4", "type": "STRING", "mode": "NULLABLE" }, { "name": "quantity_4", "type": "STRING", "mode": "NULLABLE" }, { "name": "component_id_5", "type": "STRING", "mode": "NULLABLE" }, { "name": "quantity_5", "type": "STRING", "mode": "NULLABLE" }, { "name": "component_id_6", "type": "STRING", "mode": "NULLABLE" }, { "name": "quantity_6", "type": "STRING", "mode": "NULLABLE" }, { "name": "component_id_7", "type": "STRING", "mode": "NULLABLE" }, { "name": "quantity_7", "type": "STRING", "mode": "NULLABLE" }, { "name": "component_id_8", "type": "STRING", "mode": "NULLABLE" }, { "name": "quantity_8", "type": "STRING", "mode": "NULLABLE" } ]''',
  'pricing': '''[ { "name": "tube_assembly_id", "type": "STRING", "mode": "REQUIRED" }, { "name": "supplier", "type": "STRING", "mode": "REQUIRED" }, { "name": "quote_date", "type": "DATE", "mode": "REQUIRED" }, { "name": "annual_usage", "type": "INTEGER", "mode": "REQUIRED" }, { "name": "min_order_quantity", "type": "INTEGER", "mode": "REQUIRED" }, { "name": "bracket_pricing", "type": "BOOLEAN", "mode": "REQUIRED" }, { "name": "quantity", "type": "INTEGER", "mode": "REQUIRED" }, { "name": "cost", "type": "FLOAT", "mode": "REQUIRED" } ]'''
}

HEADERS = {
  'comp_boss': 'component_id,component_type_id,type,connection_type_id,outside_shape,base_type,height_over_tube,bolt_pattern_long,bolt_pattern_wide,groove,base_diameter,shoulder_diameter,unique_feature,orientation,weight',
  'bill_of_materials': 'tube_assembly_id,component_id_1,quantity_1,component_id_2,quantity_2,component_id_3,quantity_3,component_id_4,quantity_4,component_id_5,quantity_5,component_id_6,quantity_6,component_id_7,quantity_7,component_id_8,quantity_8',
  'price_quote': 'tube_assembly_id,supplier,quote_date,annual_usage,min_order_quantity,bracket_pricing,quantity,cost'
}

print('SCHEMAS and HEADERS defined')

SCHEMAS and HEADERS defined


![alt text](https://drive.google.com/uc?export=view&id=1De83HtzcxPuZfCdDwlgx-e8Fd_r19ZXv)

#Autenticação

In [8]:
from google.colab import auth
auth.authenticate_user()
print('Authenticated')

Authenticated


![alt text](https://drive.google.com/uc?export=view&id=1YgdPyZw3TmEwFvmtXStYLec-KWvnEtAW)

#Definição do pipeline

In [9]:
from datetime import datetime as dt
import json
import time

import apache_beam as beam
from apache_beam.io import BigQueryDisposition
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.io import WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

BASE_PIPELINE_ARGS = [
    '--runner=DirectRunner',
    '--project=modular-aileron-191222',
    '--staging_location=gs://treinamento-load/staging',
    '--temp_location=gs://treinamento-load/temp'
]

CSV_DELIMITER = ','


def get_header(source):
    header = HEADERS[source]
    return header


def get_schema(name):
    def type2func(_type):
        if _type == 'STRING':
            return lambda x: x
        if _type == 'INTEGER':
            return lambda x: int(x)
        if _type == 'FLOAT':
            return lambda x: float(x)
        if _type == 'BOOLEAN':
            return lambda x: x.lower() == 'yes'
        if _type == 'DATE':
            return lambda x: dt.strptime(x, '%Y-%m-%d').strftime('%Y-%m-%d')

    bq_schema = json.loads(SCHEMAS[name])

    schema = dict()
    for obj in bq_schema:
        schema[obj['name']] = {
            'func': type2func(obj['type']),
            'required': True if obj['mode'] == 'REQUIRED' else False
        }

    return (bq_schema, schema)


def csv2json(fields):
    def _internal(line):
        splitted_line = line.split(CSV_DELIMITER)
        return { key:value for (key, value) in zip(fields, splitted_line) }

    return _internal


def process(schema):
    def _convert(func, value):
        try:
            return func(value)
        except:
            return None

    def _internal(obj):
        proc_obj = dict()

        for (key, value) in obj.items():
            proc_value = None if value == 'NA' else value
            if proc_value is not None:
                func = schema[key]['func']
                proc_value = _convert(func, proc_value)

            if schema[key]['required'] and proc_value is None:
                return None

            proc_obj[key] = proc_value

        return proc_obj

    return _internal


def run_pipeline(source, target):
    header = get_header(source)
    fields = header.split(CSV_DELIMITER)
    
    (bq_schema, schema) = get_schema(target)

    input_path = 'gs://treinamento-load/raw/{}.csv'.format(source)
    output_path = 'gs://treinamento-load/processed/{}.json/part'.format(target)

    pipeline_args = [
        '--job_name={}-{}'.format(target, str(time.time()).replace('.', '-')),
        '--input={}'.format(input_path),
        '--output={}'.format(output_path),
        '--service_account_email=341551830954-compute@developer.gserviceaccount.com'
    ]

    pipeline_args.extend(BASE_PIPELINE_ARGS)

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True

    with beam.Pipeline(options=pipeline_options) as pipeline:
        lines = pipeline | ReadFromText(input_path)
        #Ler linhas do arquivo no bucket

        lines = lines | 'RemoveHeader' >> beam.Filter(lambda line: line != header)
        #Remove primeira linha do arquivo (header)

        objs = lines | 'CSV2JSON' >> beam.Map(csv2json(fields))
        #transforma linha do arquivo em json

        proc_objs = objs | 'ProcessJSONs' >> beam.Map(process(schema))
        #processa a linha junto com schema para o bigquery

        filtered_proc_objs = proc_objs | 'FilterEmpties' >> beam.Filter(lambda x: x)
        #limpa linhas vazias ou em branco

        filtered_proc_objs | WriteToBigQuery(
            'modular-aileron-191222:tubulation.{}'.format(target),
            write_disposition=BigQueryDisposition.WRITE_TRUNCATE,
            create_disposition=BigQueryDisposition.CREATE_NEVER
        )
  # quando for escrever no BQ ele solicita a forma de escrita, eu não quero criar a tabela e se tiver dados truncar

print('Pipeline defined')

Pipeline defined


![alt text](https://i.stack.imgur.com/yrC5W.png)

#invocação do pipeline (execução de fato)

#Parte de cima é como se tivesse criado uma função tipo um DEF, agora na parte de baixo é a execução dessa "função"

In [10]:
import os

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/content/modular-aileron-191222-3a3c28d2079c.json'

PROCESSES = {
    'components': {
        'source': 'comp_boss',
        'target': 'components'
    },
    'materials': {
        'source': 'bill_of_materials',
        'target': 'materials'
    },
    'pricing': {
        'source': 'price_quote',
        'target': 'pricing'
    }
}

for k, v in PROCESSES.items():
  print('Importing: {}'.format(k))
  run_pipeline(v['source'], v['target'])

Importing: components


  experiments = p.options.view_as(DebugOptions).experiments or []


Importing: materials
Importing: pricing


![alt text](https://engineering.fb.com/wp-content/uploads/2019/11/Data_PortabilityPrivacy_BANNER_003.gif)

![alt text](https://drive.google.com/uc?export=view&id=1TlSkiGeCHvCRvoDG6UgwZzszp9P9Ohjl) #Insert no BQ