In [1]:
pip install google-cloud

Collecting google-cloud
  Downloading google_cloud-0.34.0-py2.py3-none-any.whl.metadata (2.7 kB)
Downloading google_cloud-0.34.0-py2.py3-none-any.whl (1.8 kB)
Installing collected packages: google-cloud
Successfully installed google-cloud-0.34.0


In [2]:
pip install Faker

Collecting Faker
  Downloading Faker-30.8.0-py3-none-any.whl.metadata (15 kB)
Downloading Faker-30.8.0-py3-none-any.whl (1.8 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.8 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.3/1.8 MB[0m [31m7.6 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m1.8/1.8 MB[0m [31m27.2 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m19.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: Faker
Successfully installed Faker-30.8.0


In [30]:
# Variables
project = "xxxxxxxx"
my_dataset = "py_dataset20241021"
my_table = "py_table20241021"
src_bucket = "bkt-dev-src20241021"
destination_bucket = "bkt-dev-dst20241021"

In [26]:
from google.cloud import bigquery
from google.cloud import storage
from google.cloud.bigquery import LoadJobConfig
from faker import Faker
import pandas as pd
import csv
import datetime
import time

In [14]:
from google.colab import auth
auth.authenticate_user()

In [20]:
# Crear función para crear tabla en BigQuery
def create_bq():
    client = bigquery.Client(project=project)

    dataset_id = my_dataset
    table_id = my_table

    schema = [
        bigquery.SchemaField("id", "INTEGER"),
        bigquery.SchemaField("name", "STRING"),
        bigquery.SchemaField("email", "STRING"),
        bigquery.SchemaField("description","STRING"),
        bigquery.SchemaField("address","STRING"),
        bigquery.SchemaField("city","STRING"),
        bigquery.SchemaField("state","STRING"),
        bigquery.SchemaField("country","STRING"),
        bigquery.SchemaField("birthdate","DATE"),
        bigquery.SchemaField("password","STRING"),
        bigquery.SchemaField("last_login","TIMESTAMP")
    ]

    table_ref = client.dataset(my_dataset).table(my_table)
    table = bigquery.Table(table_ref, schema=schema)

    try:
        dataset = client.create_dataset(my_dataset)
        print(f"Created dataset {my_dataset}")
    except:
        print(f"Dataset {my_dataset} already exists")

    try:
        table = client.create_table(table)
        print(f"Created table {table_id}")
    except:
        print(f"Table {table_id} already exists")

In [6]:
# Crear función para generar datos en tabla origen BigQuery
def generate_data(num_rows):
    fake = Faker()
    now = datetime.datetime.now()
    filename = "data_" + now.strftime("%Y%m%d%H%M%S") + ".csv"


    with open(filename, 'w', newline='') as csvfile:
        fieldnames = ['id', 'name', 'email', 'description', 'address', 'city', 'state',
                      'country', 'birthdate', 'password', 'last_login']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

        writer.writeheader()
        for i in range(num_rows):
            writer.writerow(
                {
                    'id': fake.random_int(),
                    'name': fake.name(),
                    'email': fake.email(),
                    'description': fake.sentence(),
                    'address': fake.street_address(),
                    'city': fake.city(),
                    'state': fake.state(),
                    'country': fake.country(),
                    'birthdate': fake.date(),
                    'password': fake.password(),
                    'last_login': fake.date_time()
                }
            )
    print (f"Data generated with {num_rows} records")

    client = storage.Client()
    bucket_name = src_bucket
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(filename)
    blob.upload_from_filename(filename)
    print(f"File {filename} Uploaded to {bucket_name}")

    return filename

In [17]:
# Crear función para cargar datos de tabla origen BigQuery
def load_bq(filename):
    client = bigquery.Client(project=project)
    filename = filename
    table_ref = client.dataset(my_dataset).table(my_table)
    job_config = LoadJobConfig()
    job_config.source_format = bigquery.SourceFormat.CSV
    job_config.skip_leading_rows = 1
    job_config.autodetect = True

    uri = f'gs://{src_bucket}/{filename}'
    load_job = client.load_table_from_uri(uri, table_ref, job_config=job_config)
    load_job.result()
    time.sleep(10)
    num_rows = load_job.output_rows
    print(f"{num_rows} rows loaded into {my_table}.")

In [18]:
# Crear función para extraer datos de tabla origen BigQuery hacia tabla destino
def extract_bq():
    client = bigquery.Client(project=project)
    dataset_ref = client.dataset(my_dataset)
    table_ref = dataset_ref.table(my_table)
    job_config = bigquery.ExtractJobConfig()
    job_config.compression = 'GZIP'
    job_config.field_delimiter = ','
    job_config.print_header = False

    now = datetime.datetime.now()
    filename = "bqextract_" + now.strftime("%Y%m%d%H%M%S") + ".csv"
    destination_uri = f'gs://{destination_bucket}/{filename}'

    extract_job = client.extract_table(table_ref, destination_uri, job_config=job_config)

    extract_job.result()

    print('Data extracted from table {} and loaded into GCS bucket {}.'.format(table_ref.path, destination_uri))

In [21]:
# Crear tabla origen en BigQuery
create_bq()

Created dataset py_dataset20241021
Created table py_table20241021


In [15]:
# Generar archivo con datos a cargar
filename = generate_data(500)

Data generated with 500 records
File data_20241021155117.csv Uploaded to bkt-dev-src20241021


In [22]:
# Cargar datos del archivo a tabla origen en BigQuery
load_bq(filename)

500 rows loaded into py_table20241021.


In [31]:
# Consultar datos de tabla origen en BigQuery
client = bigquery.Client(project=project)
query = f"SELECT * FROM `{project}.py_dataset20241021.py_table20241021` LIMIT 1000"
query_job = client.query(query)
df = query_job.to_dataframe()
df.head(50)

Unnamed: 0,id,name,email,description,address,city,state,country,birthdate,password,last_login
0,194,Brandon Vance Jr.,joysmith@example.com,Perform create record move personal home kid.,398 Hall Springs,South Brendaton,Iowa,Mongolia,2003-02-14,8#8g*3@rrN,2019-12-04 05:33:59.786843+00:00
1,6274,Barry Sanders,jonescraig@example.com,Sure sign foot television impact town rule.,74913 Kelly Branch,Hendersonberg,Iowa,Greece,1985-04-05,x9Clz49v(J,1985-02-02 07:28:47.943444+00:00
2,700,Seth Montoya,hallstacey@example.org,Matter find understand.,68553 Williams Bridge,Williamberg,Iowa,Seychelles,1991-01-25,(2Q2^W^xCD,2001-07-15 18:50:39.063167+00:00
3,8625,Katherine Hart,hilltyler@example.org,Career deal deal big start series nothing deal.,9139 Matthew Burg,Littlehaven,Iowa,Pakistan,2001-04-11,)FhL6EvDWz,2006-01-16 18:16:13.369839+00:00
4,7971,Peter Wright,brian94@example.org,I including truth.,70483 Nicholas Corners,Lake Daniel,Iowa,Netherlands Antilles,1996-09-22,6+&2EO&j8_,2001-01-07 15:07:48.122758+00:00
5,2768,Megan Solomon,baileymike@example.net,Dog international help let.,508 Brown Lakes,New Nathan,Iowa,Kiribati,1996-11-28,Tr$8KBr8T^,1998-10-28 21:18:04.315860+00:00
6,2371,Ronald Green,igreene@example.org,Media become have like.,2489 Amy Fork Apt. 559,Lake Shirleyland,Iowa,Philippines,2013-02-11,N)jI3Tf#U+,1992-01-23 05:31:04.166546+00:00
7,9567,Anne Martin,sdavenport@example.org,Mouth citizen community picture shake.,67449 Zhang Ridge Suite 412,Jeffreychester,Ohio,Korea,1979-08-03,7E6TAHIaD_,2012-08-05 01:59:46.753078+00:00
8,5392,Jeffery Bell,ronaldstewart@example.net,Anything before side then.,73190 Wolf Stream Apt. 090,North Michael,Ohio,Italy,1979-05-02,%&Gaf8MwJr,1985-05-16 00:00:05.099731+00:00
9,256,Justin Rivera,tpage@example.com,Company professional drop compare certainly to...,83039 Gabriela Meadows Apt. 030,Sylviaton,Ohio,Sweden,2013-08-27,@4UeRrBhrG,2018-08-29 04:21:34.926075+00:00


In [23]:
# Extraer datos de tabla origen y cargar en bucket destino
extract_bq()

Data extracted from table /projects/bigquerypython-439314/datasets/py_dataset20241021/tables/py_table20241021 and loaded into GCS bucket gs://bkt-dev-dst20241021/bqextract_20241021160354.csv.


In [29]:
# Mostrar archivo creado por la ETL en bucket destino
client = storage.Client()
blobs = client.list_blobs(destination_bucket)

for blob in blobs:
    print(blob.name)

bqextract_20241021160354.csv
