This is prompt one where we decided too

In [None]:
prompt = """
Analyze the given product image and extract relevant metadata attributes. Identify and categorize unique features, colors, and text present in the image.

Extract the following attributes (if applicable):
Product Identification: Extract product_id, ASIN, UPC (if visible).
Brand & Naming: Identify brand_name, product_name, model_number.
Categorization: Identify Main_Category, Subcategory_1, Subcategory_2, Subcategory_3.
Pricing Details: If visible, extract list_price and selling_price.
Technical Details: Extract dimensions, weight, product_specs, product_technical information.
Text Extraction: If the image contains any text, extract the words accurately.
Visual Features: Identify relevant object features such as materials, packaging details, logos, and labels.
Color Information: Identify dominant colors present in the image.

Return the output in JSON format.

{
"business": "product_data_analysis",
"category": "product_metadata",
"product_attributes": {
"product_id": "STRING",
"brand_name": "STRING",
"product_name": "STRING",
"ASIN": "STRING",
"UPC": "BOOL",
"Main_Category": "STRING",
"Subcategory_1": "STRING",
"Subcategory_2": "STRING",
"Subcategory_3": "STRING",
"list_price": "FLOAT",
"selling_price": "FLOAT",
"quantity": "INT",
"model_number": "STRING",
"about_product": "STRING",
"product_specs": "STRING",
"product_technical": "STRING",
"weight": "STRING",
"dimensions": "STRING",
"url": "STRING"
},
"image_analysis": {
"detected_text": ["TEXT"],
"dominant_colors": ["COLOR1", "COLOR2"],
"object_features": ["FEATURE1", "FEATURE2"]
}
}

Do not include any extra details outside this format. Ensure extracted attributes are precise and relevant to the specific product image.
"""

In [None]:
import csv
import json
import vertexai
from vertexai.generative_models import GenerativeModel, Part
from google.cloud import storage
from io import StringIO

# Configuration
project_id = "amazon-product-reviews-452322"
region = "us-central1"
bucket_name = "product_data_284725"
gcs_in_folder = "initial_loads/product-image-data/in"
gcs_out_folder = "initial_loads/product-image-data/out"
final_csv_name = "product-image-data-all.csv"
model_name = "gemini-1.5-flash-002"

# Enhanced prompt for image analysis with new requested attributes
prompt = """
Analyze the given product image and extract relevant metadata attributes. Identify and categorize unique features, colors, and text present in the image.

Extract the following attributes (if applicable):
- Brand & Naming: Extract brand_name, product_name, model_number.
- Categorization: Identify Main_Category, Subcategory_1, Subcategory_2, Subcategory_3.
- Pricing Details: If visible, extract list_price and selling_price.
- Technical Details: Extract product_specs, product_technical information.
- Text Extraction: If the image contains any text, extract the words accurately.
- Visual Features: Identify relevant object features such as materials, packaging details, logos, and labels.
- Color Information: Identify dominant colors present in the image.
- Logo Detection: Recognize brand logos if visible.
- Text Sentiment: Analyze text on packaging for positive or negative language.
- Object Detection: Identify key objects (e.g., accessories, packaging).
- Image Quality: Assess clarity, lighting conditions, and presence of watermarks.

Return the output in JSON format.

{
"business": "product_data_analysis",
"category": "product_metadata",
"product_attributes": {
    "brand_name": "STRING",
    "product_name": "STRING",
    "Main_Category": "STRING",
    "Subcategory_1": "STRING",
    "Subcategory_2": "STRING",
    "Subcategory_3": "STRING",
    "list_price": "FLOAT",
    "selling_price": "FLOAT",
    "model_number": "STRING",
    "about_product": "STRING",
    "product_specs": "STRING",
    "product_technical": "STRING",
    "url": "STRING"
},
"image_analysis": {
    "detected_text": ["TEXT"],
    "dominant_colors": ["COLOR1", "COLOR2"],
    "object_features": ["FEATURE1", "FEATURE2"],
    "logo_detection": ["LOGO1", "LOGO2"],
    "text_sentiment": "STRING",
    "object_detection": ["OBJECT1", "OBJECT2"],
    "image_quality": {
        "clarity": "STRING",
        "lighting": "STRING",
        "watermarks": "BOOL"
    }
}
}

Do not include any extra details outside this format. Ensure extracted attributes are precise and relevant to the specific product image.
"""

