In [1]:
#SETUP
from google.cloud import bigquery
from google.cloud.bigquery import Row
from google.cloud import storage
from datetime import time, datetime
import sys
import csv

In [2]:
#NOMBRES VARIABLES PARA TABLA TEMPORAL
param_str_afi = "4040"
anio_mes_dia_d = datetime.now().strftime("%Y-%m-%d")
anio_mes_dia_t = datetime.now().strftime("%Y_%m_%d")

In [3]:
#VARIABLES
project_id = "dfa-dna-ws0009-la-prd-d0bb"
dataset_id = "dfa_dna_ws0009_la_prd_sandbox"
dataset_landing_id = "dfa-dna-ws0009-la-prd-landing-zone"
location = "northamerica-northeast1"

#NOMBRES CSV DE SALIDA
input_filename = "IP_INPUT.csv"
tmp_filename = "IP_TMP.csv"
output_filename = f"IP_OUTPUT_{param_str_afi}_{anio_mes_dia_t}"

#NOMBRE DE CARPETAS
input_folder = f"income_predictor/in/"
tmp_folder = f"income_predictor/tmp/"
out_folder = f"income_predictor/out/"

#TABLES NAMES
table_name_query = "uy_efx_score_ip_monthly_hash_la_prd"
table_name_temp_sandbook_1 = f"tmp_input_cliente_ip_{param_str_afi}"
table_name_temp_sandbook_2 = f"tmp_input_cliente_ip_{param_str_afi}_{anio_mes_dia_t}"

In [4]:
#TRAER CSV IN
storage_client = storage.Client()
gcs_bucket = dataset_landing_id

#CREAR TABLA TEMP
schema = [
    bigquery.SchemaField("ID", "STRING", mode = "NULLABLE"),
    bigquery.SchemaField("DOCUMENTO", "STRING", mode = "NULLABLE")
    ]

#TABLA NAME
client = bigquery.Client()
tmp_table_name = project_id + "." + dataset_id + "." + table_name_temp_sandbook_1
table = bigquery.Table(tmp_table_name, schema=schema)
table = client.create_table(table)

#CARGAR CSV AL SCHEMA
job_config = bigquery.LoadJobConfig(schema = schema, skip_leading_rows = 1, source_format = bigquery.SourceFormat.CSV, field_delimiter=";")
uri = f"gs://{gcs_bucket}/{input_folder + input_filename}"
load_job = client.load_table_from_uri(uri, tmp_table_name, job_config = job_config)  
load_job.result()  

destination_table = client.get_table(tmp_table_name)  
print("Loaded {} rows.".format(destination_table.num_rows))

Loaded 599 rows.


In [6]:
#GET TABLE
client = bigquery.Client()
table_id_1 = project_id + "." + dataset_id + "." + table_name_temp_sandbook_1
table_1 = client.get_table(table_id_1)

client = bigquery.Client()
table_id_2 = project_id + "." + dataset_id + "." + table_name_query
table_2 = client.get_table(table_id_2)

sql = f"SELECT  T1.* \
            , T2.cat_ip \
FROM (SELECT documento, ID as id_input FROM {table_id_1}) T1 left join \
        (SELECT documento AS documento, cast(cat_ip as integer) as CAT_IP \
         FROM {table_id_2}) T2 \
ON T1.documento = T2.documento".format(param_str_afi)

#CREAR DATAFRAME
df = client.query(sql).to_dataframe()

#SETUP UPLOAD
client = storage.Client()
bucket = client.get_bucket(dataset_landing_id)

#UPLOAD CSV TO BUCKET TEMP
bucket.blob(input_folder + input_filename).upload_from_string(df.to_csv(index = False, sep = ','), 'text/csv')

##traer de un bucket
storage_client = storage.Client()
gcs_bucket = dataset_landing_id

bucket = storage_client.get_bucket(gcs_bucket)
blob = bucket.get_blob(input_folder + input_filename)
downloaded_blob = blob.download_as_string()

#transformar data
decoded_csv = downloaded_blob.decode('utf-8').splitlines()
reader = csv.reader(decoded_csv, delimiter = ";")
parsed_csv = list(reader)

for row in parsed_csv:
    if row[0] == "documento":
        print(f"Header: {','.join(row)}")
        row.append('param_str_afi,anio_mes_dia_d')
    else:
        row.append(param_str_afi)
        row.append(anio_mes_dia_d)
lista_records = []

for row in parsed_csv:
    lista_records.append(",".join(row))

csv_string = "\n".join(lista_records)

