In [1]:
import os
import pandas as pd

from google.cloud import storage
from google.cloud import bigquery
import apache_beam as beam

### Codigos para manipulação do Bucket

In [2]:
def create_bucket(bucket_name):
    storage_client = storage.Client()

    bucket = storage_client.bucket(bucket_name)
    bucket.storage_class = "STANDARD"
    new_bucket = storage_client.create_bucket(bucket, location="us")

    print(
        "Created bucket {} in {} with storage class {}".format(
            new_bucket.name, new_bucket.location, new_bucket.storage_class
        )
    )
    return new_bucket

In [3]:
def delete_bucket(bucket_name):
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)
    
    for i in storage_client.list_blobs(bucket_name): 
        i.delete()
    
    bucket.delete()

    print("Bucket {} deleted".format(bucket.name))

In [4]:
def upload_to_bucket(bucket_name, source_file_name, destination_blob_name):
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    blob.upload_from_filename(source_file_name)

    print(
        "File {} uploaded to {}.".format(
            source_file_name, destination_blob_name
        )
    )

## Codigos de Manipulação BigQuery

In [5]:
def create_dataset_BQ(dataset_id):

    client = bigquery.Client()

    # Construct a full Dataset object to send to the API.
    dataset = bigquery.Dataset(dataset_id)

    # TODO(developer): Specify the geographic location where the dataset should reside.
    dataset.location = "US"

    dataset = client.create_dataset(dataset, timeout=30)  # Make an API request.
    print("Created dataset {}.{}".format(client.project, dataset.dataset_id))

In [6]:
def delete_dataset_BD(dataset_id):
    client = bigquery.Client()
    
    client.delete_dataset(
        dataset_id, delete_contents=True, not_found_ok=True
    )  # Make an API request.

    print("Deleted dataset '{}'.".format(dataset_id))

In [7]:
def delete_table(table_id):
    client = bigquery.Client()
    try:
        job = client.delete_table( table_id )
        job.result()
        print(f'{ table_id } was removed.')
    except:
        print(f'The { table_id } is not exist.')


## Funcões Apache BEAM

In [8]:
class ReadGcsBlobs(beam.DoFn):
    def process(self, element, *args, **kwargs):
        from apache_beam.io.gcp import gcsio
        gcs = gcsio.GcsIO()
        yield (element, gcs.open(element).read())

In [9]:
def create_dataframe(readable_file):
    import io
    import numpy as np
    
    buffer = io.StringIO(readable_file[1].decode())
    df = pd.read_csv(filepath_or_buffer = buffer, header = 0)

    yield df.replace({np.nan:None})

In [10]:
def convert_boss_comp(df):       
    df.unique_feature = df.unique_feature.map(dict(Yes=1, No=0))
    df.groove = df.groove.map(dict(Yes=1, No=0))
    df.orientation = df.orientation.map(dict(Yes=1, No=0))
    
    #.to_dict('records') 
    #.values.tolist() 
    #.T.to_dict().values()
    #qs = df.replace({np.nan:None}).to_dict('records')
    #l= [v for k,v in qs.items()]

    temp = df.to_dict('records')
    yield temp[0]  #funcionou
    
    #print({'items': temp})
    #yield df.replace({np.nan:None}).T.to_dict().values()
    #yield {'items': temp}

In [11]:
def convert_price_quote(df):
    df.bracket_pricing = df.bracket_pricing.map(dict(Yes=1, No=0))
    
    temp = df.to_dict('records')
    
    yield temp[0]

In [12]:
def convert_bill_of_materials(df):
    df = df[['tube_assembly_id']]
    df.reset_index(inplace=True)
    
    temp = df.to_dict('records')
    
    yield temp[0]

In [13]:
def convert_bill_of_materials_to_product_structure(df):
    df8 = df[df.component_id_8.notnull()][['tube_assembly_id', 'component_id_8','quantity_8']].rename(columns={'component_id_8': 'component_id', 'quantity_8' : 'quantity'})
    df7 = df[df.component_id_7.notnull()][['tube_assembly_id', 'component_id_7','quantity_7']].rename(columns={'component_id_7': 'component_id', 'quantity_7' : 'quantity'})
    df6 = df[df.component_id_6.notnull()][['tube_assembly_id', 'component_id_6','quantity_6']].rename(columns={'component_id_6': 'component_id', 'quantity_6' : 'quantity'})
    df5 = df[df.component_id_5.notnull()][['tube_assembly_id', 'component_id_5','quantity_5']].rename(columns={'component_id_5': 'component_id', 'quantity_5' : 'quantity'})
    df4 = df[df.component_id_4.notnull()][['tube_assembly_id', 'component_id_4','quantity_4']].rename(columns={'component_id_4': 'component_id', 'quantity_4' : 'quantity'})
    df3 = df[df.component_id_3.notnull()][['tube_assembly_id', 'component_id_3','quantity_3']].rename(columns={'component_id_3': 'component_id', 'quantity_3' : 'quantity'})
    df2 = df[df.component_id_2.notnull()][['tube_assembly_id', 'component_id_2','quantity_2']].rename(columns={'component_id_2': 'component_id', 'quantity_2' : 'quantity'})
    df1 = df[df.component_id_1.notnull()][['tube_assembly_id', 'component_id_1','quantity_1']].rename(columns={'component_id_1': 'component_id', 'quantity_1' : 'quantity'})
    
    
    df = pd.concat([df1, df2, df3, df4, df5, df6, df7, df8], ignore_index=True)
  
    temp = df.to_dict('records')
    
    yield temp[0]