def process_product_images():
    """Process product images from GCS bucket using Vertex AI's Generative AI"""
    # Initialize Vertex AI
    vertexai.init(project=project_id, location=region)
    model = GenerativeModel(model_name)

    # Initialize GCS client
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)

    # List all files in the input folder
    blobs = list(bucket.list_blobs(prefix=gcs_in_folder))

    print(f"Found {len(blobs)} files in {gcs_in_folder}")

    # Debug: Print first few blobs to understand the structure
    for i, blob in enumerate(blobs):
        if i < 5:  # Print first 5 blobs
            print(f"Debug - Blob {i}: {blob.name}")

    # Initialize a list to store all product data for the final consolidated CSV
    all_product_data = []

    # Define CSV headers based on the enhanced prompt
    headers = [
        "product_id", "brand_name", "product_name", "Main_Category", 
        "Subcategory_1", "Subcategory_2", "Subcategory_3",
        "list_price", "selling_price", "model_number",
        "about_product", "product_specs", "product_technical",
        "url", "detected_text", "dominant_colors", "object_features",
        "logo_detection", "text_sentiment", "object_detection",
        "image_quality_clarity", "image_quality_lighting", "image_quality_watermarks",
        "image_path"  # Added image path for reference
    ]

    for blob in blobs:
        # Skip directories or non-image files
        if blob.name.endswith('/') or not any(blob.name.lower().endswith(ext) for ext in ['.jpg', '.jpeg', '.png']):
            continue

        print(f"Processing: {blob.name}")

        try:
            # Parse product information from filename
            filename = blob.name.split("/")[-1]
            parts = filename.split("-")

            # Extract product ID from filename (if available)
            product_id = parts[1] if len(parts) > 1 else "unknown"

            # Create CSV output for this image
            output_content = StringIO()
            writer = csv.writer(output_content)

            # Write header
            writer.writerow(headers[:-1])  # Exclude image_path from individual files

            # Process image with Gemini
            image_uri = f"gs://{bucket_name}/{blob.name}"
            image_part = Part.from_uri(image_uri, mime_type="image/jpeg")

            response = model.generate_content([image_part, prompt])
            response_text = response.text

            # Clean up the JSON response
            clean_json = response_text.replace("```json", "").replace("```", "").strip()

            try:
                # Parse the JSON response
                result = json.loads(clean_json)

                # Extract product attributes and image analysis
                product_attrs = result.get("product_attributes", {})
                image_analysis = result.get("image_analysis", {})
                image_quality = image_analysis.get("image_quality", {})

                # Prepare row data with new attributes
                row_data = [
                    product_id,
                    product_attrs.get("brand_name", ""),
                    product_attrs.get("product_name", ""),
                    product_attrs.get("Main_Category", ""),
                    product_attrs.get("Subcategory_1", ""),
                    product_attrs.get("Subcategory_2", ""),
                    product_attrs.get("Subcategory_3", ""),
                    product_attrs.get("list_price", ""),
                    product_attrs.get("selling_price", ""),
                    product_attrs.get("model_number", ""),
                    product_attrs.get("about_product", ""),
                    product_attrs.get("product_specs", ""),
                    product_attrs.get("product_technical", ""),
                    product_attrs.get("url", ""),
                    "|".join(image_analysis.get("detected_text", [])),
                    "|".join(image_analysis.get("dominant_colors", [])),
                    "|".join(image_analysis.get("object_features", [])),
                    "|".join(image_analysis.get("logo_detection", [])),
                    image_analysis.get("text_sentiment", ""),
                    "|".join(image_analysis.get("object_detection", [])),
                    image_quality.get("clarity", ""),
                    image_quality.get("lighting", ""),
                    str(image_quality.get("watermarks", False))
                ]

                # Add to the consolidated data with image path
                all_product_data.append(row_data + [blob.name])

                # Write the data row to individual CSV
                writer.writerow(row_data)

                # Create output file name
                output_filename = f"{product_id}.csv" if product_id != "unknown" else blob.name.split("/")[-1].replace(".jpg", ".csv").replace(".jpeg", ".csv").replace(".png", ".csv")
                output_path = f"{gcs_out_folder}/{output_filename}"

                # Save to GCS
                output_blob = bucket.blob(output_path)
                output_blob.upload_from_string(output_content.getvalue())

                print(f"Saved analysis for {blob.name} to {output_path}")

            except json.JSONDecodeError as e:
                print(f"Error parsing JSON response for {blob.name}: {e}")
                print(f"Raw response: {clean_json}")

        except Exception as e:
            print(f"Error processing {blob.name}: {e}")

    # Create the final consolidated CSV
    if all_product_data:
        create_consolidated_csv(bucket, headers, all_product_data)

