In [19]:
import psycopg2
import pandas as pd
from datetime import date, timedelta
import numpy as np
import gspread
import warnings
import csv
warnings.filterwarnings("ignore")
from gspread_dataframe import set_with_dataframe
from google.oauth2.service_account import Credentials
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
pd.set_option('display.max_columns', None)

import subprocess
import os
import gcloud

In [20]:
def check_gsutil():
    """Checks if gsutil is installed and in the PATH."""
    try:
        subprocess.run(["gsutil", "--version"], check=True, capture_output=True, text=True)
        return True
    except FileNotFoundError:
        return False
check_gsutil()

True

In [34]:
from google.cloud import bigquery
from google.oauth2 import service_account

# Setup your credentials and client
credentials = service_account.Credentials.from_service_account_file(
    "/Users/abdullahajmal/Abdullah@Ajar/BigQueryUpload/pg-bigquery-pipeline.json"
)
client = bigquery.Client(credentials=credentials, project="ajar-kw")

def create_bigquery_tables(client):
    schema = [
        bigquery.SchemaField("id", "INTEGER"),
        bigquery.SchemaField("reference_id", "STRING"),
        bigquery.SchemaField("internal_id", "STRING"),
        bigquery.SchemaField("broker_id", "INTEGER"),
        bigquery.SchemaField("contact_id", "INTEGER"),
        bigquery.SchemaField("user_id", "INTEGER"),
        bigquery.SchemaField("payment_method_id", "STRING"),
        bigquery.SchemaField("currency", "STRING"),
        bigquery.SchemaField("offline", "BOOLEAN"),
        bigquery.SchemaField("extra", "STRING"),
        bigquery.SchemaField("amount", "NUMERIC"),
        bigquery.SchemaField("refunded_amount", "NUMERIC"),
        bigquery.SchemaField("status", "STRING"),
        bigquery.SchemaField("card_bin_id", "INTEGER"),
        bigquery.SchemaField("ip_address", "STRING"),
        bigquery.SchemaField("by_payment_link", "BOOLEAN"),
        bigquery.SchemaField("captured_at", "TIMESTAMP"),
        bigquery.SchemaField("created_at", "TIMESTAMP"),
        bigquery.SchemaField("updated_at", "TIMESTAMP"),
        bigquery.SchemaField("archived_at", "TIMESTAMP"),
        bigquery.SchemaField("created_by", "INTEGER"),
        bigquery.SchemaField("updated_by", "INTEGER"),
    ]
    
    BIGQUERY_PROJECT_ID = "ajar-kw"
    BIGQUERY_DATASET_NAME = "payments_data"

    table_names = ["oltp_payments", "oltp_payments_temp"]

    for table_name in table_names:
        table_id = f"{BIGQUERY_PROJECT_ID}.{BIGQUERY_DATASET_NAME}.{table_name}"
        table = bigquery.Table(table_id)
        table.schema = schema
        table.time_partitioning = bigquery.TimePartitioning(field="updated_at")
        try:
            client.create_table(table)
            print(f"✅ Table '{table_name}' created successfully.")
        except Exception as e:
            if "Already Exists" in str(e):
                print(f"ℹ️ Table '{table_name}' already exists — skipping creation.")
            else:
                print(f"❌ Error creating table '{table_name}': {e}")

# Call it with the connected client
create_bigquery_tables(client)

✅ Table 'oltp_payments' created successfully.
✅ Table 'oltp_payments_temp' created successfully.


In [3]:
from google.cloud import bigquery
from google.oauth2 import service_account

credentials = service_account.Credentials.from_service_account_file(
    "/Users/abdullahajmal/Abdullah@Ajar/BigQueryUpload/pg-bigquery-pipeline.json"
)

client = bigquery.Client(credentials=credentials, project="ajar-kw")

# Try listing datasets
datasets = list(client.list_datasets())
if datasets:
    print("✅ Connected successfully. Found datasets:")
    for ds in datasets:
        print(f" - {ds.dataset_id}")
else:
    print("✅ Connected successfully, but no datasets found.")


✅ Connected successfully. Found datasets:
 - dbt_aajmal
 - dev_dbt
 - looker_pdt_scratch
 - payments_data
 - raw_accounts
 - staging_dbt


In [6]:
# Configuration (Hardcoded with your values - BEST PRACTICE: use environment variables)
DB_HOST = "34.93.7.102"
DB_NAME = "ajar"
DB_USER = "tech"
DB_PASSWORD = ">aRSIeB(C,gHuo1|"
GCS_BUCKET_NAME = "ajar-bigquery-staging-bucket"
BIGQUERY_DATASET_NAME = "ajar-kw:payments_data"  # Include the project ID here.
BIGQUERY_TABLE_NAME = "yesterday_payments"  #  No date suffix here, we'll add in the code.
OUTPUT_CSV_PATH = "/Users/abdullahajmal/Abdullah@Ajar/BigQueryUpload/yesterday_payments.csv"  # Use /tmp for temporary files

def connectionNew():
    """Establishes a connection to the PostgreSQL database."""
    try:
        connection = psycopg2.connect(host=DB_HOST, database=DB_NAME, user=DB_USER, password=DB_PASSWORD)
        return connection
    except psycopg2.Error as e:
        print(f"Error connecting to PostgreSQL: {e}")
        return None

def query_executeNew(connection, query):
    """Executes a query on the given connection and returns the results."""
    try:
        cursor = connection.cursor()
        cursor.execute(query)
        return cursor.fetchall(), cursor.description
    except psycopg2.Error as e:
        print(f"Error executing query: {e}")
        return [], None

def export_yesterdays_payments_to_csv(output_csv_path):
    """Exports yesterday's payments data from PostgreSQL to a CSV file."""
    conn = None
    try:
        conn = connectionNew()
        if conn is None:
            return False  # Indicate failure

        query = f"""
            SELECT *
            FROM payments
            WHERE (updated_at + interval '3 hours') >= date_trunc('day', current_date - interval '2 days')
            AND (updated_at + interval '3 hours') < date_trunc('day', current_date);
        """
        records, description = query_executeNew(conn, query)
        if not records:  # Handle empty result set
            print("No records found for yesterday.")
            return True  # No error, but no data

        with open(output_csv_path, 'w', newline='', encoding='utf-8') as csvfile:
            csv_writer = csv.writer(csvfile)
            if description:
                column_names = [desc[0] for desc in description]
                csv_writer.writerow(column_names)
            csv_writer.writerows(records)
        print(f"Successfully exported {len(records)} rows to {output_csv_path}")
        return True  # Indicate success

    except Exception as e:
        print(f"Error during PostgreSQL export: {e}")
        return False
    finally:
        if conn:
            conn.close()

def upload_to_gcs(local_file_path, gcs_bucket_name, gcs_file_path):
    """Uploads a file to Google Cloud Storage."""
    try:
        command = ["gsutil", "cp", local_file_path, gcs_file_path]
        subprocess.run(command, check=True, capture_output=True, text=True)
        print(f"Successfully uploaded {local_file_path} to {gcs_file_path}")
        return True
    except subprocess.CalledProcessError as e:
        print(f"Error during GCS upload: {e.stderr}")
        return False

def load_to_bigquery(gcs_file_path, bigquery_dataset_name, bigquery_table_name):
    """Loads data from GCS to BigQuery."""
    try:
        conn = connectionNew()
        if conn is None:
            return False

        query = f"""
            SELECT column_name, data_type
            FROM information_schema.columns
            WHERE table_name = 'payments';  --  Hardcoded table name.  Good practice to make this a variable.
        """
        cursor = conn.cursor()
        cursor.execute(query)
        schema_list = cursor.fetchall()
        if not schema_list:
            print("Could not retrieve schema from database")
            return False

        schema_fields = ",".join([f"{row[0]}:{map_postgres_type_to_bq(row[1])}" for row in schema_list])

        command = [
            "bq", "load",
            "--source_format=CSV",
            "--skip_leading_rows=1",
            "--autodetect", #  Added autodetect
            f"{bigquery_dataset_name}.{bigquery_table_name}",
            gcs_file_path,
        ]
        result = subprocess.run(command, check=True, capture_output=True, text=True)
        print(f"Successfully loaded data into BigQuery: {result.stdout}")
        return True
    except subprocess.CalledProcessError as e:
        print(f"Error during BigQuery load: {e.stderr}")
        return False
    finally:
        if conn:
            conn.close()

def map_postgres_type_to_bq(postgres_type):
    """Maps PostgreSQL data types to BigQuery data types."""
    type_mapping = {
        "integer": "INTEGER",
        "bigint": "INTEGER",  # Or possibly "INT64"
        "smallint": "INTEGER",
        "numeric": "NUMERIC",
        "decimal": "NUMERIC",
        "real": "FLOAT",
        "double precision": "FLOAT",
        "text": "STRING",
        "character varying": "STRING",
        "varchar": "STRING",
        "char": "STRING",
        "character": "STRING",
        "timestamp with time zone": "TIMESTAMP",
        "timestamp without time zone": "TIMESTAMP",
        "date": "DATE",
        "time with time zone": "TIME",
        "time without time zone": "TIME",
        "boolean": "BOOLEAN",
        "bytea": "BYTES",
        "json": "STRING",  # Or consider using JSON type in BigQuery, requires different load.
        "jsonb": "STRING"
    }
    return type_mapping.get(postgres_type, "STRING")  # Default to STRING if not found

