In [1]:
import pandas as pd
from google.cloud import storage, bigquery
from google.oauth2 import service_account
from io import BytesIO
import psycopg2
import logging
import os
import pandas as pd
from datetime import datetime, timedelta
from google.cloud.bigquery.job import ExtractJobConfig
from sqlalchemy import create_engine, inspect, MetaData, text
from google.cloud.exceptions import GoogleCloudError
from google.api_core.exceptions import GoogleAPIError, NotFound, Forbidden

In [2]:
credentialsPath = "C:/Users/AU TRADERS/Downloads/noman-gcu-iot-jan24-0a4c8b2033d6.json"
bq_credentials = service_account.Credentials.from_service_account_file(credentialsPath)
gcs_credentials = service_account.Credentials.from_service_account_file(credentialsPath)
project_id = 'noman-gcu-iot-jan24'
destination_uri = 'gs://partitioned_table'
bucket_name = 'partitioned_table'
dataset_id = 'BikeStore'
full_dataset_id = 'noman-gcu-iot-jan24.BikeStore'

In [3]:
pwd = "test123"
uid = "python"
pg_database = 'Partitioned_Tables'

postgres_uri = f"postgresql+psycopg2://{uid}:{pwd}@localhost:5432/{pg_database}"
postgres_engine = create_engine(postgres_uri)

In [4]:
def initialize_clients(bq_creds, gcs_creds, proj_id):
    try:
        storage_client = storage.Client(credentials=gcs_creds, project=proj_id)
        bigquery_client = bigquery.Client(credentials=bq_creds, project=proj_id)
        return storage_client, bigquery_client
    except GoogleAPIError as e:
        print(f"Failed to initialize clients due to a Google API error: {e}")
        return None, None
    except Exception as e:
        print(f"An unexpected error occurred while initializing clients: {e}")
        return None, None

In [5]:
b_client, bq_client = initialize_clients(bq_credentials, gcs_credentials, project_id)

In [6]:
def get_yesterdays_date():
    yesterday = datetime.now() - timedelta(days=1)
    return yesterday.strftime("%Y-%m-%d")

In [7]:
def list_partitioned_tables(bq_client, dataset_id):
    try:
        dataset_ref = bq_client.dataset(dataset_id)
        tables = bq_client.list_tables(dataset_ref)
        partitioned_tables = []

        for table in tables:
            try:
                table_ref = bq_client.get_table(table.reference)
                if table_ref.time_partitioning or table_ref.range_partitioning:
                    partitioned_tables.append(table.table_id)
            except NotFound:
                logging.error(f"Table {table.table_id} not found.")
            except Forbidden:
                logging.error(f"Access to table {table.table_id} is forbidden.")

        return partitioned_tables
    except NotFound:
        logging.error(f"Dataset {dataset_id} not found.")
        return []
    except Forbidden:
        logging.error(f"Access to dataset {dataset_id} is forbidden.")
        return []
    except GoogleAPIError as e:
        logging.error(f"An error occurred: {e}")
        return []

In [8]:
def check_for_yesterdays_partition(bq_client, table, full_dataset_id, yesterday):
    new_partition = yesterday.replace('-', '')

    query = f"""
    SELECT MAX(partition_id) as max_partition_id
    FROM `{full_dataset_id}.INFORMATION_SCHEMA.PARTITIONS`
    WHERE table_name = '{table}'
    """
    try:
        query_job = bq_client.query(query)
        result = query_job.result()
        partition_id = next(result, None).max_partition_id if result.total_rows > 0 else None
        return partition_id if partition_id == new_partition else None
    except bigquery.exceptions.BigQueryError as e:
        logging.error(f"BigQuery error processing table {table}: {e}")
    except Exception as e:
        logging.error(f"Unexpected error processing table {table}: {e}")

    return None