# Dados do projeto GCP

In [14]:
bucket_name = "fk_dotz_bucket"

PROJECT='teste-dotz-292803' 
dataset = 'dotz'
REGION='us-east1-b'

options = {
      'project': PROJECT,
      'region': REGION,
      'teardown_policy': 'TEARDOWN_ALWAYS',
      'no_save_main_session': True,
  }
    
RUNNER = 'DirectRunner' #DataflowRunner 
opts = beam.pipeline.PipelineOptions(flags = [], **options)


### Criação do Bucket e Upload de arquivos

In [15]:
create_bucket(bucket_name)
upload_to_bucket(bucket_name, f'{ os.getcwd() }/dataset/bill_of_materials.csv', 'dataset/bill_of_materials.csv')
upload_to_bucket(bucket_name, f'{ os.getcwd() }/dataset/comp_boss.csv', 'dataset/comp_boss.csv')
upload_to_bucket(bucket_name, f'{ os.getcwd() }/dataset/price_quote.csv', 'dataset/price_quote.csv')

Created bucket fk_dotz_bucket in US with storage class STANDARD
File E:\Projetos\Jobs\DataEngineer\Dotz/dataset/bill_of_materials.csv uploaded to dataset/bill_of_materials.csv.
File E:\Projetos\Jobs\DataEngineer\Dotz/dataset/comp_boss.csv uploaded to dataset/comp_boss.csv.
File E:\Projetos\Jobs\DataEngineer\Dotz/dataset/price_quote.csv uploaded to dataset/price_quote.csv.


### Criação de um DataSet no BigQuery 

In [16]:
create_dataset_BQ(f'{PROJECT}.{dataset}')

Created dataset teste-dotz-292803.dotz


## PIPELINE Apache Beam

In [17]:
def run_pipeline(origin, table, schema, _function):
    with beam.Pipeline(RUNNER, options = opts) as p:
        files = (p
                 | "Initialize" >> beam.Create([f"gs://{bucket_name}/dataset/{origin}"])
                 | "Read blobs" >> beam.ParDo(ReadGcsBlobs())
                 | "Convert to Pandas " >> beam.FlatMap(create_dataframe)
                 | "convert function " >> beam.FlatMap(_function)
                 | f'create table {table}' >> beam.io.gcp.bigquery.WriteToBigQuery(
                                    table=f'teste-dotz-292803:dotz.{ table }',
                                    schema=schema ,
                                    method="STREAMING_INSERTS",
                                    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
                                ) 
                )

In [18]:
run_pipeline('comp_boss.csv',
             'component',
             'component_id:STRING, component_type_id:STRING, type:STRING, connection_type_id:STRING, outside_shape:STRING, base_type:STRING, height_over_tube:FLOAT, bolt_pattern_long:FLOAT, bolt_pattern_wide:FLOAT,' + \
                'groove:INTEGER, base_diameter:FLOAT, shoulder_diameter:FLOAT, unique_feature:INTEGER, orientation:INTEGER, weight:FLOAT',
             convert_boss_comp)

  experiments = p.options.view_as(DebugOptions).experiments or []


In [21]:
run_pipeline('price_quote.csv',
             'quotation',
             'tube_assembly_id:STRING, supplier:STRING, quote_date:STRING, annual_usage:INTEGER, min_order_quantity:INTEGER, bracket_pricing:INTEGER, quantity:INTEGER, cost:FLOAT',
             convert_boss_comp)



In [22]:
run_pipeline('bill_of_materials.csv',
             'product_structure',
             'tube_assembly_id:STRING, component_id:STRING, quantity:INTEGER',
             convert_bill_of_materials_to_product_structure)



In [23]:
run_pipeline('bill_of_materials.csv',
             'tube_assembly',
             'idx:INTEGER, tube_assembly_id:STRING',
            convert_bill_of_materials)



### Remoção das tabelas e dataset BigQuery

In [24]:
delete_table(f'{ PROJECT }.{ dataset }.component')
delete_table(f'{ PROJECT }.{ dataset }.quotation')
delete_table(f'{ PROJECT }.{ dataset }.tube_assembly')
delete_table(f'{ PROJECT }.{ dataset }.product_structure')
delete_dataset_BD(dataset)

The teste-dotz-292803.dotz.component is not exist.
The teste-dotz-292803.dotz.quotation is not exist.
The teste-dotz-292803.dotz.tube_assembly is not exist.
The teste-dotz-292803.dotz.product_structure is not exist.
Deleted dataset 'dotz'.


### Remoção do Bucket

In [25]:
delete_bucket(bucket_name)

Bucket fk_dotz_bucket deleted