def delete_gcs_file(gcs_file_path):
    """Deletes a file from Google Cloud Storage."""
    try:
        command = ["gsutil", "rm", gcs_file_path]
        subprocess.run(command, check=True, capture_output=True, text=True)
        print(f"Successfully deleted {gcs_file_path} from GCS")
        return True
    except subprocess.CalledProcessError as e:
        print(f"Error deleting GCS file: {e.stderr}")
        return False

def run_data_pipeline():
    """Runs the entire data pipeline: export fromPostgres, upload to GCS, load to BigQuery, and delete from GCS."""
    today = date.today()
    yesterday = today - timedelta(days=1)

    # Create a table name with the date.
    bigquery_table_name = "yesterday_payments"

    print(f"Running data pipeline for {yesterday}...")

    if not export_yesterdays_payments_to_csv(OUTPUT_CSV_PATH):
        print("Pipeline failed: Error exporting data from PostgreSQL.")
        return  # Stop if export fails

    gcs_file_path = f"gs://{GCS_BUCKET_NAME}/raw/{yesterday.strftime('%Y%m%d')}_yesterday_payments.csv"  # Include date in GCS path
    if not upload_to_gcs(OUTPUT_CSV_PATH, GCS_BUCKET_NAME, gcs_file_path):
        print("Pipeline failed: Error uploading to GCS.")
        return  # Stop if GCS upload fails

    if not load_to_bigquery(gcs_file_path, BIGQUERY_DATASET_NAME, bigquery_table_name):
        print("Pipeline failed: Error loading to BigQuery.")
        return  # Stop if BigQuery load fails

    if delete_gcs_file(gcs_file_path):
        print("Successfully deleted file from GCS.")
    else:
        print("Warning: Failed to delete file from GCS.  This should be investigated.")
        #  Don't stop the pipeline, but log the error

    print("Pipeline completed successfully.")

if __name__ == "__main__":
    run_data_pipeline()


Running data pipeline for 2025-06-26...
Successfully exported 3051 rows to /Users/abdullahajmal/Abdullah@Ajar/BigQueryUpload/yesterday_payments.csv
Successfully uploaded /Users/abdullahajmal/Abdullah@Ajar/BigQueryUpload/yesterday_payments.csv to gs://ajar-bigquery-staging-bucket/raw/20250626_yesterday_payments.csv
Successfully loaded data into BigQuery: 
Successfully deleted gs://ajar-bigquery-staging-bucket/raw/20250626_yesterday_payments.csv from GCS
Successfully deleted file from GCS.
Pipeline completed successfully.


In [5]:
# Configuration (Hardcoded with your values - BEST PRACTICE: use environment variables)
DB_HOST = "34.93.7.102"
DB_NAME = "ajar"
DB_USER = "tech"
DB_PASSWORD = ">aRSIeB(C,gHuo1|"
GCS_BUCKET_NAME = "ajar-bigquery-staging-bucket"
BIGQUERY_DATASET_NAME = "ajar-kw:payments_data"  # Include the project ID here.
BIGQUERY_TABLE_NAME = "oltp_payments"  #  No date suffix here, we'll add in the code.
OUTPUT_CSV_PATH = "/Users/abdullahajmal/Abdullah@Ajar/BigQueryUpload/oltp_payments.csv"  # Use /tmp for temporary files

def connectionNew():
    """Establishes a connection to the PostgreSQL database."""
    try:
        connection = psycopg2.connect(host=DB_HOST, database=DB_NAME, user=DB_USER, password=DB_PASSWORD)
        return connection
    except psycopg2.Error as e:
        print(f"Error connecting to PostgreSQL: {e}")
        return None

def query_executeNew(connection, query):
    """Executes a query on the given connection and returns the results."""
    try:
        cursor = connection.cursor()
        cursor.execute(query)
        return cursor.fetchall(), cursor.description
    except psycopg2.Error as e:
        print(f"Error executing query: {e}")
        return [], None

def export_yesterdays_payments_to_csv(output_csv_path):
    """Exports payments data updated between last two days from PostgreSQL to a CSV file."""
    conn = None
    try:
        conn = connectionNew()
        if conn is None:
            return False  # Indicate failure

        query = f"""
            SELECT *
            FROM payments
            WHERE (updated_at + interval '3 hours') >= date_trunc('day', current_date - interval '2 days')
            AND (updated_at + interval '3 hours') < date_trunc('day', current_date);
        """
        records, description = query_executeNew(conn, query)
        if not records:  # Handle empty result set
            print("No records found for yesterday.")
            return True  # No error, but no data

        with open(output_csv_path, 'w', newline='', encoding='utf-8') as csvfile:
            csv_writer = csv.writer(csvfile)
            if description:
                column_names = [desc[0] for desc in description]
                csv_writer.writerow(column_names)
            csv_writer.writerows(records)
        print(f"Successfully exported {len(records)} rows to {output_csv_path}")
        return True  # Indicate success

    except Exception as e:
        print(f"Error during PostgreSQL export: {e}")
        return False
    finally:
        if conn:
            conn.close()

def upload_to_gcs(local_file_path, gcs_bucket_name, gcs_file_path):
    """Uploads a file to Google Cloud Storage."""
    try:
        command = ["gsutil", "cp", local_file_path, gcs_file_path]
        subprocess.run(command, check=True, capture_output=True, text=True)
        print(f"Successfully uploaded {local_file_path} to {gcs_file_path}")
        return True
    except subprocess.CalledProcessError as e:
        print(f"Error during GCS upload: {e.stderr}")
        return False

# def load_to_bigquery(gcs_file_path, bigquery_dataset_name, bigquery_table_name):
#     """Loads data from GCS to BigQuery."""
#     try:
#         conn = connectionNew()
#         if conn is None:
#             return False

#         query = f"""
#             SELECT column_name, data_type
#             FROM information_schema.columns
#             WHERE table_name = 'payments';  --  Hardcoded table name.  Good practice to make this a variable.
#         """
#         cursor = conn.cursor()
#         cursor.execute(query)
#         schema_list = cursor.fetchall()
#         if not schema_list:
#             print("Could not retrieve schema from database")
#             return False

#         # Construct the schema string
#         schema_fields = ",".join([
#             f"{row[0]}:{map_postgres_type_to_bq(row[1])}" for row in schema_list
#         ])

#         command = [
#             "bq", "load",
#             "--source_format=CSV",
#             "--skip_leading_rows=1",
#             "--replace=false",
#             f"--schema={schema_fields}",
#             f"{bigquery_dataset_name}.{bigquery_table_name}",
#             gcs_file_path,
#         ]

#     result = subprocess.run(command, check=True, capture_output=True, text=True)
#     print(f"Successfully loaded data into BigQuery: {result.stdout}")
#     return True
# except subprocess.CalledProcessErcondror as e:
#     print(f"Error during BigQuery load: {e.stderr}")
#     return False
# finally:
#     if conn:
#         conn.close()

def load_to_bigquery(gcs_file_path, bigquery_dataset_name, bigquery_table_name):
    """Loads data from GCS to BigQuery with explicit schema and appends to existing table."""
    try:
        conn = connectionNew()
        if conn is None:
            return False

        # Fetch schema from Postgres
        query = f"""
            SELECT column_name, data_type
            FROM information_schema.columns
            WHERE table_name = 'payments';
        """
        cursor = conn.cursor()
        cursor.execute(query)
        schema_list = cursor.fetchall()
        if not schema_list:
            print("Could not retrieve schema from database")
            return False

        # Map schema to BigQuery-compatible types
        schema_fields = ",".join([
            f"{row[0]}:{map_postgres_type_to_bq(row[1])}" for row in schema_list
        ])

        command = [
            "bq", "load",
            "--source_format=CSV",
            "--skip_leading_rows=1",
            "--schema_update_option=ALLOW_FIELD_ADDITION", # Optional: Allows adding new fields if your source schema evolves
            # "--write_disposition=WRITE_APPEND",  # <--- THIS IS THE KEY CHANGE
            f"--schema={schema_fields}",  # ✅ use schema instead of autodetect
            f"{bigquery_dataset_name}.{bigquery_table_name}",
            gcs_file_path,
        ]
        result = subprocess.run(command, check=True, capture_output=True, text=True)
        print(f"✅ Successfully loaded data into BigQuery:\n{result.stdout}")
        return True

    except subprocess.CalledProcessError as e:
        print(f"❌ Error during BigQuery load: {e.stderr}")
        return False

    finally:
        if conn:
            conn.close()

def map_postgres_type_to_bq(postgres_type):
    """Maps PostgreSQL data types to BigQuery data types."""
    type_mapping = {
        "integer": "INTEGER",
        "bigint": "INTEGER",  # Or possibly "INT64"
        "smallint": "INTEGER",
        "numeric": "NUMERIC",
        "decimal": "NUMERIC",
        "real": "FLOAT",
        "double precision": "FLOAT",
        "text": "STRING",
        "character varying": "STRING",
        "varchar": "STRING",
        "char": "STRING",
        "character": "STRING",
        "timestamp with time zone": "TIMESTAMP",
        "timestamp without time zone": "TIMESTAMP",
        "date": "DATE",
        "time with time zone": "TIME",
        "time without time zone": "TIME",
        "boolean": "BOOLEAN",
        "bytea": "BYTES",
        "json": "STRING",  # Or consider using JSON type in BigQuery, requires different load.
        "jsonb": "STRING"
    }
    return type_mapping.get(postgres_type, "STRING")  # Default to STRING if not found