def create_consolidated_csv(bucket, headers, all_product_data):
    """Create a consolidated CSV with all product data"""
    print(f"Creating consolidated CSV with {len(all_product_data)} product entries")

    # Create the consolidated CSV content
    output_content = StringIO()
    writer = csv.writer(output_content)

    # Write header
    writer.writerow(headers)

    # Write all data rows
    for row in all_product_data:
        writer.writerow(row)

    # Save the consolidated CSV to GCS
    final_path = f"{gcs_out_folder}/{final_csv_name}"
    output_blob = bucket.blob(final_path)
    output_blob.upload_from_string(output_content.getvalue())

    print(f"Consolidated CSV saved to: {final_path}")

def list_all_folders():
    """List all folders in the bucket to help identify the correct path"""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)

    # List all blobs in the bucket
    blobs = list(bucket.list_blobs())

    # Extract unique folder paths
    folders = set()
    for blob in blobs:
        # Split the path and collect all parent folders
        parts = blob.name.split('/')
        for i in range(1, len(parts)):
            folders.add('/'.join(parts[:i]))

    # Print all folders
    print("Available folders in the bucket:")
    for folder in sorted(folders):
        print(f"  - {folder}")

if __name__ == "__main__":
    # First list available folders to help identify the correct path
    list_all_folders()

    # Then process the images
    process_product_images()

In [None]:
import csv
import json
import vertexai
from vertexai.generative_models import GenerativeModel, Part
from google.cloud import storage
from io import StringIO
import datetime

# Configuration
project_id = "amazon-product-reviews-452322"
region = "us-central1"
bucket_name = "product_data_284725"
gcs_in_folder = "initial_loads/product-image-data/in"
gcs_out_folder = "initial_loads/product-image-data/out"
final_csv_name = f"product-image-data-all-{datetime.datetime.now().strftime('%Y%m%d')}.csv"
model_name = "gemini-1.5-flash-002"

# Enhanced prompt for image analysis with adjusted categories and removed fields
prompt = """
Analyze the given product image and extract relevant metadata attributes. Identify and categorize unique features, colors, and text present in the image.

Extract the following attributes (if applicable):
- Brand & Naming: Extract brand_name, product_name.
- Categorization: Identify Main_Category, Subcategory_1, Subcategory_2, Subcategory_3, Subcategory_4, Subcategory_5.
- Pricing Details: If visible, extract list_price and selling_price.
- Technical Details: Extract product_technical information.
- Text Extraction: If the image contains any text, extract the words accurately.
- Visual Features: Identify relevant object features such as materials, packaging details, logos, and labels.
- Color Information: Identify dominant colors present in the image.
- Logo Detection: Recognize brand logos if visible.
- Text Sentiment: Analyze text on packaging for positive or negative language.
- Object Detection: Identify key objects (e.g., accessories, packaging).
- Image Quality: Assess clarity, lighting conditions, and presence of watermarks.
- Product Condition: Assess whether the product appears new, used, or refurbished.
- Packaging Type: Identify retail box, bulk packaging, eco-friendly packaging, etc.
- Target Demographic: Identify likely target audience based on visual cues.
- Competitor Products: Identify if competitor products are visible in the image.

Return the output in JSON format.

{
"business": "product_data_analysis",
"category": "product_metadata",
"product_attributes": {
    "brand_name": "STRING",
    "product_name": "STRING",
    "Main_Category": "STRING",
    "Subcategory_1": "STRING",
    "Subcategory_2": "STRING",
    "Subcategory_3": "STRING",
    "Subcategory_4": "STRING",
    "Subcategory_5": "STRING",
    "list_price": "FLOAT",
    "selling_price": "FLOAT",
    "about_product": "STRING",
    "product_technical": "STRING",
    "url": "STRING"
},
"image_analysis": {
    "detected_text": ["TEXT"],
    "dominant_colors": ["COLOR1", "COLOR2"],
    "object_features": ["FEATURE1", "FEATURE2"],
    "logo_detection": ["LOGO1", "LOGO2"],
    "text_sentiment": "STRING",
    "object_detection": ["OBJECT1", "OBJECT2"],
    "image_quality": {
        "clarity": "STRING",
        "lighting": "STRING",
        "watermarks": "BOOL"
    },
    "product_condition": "STRING",
    "packaging_type": "STRING",
    "target_demographic": ["DEMOGRAPHIC1", "DEMOGRAPHIC2"],
    "competitor_products": ["COMPETITOR1", "COMPETITOR2"]
}
}

Do not include any extra details outside this format. Ensure extracted attributes are precise and relevant to the specific product image.
"""

