In [None]:
import os
from google.cloud import bigquery
import pandas as pd
from typing import List, Dict, Any

# --- Configuration ---
# NOTE: This script assumes you have authenticated with Google Cloud.
# If running locally, you can typically run 'gcloud auth application-default login'
# in your terminal first.

PROJECT_ID = "celtic-fact-367202"
DATASET_ID = "test"
SOURCE_TABLE_ID = "pokemon"
DESTINATION_TABLE_ID = "new_pokemon_data" # New table to demonstrate the push/upload function

def initialize_client() -> bigquery.Client | None:
    """Initializes and returns the BigQuery client, or None on failure."""
    try:
        # Initialize the BigQuery client
        client = bigquery.Client()
        print("BigQuery client initialized successfully.")
        return client
    except Exception as e:
        print("Failed to initialize BigQuery client.")
        print("Please ensure your Google Cloud environment is authenticated.")
        print(f"Error: {e}")
        return None


def fetch_data(client: bigquery.Client, table_id: str) -> pd.DataFrame:
    """
    Fetches all data from the specified BigQuery table into a Pandas DataFrame.
    """
    full_table_path = f"{PROJECT_ID}.{DATASET_ID}.{table_id}"
    print(f"Fetching data from: {full_table_path}...")

    # A simple SQL query to select all data
    sql_query = f"""
        SELECT *
        FROM `{full_table_path}`
        LIMIT 100  -- Limiting the fetch for demonstration purposes
    """

    try:
        # The to_dataframe() method handles the job execution and result fetching
        df = client.query(sql_query).to_dataframe()
        print(f"Successfully fetched {len(df)} rows.")
        return df
    except Exception as e:
        print(f"An error occurred during data fetch: {e}")
        return pd.DataFrame()


def generate_mock_data() -> pd.DataFrame:
    """
    Generates a mock DataFrame that conforms to the 'pokemon' table schema
    for the purpose of demonstrating the 'push' operation.
    """
    print("Generating mock data for upload...")
    new_data = [
        {'#': 1001, 'Name': 'Pikachu Prime', 'Type_1': 'Electric', 'Type_2': None, 'HP': 150, 'Attack': 120, 'Defense': 90, 'Sp_Atk': 150, 'Sp_Def': 90, 'Speed': 180, 'Generation': 10, 'Legendary': True},
        {'#': 1002, 'Name': 'Bulbasaur Beta', 'Type_1': 'Grass', 'Type_2': 'Poison', 'HP': 100, 'Attack': 70, 'Defense': 70, 'Sp_Atk': 80, 'Sp_Def': 100, 'Speed': 60, 'Generation': 10, 'Legendary': False},
    ]

    # The columns MUST match the schema of the target BigQuery table
    columns = ['#', 'Name', 'Type_1', 'Type_2', 'HP', 'Attack', 'Defense', 'Sp_Atk', 'Sp_Def', 'Speed', 'Generation', 'Legendary']
    df = pd.DataFrame(new_data, columns=columns)
    return df


def push_data(client: bigquery.Client, df: pd.DataFrame, table_id: str):
    """
    Pushes data from a Pandas DataFrame to a specified BigQuery table.
    It will create the table if it does not exist, and APPANDS new data.
    """
    full_table_path = f"{PROJECT_ID}.{DATASET_ID}.{table_id}"
    print(f"\nAttempting to push {len(df)} rows to: {full_table_path}...")

    # write_disposition=bigquery.WriteDisposition.WRITE_APPEND tells BigQuery 
    # to add the new rows to the existing table content.
    job_config = bigquery.LoadJobConfig(
        write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
    )

    try:
        # Start the load job
        job = client.load_table_from_dataframe(
            df, full_table_path, job_config=job_config
        )
        job.result()  # Wait for the job to complete

        table = client.get_table(full_table_path)
        print(f"Successfully appended {len(df)} rows into {full_table_path}.")
    except Exception as e:
        print(f"An error occurred during data push: {e}")


def run_bigquery_pipeline(client: bigquery.Client):
    """Executes the full BigQuery data pipeline: fetch, process (mock), and push."""
    # 1. GET DATA
    # Fetch existing data from the source table (pokemon)
    pokemon_df = fetch_data(client, SOURCE_TABLE_ID)

    if pokemon_df.empty:
        print("Could not proceed without fetching data.")
        return
    
    print("\n--- Sample of Fetched Data ---")
    print(pokemon_df.head())

    # 2. PUSH DATA
    # Create the new data to push
    new_pokemon_data_df = generate_mock_data()

    # Upload the new data to the destination table, using the APPEND disposition
    push_data(client, new_pokemon_data_df, DESTINATION_TABLE_ID)


# def main():
#     """Main function to start the pipeline."""
#     print("Starting BigQuery Data Pipeline...")
#     client = initialize_client()
#     if client:
#         run_bigquery_pipeline(client)


# if __name__ == "__main__":
#     main()




In [2]:
# 1. Initialize the client (using the new function)
client = initialize_client()

# 2. Call the function with the required arguments
if client:
    # Use the SOURCE_TABLE_ID defined in the script
    df = fetch_data(client, SOURCE_TABLE_ID) 
    
df

BigQuery client initialized successfully.
Fetching data from: celtic-fact-367202.test.pokemon...




Successfully fetched 100 rows.


Unnamed: 0,#,Name,Type 1,Type 2,HP,Attack,Defense,Sp_Atk,Sp_Def,Speed,Generation,Legendary
0,1,Bulbasaur,Grass,Poison,45,49,49,65,65,45,1,False
1,2,Ivysaur,Grass,Poison,60,62,63,80,80,60,1,False
2,3,Venusaur,Grass,Poison,80,82,83,100,100,80,1,False
3,3,VenusaurMega Venusaur,Grass,Poison,80,100,123,122,120,80,1,False
4,4,Charmander,Fire,,39,52,43,60,50,65,1,False
...,...,...,...,...,...,...,...,...,...,...,...,...
95,88,Grimer,Poison,,80,80,50,40,50,25,1,False
96,89,Muk,Poison,,105,105,75,65,100,50,1,False
97,90,Shellder,Water,,30,65,100,45,25,40,1,False
98,91,Cloyster,Water,Ice,50,95,180,85,45,70,1,False