def delete_gcs_file(gcs_file_path):
    """Deletes a file from Google Cloud Storage."""
    try:
        command = ["gsutil", "rm", gcs_file_path]
        subprocess.run(command, check=True, capture_output=True, text=True)
        print(f"Successfully deleted {gcs_file_path} from GCS")
        return True
    except subprocess.CalledProcessError as e:
        print(f"Error deleting GCS file: {e.stderr}")
        return False

def run_data_pipeline():
    """Runs the entire data pipeline: export fromPostgres, upload to GCS, load to BigQuery, and delete from GCS."""
    today = date.today()
    start = today - timedelta(days=2)
    end = today - timedelta(days=1)
    # Create a table name with the date.
    bigquery_table_name = "oltp_payments"

    print(f"Running data pipeline for all payments updated between {start} - {end} ...")

    if not export_yesterdays_payments_to_csv(OUTPUT_CSV_PATH):
        print("Pipeline failed: Error exporting data from PostgreSQL.")
        return  # Stop if export fails

    gcs_file_path = f"gs://{GCS_BUCKET_NAME}/raw/{start.strftime('%Y%m%d')}_oltp_payments.csv"  # Include date in GCS path
    if not upload_to_gcs(OUTPUT_CSV_PATH, GCS_BUCKET_NAME, gcs_file_path):
        print("Pipeline failed: Error uploading to GCS.")
        return  # Stop if GCS upload fails

    if not load_to_bigquery(gcs_file_path, BIGQUERY_DATASET_NAME, bigquery_table_name):
        print("Pipeline failed: Error loading to BigQuery.")
        return  # Stop if BigQuery load fails

    if delete_gcs_file(gcs_file_path):
        print("Successfully deleted file from GCS.")
    else:
        print("Warning: Failed to delete file from GCS.  This should be investigated.")
        #  Don't stop the pipeline, but log the error

    print("Pipeline completed successfully.")

if __name__ == "__main__":
    run_data_pipeline()

Running data pipeline for all payments updated between 2025-06-25 - 2025-06-26 ...
Successfully exported 3051 rows to /Users/abdullahajmal/Abdullah@Ajar/BigQueryUpload/oltp_payments.csv
Successfully uploaded /Users/abdullahajmal/Abdullah@Ajar/BigQueryUpload/oltp_payments.csv to gs://ajar-bigquery-staging-bucket/raw/20250625_oltp_payments.csv
❌ Error during BigQuery load: 
Pipeline failed: Error loading to BigQuery.


In [8]:
# Configuration (Hardcoded - BEST PRACTICE: use environment variables)
# For local testing, keep these. For deployment, switch to env vars or Secret Manager.
DB_HOST = "34.93.7.102"
DB_NAME = "ajar"
DB_USER = "tech"
DB_PASSWORD = ">aRSIeB(C,gHuo1|"
GCS_BUCKET_NAME = "ajar-bigquery-staging-bucket"
BIGQUERY_DATASET_NAME = "ajar-kw:payments_data"  # Include the project ID here.
BIGQUERY_TABLE_NAME = "oltp_payments"  # No date suffix here, for appending.
# For local execution:
OUTPUT_CSV_PATH = "/Users/abdullahajmal/Abdullah@Ajar/BigQueryUpload/oltp_payments.csv"
# For cloud environments (e.g., Cloud Functions, Cloud Run, Compute Engine), use /tmp:
# OUTPUT_CSV_PATH = "/tmp/oltp_payments.csv"

def connectionNew():
    """Establishes a connection to the PostgreSQL database."""
    try:
        connection = psycopg2.connect(host=DB_HOST, database=DB_NAME, user=DB_USER, password=DB_PASSWORD)
        return connection
    except psycopg2.Error as e:
        print(f"Error connecting to PostgreSQL: {e}")
        return None

def query_executeNew(connection, query):
    """Executes a query on the given connection and returns the results."""
    try:
        cursor = connection.cursor()
        cursor.execute(query)
        return cursor.fetchall(), cursor.description
    except psycopg2.Error as e:
        print(f"Error executing query: {e}")
        return [], None

def export_yesterdays_payments_to_csv(output_csv_path):
    """Exports payments data updated between last two days from PostgreSQL to a CSV file."""
    conn = None
    try:
        conn = connectionNew()
        if conn is None:
            return False  # Indicate failure

        # Note: (updated_at + interval '3 hours') accounts for a specific timezone offset.
        # Ensure this correctly aligns with your data's timezone and your definition of "yesterday".
        # Current date is June 27, 2025. This query will get data from June 25, 2025 00:00:00 to June 26, 2025 00:00:00 (in the adjusted timezone).
        query = f"""
            SELECT *
            FROM payments
            WHERE (updated_at + interval '3 hours') >= date_trunc('day', current_date - interval '2 days')
            AND (updated_at + interval '3 hours') < date_trunc('day', current_date);
        """
        records, description = query_executeNew(conn, query)
        if not records:  # Handle empty result set
            print("No records found for yesterday.")
            return True  # No error, but no data

        with open(output_csv_path, 'w', newline='', encoding='utf-8') as csvfile:
            csv_writer = csv.writer(csvfile)
            if description:
                column_names = [desc[0] for desc in description]
                csv_writer.writerow(column_names)
            csv_writer.writerows(records)
        print(f"Successfully exported {len(records)} rows to {output_csv_path}")
        return True  # Indicate success

    except Exception as e:
        print(f"Error during PostgreSQL export: {e}")
        return False
    finally:
        if conn:
            conn.close()

def upload_to_gcs(local_file_path, gcs_bucket_name, gcs_file_path):
    """Uploads a file to Google Cloud Storage using gsutil."""
    try:
        command = ["gsutil", "cp", local_file_path, gcs_file_path]
        # capture_output=True, text=True are good for debugging if needed, but not strictly required for cp
        subprocess.run(command, check=True, capture_output=True, text=True)
        print(f"Successfully uploaded {local_file_path} to {gcs_file_path}")
        return True
    except subprocess.CalledProcessError as e:
        print(f"Error during GCS upload: {e.stderr}")
        return False

def map_postgres_type_to_bq(postgres_type):
    """Maps PostgreSQL data types to BigQuery data types."""
    type_mapping = {
        "integer": "INTEGER",
        "bigint": "INTEGER",  # Or "INT64"
        "smallint": "INTEGER",
        "numeric": "NUMERIC",
        "decimal": "NUMERIC",
        "real": "FLOAT",
        "double precision": "FLOAT",
        "text": "STRING",
        "character varying": "STRING",
        "varchar": "STRING",
        "char": "STRING",
        "character": "STRING",
        "timestamp with time zone": "TIMESTAMP",
        "timestamp without time zone": "TIMESTAMP",
        "date": "DATE",
        "time with time zone": "TIME",
        "time without time zone": "TIME",
        "boolean": "BOOLEAN",
        "bytea": "BYTES",
        "json": "STRING",
        "jsonb": "STRING"
    }
    return type_mapping.get(postgres_type, "STRING")  # Default to STRING if not found

def load_to_bigquery_with_client_library(gcs_file_path, bigquery_dataset_name, bigquery_table_name):
    """Loads data from GCS to BigQuery using the Python client library."""
    client = bigquery.Client()
    table_id = f"{bigquery_dataset_name}.{bigquery_table_name}"

    # 1. Fetch schema from Postgres (still necessary to define BQ schema for the client library)
    conn = None
    try:
        conn = connectionNew()
        if conn is None:
            print("Failed to connect to PostgreSQL for schema retrieval.")
            return False

        query = "SELECT column_name, data_type FROM information_schema.columns WHERE table_name = 'payments';"
        cursor = conn.cursor()
        cursor.execute(query)
        schema_list_pg = cursor.fetchall()
        if not schema_list_pg:
            print("Could not retrieve schema from database. BigQuery load will fail.")
            return False

        # Convert PostgreSQL schema to BigQuery schema fields
        schema_bq_fields = []
        for col_name, pg_type in schema_list_pg:
            bq_type = map_postgres_type_to_bq(pg_type)
            schema_bq_fields.append(bigquery.SchemaField(col_name, bq_type))

    except Exception as e:
        print(f"Error retrieving schema from PostgreSQL: {e}")
        return False
    finally:
        if conn:
            conn.close()

    # 2. Configure the BigQuery load job
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV,
        skip_leading_rows=1,
        write_disposition=bigquery.WriteDisposition.WRITE_APPEND, # This is the key for appending
        schema_update_options=[bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION],
        schema=schema_bq_fields, # Provide the explicitly derived schema
    )

    # 3. Start the load job
    print(f"Starting BigQuery load job from {gcs_file_path} to {table_id} using client library...")
    try:
        load_job = client.load_table_from_uri(
            gcs_file_path, table_id, job_config=job_config
        )
        load_job.result()  # Waits for the job to complete

        print(f"✅ Successfully loaded {load_job.output_rows} rows into BigQuery table: {table_id}")
        return True
    except Exception as e:
        print(f"❌ Error during BigQuery load job (client library): {e}")
        # You can inspect load_job.error_result and load_job.errors for more details
        return False

