In [2]:
import pandas as pd
import requests
from google.cloud import bigquery, storage
from io import BytesIO

# --- Configuration ---
PROJECT_ID = "hallowed-winter-469505-j6"
DATASET_ID = "semantic_ads_targeting"
TABLE_ID = "target_ads"
BUCKET_NAME = "semantic-product-images"

# --- Initialize Clients ---
bq_client = bigquery.Client()
storage_client = storage.Client()
bucket = storage_client.bucket(BUCKET_NAME)

# 1. Fetch ad data from BigQuery
print("Fetching ads from BigQuery...")
query = f"SELECT ad_id, product_id, ad_image_url FROM `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}`"
df = bq_client.query(query).to_dataframe()

new_rows = []
print(f"Found {len(df)} ads to process.")

# 2. Loop, Download, and Upload
for index, row in df.iterrows():
    http_url = row['ad_image_url']
    # Use product_id or ad_id for a unique filename. Add '.jpg' extension.
    file_name = f"{row['product_id']}.jpg"
    gcs_uri = f"gs://{BUCKET_NAME}/{file_name}"

    try:
        # Download the image
        response = requests.get(http_url, stream=True, timeout=10)
        response.raise_for_status()  # Raise an exception for bad status codes

        # Upload to GCS
        blob = bucket.blob(file_name)
        blob.upload_from_file(BytesIO(response.content), content_type='image/jpeg')

        # Store the new GCS URI
        new_row = row.to_dict()
        new_row['gcs_image_uri'] = gcs_uri
        new_rows.append(new_row)

        if (index + 1) % 50 == 0:
            print(f"Processed {index + 1}/{len(df)} images...")

    except requests.exceptions.RequestException as e:
        print(f"Failed to download {http_url}: {e}")
    except Exception as e:
        print(f"An error occurred for ad_id {row['ad_id']}: {e}")

print("Image migration complete.")

# 3. Create an updated CSV file
updated_df = pd.DataFrame(new_rows)
updated_df.to_csv("target_ads_with_gcs_uris.csv", index=False)
print("Saved updated ad data to 'target_ads_with_gcs_uris.csv'")
print("You should now upload this new CSV to a BigQuery table.")

Fetching ads from BigQuery...
Found 427786 ads to process.
Processed 50/427786 images...
Processed 100/427786 images...
Processed 150/427786 images...
Processed 200/427786 images...
Processed 250/427786 images...
Processed 300/427786 images...
Processed 350/427786 images...
Processed 400/427786 images...
Processed 450/427786 images...



KeyboardInterrupt



In [None]:
import pandas as pd
import requests
from google.cloud import bigquery, storage
from io import BytesIO
from concurrent.futures import ProcessPoolExecutor, as_completed
from tqdm import tqdm # A library for progress bars, pip install tqdm

# --- Configuration ---
PROJECT_ID = "hallowed-winter-469505-j6"
DATASET_ID = "semantic_ads_targeting"
TABLE_ID = "target_ads"
BUCKET_NAME = "semantic-product-images"  # Your GCS bucket name
MAX_WORKERS = 16  # Adjust based on your machine's cores (e.g., 16 for your notebook)

# --- Initialize GCS Client (outside the function for efficiency) ---
# Note: BQ client can't be passed between processes easily, but storage client is fine.
storage_client = storage.Client()
bucket = storage_client.bucket(BUCKET_NAME)

def process_image(ad_row):
    """
    Function to process a single ad: download, upload, and return the new URI.
    This function will be run by each worker process.
    """
    http_url = ad_row['ad_image_url']
    file_name = f"{ad_row['product_id']}.jpg"
    gcs_uri = f"gs://{BUCKET_NAME}/{file_name}"

    try:
        # Download the image
        response = requests.get(http_url, stream=True, timeout=20)
        response.raise_for_status()

        # Upload to GCS
        blob = bucket.blob(file_name)
        blob.upload_from_file(BytesIO(response.content), content_type='image/jpeg')

        # Return the successful mapping
        return {
            'ad_id': ad_row['ad_id'],
            'product_id': ad_row['product_id'],
            'ad_image_url': http_url,
            'gcs_image_uri': gcs_uri
        }
    except Exception as e:
        # Log the error and return None for this row
        # print(f"Error processing {http_url}: {e}")
        return None

# --- Main Execution ---
if __name__ == "__main__":
    # 1. Fetch ad data from BigQuery
    print("Fetching ads from BigQuery...")
    bq_client = bigquery.Client()
    query = f"SELECT ad_id, product_id, ad_image_url FROM `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}`"
    df = bq_client.query(query).to_dataframe()
    print(f"Found {len(df)} ads to process.")

    # Convert dataframe rows to a list of dictionaries to pass to workers
    tasks = df.to_dict('records')
    results = []

    # 2. Use ProcessPoolExecutor to run tasks in parallel
    with ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor:
        # Create a future for each task
        future_to_task = {executor.submit(process_image, task): task for task in tasks}

        # Use tqdm for a live progress bar
        for future in tqdm(as_completed(future_to_task), total=len(tasks), desc="Migrating Images"):
            result = future.result()
            if result:  # Only append successful results
                results.append(result)

    print(f"\nImage migration complete. Successfully processed {len(results)}/{len(tasks)} images.")

    # 3. Create a final DataFrame and save to CSV
    if results:
        updated_df = pd.DataFrame(results)
        updated_df.to_csv("target_ads_with_gcs_uris.csv", index=False)
        print("Saved updated ad data to 'target_ads_with_gcs_uris.csv'")
        print("You should now upload this new CSV to a BigQuery table (e.g., 'target_ads_gcs').")
    else:
        print("No images were successfully processed.")

Fetching ads from BigQuery...
Found 427786 ads to process.


Migrating Images: 100%|██████████| 427786/427786 [6:23:40<00:00, 18.58it/s]   



Image migration complete. Successfully processed 427706/427786 images.
Saved updated ad data to 'target_ads_with_gcs_uris.csv'
You should now upload this new CSV to a BigQuery table (e.g., 'target_ads_gcs').
