In [4]:
from google.cloud import bigquery
from google.oauth2 import service_account
from google.cloud.bigquery import LoadJobConfig
from pandas_gbq import to_gbq
import pandas as pd

import re
import json
import os

from google_auth_oauthlib import flow

#### funciones:

- **credentials = service_account.Credentials.from_service_account_file(bq_credentials, scopes=scopes)**
- **client = bigquery.Client(credentials=credentials, project=credentials.project_id)**   --> para establecer la conexion
---
- **client.dataset(dataset_id)**  --> para referenciar el data set
- **dataset = client.create_dataset('us_states_dataset')** --> sino existe el dataset lo podemos crear
- **client.table(table_id)**   --> para referenciar la tabla
- **table = client.create_table(table)** --> sino existe la tabla la podemos crear
- o referenciarla de golpe:
- **table = client.dataset(dataset_id).table(table_id)** --> para referenciar el dataset y la tabla en conjunto
- en el caso de que si que exista:
- **table = client.get_table(table)** --> obtenemos la tabla de BigQuery
---
- **job_config = bigquery.job.QueryJobConfig(use_query_cache=False)**   --> para modificar configuraciones
- **job_config = bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE")**  --> para modificar configuraciones
- **job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON** --> para seguir añadiendo modificaciones configuraciones
---
- **client.query(query, job_config=job_config)** --> para ejecutar una query
---
- **client.load_table_from_uri(gcs_uri, table, job_config=job_config)** --> para cargar una tabla a BQ desde un bucket de GCS
---
- CREACION DE TABLA
- - **table = client.dataset(dataset_id).table(table_id)** --> referenciamos la tabla que deberia existir en BigQuery
  - 0º COMPROBAMOS  a ver si existe **client.get_table(table)** --> obtenemos la tabla de BigQuery. SINO EXISTE LA TENDRIAMOS QUE CREAR:
  - 1º DEFINIMOS el schema --> **schema = [bigquery.SchemaField("cars", "STRING", mode="NULLABLE"), bigquery.SchemaField("mpg", "FLOAT", mode="REQUIRED"),  ...]**
  - 2º DEFINIMOS la tabla --> **table = bigquery.Table(table, schema=schema)**
  - 3º CREAMOS la tabla --> **table = client.create_table(table)**
  - también se podría saltar el paso 1 y 2 y configurar el schema en el jobconfig
  - - **job_config = job_config.schema = [bigquery.SchemaField('name', 'STRING'), bigquery.SchemaField('post_abbr','STRING')]**
    - **table = client.create_table(table, job_config=job_config)**
---
- INSERCION DE VALORES (debajo de los ya existentes) (partimos de que ya existe)
- - 0º OBTENEMOS la tabla **table = client.get_table(table)**
  - 1º CARGAMOS DATOS (desde un DF en este caso) **job = client.load_table_from_dataframe(DF, table)**
--- 
- SOBREESCRIBIR VALORES
- - 0º OBTENEMOS la tabla **table = client.get_table(table)**
  - 1º CAMBIAMOS la configuración **job_config = bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE")**
  - 2º   CARGAMOS DATOS (desde un DF en este caso) **job = client.load_table_from_dataframe(DF, table, job_config=job_config)** rage

#### CREAR TABLA E INSERTAR DATOS

#### configuración

In [39]:
# variables de configuración
env = "DEV"
config_file = open('../config/general_config_' + env + '.json')
config_file = json.load(config_file)
##BigQuery
scopes = config_file['bigquery']['scopes']
bq_credentials = config_file['bigquery']['bq_credentials']
project_id = config_file['bigquery']['project_id']
dataset_id = config_file['bigquery']['dataset_id']
table_id = config_file['bigquery']['table_id']
csv_path = config_file['bigquery']['csv_path']
insert_method = config_file['bigquery']['insert_method']

In [45]:
#CONEXION
def bq_client(scopes, bq_credentials, project_id):
    try:
        credentials = service_account.Credentials.from_service_account_file(bq_credentials, scopes=scopes)
        client = bigquery.Client(credentials=credentials, project=credentials.project_id)
        return client
    except Exception as e:
        print("Big query local conection wrong: " + str(e) + " try virtual conexion")
        #La VM tiene que tener los permisos adecuados --> dárselos como user o descargando el archivo JSON de la cuenta de servicio y colocándolo en la VM (export GOOGLE_APPLICATION_CREDENTIALS="/ruta/a/tu/credencial.json")
        client = bigquery.Client()
        return client

client = bq_client(scopes, bq_credentials, project_id)