def delete_gcs_file(gcs_file_path):
    """Deletes a file from Google Cloud Storage using gsutil."""
    try:
        command = ["gsutil", "rm", gcs_file_path]
        subprocess.run(command, check=True, capture_output=True, text=True)
        print(f"Successfully deleted {gcs_file_path} from GCS")
        return True
    except subprocess.CalledProcessError as e:
        print(f"Error deleting GCS file: {e.stderr}")
        return False

def run_data_pipeline():
    """Runs the entire data pipeline: export from Postgres, upload to GCS, load to BigQuery, and delete from GCS."""
    # Current date in Kuwait is Friday, June 27, 2025.
    # date_trunc('day', current_date - interval '2 days') will be start of 2025-06-25 (Wednesday)
    # date_trunc('day', current_date) will be start of 2025-06-27 (Friday)
    # So the query targets data updated from 2025-06-25 00:00:00 up to (but not including) 2025-06-27 00:00:00.
    # This captures data for both June 25th and June 26th.
    # If you only want June 26th, the query needs adjustment:
    # WHERE (updated_at + interval '3 hours') >= date_trunc('day', current_date - interval '1 day')
    # AND (updated_at + interval '3 hours') < date_trunc('day', current_date);

    today = date.today() # 2025-06-27
    start_date_for_file_name = today - timedelta(days=2) # This will be 2025-06-25, aligning with the query's start bound.

    print(f"Running data pipeline for payments updated since {start_date_for_file_name} (approximately yesterday and day before)...")

    if not export_yesterdays_payments_to_csv(OUTPUT_CSV_PATH):
        print("Pipeline failed: Error exporting data from PostgreSQL.")
        return  # Stop if export fails

    # Construct the GCS file path using the start_date_for_file_name
    gcs_file_path = f"gs://{GCS_BUCKET_NAME}/raw/{start_date_for_file_name.strftime('%Y%m%d')}_oltp_payments.csv"
    if not upload_to_gcs(OUTPUT_CSV_PATH, GCS_BUCKET_NAME, gcs_file_path):
        print("Pipeline failed: Error uploading to GCS.")
        return  # Stop if GCS upload fails

    # Call the new BigQuery load function using the client library
    if not load_to_bigquery_with_client_library(gcs_file_path, BIGQUERY_DATASET_NAME, BIGQUERY_TABLE_NAME):
        print("Pipeline failed: Error loading to BigQuery.")
        return  # Stop if BigQuery load fails

    if delete_gcs_file(gcs_file_path):
        print("Successfully deleted file from GCS.")
    else:
        print("Warning: Failed to delete file from GCS. This should be investigated.")
        # Don't stop the pipeline, but log the error

    print("Pipeline completed successfully.")

if __name__ == "__main__":
    run_data_pipeline()

Running data pipeline for payments updated since 2025-06-25 (approximately yesterday and day before)...
Successfully exported 3051 rows to /Users/abdullahajmal/Abdullah@Ajar/BigQueryUpload/oltp_payments.csv
Successfully uploaded /Users/abdullahajmal/Abdullah@Ajar/BigQueryUpload/oltp_payments.csv to gs://ajar-bigquery-staging-bucket/raw/20250625_oltp_payments.csv


DefaultCredentialsError: Your default credentials were not found. To set up Application Default Credentials, see https://cloud.google.com/docs/authentication/external/set-up-adc for more information.

In [11]:
import psycopg2
import csv
import subprocess
from datetime import date, timedelta
from google.cloud import bigquery
# from google.cloud import storage
from google.oauth2 import service_account # <--- ADD THIS IMPORT

# Configuration (Hardcoded - BEST PRACTICE: use environment variables)
DB_HOST = "34.93.7.102"
DB_NAME = "ajar"
DB_USER = "tech"
DB_PASSWORD = ">aRSIeB(C,gHuo1|"
GCS_BUCKET_NAME = "ajar-bigquery-staging-bucket"
BIGQUERY_DATASET_NAME = "payments_data"  # Include the project ID here.
BIGQUERY_TABLE_NAME = "oltp_payments"  # No date suffix here, for appending.
OUTPUT_CSV_PATH = "/Users/abdullahajmal/Abdullah@Ajar/BigQueryUpload/oltp_payments.csv"

# --- ADD THIS CONFIGURATION FOR BIGQUERY SERVICE ACCOUNT ---
BIGQUERY_SERVICE_ACCOUNT_KEY_PATH = "/Users/abdullahajmal/Abdullah@Ajar/BigQueryUpload/pg-bigquery-pipeline.json"
# Ensure the project ID matches what's in your key file, or directly set it below.
# If BIGQUERY_DATASET_NAME already has the project ID (e.g., "ajar-kw:payments_data"),
# you might not strictly need to pass 'project' to Client(), but it's safer to be explicit.
BIGQUERY_PROJECT_ID = "ajar-kw" # Derived from BIGQUERY_DATASET_NAME, ensure it's accurate
# -----------------------------------------------------------

def connectionNew():
    """Establishes a connection to the PostgreSQL database."""
    try:
        connection = psycopg2.connect(host=DB_HOST, database=DB_NAME, user=DB_USER, password=DB_PASSWORD)
        return connection
    except psycopg2.Error as e:
        print(f"Error connecting to PostgreSQL: {e}")
        return None

def query_executeNew(connection, query):
    """Executes a query on the given connection and returns the results."""
    try:
        cursor = connection.cursor()
        cursor.execute(query)
        return cursor.fetchall(), cursor.description
    except psycopg2.Error as e:
        print(f"Error executing query: {e}")
        return [], None

def export_yesterdays_payments_to_csv(output_csv_path):
    """Exports payments data updated between last two days from PostgreSQL to a CSV file."""
    conn = None
    try:
        conn = connectionNew()
        if conn is None:
            return False  # Indicate failure

        query = f"""
            SELECT *
            FROM payments
            WHERE (updated_at + interval '3 hours') >= date_trunc('day', current_date - interval '2 days')
            AND (updated_at + interval '3 hours') < date_trunc('day', current_date);
        """
        records, description = query_executeNew(conn, query)
        if not records:
            print("No records found for yesterday.")
            return True

        with open(output_csv_path, 'w', newline='', encoding='utf-8') as csvfile:
            csv_writer = csv.writer(csvfile)
            if description:
                column_names = [desc[0] for desc in description]
                csv_writer.writerow(column_names)
            csv_writer.writerows(records)
        print(f"Successfully exported {len(records)} rows to {output_csv_path}")
        return True

    except Exception as e:
        print(f"Error during PostgreSQL export: {e}")
        return False
    finally:
        if conn:
            conn.close()

def upload_to_gcs(local_file_path, gcs_bucket_name, gcs_file_path):
    """Uploads a file to Google Cloud Storage using gsutil."""
    try:
        command = ["gsutil", "cp", local_file_path, gcs_file_path]
        subprocess.run(command, check=True, capture_output=True, text=True)
        print(f"Successfully uploaded {local_file_path} to {gcs_file_path}")
        return True
    except subprocess.CalledProcessError as e:
        print(f"Error during GCS upload: {e.stderr}")
        return False

def map_postgres_type_to_bq(postgres_type):
    """Maps PostgreSQL data types to BigQuery data types."""
    type_mapping = {
        "integer": "INTEGER", "bigint": "INTEGER", "smallint": "INTEGER",
        "numeric": "NUMERIC", "decimal": "NUMERIC", "real": "FLOAT", "double precision": "FLOAT",
        "text": "STRING", "character varying": "STRING", "varchar": "STRING",
        "char": "STRING", "character": "STRING",
        "timestamp with time zone": "TIMESTAMP", "timestamp without time zone": "TIMESTAMP",
        "date": "DATE", "time with time zone": "TIME", "time without time zone": "TIME",
        "boolean": "BOOLEAN", "bytea": "BYTES", "json": "STRING", "jsonb": "STRING"
    }
    return type_mapping.get(postgres_type, "STRING")

def load_to_bigquery_with_client_library(gcs_file_path, bigquery_dataset_name, bigquery_table_name):
    """Loads data from GCS to BigQuery using the Python client library."""
    # --- AUTHENTICATION PART ---
    try:
        credentials = service_account.Credentials.from_service_account_file(
            BIGQUERY_SERVICE_ACCOUNT_KEY_PATH
        )
        client = bigquery.Client(credentials=credentials, project=BIGQUERY_PROJECT_ID)
    except Exception as e:
        print(f"❌ Error initializing BigQuery client with credentials: {e}")
        return False
    # --- END AUTHENTICATION PART ---

    table_id = f"{bigquery_dataset_name}.{bigquery_table_name}"

    # 1. Fetch schema from Postgres
    conn = None
    try:
        conn = connectionNew()
        if conn is None:
            print("Failed to connect to PostgreSQL for schema retrieval.")
            return False

        query = "SELECT column_name, data_type FROM information_schema.columns WHERE table_name = 'payments';"
        cursor = conn.cursor()
        cursor.execute(query)
        schema_list_pg = cursor.fetchall()
        if not schema_list_pg:
            print("Could not retrieve schema from database. BigQuery load will fail.")
            return False

        schema_bq_fields = []
        for col_name, pg_type in schema_list_pg:
            bq_type = map_postgres_type_to_bq(pg_type)
            schema_bq_fields.append(bigquery.SchemaField(col_name, bq_type))

    except Exception as e:
        print(f"Error retrieving schema from PostgreSQL: {e}")
        return False
    finally:
        if conn:
            conn.close()

    # 2. Configure the BigQuery load job
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV,
        skip_leading_rows=1,
        write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
        schema_update_options=[bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION],
        schema=schema_bq_fields,
    )

    # 3. Start the load job
    print(f"Starting BigQuery load job from {gcs_file_path} to {table_id} using client library...")
    try:
        load_job = client.load_table_from_uri(
            gcs_file_path, table_id, job_config=job_config
        )
        load_job.result()

        print(f"✅ Successfully loaded {load_job.output_rows} rows into BigQuery table: {table_id}")
        return True
    except Exception as e:
        print(f"❌ Error during BigQuery load job (client library): {e}")
        return False