In [9]:
def get_partitioning_field(client, table, dataset_id):
    table_id = f'{dataset_id}.{table}'
    try:
        bq_table = client.get_table(table_id)
        return bq_table.time_partitioning.field if bq_table.time_partitioning else None
    except bigquery.NotFound as e:
        logging.error(f"Table {table_id} not found: {e}")
        return None
    except bigquery.Forbidden as e:
        logging.error(f"Access to table {table_id} is forbidden: {e}")
        return None
    except bigquery.GoogleCloudError as e:
        logging.error(f"Google Cloud error accessing table {table_id}: {e}")
        return None
    except Exception as e:
        logging.error(f"Unexpected error getting partitioning field for table {table_id}: {e}")
        return None

In [10]:
def export_partition_to_csv(bucket_name, s_client, client, dataset_id, table, partition_field, partition_id, yesterday, destination_uri, location="us-west1"):

    destination_uri = f'{destination_uri}/{table}_{partition_id}.csv'
    blob_name = f'{table}_{partition_id}.csv'

    query = f"""
    SELECT *
    FROM `{dataset_id}.{table}`
    WHERE `{partition_field}` = '{yesterday}'
    """

    try:
        query_job = client.query(query)
        destination_blob = s_client.bucket(bucket_name).blob(blob_name)
        destination_blob.content_type = 'text/csv'
        query_job.result().to_dataframe().to_csv(destination_blob.open('w'), index=False)
        logging.info(f"Export successful to {destination_uri}")
    except GoogleCloudError as e:
        logging.error(f"Error exporting table due to a Google Cloud error: {e}")
    except Exception as e:
        logging.error(f"General error exporting table: {e}")

In [14]:
partitioned_tables = list_partitioned_tables(bq_client, dataset_id)
yesterday = get_yesterdays_date()
gcs_files = []

for table in partitioned_tables:
    y_partitioned_id = check_for_yesterdays_partition(bq_client, table, full_dataset_id, yesterday)
    if y_partitioned_id is not None:
        file_name = f"{table}_{y_partitioned_id}.csv"
        gcs_files.append(file_name)
        partitioning_field = get_partitioning_field(bq_client, table, dataset_id)
        (bucket_name, b_client, bq_client, dataset_id, table, partitioning_field, y_partitioned_id, yesterday, destination_uri)

gcs_files

['partitioned_table_20240428.csv',
 'partitioned_table1_20240428.csv',
 'partitioned_table2_20240428.csv',
 'partitioned_table3_20240428.csv']

In [15]:
table_no = 0
for file_name in gcs_files:

    bucket = b_client.get_bucket(bucket_name)
    blob = bucket.get_blob(file_name)
    
    buffer = BytesIO()
    blob.download_to_file(buffer)
    buffer.seek(0)
    
    df = pd.read_csv(buffer)

    table, _ = os.path.splitext(file_name)
    table_name = table.split('_')
    table_name = "_".join(table_name[:2])
    
    
    ################################################################
    print()
    print(f"##### Dumping table No. {table_no + 1} {table_name}...")
    ################################################################
    table_no = table_no + 1
    
    try:
        postgres_connection = postgres_engine.connect()
        
        df.columns = [c.lower() for c in df.columns]

        if table_name == 'partitioned_table3':
            df.to_sql(name=table_name, con=postgres_connection, schema='public',
                  chunksize=5000, index=False, if_exists='replace')
        else:
            df.to_sql(name=table_name, con=postgres_connection, schema='public',
                      chunksize=5000, index=False, if_exists='append')
        
        ################################################################
        print(f"   .. Wrote {table_name} to PostgreSQL database")
        ################################################################
        
    except Exception as e:
        print(f"Error processing {file_name}: {e}")
    finally:
        postgres_connection.close()
postgres_engine.dispose()


##### Dumping table No. 1 partitioned_table...
   .. Wrote partitioned_table to PostgreSQL database

##### Dumping table No. 2 partitioned_table1...
   .. Wrote partitioned_table1 to PostgreSQL database

##### Dumping table No. 3 partitioned_table2...
   .. Wrote partitioned_table2 to PostgreSQL database

##### Dumping table No. 4 partitioned_table3...
   .. Wrote partitioned_table3 to PostgreSQL database
