In [None]:
import pandas as pd
from google.cloud import bigquery
from google.oauth2 import service_account
from google.api_core.exceptions import Conflict

PROJECT_ID = "trim-plexus-396409"
DATASET_ID = "BigQuery_ETL_Assignment"
TABLE_ID = "Customer_Orders"
CSV_PATH = r"E:\BigQueryAssignment\Customer_Orders.csv"

key_path = r"E:\BigQueryAssignment\trim-plexus-396409-dfc55c39f51e.json" 
credentials = service_account.Credentials.from_service_account_file(key_path)
client = bigquery.Client(credentials=credentials, project=credentials.project_id)

print("Extracting data from CSV...")
df = pd.read_csv(CSV_PATH, encoding="ISO-8859-1")

print("Data extracted. Sample:")
print(df.head())

print("Transforming data...")

df = df.drop_duplicates()
df["Order_Price"] = df["Order_Price"].fillna(df["Order_Price"].mean())

from datetime import datetime
df["load_timestamp"] = datetime.now()

print("Transformation complete. Sample:")
print(df.head())

dataset_ref = bigquery.Dataset(f"{PROJECT_ID}.{DATASET_ID}")

try:
    client.create_dataset(dataset_ref)
    print(f"Dataset '{DATASET_ID}' created.")
except Conflict:
    print(f"Dataset '{DATASET_ID}' already exists.")

schema = [
    bigquery.SchemaField("Customer_Id", "INTEGER"),
    bigquery.SchemaField("Customer_Name", "STRING"),
    bigquery.SchemaField("Customer_Location", "STRING"),
    bigquery.SchemaField("Order_ID", "INTEGER"),
    bigquery.SchemaField("Order_Quantity", "INTEGER"),
    bigquery.SchemaField("Order_Price", "FLOAT"),
]

table_ref = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"
table = bigquery.Table(table_ref, schema=schema)

try:
    client.create_table(table)
    print(f"Table '{TABLE_ID}' created.")
except Conflict:
    print(f"Table '{TABLE_ID}' already exists.")

print("Loading transformed data into BigQuery...")

job_config = bigquery.LoadJobConfig(
    write_disposition="WRITE_APPEND", 
    schema_update_options=[
        bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION,
    ]
)

job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
job.result()

print(f"Successfully loaded {len(df)} rows into {table_ref}.")

📥 Extracting data from CSV...
✅ Data extracted. Sample:
   Customer_Id     Customer_Name           Customer_Location  Order_ID  \
0           10     Mary Vega DDS         China, Beijing Shi       2268   
1           20     Brandon Myers  193, Bannerghatta Main Rd       3082   
2           30    Margaret Wells            Behrenstraße 42       3160   
3           40  Michael Matthews            Behrenstraße 42       1272   
4           50   Connor Williams  Floreasca Park 43 Soseaua       9447   

   Order_Quantity  Order_Price  
0               5        16.52  
1               4        17.27  
2               1         3.37  
3               5         2.20  
4               1        12.23  
🔄 Transforming data...
✅ Transformation complete. Sample:
   Customer_Id     Customer_Name           Customer_Location  Order_ID  \
0           10     Mary Vega DDS         China, Beijing Shi       2268   
1           20     Brandon Myers  193, Bannerghatta Main Rd       3082   
2           30    Mar



✅ Successfully loaded 50 rows into trim-plexus-396409.BigQuery_ETL_Assignment.Customer_Orders.