def delete_gcs_file(gcs_file_path):
    """Deletes a file from Google Cloud Storage using gsutil."""
    try:
        command = ["gsutil", "rm", gcs_file_path]
        subprocess.run(command, check=True, capture_output=True, text=True)
        print(f"Successfully deleted {gcs_file_path} from GCS")
        return True
    except subprocess.CalledProcessError as e:
        print(f"Error deleting GCS file: {e.stderr}")
        return False

def run_data_pipeline():
    """Runs the entire data pipeline: export from Postgres, upload to GCS, load to BigQuery, and delete from GCS."""
    today = date.today()
    start_date_for_file_name = today - timedelta(days=2)

    print(f"Running data pipeline for payments updated since {start_date_for_file_name} (approximately yesterday and day before)...")

    if not export_yesterdays_payments_to_csv(OUTPUT_CSV_PATH):
        print("Pipeline failed: Error exporting data from PostgreSQL.")
        return

    gcs_file_path = f"gs://{GCS_BUCKET_NAME}/raw/{start_date_for_file_name.strftime('%Y%m%d')}_oltp_payments.csv"
    if not upload_to_gcs(OUTPUT_CSV_PATH, GCS_BUCKET_NAME, gcs_file_path):
        print("Pipeline failed: Error uploading to GCS.")
        return

    # Call the new BigQuery load function using the client library
    if not load_to_bigquery_with_client_library(gcs_file_path, BIGQUERY_DATASET_NAME, BIGQUERY_TABLE_NAME):
        print("Pipeline failed: Error loading to BigQuery.")
        return

    if delete_gcs_file(gcs_file_path):
        print("Successfully deleted file from GCS.")
    else:
        print("Warning: Failed to delete file from GCS. This should be investigated.")

    print("Pipeline completed successfully.")

if __name__ == "__main__":
    run_data_pipeline()

Running data pipeline for payments updated since 2025-06-25 (approximately yesterday and day before)...
Successfully exported 3051 rows to /Users/abdullahajmal/Abdullah@Ajar/BigQueryUpload/oltp_payments.csv
Successfully uploaded /Users/abdullahajmal/Abdullah@Ajar/BigQueryUpload/oltp_payments.csv to gs://ajar-bigquery-staging-bucket/raw/20250625_oltp_payments.csv
Starting BigQuery load job from gs://ajar-bigquery-staging-bucket/raw/20250625_oltp_payments.csv to payments_data.oltp_payments using client library...
❌ Error during BigQuery load job (client library): 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/ajar-kw/jobs?prettyPrint=false: Field id already exists in schema
Pipeline failed: Error loading to BigQuery.


In [12]:
import psycopg2
import csv
import subprocess
from datetime import date, timedelta
from google.cloud import bigquery
from google.oauth2 import service_account

# Configuration (Hardcoded - BEST PRACTICE: use environment variables)
DB_HOST = "34.93.7.102"
DB_NAME = "ajar"
DB_USER = "tech"
DB_PASSWORD = ">aRSIeB(C,gHuo1|"
GCS_BUCKET_NAME = "ajar-bigquery-staging-bucket"

BIGQUERY_PROJECT_ID = "ajar-kw" # Explicitly define your project ID
BIGQUERY_DATASET_NAME = "payments_data" # This should ONLY be the dataset ID

BIGQUERY_TABLE_NAME = "oltp_payments"
OUTPUT_CSV_PATH = "/Users/abdullahajmal/Abdullah@Ajar/BigQueryUpload/oltp_payments.csv"

BIGQUERY_SERVICE_ACCOUNT_KEY_PATH = "/Users/abdullahajmal/Abdullah@Ajar/BigQueryUpload/pg-bigquery-pipeline.json"

def connectionNew():
    """Establishes a connection to the PostgreSQL database."""
    try:
        connection = psycopg2.connect(host=DB_HOST, database=DB_NAME, user=DB_USER, password=DB_PASSWORD)
        return connection
    except psycopg2.Error as e:
        print(f"Error connecting to PostgreSQL: {e}")
        return None

def query_executeNew(connection, query):
    """Executes a query on the given connection and returns the results."""
    try:
        cursor = connection.cursor()
        cursor.execute(query)
        return cursor.fetchall(), cursor.description
    except psycopg2.Error as e:
        print(f"Error executing query: {e}")
        return [], None

def export_yesterdays_payments_to_csv(output_csv_path):
    """Exports payments data updated between last two days from PostgreSQL to a CSV file."""
    conn = None
    try:
        conn = connectionNew()
        if conn is None:
            return False

        query = f"""
            SELECT *
            FROM payments
            WHERE (updated_at + interval '3 hours') >= date_trunc('day', current_date - interval '2 days')
            AND (updated_at + interval '3 hours') < date_trunc('day', current_date);
        """
        records, description = query_executeNew(conn, query)
        if not records:
            print("No records found for yesterday.")
            return True

        with open(output_csv_path, 'w', newline='', encoding='utf-8') as csvfile:
            csv_writer = csv.writer(csvfile)
            if description:
                column_names = [desc[0] for desc in description]
                csv_writer.writerow(column_names)
            csv_writer.writerows(records)
        print(f"Successfully exported {len(records)} rows to {output_csv_path}")
        return True

    except Exception as e:
        print(f"Error during PostgreSQL export: {e}")
        return False
    finally:
        if conn:
            conn.close()

def upload_to_gcs(local_file_path, gcs_bucket_name, gcs_file_path):
    """Uploads a file to Google Cloud Storage using gsutil."""
    try:
        command = ["gsutil", "cp", local_file_path, gcs_file_path]
        subprocess.run(command, check=True, capture_output=True, text=True)
        print(f"Successfully uploaded {local_file_path} to {gcs_file_path}")
        return True
    except subprocess.CalledProcessError as e:
        print(f"Error during GCS upload: {e.stderr}")
        return False

def map_postgres_type_to_bq(postgres_type):
    """Maps PostgreSQL data types to BigQuery data types."""
    type_mapping = {
        "integer": "INTEGER", "bigint": "INTEGER", "smallint": "INTEGER",
        "numeric": "NUMERIC", "decimal": "NUMERIC", "real": "FLOAT", "double precision": "FLOAT",
        "text": "STRING", "character varying": "STRING", "varchar": "STRING",
        "char": "STRING", "character": "STRING",
        "timestamp with time zone": "TIMESTAMP", "timestamp without time zone": "TIMESTAMP",
        "date": "DATE", "time with time zone": "TIME", "time without time zone": "TIME",
        "boolean": "BOOLEAN", "bytea": "BYTES", "json": "STRING", "jsonb": "STRING"
    }
    return type_mapping.get(postgres_type, "STRING")

def load_to_bigquery_with_client_library(gcs_file_path, bigquery_dataset_name, bigquery_table_name):
    """Loads data from GCS to BigQuery using the Python client library."""
    try:
        credentials = service_account.Credentials.from_service_account_file(
            BIGQUERY_SERVICE_ACCOUNT_KEY_PATH
        )
        client = bigquery.Client(credentials=credentials, project=BIGQUERY_PROJECT_ID)
    except Exception as e:
        print(f"❌ Error initializing BigQuery client with credentials: {e}")
        return False

    table_id = f"{bigquery_dataset_name}.{bigquery_table_name}"

    # 1. Fetch schema from Postgres, including nullability
    conn = None
    try:
        conn = connectionNew()
        if conn is None:
            print("Failed to connect to PostgreSQL for schema retrieval.")
            return False

        # --- MODIFIED QUERY TO GET is_nullable ---
        query = f"""
            SELECT column_name, data_type, is_nullable
            FROM information_schema.columns
            WHERE table_name = 'payments'
            AND table_schema = 'public'; -- Assuming your table is in the 'public' schema
        """
        cursor = conn.cursor()
        cursor.execute(query)
        schema_list_pg = cursor.fetchall() # Each row is now (col_name, data_type, is_nullable_string)
        if not schema_list_pg:
            print("Could not retrieve schema from database. BigQuery load will fail.")
            return False

        schema_bq_fields = []
        for col_name, pg_type, is_nullable_str in schema_list_pg:
            bq_type = map_postgres_type_to_bq(pg_type)
            # --- Dynamically set mode based on PG's is_nullable ---
            bq_mode = 'NULLABLE' if is_nullable_str == 'YES' else 'REQUIRED'
            schema_bq_fields.append(bigquery.SchemaField(col_name, bq_type, mode=bq_mode))

        # --- OPTIONAL: Print schema being sent to BigQuery for debugging ---
        print("Schema being sent to BigQuery:")
        for field in schema_bq_fields:
            print(f"  - {field.name}: {field.field_type} (mode={field.mode})")
        # ------------------------------------------------------------------

    except Exception as e:
        print(f"Error retrieving schema from PostgreSQL: {e}")
        return False
    finally:
        if conn:
            conn.close()

    # 2. Configure the BigQuery load job
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV,
        skip_leading_rows=1,
        write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
        # Keep ALLOW_FIELD_ADDITION for new columns.
        # ALLOW_FIELD_RELAXATION would be needed if you ever change a REQUIRED to NULLABLE in BQ,
        # but that's not typically the conflict causing "id already exists".
        schema_update_options=[bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION],
        schema=schema_bq_fields,
    )

    # 3. Start the load job
    print(f"Starting BigQuery load job from {gcs_file_path} to {table_id} using client library...")
    try:
        load_job = client.load_table_from_uri(
            gcs_file_path, table_id, job_config=job_config
        )
        load_job.result()

        print(f"✅ Successfully loaded {load_job.output_rows} rows into BigQuery table: {table_id}")
        return True
    except Exception as e:
        print(f"❌ Error during BigQuery load job (client library): {e}")
        return False