In [7]:
#REFERENCIAR LA TABLA
def bq_table_ref(client,dataset_id,table_id):
    try:
        table_ref = client.dataset(dataset_id).table(table_id)
        return table_ref
    except Exception as e:
        print("Error in Bigquery table reference: " + str(e))

table = bq_table_ref(client,dataset_id,table_id)

In [43]:
#CSV a un DataFrame de Pandas
def bq_csv_to_pandas(csv_path):
    try:
        bigquery_csv = pd.read_csv(csv_path, sep=",", index_col=False)
        return bigquery_csv
    except Exception as e:
        print("Error importing the csv file" + str(e))

bq_csv_df = bq_csv_to_pandas(csv_path)

##### a) SQL --> client.query(query, job_config=job_config)

In [9]:
#correr después de crear la query:
def bq_query_execution(client, query, dataset_id, table_id, table):
    try:
        query_ejecutada = client.query(query)
        resultado_query = query_ejecutada.result()

        if "CREATE TABLE" in query:
            return("Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id))
        elif "INSERT" in query:
            return("Values inserted into {}.{}.{}".format(table.project, table.dataset_id, table.table_id))
        elif "TRUNCATE TABLE" in query:
            return("TRUNCATE {}.{}.{}".format(table.project, table.dataset_id, table.table_id))
        else:
            pass
        
    except Exception as e:
        print(e)

###### 1.A crear la tabla si no existe

In [10]:
#CREAR ESQUEMA DESDE EL CSV 

def bq_schema_for_query(bq_csv_df):
    schema_for_query = {}

    # Itera sobre las columnas y determina el tipo de datos
    try:
        for column in bq_csv_df.columns:
            # Infiere el tipo de datos basándose en los tipos de pandas
            if pd.api.types.is_string_dtype(bq_csv_df[column]):
                schema_for_query[column] = 'STRING'
            elif pd.api.types.is_numeric_dtype(bq_csv_df[column]):
                if pd.api.types.is_float_dtype(bq_csv_df[column]):
                    schema_for_query[column] = 'FLOAT64'
                else:
                    schema_for_query[column] = 'INT64'
            else:
                schema_for_query[column] = 'STRING'
                
        schema_for_query = re.sub("'|\{|\}|\[|\]|\:","",str(schema_for_query))
        #schema_for_query = re.sub("FLOAT","FLOAT64",str(schema_for_query))
        #schema_for_query = re.sub("INTEGER","INT64",str(schema_for_query))

        return schema_for_query
        
    except Exception as e:
        print(e)

bq_schema_for_query = bq_schema_for_query(bq_csv_df)

In [13]:
def bq_schema_for_query2(bq_csv_df):
    try:
        return re.sub("'|\{|\}|\[|\]|\:|dtype|\(|\)","",str(bq_csv_df.dtypes.replace("O","string").to_dict())) #en minúscula
        #return re.sub("'|\{|\}|\[|\]|\:|dtype|\(|\)","",str({key: str(values).upper() for key, values in bq_csv_df.dtypes.replace("O","string").items()})) #en mayúscula
        
    except Exception as e:
        print(e)  

bq_schema_for_query2 = bq_schema_for_query2(bq_csv_df)

In [14]:
#crear la tabla si no existe --> https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language
def bq_query_create_table(dataset_id,table_id,bq_schema_for_query):
    query = """CREATE TABLE IF NOT EXISTS `{}.{}`({});""".format(dataset_id,table_id,bq_schema_for_query)
    return query
    
    #PARTITION BY fecha
    #OPTIONS(
    #    partition_expiration_days=100,
    #    description="a table partitioned by fecha");

bq_query_create_table = bq_query_create_table(dataset_id,table_id,bq_schema_for_query)

In [15]:
#ejecución de la query
bq_query_execution(client, bq_query_create_table, dataset_id, table_id, table)

'Created table curso-bigquery-mide-403114.chicago_taxi_tips.cars_upload_test'

###### 2.A INSERTAR datos del csv en la tabla debajo de los ya existentes

In [16]:
def bq_query_insert_values(bq_csv_df,dataset_id,table_id):
    #crear columnas
    columns_for_query = list(bq_csv_df.columns)
    columns_for_query = re.sub("'|\{|\}|\[|\]|\:","",str(columns_for_query))

    #crear valores
    values_for_query = str([tuple(i_value) for i_value in bq_csv_df.values.tolist()])
    values_for_query = re.sub("\[|\]","",str(values_for_query))

    #crear query
    query = """INSERT `{}.{}`({}) VALUES {}""".format(dataset_id,table_id,columns_for_query,values_for_query)
    return query

bq_query_insert_values = bq_query_insert_values(bq_csv_df,dataset_id,table_id)

In [17]:
bq_query_execution(client, bq_query_insert_values, dataset_id, table_id, table)

403 Billing has not been enabled for this project. Enable billing at https://console.cloud.google.com/billing. DML queries are not allowed in the free tier. Set up a billing account to remove this restriction.

Location: europe-southwest1
Job ID: 77576816-80c9-4bac-b40e-70ac0680387a



###### * Si queremos eliminar los datos antes de insertarlos tendríamos que hacer un TRUNCATE

In [18]:
def bq_query_truncate_table(dataset_id,table_id):
    query = """TRUNCATE TABLE `{}.{}`""".format(dataset_id,table_id)
    return query

bq_query_truncate_table = bq_query_truncate_table(dataset_id,table_id)

In [19]:
bq_query_execution(client, bq_query_truncate_table, dataset_id, table_id, table)

403 Billing has not been enabled for this project. Enable billing at https://console.cloud.google.com/billing. DML queries are not allowed in the free tier. Set up a billing account to remove this restriction.

Location: europe-southwest1
Job ID: 2a3626b1-5e82-421e-acc1-1dcfb7414ddd



##### b) python
https://cloud.google.com/bigquery/docs/tables?hl=es-419
https://github.com/googleapis/python-bigquery/blob/35627d145a41d57768f19d4392ef235928e00f72/samples/create_table_range_partitioned.py

###### 1.A crear la tabla a mano si no existe

In [20]:
def bq_python_create_table_static(client, table, dataset_id, table_id):
    try:
        client.get_table(table)
        print(f"La tabla {table_id} ya existe en el conjunto de datos {dataset_id}.")
    except Exception as e:
        if "Not found" in str(e):
            print(f"La tabla {table_id} no existe en el conjunto de datos {dataset_id}.")
            # Crea la tabla si no existe
            schema = [
                bigquery.SchemaField("cars", "STRING", mode="NULLABLE"),
                bigquery.SchemaField("mpg", "FLOAT", mode="REQUIRED"),
                bigquery.SchemaField("cyl", "INTEGER", mode="NULLABLE"),
                bigquery.SchemaField("disp", "FLOAT", mode="NULLABLE"),
                bigquery.SchemaField("hp", "INTEGER", mode="NULLABLE"),
                bigquery.SchemaField("drat", "FLOAT", mode="NULLABLE"),
                bigquery.SchemaField("wt", "FLOAT", mode="NULLABLE"),
                bigquery.SchemaField("qsec", "FLOAT", mode="NULLABLE"),
                bigquery.SchemaField("vs", "INTEGER", mode="NULLABLE"),
                bigquery.SchemaField("am", "INTEGER", mode="NULLABLE"),
                bigquery.SchemaField("gear", "INTEGER", mode="NULLABLE"),
                bigquery.SchemaField("carb", "INTEGER", mode="NULLABLE")
            ]
            table = bigquery.Table(table, schema=schema)
    
            #para crear tabla particionada
            #table.time_partitioning = bigquery.TimePartitioning(
            #    type_=bigquery.TimePartitioningType.DAY,
            #    field="date",  # name of column to use for partitioning
            #    expiration_ms=1000 * 60 * 60 * 24 * 90,
            #)  # 90 days
            
            table = client.create_table(table)
            return("Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id))
        else:
            print(f"Error: {e}")
            
bq_python_create_table_static(client, table, dataset_id, table_id)

La tabla cars_upload_test ya existe en el conjunto de datos chicago_taxi_tips.


###### 1.B crear la tabla leyendo el schema del csv

In [22]:
def bq_python_create_table_dynamic(client,bq_csv_df, table, dataset_id, table_id):

    # Crea un diccionario para almacenar el esquema inferido
    schema = {}
    
    # Itera sobre las columnas y determina el tipo de datos
    try:
        for column in bq_csv_df.columns:
            # Infiera el tipo de datos basándose en los tipos de pandas
            if pd.api.types.is_string_dtype(bq_csv_df[column]):
                schema[column] = 'STRING'
            elif pd.api.types.is_numeric_dtype(bq_csv_df[column]):
                if pd.api.types.is_float_dtype(bq_csv_df[column]):
                    schema[column] = 'FLOAT'
                else:
                    schema[column] = 'INTEGER'
            else:
                schema[column] = 'STRING'  # Puedes ajustar según sea necesario
    except Exception as e:
        print(e)
    
    # iterar este esquema para crear el formato que BQ necesita en un listado (ejemplo mas arriba en la creacion a mano del listado)
    try:
        schema2=[bigquery.SchemaField(keyx, valuex, mode="NULLABLE") for keyx,valuex in schema.items()]
    except Exception as e:
        print(e)
        
    try:
        client.get_table(table)
        print(f"La tabla {table_id} ya existe en el conjunto de datos {dataset_id}.")
    except Exception as e:
        if "Not found" in str(e):
            print(f"La tabla {table_id} no existe en el conjunto de datos {dataset_id}.")
            
            # Crea la tabla si no existe
            table = bigquery.Table(table, schema=schema2)
    
            #para crear tabla particionada
            #table.time_partitioning = bigquery.TimePartitioning(
            #    type_=bigquery.TimePartitioningType.DAY,
            #    field="date",  # name of column to use for partitioning
            #    expiration_ms=1000 * 60 * 60 * 24 * 90,
            #)  # 90 days
            
            table = client.create_table(table)
            return("Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id))

bq_python_create_table_dynamic(client,bq_csv_df, table, dataset_id, table_id)

La tabla cars_upload_test no existe en el conjunto de datos chicago_taxi_tips.


'Created table curso-bigquery-mide-403114.chicago_taxi_tips.cars_upload_test'

###### 2.A INSERTAR datos del csv en la tabla debajo de los ya existentes

In [27]:
def bq_python_insert_values(client, bq_csv_df, table, dataset_id, table_id):
    try:
        table = client.get_table(table)
        print(f"La tabla {table_id} existe en el conjunto de datos {dataset_id}.")
    
        job = client.load_table_from_dataframe(bq_csv_df, table)
        job.result()  # Espera a que se complete la carga
    
        return(f"Datos del CSV cargados en la tabla {table_id} de BigQuery.")
    except:
        print(f"La tabla {table_id} NO existe en el conjunto de datos {dataset_id}, hay que crearla")

bq_python_insert_values(client, bq_csv_df, table, dataset_id, table_id)

La tabla cars_upload_test existe en el conjunto de datos chicago_taxi_tips.


'Datos del CSV cargados en la tabla cars_upload_test de BigQuery.'

###### 2.B REEMPLAZAR datos ya existentes en la tabla por los del csv

In [25]:
def bq_python_truncate_table(client, bq_csv_df, table, dataset_id, table_id):
    try:
        table = client.get_table(table)
        print(f"La tabla {table_id} existe en el conjunto de datos {dataset_id}.")

        #truncate
        job_config = bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE")

        #insertar datos
        job = client.load_table_from_dataframe(bq_csv_df, table, job_config=job_config)
        job.result()  # Espera a que se complete la carga
    
        return(f"Datos del CSV reemplazaron los existentes en la tabla {table_id} de BigQuery.")
    
    except:
        print(f"La tabla {table_id} NO existe en el conjunto de datos {dataset_id}, hay que crearla")

bq_python_truncate_table(client, bq_csv_df, table, dataset_id, table_id)

La tabla cars_upload_test existe en el conjunto de datos chicago_taxi_tips.


'Datos del CSV reemplazaron los existentes en la tabla cars_upload_test de BigQuery.'

##### c) pandas_gbq

**DataFrame.to_gbq(*destination_table, project_id=None, chunksize=None, reauth=False, if_exists='fail', auth_local_webserver=True, table_schema=None, location=None, progress_bar=True, credentials=None*)**

[DODUMENTACIÓN OFICIAL](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_gbq.html)

**if_exists**: str, default ‘fail’

--> Behavior when the destination table exists. Value can be one of:

- **'fail'**
If table exists raise pandas_gbq.gbq.TableCreationError.

- **'replace'**
If table exists, drop it, recreate it, and insert data.

- **'append'**
If table exists, insert data. Create if does not exist.

In [40]:
def bq_python_to_gbp_create_and_replace_values(client, bq_csv_df, project_id, dataset_id, table_id, insert_method):     
    try:
        to_gbq(bq_csv_df, f'{project_id}.{dataset_id}.{table_id}', project_id=project_id, if_exists=insert_method)
        return f"La tabla {table_id} ha sido creada y los datos del CSV han sido cargados en BigQuery."
    except Exception as e:
        print(f"Error: {e}")

In [42]:
bq_python_to_gbp_create_and_replace_values(client, bq_csv_df, project_id, dataset_id, table_id, insert_method)

100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<?, ?it/s]


'La tabla cars_upload_test ha sido creada y los datos del CSV han sido cargados en BigQuery.'