def process_product_images():
    """Process product images from GCS bucket using Vertex AI's Generative AI"""
    # Initialize Vertex AI
    vertexai.init(project=project_id, location=region)
    model = GenerativeModel(model_name)

    # Initialize GCS client
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)

    # List all files in the input folder
    blobs = list(bucket.list_blobs(prefix=gcs_in_folder))

    print(f"Found {len(blobs)} files in {gcs_in_folder}")

    # Initialize a list to store all product data for the final consolidated CSV
    all_product_data = []

    # Define CSV headers based on the enhanced prompt
    headers = [
        "product_id", "brand_name", "product_name", "Main_Category", 
        "Subcategory_1", "Subcategory_2", "Subcategory_3", "Subcategory_4", "Subcategory_5",
        "list_price", "selling_price",
        "about_product", "product_technical",
        "url", "detected_text", "dominant_colors", "object_features",
        "logo_detection", "text_sentiment", "object_detection",
        "image_quality_clarity", "image_quality_lighting", "image_quality_watermarks",
        "product_condition", "packaging_type", "target_demographic", "competitor_products",
        "image_path", "analysis_timestamp"  # Added timestamps for tracking
    ]

    # Track processing statistics
    stats = {
        "total_images": 0,
        "successful_analyses": 0,
        "failed_analyses": 0,
        "start_time": datetime.datetime.now()
    }

    for blob in blobs:
        # Skip directories or non-image files
        if blob.name.endswith('/') or not any(blob.name.lower().endswith(ext) for ext in ['.jpg', '.jpeg', '.png']):
            continue

        stats["total_images"] += 1
        print(f"Processing {stats['total_images']}/{len(blobs)}: {blob.name}")

        try:
            # Parse product information from filename
            filename = blob.name.split("/")[-1]
            parts = filename.split("-")

            # Extract product ID from filename (if available)
            product_id = parts[1] if len(parts) > 1 else "unknown"

            # Create CSV output for this image
            output_content = StringIO()
            writer = csv.writer(output_content)

            # Write header
            writer.writerow(headers[:-2])  # Exclude image_path and timestamp from individual files

            # Process image with Gemini
            image_uri = f"gs://{bucket_name}/{blob.name}"
            image_part = Part.from_uri(image_uri, mime_type="image/jpeg")

            response = model.generate_content([image_part, prompt])
            response_text = response.text

            # Clean up the JSON response
            clean_json = response_text.replace("```json", "").replace("```", "").strip()

            try:
                # Parse the JSON response
                result = json.loads(clean_json)

                # Extract product attributes and image analysis
                product_attrs = result.get("product_attributes", {})
                image_analysis = result.get("image_analysis", {})
                image_quality = image_analysis.get("image_quality", {})

                # Current timestamp
                timestamp = datetime.datetime.now().isoformat()

                # Prepare row data with new attributes
                row_data = [
                    product_id,
                    product_attrs.get("brand_name", ""),
                    product_attrs.get("product_name", ""),
                    product_attrs.get("Main_Category", ""),
                    product_attrs.get("Subcategory_1", ""),
                    product_attrs.get("Subcategory_2", ""),
                    product_attrs.get("Subcategory_3", ""),
                    product_attrs.get("Subcategory_4", ""),
                    product_attrs.get("Subcategory_5", ""),
                    product_attrs.get("list_price", ""),
                    product_attrs.get("selling_price", ""),
                    product_attrs.get("about_product", ""),
                    product_attrs.get("product_technical", ""),
                    product_attrs.get("url", ""),
                    "|".join(image_analysis.get("detected_text", [])),
                    "|".join(image_analysis.get("dominant_colors", [])),
                    "|".join(image_analysis.get("object_features", [])),
                    "|".join(image_analysis.get("logo_detection", [])),
                    image_analysis.get("text_sentiment", ""),
                    "|".join(image_analysis.get("object_detection", [])),
                    image_quality.get("clarity", ""),
                    image_quality.get("lighting", ""),
                    str(image_quality.get("watermarks", False)),
                    image_analysis.get("product_condition", ""),
                    image_analysis.get("packaging_type", ""),
                    "|".join(image_analysis.get("target_demographic", [])),
                    "|".join(image_analysis.get("competitor_products", []))
                ]

                # Add to the consolidated data with image path and timestamp
                all_product_data.append(row_data + [blob.name, timestamp])

                # Write the data row to individual CSV
                writer.writerow(row_data)

                # Create output file name
                output_filename = f"{product_id}.csv" if product_id != "unknown" else blob.name.split("/")[-1].replace(".jpg", ".csv").replace(".jpeg", ".csv").replace(".png", ".csv")
                output_path = f"{gcs_out_folder}/{output_filename}"

                # Save to GCS
                output_blob = bucket.blob(output_path)
                output_blob.upload_from_string(output_content.getvalue())

                print(f"Saved analysis for {blob.name} to {output_path}")
                stats["successful_analyses"] += 1

            except json.JSONDecodeError as e:
                print(f"Error parsing JSON response for {blob.name}: {e}")
                print(f"Raw response: {clean_json}")
                stats["failed_analyses"] += 1

                # Save the raw response for debugging
                error_path = f"{gcs_out_folder}/errors/{blob.name.split('/')[-1]}.error.txt"
                error_blob = bucket.blob(error_path)
                error_blob.upload_from_string(f"Error: {str(e)}\n\nRaw response:\n{clean_json}")

        except Exception as e:
            print(f"Error processing {blob.name}: {e}")
            stats["failed_analyses"] += 1

    # Create the final consolidated CSV
    if all_product_data:
        create_consolidated_csv(bucket, headers, all_product_data)

    # Calculate and print processing statistics
    stats["end_time"] = datetime.datetime.now()
    stats["duration"] = stats["end_time"] - stats["start_time"]
    print("\nProcessing Statistics:")
    print(f"Total images processed: {stats['total_images']}")
    print(f"Successful analyses: {stats['successful_analyses']}")
    print(f"Failed analyses: {stats['failed_analyses']}")
    print(f"Success rate: {(stats['successful_analyses']/stats['total_images'])*100 if stats['total_images'] > 0 else 0:.2f}%")
    print(f"Processing time: {stats['duration']}")

    # Save statistics to GCS
    stats_blob = bucket.blob(f"{gcs_out_folder}/processing_stats_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
    stats_blob.upload_from_string(json.dumps(stats, default=str))

def create_consolidated_csv(bucket, headers, all_product_data):
    """Create a consolidated CSV with all product data"""
    print(f"Creating consolidated CSV with {len(all_product_data)} product entries")

    # Create the consolidated CSV content
    output_content = StringIO()
    writer = csv.writer(output_content)

    # Write header
    writer.writerow(headers)

    # Write all data rows
    for row in all_product_data:
        writer.writerow(row)

    # Save the consolidated CSV to GCS
    final_path = f"{gcs_out_folder}/{final_csv_name}"
    output_blob = bucket.blob(final_path)
    output_blob.upload_from_string(output_content.getvalue())

    print(f"Consolidated CSV saved to: {final_path}")

def list_all_folders():
    """List all folders in the bucket to help identify the correct path"""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)

    # List all blobs in the bucket
    blobs = list(bucket.list_blobs())

    # Extract unique folder paths
    folders = set()
    for blob in blobs:
        # Split the path and collect all parent folders
        parts = blob.name.split('/')
        for i in range(1, len(parts)):
            folders.add('/'.join(parts[:i]))

    # Print all folders
    print("Available folders in the bucket:")
    for folder in sorted(folders):
        print(f"  - {folder}")

def batch_processing(batch_size=10):
    """Process images in smaller batches to avoid memory issues"""
    # Initialize GCS client
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)

    # List all files in the input folder
    blobs = list(bucket.list_blobs(prefix=gcs_in_folder))
    image_blobs = [blob for blob in blobs if any(blob.name.lower().endswith(ext) for ext in ['.jpg', '.jpeg', '.png'])]
    
    print(f"Found {len(image_blobs)} image files to process")
    
    # Process in batches
    for i in range(0, len(image_blobs), batch_size):
        batch = image_blobs[i:i+batch_size]
        print(f"Processing batch {i//batch_size + 1}/{(len(image_blobs)-1)//batch_size + 1} ({len(batch)} images)")
        # Process the batch
        # Implementation of batch processing would go here
        
if __name__ == "__main__":
    # First list available folders to help identify the correct path
    list_all_folders()

    # Then process the images
    process_product_images()
    
    # Alternative: Use batch processing for large datasets
    # batch_processing(batch_size=10)