def delete_gcs_file(gcs_file_path):
    """Deletes a file from Google Cloud Storage using gsutil."""
    try:
        command = ["gsutil", "rm", gcs_file_path]
        subprocess.run(command, check=True, capture_output=True, text=True)
        print(f"Successfully deleted {gcs_file_path} from GCS")
        return True
    except subprocess.CalledProcessError as e:
        print(f"Error deleting GCS file: {e.stderr}")
        return False

def run_data_pipeline():
    """Runs the entire data pipeline: export from Postgres, upload to GCS, load to BigQuery, and delete from GCS."""
    today = date.today()
    start_date_for_file_name = today - timedelta(days=2)

    print(f"Running data pipeline for payments updated since {start_date_for_file_name} (approximately yesterday and day before)...")

    if not export_yesterdays_payments_to_csv(OUTPUT_CSV_PATH):
        print("Pipeline failed: Error exporting data from PostgreSQL.")
        return

    gcs_file_path = f"gs://{GCS_BUCKET_NAME}/raw/{start_date_for_file_name.strftime('%Y%m%d')}_oltp_payments.csv"
    if not upload_to_gcs(OUTPUT_CSV_PATH, GCS_BUCKET_NAME, gcs_file_path):
        print("Pipeline failed: Error uploading to GCS.")
        return

    if not load_to_bigquery_with_client_library(gcs_file_path, BIGQUERY_DATASET_NAME, BIGQUERY_TABLE_NAME):
        print("Pipeline failed: Error loading to BigQuery.")
        return

    if delete_gcs_file(gcs_file_path):
        print("Successfully deleted file from GCS.")
    else:
        print("Warning: Failed to delete file from GCS. This should be investigated.")

    print("Pipeline completed successfully.")

if __name__ == "__main__":
    run_data_pipeline()

Running data pipeline for payments updated since 2025-06-25 (approximately yesterday and day before)...
Successfully exported 3051 rows to /Users/abdullahajmal/Abdullah@Ajar/BigQueryUpload/oltp_payments.csv
Successfully uploaded /Users/abdullahajmal/Abdullah@Ajar/BigQueryUpload/oltp_payments.csv to gs://ajar-bigquery-staging-bucket/raw/20250625_oltp_payments.csv
Schema being sent to BigQuery:
  - id: INTEGER (mode=REQUIRED)
  - reference_id: STRING (mode=NULLABLE)
  - internal_id: STRING (mode=NULLABLE)
  - broker_id: INTEGER (mode=NULLABLE)
  - contact_id: INTEGER (mode=NULLABLE)
  - user_id: INTEGER (mode=NULLABLE)
  - payment_method_id: INTEGER (mode=NULLABLE)
  - currency: STRING (mode=NULLABLE)
  - offline: BOOLEAN (mode=NULLABLE)
  - extra: STRING (mode=NULLABLE)
  - amount: FLOAT (mode=NULLABLE)
  - refunded_amount: FLOAT (mode=NULLABLE)
  - status: STRING (mode=NULLABLE)
  - card_bin_id: INTEGER (mode=NULLABLE)
  - ip_address: STRING (mode=NULLABLE)
  - by_payment_link: BOOLEAN

In [29]:
import psycopg2
import csv
import subprocess
from datetime import date, timedelta
from google.cloud import bigquery
from google.oauth2 import service_account

# Configuration (Hardcoded - BEST PRACTICE: use environment variables)
DB_HOST = "34.93.7.102"
DB_NAME = "ajar"
DB_USER = "tech"
DB_PASSWORD = ">aRSIeB(C,gHuo1|"
GCS_BUCKET_NAME = "ajar-bigquery-staging-bucket"

BIGQUERY_PROJECT_ID = "ajar-kw"
BIGQUERY_DATASET_NAME = "payments_data"
BIGQUERY_TABLE_NAME = "oltp_payments"
OUTPUT_CSV_PATH = "/Users/abdullahajmal/Abdullah@Ajar/BigQueryUpload/oltp_payments.csv"

BIGQUERY_SERVICE_ACCOUNT_KEY_PATH = "/Users/abdullahajmal/Abdullah@Ajar/BigQueryUpload/pg-bigquery-pipeline.json"

def connectionNew():
    """Establishes a connection to the PostgreSQL database."""
    try:
        connection = psycopg2.connect(host=DB_HOST, database=DB_NAME, user=DB_USER, password=DB_PASSWORD)
        return connection
    except psycopg2.Error as e:
        print(f"Error connecting to PostgreSQL: {e}")
        return None

def query_executeNew(connection, query):
    """Executes a query on the given connection and returns the results."""
    try:
        cursor = connection.cursor()
        cursor.execute(query)
        return cursor.fetchall(), cursor.description
    except psycopg2.Error as e:
        print(f"Error executing query: {e}")
        return [], None

def export_yesterdays_payments_to_csv(output_csv_path):
    """Exports payments data updated between last two days from PostgreSQL to a CSV file."""
    conn = None
    try:
        conn = connectionNew()
        if conn is None:
            return False

        query = f"""
            SELECT *
            FROM payments
            WHERE (updated_at + interval '3 hours') >= date_trunc('day', current_date - interval '7 days')
            AND (updated_at + interval '3 hours') < date_trunc('day', current_date);
        """
        records, description = query_executeNew(conn, query)
        if not records:
            print("No records found for yesterday.")
            return True

        with open(output_csv_path, 'w', newline='', encoding='utf-8') as csvfile:
            csv_writer = csv.writer(csvfile)
            if description:
                column_names = [desc[0] for desc in description]
                csv_writer.writerow(column_names)
            csv_writer.writerows(records)
        print(f"Successfully exported {len(records)} rows to {output_csv_path}")
        return True

    except Exception as e:
        print(f"Error during PostgreSQL export: {e}")
        return False
    finally:
        if conn:
            conn.close()

def upload_to_gcs(local_file_path, gcs_bucket_name, gcs_file_path):
    """Uploads a file to Google Cloud Storage using gsutil."""
    try:
        command = ["gsutil", "cp", local_file_path, gcs_file_path]
        subprocess.run(command, check=True, capture_output=True, text=True)
        print(f"Successfully uploaded {local_file_path} to {gcs_file_path}")
        return True
    except subprocess.CalledProcessError as e:
        print(f"Error during GCS upload: {e.stderr}")
        return False

# --- MODIFIED: Added column_name parameter for specific mapping overrides ---
def map_postgres_type_to_bq(postgres_type, column_name=None):
    """Maps PostgreSQL data types to BigQuery data types, with specific overrides."""
    type_mapping = {
        "integer": "INTEGER", "bigint": "INTEGER", "smallint": "INTEGER",
        "numeric": "NUMERIC", "decimal": "NUMERIC",
        "real": "FLOAT", "double precision": "FLOAT", # Default float mapping for these PG types

        "text": "STRING", "character varying": "STRING", "varchar": "STRING",
        "char": "STRING", "character": "STRING",
        "timestamp with time zone": "TIMESTAMP", "timestamp without time zone": "TIMESTAMP",
        "date": "DATE", "time with time zone": "TIME", "time without time zone": "TIME",
        "boolean": "BOOLEAN", "bytea": "BYTES", "json": "STRING", "jsonb": "STRING"
    }

    # --- Specific override for 'amount' column to ensure it's NUMERIC in BQ ---
    if column_name == 'amount' or column_name == 'refunded_amount':
        # If your BigQuery 'amount' column is already NUMERIC,
        # we must force the incoming schema to also declare it as NUMERIC.
        return "NUMERIC"
    # ----------------------------------------------------------------------

    return type_mapping.get(postgres_type, "STRING") # Default to STRING if not found