#Guardar CSV enriquecido
blob = bucket.blob(tmp_folder + tmp_filename)
blob.upload_from_string(csv_string)

In [10]:
#TRAER CSV IN
storage_client = storage.Client()
gcs_bucket = dataset_landing_id

#CREAR TABLA TEMP
schema = [
    bigquery.SchemaField("documento", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("id_input", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("cat_ip", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("param_str_afi", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("anio_mes_dia", "DATE", mode="NULLABLE")
    ]

#TABLA NAME
client = bigquery.Client()
tmp_table_name = project_id + "." + dataset_id + "." + table_name_temp_sandbook_2
table = bigquery.Table(tmp_table_name, schema=schema)
table = client.create_table(table)

#CARGAR CSV AL SCHEMA
job_config = bigquery.LoadJobConfig(schema = schema, skip_leading_rows = 1, source_format = bigquery.SourceFormat.CSV)
uri = f"gs://{gcs_bucket}/{tmp_folder + tmp_filename}"
load_job = client.load_table_from_uri(uri, tmp_table_name, job_config=job_config)  
load_job.result()  

destination_table = client.get_table(tmp_table_name)  
print("Loaded {} rows.".format(destination_table.num_rows))

#GET TABLE
client = bigquery.Client()
table_id_1 = project_id + "." + dataset_id + "." + table_name_temp_sandbook_2
table_1 = client.get_table(table_id_1)

sql = f"SELECT * FROM {table_id_1} \
        ORDER BY id_input"

#CREAR DATAFRAME
df = client.query(sql).to_dataframe()

#SETUP UPLOAD
client = storage.Client()
bucket = client.get_bucket(dataset_landing_id)

#UPLOAD CSV TO BUCKET TEMP
bucket.blob(out_folder + output_filename).upload_from_string(df.to_csv(index = False, sep = ','), 'text/csv')

Loaded 599 rows.


In [11]:
#GET TABLE
client = bigquery.Client()
table_id_1 = project_id + "." + dataset_id + "." + table_name_temp_sandbook_2
table_1 = client.get_table(table_id_1)

sql = f"SELECT * FROM {table_id_1} \
        ORDER BY id_input"

#CREAR DATAFRAME
df = client.query(sql).to_dataframe()

#SETUP UPLOAD
client = storage.Client()
bucket = client.get_bucket(dataset_landing_id)

#UPLOAD CSV TO BUCKET TEMP
bucket.blob(out_folder + output_filename).upload_from_string(df.to_csv(index = False, sep = ','), 'text/csv')

In [12]:
#DELETE IN CSV
bucket1 = storage_client.get_bucket(gcs_bucket)
blob1 = bucket1.get_blob(input_folder + input_filename)
blob1.delete()

#DELETE TEMP CSV
bucket = storage_client.get_bucket(gcs_bucket)
blobt = bucket.get_blob(tmp_folder + tmp_filename)
blobt.delete()

print("Deleted csv '{}'.".format(blob1))
print("Deleted csv '{}'.".format(blobt))

Deleted csv '<Blob: dfa-dna-ws0009-la-prd-landing-zone, income_predictor/in/IP_INPUT.csv, 1658434821289656>'.
Deleted csv '<Blob: dfa-dna-ws0009-la-prd-landing-zone, income_predictor/tmp/IP_TMP.csv, 1658434821502655>'.


In [13]:
#BORRAR TABLA EN SANDBOOK
#APLICAR DESPUES DE OCUPAR LA TABLA
client = bigquery.Client()
table_id_1 = (project_id + "." + dataset_id + "." + table_name_temp_sandbook_1)

client = bigquery.Client()
table_id_1 = (project_id + "." + dataset_id + "." + table_name_temp_sandbook_2)

client.delete_table(table_id_1, not_found_ok = True)
client.delete_table(table_id_2, not_found_ok = True)
print("Deleted table '{}'.".format(table_id_1))
print("Deleted table '{}'.".format(table_id_2))

print("Proceso finalizado con exito, revisar csv carpeta income_predictor/out")

Deleted table 'dfa-dna-ws0009-la-prd-d0bb.dfa_dna_ws0009_la_prd_sandbox.tmp_input_cliente_ip_4040_2022_07_21'.
Deleted table 'dfa-dna-ws0009-la-prd-d0bb.dfa_dna_ws0009_la_prd_sandbox.uy_efx_score_ip_monthly_hash_la_prd'.
Proceso finalizado con exito, revisar csv carpeta income_predictor/out