def load_to_bigquery_with_client_library(gcs_file_path, bigquery_dataset_name, bigquery_table_name):
    """Loads data from GCS to BigQuery using the Python client library."""
    try:
        credentials = service_account.Credentials.from_service_account_file(
            BIGQUERY_SERVICE_ACCOUNT_KEY_PATH
        )
        client = bigquery.Client(credentials=credentials, project=BIGQUERY_PROJECT_ID)
    except Exception as e:
        print(f"❌ Error initializing BigQuery client with credentials: {e}")
        return False

    table_id = f"{bigquery_dataset_name}.{bigquery_table_name}"

    # 1. Fetch schema from Postgres, including nullability
    conn = None
    try:
        conn = connectionNew()
        if conn is None:
            print("Failed to connect to PostgreSQL for schema retrieval.")
            return False

        query = f"""
            SELECT column_name, data_type, is_nullable
            FROM information_schema.columns
            WHERE table_name = 'payments'
            AND table_schema = 'public';
        """
        cursor = conn.cursor()
        cursor.execute(query)
        schema_list_pg = cursor.fetchall()
        if not schema_list_pg:
            print("Could not retrieve schema from database. BigQuery load will fail.")
            return False

        schema_bq_fields = []
        for col_name, pg_type, is_nullable_str in schema_list_pg:
            # --- MODIFIED: Pass column_name to map_postgres_type_to_bq ---
            bq_type = map_postgres_type_to_bq(pg_type, column_name=col_name)
            # -------------------------------------------------------------
            bq_mode = 'NULLABLE' if is_nullable_str == 'YES' else 'REQUIRED'
            schema_bq_fields.append(bigquery.SchemaField(col_name, bq_type, mode=bq_mode))

        print("Schema being sent to BigQuery:")
        for field in schema_bq_fields:
            print(f"  - {field.name}: {field.field_type} (mode={field.mode})")

    except Exception as e:
        print(f"Error retrieving schema from PostgreSQL: {e}")
        return False
    finally:
        if conn:
            conn.close()

    # 2. Configure the BigQuery load job
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV,
        skip_leading_rows=1,
        write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
        schema_update_options=[bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION],
        schema=schema_bq_fields,
    )

    # 3. Start the load job
    print(f"Starting BigQuery load job from {gcs_file_path} to {table_id} using client library...")
    try:
        load_job = client.load_table_from_uri(
            gcs_file_path, table_id, job_config=job_config
        )
        load_job.result()

        print(f"✅ Successfully loaded {load_job.output_rows} rows into BigQuery table: {table_id}")
        return True
    except Exception as e:
        print(f"❌ Error during BigQuery load job (client library): {e}")
        return False

def delete_gcs_file(gcs_file_path):
    """Deletes a file from Google Cloud Storage using gsutil."""
    try:
        command = ["gsutil", "rm", gcs_file_path]
        subprocess.run(command, check=True, capture_output=True, text=True)
        print(f"Successfully deleted {gcs_file_path} from GCS")
        return True
    except subprocess.CalledProcessError as e:
        print(f"Error deleting GCS file: {e.stderr}")
        return False

def run_data_pipeline():
    """Runs the entire data pipeline: export from Postgres, upload to GCS, load to BigQuery, and delete from GCS."""
    today = date.today()
    start_date_for_file_name = today - timedelta(days=2)

    print(f"Running data pipeline for payments updated since {start_date_for_file_name} (approximately yesterday and day before)...")

    if not export_yesterdays_payments_to_csv(OUTPUT_CSV_PATH):
        print("Pipeline failed: Error exporting data from PostgreSQL.")
        return

    gcs_file_path = f"gs://{GCS_BUCKET_NAME}/raw/{start_date_for_file_name.strftime('%Y%m%d')}_oltp_payments.csv"
    if not upload_to_gcs(OUTPUT_CSV_PATH, GCS_BUCKET_NAME, gcs_file_path):
        print("Pipeline failed: Error uploading to GCS.")
        return

    if not load_to_bigquery_with_client_library(gcs_file_path, BIGQUERY_DATASET_NAME, BIGQUERY_TABLE_NAME):
        print("Pipeline failed: Error loading to BigQuery.")
        return

    if delete_gcs_file(gcs_file_path):
        print("Successfully deleted file from GCS.")
    else:
        print("Warning: Failed to delete file from GCS. This should be investigated.")

    print("Pipeline completed successfully.")

if __name__ == "__main__":
    run_data_pipeline()

Running data pipeline for payments updated since 2025-07-01 (approximately yesterday and day before)...
Successfully exported 9741 rows to /Users/abdullahajmal/Abdullah@Ajar/BigQueryUpload/oltp_payments.csv
Successfully uploaded /Users/abdullahajmal/Abdullah@Ajar/BigQueryUpload/oltp_payments.csv to gs://ajar-bigquery-staging-bucket/raw/20250701_oltp_payments.csv
Schema being sent to BigQuery:
  - id: INTEGER (mode=REQUIRED)
  - reference_id: STRING (mode=NULLABLE)
  - internal_id: STRING (mode=NULLABLE)
  - broker_id: INTEGER (mode=NULLABLE)
  - contact_id: INTEGER (mode=NULLABLE)
  - user_id: INTEGER (mode=NULLABLE)
  - payment_method_id: INTEGER (mode=NULLABLE)
  - currency: STRING (mode=NULLABLE)
  - offline: BOOLEAN (mode=NULLABLE)
  - extra: STRING (mode=NULLABLE)
  - amount: NUMERIC (mode=NULLABLE)
  - refunded_amount: NUMERIC (mode=NULLABLE)
  - status: STRING (mode=NULLABLE)
  - card_bin_id: INTEGER (mode=NULLABLE)
  - ip_address: STRING (mode=NULLABLE)
  - by_payment_link: BOO

In [28]:
import psycopg2
import csv
import subprocess
from datetime import date, timedelta
from google.cloud import bigquery
from google.oauth2 import service_account

# Configuration (Hardcoded - BEST PRACTICE: use environment variables)
DB_HOST = "34.93.7.102"
DB_NAME = "ajar"
DB_USER = "tech"
DB_PASSWORD = ">aRSIeB(C,gHuo1|"
GCS_BUCKET_NAME = "ajar-bigquery-staging-bucket"

BIGQUERY_PROJECT_ID = "ajar-kw"
BIGQUERY_DATASET_NAME = "raw_accounts"
BIGQUERY_TABLE_NAME = "customer_accounts"
OUTPUT_CSV_PATH = "/Users/abdullahajmal/Abdullah@Ajar/BigQueryUpload/customer_accounts.csv"

BIGQUERY_SERVICE_ACCOUNT_KEY_PATH = "/Users/abdullahajmal/Abdullah@Ajar/BigQueryUpload/pg-bigquery-pipeline.json"

def connectionNew():
    """Establishes a connection to the PostgreSQL database."""
    try:
        connection = psycopg2.connect(host=DB_HOST, database=DB_NAME, user=DB_USER, password=DB_PASSWORD)
        return connection
    except psycopg2.Error as e:
        print(f"Error connecting to PostgreSQL: {e}")
        return None

def query_executeNew(connection, query):
    """Executes a query on the given connection and returns the results."""
    try:
        cursor = connection.cursor()
        cursor.execute(query)
        return cursor.fetchall(), cursor.description
    except psycopg2.Error as e:
        print(f"Error executing query: {e}")
        return [], None

def export_yesterdays_payments_to_csv(output_csv_path):
    """Exports Accounts data updated as specified from PostgreSQL to a CSV file."""
    conn = None
    try:
        conn = connectionNew()
        if conn is None:
            return False

        query = f"""
            SELECT *
            FROM accounts
            WHERE (updated_at + interval '3 hours') >= date_trunc('day', current_date - interval '3 months')
            AND (updated_at + interval '3 hours') < date_trunc('day', current_date);
        """
        records, description = query_executeNew(conn, query)
        if not records:
            print("No records found for this range.")
            return True

        with open(output_csv_path, 'w', newline='', encoding='utf-8') as csvfile:
            csv_writer = csv.writer(csvfile)
            if description:
                column_names = [desc[0] for desc in description]
                csv_writer.writerow(column_names)
            csv_writer.writerows(records)
        print(f"Successfully exported {len(records)} rows to {output_csv_path}")
        return True

    except Exception as e:
        print(f"Error during PostgreSQL export: {e}")
        return False
    finally:
        if conn:
            conn.close()

def upload_to_gcs(local_file_path, gcs_bucket_name, gcs_file_path):
    """Uploads a file to Google Cloud Storage using gsutil."""
    try:
        command = ["gsutil", "cp", local_file_path, gcs_file_path]
        subprocess.run(command, check=True, capture_output=True, text=True)
        print(f"Successfully uploaded {local_file_path} to {gcs_file_path}")
        return True
    except subprocess.CalledProcessError as e:
        print(f"Error during GCS upload: {e.stderr}")
        return False

# --- MODIFIED: Added column_name parameter for specific mapping overrides ---
def map_postgres_type_to_bq(postgres_type, column_name=None):
    """Maps PostgreSQL data types to BigQuery data types, with specific overrides."""
    type_mapping = {
        "integer": "INTEGER", "bigint": "INTEGER", "smallint": "INTEGER",
        "numeric": "NUMERIC", "decimal": "NUMERIC",
        "real": "FLOAT", "double precision": "FLOAT", # Default float mapping for these PG types

        "text": "STRING", "character varying": "STRING", "varchar": "STRING",
        "char": "STRING", "character": "STRING",
        "timestamp with time zone": "TIMESTAMP", "timestamp without time zone": "TIMESTAMP",
        "date": "DATE", "time with time zone": "TIME", "time without time zone": "TIME",
        "boolean": "BOOLEAN", "bytea": "BYTES", "json": "STRING", "jsonb": "STRING"
    }

    # --- Specific override for 'company_name' column to ensure it's NUMERIC in BQ ---
    if column_name == 'company_name':
    #  or column_name == 'company_name':

    #     # If your BigQuery 'amount' column is already NUMERIC,
    #     # we must force the incoming schema to also declare it as NUMERIC.
        return "STRING"
    # ----------------------------------------------------------------------

    return type_mapping.get(postgres_type, "STRING") # Default to STRING if not found

def load_to_bigquery_with_client_library(gcs_file_path, bigquery_dataset_name, bigquery_table_name):
    """Loads data from GCS to BigQuery using the Python client library."""
    try:
        credentials = service_account.Credentials.from_service_account_file(
            BIGQUERY_SERVICE_ACCOUNT_KEY_PATH
        )
        client = bigquery.Client(credentials=credentials, project=BIGQUERY_PROJECT_ID)
    except Exception as e:
        print(f"❌ Error initializing BigQuery client with credentials: {e}")
        return False

    table_id = f"{bigquery_dataset_name}.{bigquery_table_name}"

    # 1. Fetch schema from Postgres, including nullability
    conn = None
    try:
        conn = connectionNew()
        if conn is None:
            print("Failed to connect to PostgreSQL for schema retrieval.")
            return False

        query = f"""
            SELECT column_name, data_type, is_nullable
            FROM information_schema.columns
            WHERE table_name = 'accounts'
            AND table_schema = 'public';
        """
        cursor = conn.cursor()
        cursor.execute(query)
        schema_list_pg = cursor.fetchall()
        if not schema_list_pg:
            print("Could not retrieve schema from database. BigQuery load will fail.")
            return False

        schema_bq_fields = []
        for col_name, pg_type, is_nullable_str in schema_list_pg:
            # --- MODIFIED: Pass column_name to map_postgres_type_to_bq ---
            bq_type = map_postgres_type_to_bq(pg_type, column_name=col_name)
            # -------------------------------------------------------------
            bq_mode = 'NULLABLE' if is_nullable_str == 'YES' else 'REQUIRED'
            schema_bq_fields.append(bigquery.SchemaField(col_name, bq_type, mode=bq_mode))

        print("Schema being sent to BigQuery:")
        for field in schema_bq_fields:
            print(f"  - {field.name}: {field.field_type} (mode={field.mode})")

    except Exception as e:
        print(f"Error retrieving schema from PostgreSQL: {e}")
        return False
    finally:
        if conn:
            conn.close()

    # 2. Configure the BigQuery load job
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV,
        skip_leading_rows=1,
        write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
        schema_update_options=[bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION],
        schema=schema_bq_fields,
    )

    # 3. Start the load job
    print(f"Starting BigQuery load job from {gcs_file_path} to {table_id} using client library...")
    try:
        load_job = client.load_table_from_uri(
            gcs_file_path, table_id, job_config=job_config
        )
        load_job.result()

        print(f"✅ Successfully loaded {load_job.output_rows} rows into BigQuery table: {table_id}")
        return True
    except Exception as e:
        print(f"❌ Error during BigQuery load job (client library): {e}")
        return False

def delete_gcs_file(gcs_file_path):
    """Deletes a file from Google Cloud Storage using gsutil."""
    try:
        command = ["gsutil", "rm", gcs_file_path]
        subprocess.run(command, check=True, capture_output=True, text=True)
        print(f"Successfully deleted {gcs_file_path} from GCS")
        return True
    except subprocess.CalledProcessError as e:
        print(f"Error deleting GCS file: {e.stderr}")
        return False

def run_data_pipeline():
    """Runs the entire data pipeline: export from Postgres, upload to GCS, load to BigQuery, and delete from GCS."""
    today = date.today()
    start_date_for_file_name = today - timedelta(days=60)

    print(f"Running data pipeline for accounts updated since {start_date_for_file_name} (teh time specified)...")

    if not export_yesterdays_payments_to_csv(OUTPUT_CSV_PATH):
        print("Pipeline failed: Error exporting data from PostgreSQL.")
        return

    gcs_file_path = f"gs://{GCS_BUCKET_NAME}/raw/{start_date_for_file_name.strftime('%Y%m%d')}_customer_accounts.csv"
    if not upload_to_gcs(OUTPUT_CSV_PATH, GCS_BUCKET_NAME, gcs_file_path):
        print("Pipeline failed: Error uploading to GCS.")
        return

    if not load_to_bigquery_with_client_library(gcs_file_path, BIGQUERY_DATASET_NAME, BIGQUERY_TABLE_NAME):
        print("Pipeline failed: Error loading to BigQuery.")
        return

    if delete_gcs_file(gcs_file_path):
        print("Successfully deleted file from GCS.")
    else:
        print("Warning: Failed to delete file from GCS. This should be investigated.")

    print("Pipeline completed successfully.")

if __name__ == "__main__":
    run_data_pipeline()

Running data pipeline for accounts updated since 2025-05-04 (teh time specified)...
Successfully exported 124 rows to /Users/abdullahajmal/Abdullah@Ajar/BigQueryUpload/customer_accounts.csv
Successfully uploaded /Users/abdullahajmal/Abdullah@Ajar/BigQueryUpload/customer_accounts.csv to gs://ajar-bigquery-staging-bucket/raw/20250504_customer_accounts.csv
Schema being sent to BigQuery:
  - id: INTEGER (mode=REQUIRED)
  - name: STRING (mode=NULLABLE)
  - logo: STRING (mode=NULLABLE)
  - kyc_status: STRING (mode=NULLABLE)
  - is_registered_company: BOOLEAN (mode=NULLABLE)
  - is_on_behalf: BOOLEAN (mode=NULLABLE)
  - company_name: STRING (mode=NULLABLE)
  - created_by: INTEGER (mode=NULLABLE)
  - updated_by: INTEGER (mode=NULLABLE)
  - archived_at: TIMESTAMP (mode=NULLABLE)
  - created_at: TIMESTAMP (mode=NULLABLE)
  - updated_at: TIMESTAMP (mode=NULLABLE)
  - contact_email: STRING (mode=NULLABLE)
  - contact_phone: STRING (mode=NULLABLE)
  - owner_user_id: INTEGER (mode=NULLABLE)
Starting

In [7]:
# new platform production URL
def connectionNew():
    host_name="34.93.7.102"
    db_user="tech"
    db_password=">aRSIeB(C,gHuo1|"
    db_name="ajar"
    connection = db_connect.connect(host=host_name,user=db_user,password=db_password,database=db_name)
    return connection

def query_executeNew(query):
    
    cursor = connectionNew()
    cursor.execute(query)
    return cursor.fetchall()

In [17]:
# Output CSV file path
OUTPUT_CSV_PATH = "/Users/abdullahajmal/Abdullah@Ajar/BigQueryUpload/yesterday_payments.csv"

def connectionNew():
    """Establishes a connection to the PostgreSQL database."""
    host_name = "34.93.7.102"
    db_user = "tech"
    db_password = ">aRSIeB(C,gHuo1|"
    db_name = "ajar"
    try:
        connection = db_connect.connect(
            host=host_name, user=db_user, password=db_password, database=db_name
        )
        return connection
    except psycopg2.Error as e:
        print(f"Error connecting to PostgreSQL: {e}")
        return None  # Important: Return None on failure

def query_executeNew(connection, query):
    """Executes a query on the given connection and returns the results."""
    try:
        cursor = connection.cursor()
        cursor.execute(query)
        return cursor.fetchall(), cursor.description  # Return both data and description
    except psycopg2.Error as e:
        print(f"Error executing query: {e}")
        return [], None  # Important: Return empty list and None on failure
    

def export_yesterdays_payments_to_csv(output_csv_path):
    """Exports yesterday's payments data from PostgreSQL to a CSV file.

    Args:
        output_csv_path (str): The path to the output CSV file.
    """
    conn = None  # Initialize conn outside the try block
    try:
        # Connect to PostgreSQL
        conn = connectionNew()
        if conn is None:
            return  # Exit if connection failed

        # Query for yesterday's records with 3-hour offset
        query = f"""
            SELECT
                *
            FROM payments
            WHERE (created_at + interval '3 hours') >= date_trunc('day', current_date - interval '1 day')
            AND (created_at + interval '3 hours') < date_trunc('day', current_date);
        """
        records, description = query_executeNew(conn, query)  # Get data and description
        if records is None:
            return # Exit if the query failed

        # Write to CSV
        with open(output_csv_path, 'w', newline='', encoding='utf-8') as csvfile: # Added encoding
            csv_writer = csv.writer(csvfile)
            if description: #check if description exists
                # Write header 
                column_names = [desc[0] for desc in description]
                csv_writer.writerow(column_names)
            # Write data rows
            csv_writer.writerows(records)

        print(f"Successfully exported {len(records)} rows to {output_csv_path}")

    except Exception as e:
        print(f"Error during PostgreSQL export: {e}")

    finally:
        if conn:
            conn.close()  # Use conn.close()

if __name__ == "__main__":
    export_yesterdays_payments_to_csv(OUTPUT_CSV_PATH)

Successfully exported 582 rows to /Users/abdullahajmal/Abdullah@Ajar/BigQueryUpload/yesterday_payments.csv
