# Products Import Job (SDK-Based)

This notebook runs a products import job using the SaaStify Edge SDK.

## Parameters (Papermill)

This notebook accepts the following parameters (matching productsImport.ipynb pattern):
- `job_id`: Request/job identifier (request_id in saas_edge_jobs table)
- `saas_edge_id`: Tenant identifier

The notebook will fetch job details from `saas_edge_jobs` table and extract:
- `file_url`, `file_name` from `request_args.file_url`
- `template_id` from `request_args.template_id` (or look up from `import_type`)
- `feed_settings` from `request_args.feed_settings`
- `job_path` from `request_args.job_path`
- `import_type` from `request_args.import_type`

## Environment Variables Required

Set these before running:
- `DB_MODE`: Database connection mode (direct/proxy/local)
- `DB_INSTANCE`: Cloud SQL instance (for direct mode)
- `DB_NAME`: Database name
- `DB_USER`: Database user
- `DB_PASSWORD`: Database password
- `GOOGLE_APPLICATION_CREDENTIALS`: Path to GCS service account JSON (optional)


In [None]:
# Papermill parameters - these will be injected when running with papermill
# Parameters match productsImport.ipynb pattern
job_id = "your-job-id-here"  # request_id in saas_edge_jobs table
saas_edge_id = "your-saas-edge-id-here"


## 1. Install Dependencies and Import SDK


In [None]:
# Install dependencies (matching productsImport.ipynb)
%pip install saastify-edge-sdk pg8000 psycopg2-binary pandas requests openpyxl google-cloud-storage --quiet

# Import required modules
import asyncio
import json
import os
import traceback
from datetime import datetime
from typing import Dict, Any, Optional
from uuid import UUID

# Import SDK components
from saastify_edge import ImportPipelineConfig, run_product_import
from saastify_edge.db import PostgreSQLClient, get_db_config, JobStatusUpdater
from saastify_edge.utils import setup_logging, set_job_context, get_or_generate_template_schema

print("‚úÖ Dependencies installed and SDK imported successfully")


## 2. Fetch Job Details from Database


In [None]:
# Initialize database client to fetch job details
db_config = get_db_config()
db_client = PostgreSQLClient(db_config)
await db_client.connect()

# Fetch job details from saas_edge_jobs table (matching productsImport.ipynb pattern)
job_manager = JobStatusUpdater(db_client)

try:
    # Get job by request_id (matching productsImport.ipynb)
    job_record = await job_manager.get_job_by_request_id(job_id, saas_edge_id)
    
    if not job_record:
        raise ValueError(f"No job found with request_id={job_id} and saas_edge_id={saas_edge_id}")
    
    # Extract request_args (matching productsImport.ipynb)
    request_args = job_record.get("request_args", {})
    
    print("‚úÖ Job details fetched successfully")
    print(f"Job Name: {job_record.get('job_name', 'N/A')}")
    print(f"Job Status: {job_record.get('job_status', 'N/A')}")
    print(f"\nRequest Args:")
    for key, value in request_args.items():
        print(f"  {key}: {value}")
    
    # Extract parameters from request_args (matching productsImport.ipynb structure)
    file_url_obj = request_args.get("file_url", {})
    file_url = file_url_obj.get("url") if isinstance(file_url_obj, dict) else file_url_obj
    file_name = file_url_obj.get("file_name", "") if isinstance(file_url_obj, dict) else request_args.get("file_name", "")
    job_name = request_args.get("job_name", job_record.get("job_name", "Products Import"))
    job_path = request_args.get("job_path", "")
    template_id = request_args.get("template_id")
    import_type = request_args.get("import_type", "products")
    feed_settings = request_args.get("feed_settings", {})
    
    print(f"\n‚úÖ Extracted parameters:")
    print(f"  File URL: {file_url}")
    print(f"  File Name: {file_name}")
    print(f"  Template ID: {template_id}")
    print(f"  Import Type: {import_type}")
    
except Exception as e:
    print(f"‚ùå Failed to fetch job details: {e}")
    traceback.print_exc()
    raise


## 3. Generate Template Schema (if needed)


In [None]:
# Generate or fetch template schema (matching productsImport.ipynb pattern)
if not template_id:
    # Look up template_id from import_type (matching productsImport.ipynb)
    template_query = """
        SELECT template_id 
        FROM saas_channel_templates 
        WHERE saas_edge_id = $1 
          AND import_type = $2 
          AND is_active = true
        ORDER BY created_at DESC
        LIMIT 1
    """
    
    try:
        template_result = await db_client.fetch_one(template_query, saas_edge_id, import_type)
        if template_result:
            template_id = template_result.get("template_id")
            print(f"‚úÖ Found template: {template_id}")
        else:
            raise ValueError(f"No active template found for import_type: {import_type}")
    except Exception as e:
        print(f"‚ö†Ô∏è  Template lookup failed: {e}")
        raise

# Generate or fetch template schema
try:
    template_schema = await get_or_generate_template_schema(db_client, saas_edge_id, template_id)
    print(f"‚úÖ Template schema ready")
except Exception as e:
    print(f"‚ö†Ô∏è  Schema generation failed: {e}")
    traceback.print_exc()


## 4. Setup Logging


In [None]:
# Setup structured logging (matching productsImport.ipynb)
setup_logging(level="INFO", structured=True)

# Set job context for logging
set_job_context(
    job_name=job_name,
    saas_edge_id=saas_edge_id,
    import_type=import_type,
    template_id=template_id
)

print("‚úÖ Logging configured")


## 5. Configure Import Pipeline


In [None]:
# Configure file loader for GCS/HTTP (matching productsImport.ipynb)
file_loader_config = {}

# If using GCS and need explicit credentials
if file_url and (file_url.startswith("gs://") or "storage.googleapis.com" in file_url):
    if os.getenv("GOOGLE_APPLICATION_CREDENTIALS"):
        file_loader_config["gcs_credentials_path"] = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
        print(f"‚úÖ GCS credentials configured")

# Create import pipeline configuration
import_config = ImportPipelineConfig(
    file_source=file_url,  # HTTP URL or gs:// URL
    template_id=template_id,
    saas_edge_id=saas_edge_id,
    job_name=job_name,
    batch_size=500,  # Process 500 rows per batch
    max_workers=4,   # Use 4 concurrent workers
    file_loader_config=file_loader_config,
    db_client=db_client  # Real database client (not mock)
)

print("‚úÖ Import pipeline configured")
print(f"  File source: {file_url}")
print(f"  Template ID: {template_id}")
print(f"  Batch size: {import_config.batch_size}")
print(f"  Max workers: {import_config.max_workers}")


## 6. Add Utility Functions (Matching Products Notebooks)


In [None]:
# Utility functions matching productsImport.ipynb pattern
from google.cloud import storage
from google.oauth2 import service_account
from io import BytesIO
import pandas as pd

GCP_BUCKET_NAME = os.getenv("GCP_BUCKET_NAME", "edge-assets")

def sanitize_path(path):
    """Sanitize path by replacing forward slashes with underscores"""
    return path.replace('/', '_').replace(".ipynb", "").strip()

def get_output_filename(saas_edge_id, job_path, file_type):
    """Generate standardized output filename with proper date formatting"""
    try:
        current_date = datetime.now()
        date_str = current_date.strftime('%Y-%m-%d')
        timestamp_str = current_date.strftime('%H%M%S')
        sanitized_job_path = sanitize_path(job_path)
        return f"{saas_edge_id}/catalog-edge/job-reports/{sanitized_job_path}/{date_str}/import-failed-list-{timestamp_str}.{file_type}"
    except Exception as e:
        print(f"Error generating output filename: {str(e)}")
        return f"{saas_edge_id}/catalog-edge/job-reports/error_report.json"

async def upload_to_gcp_bucket(file_data, file_name, bucket_name, base_path=""):
    """Upload data to GCP bucket using Application Default Credentials"""
    try:
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)
        full_path = f"{base_path.rstrip('/')}/{file_name}" if base_path else file_name
        blob = bucket.blob(full_path)
        
        if isinstance(file_data, (list, dict)):
            upload_data = json.dumps(file_data, indent=2)
            blob.upload_from_string(upload_data, content_type='application/json')
        elif isinstance(file_data, str):
            blob.upload_from_string(file_data)
        elif isinstance(file_data, bytes):
            blob.upload_from_string(file_data, content_type='application/octet-stream')
        else:
            blob.upload_from_file(file_data)
        
        url = f"https://storage.googleapis.com/{bucket_name}/{full_path}"
        print(f"Successfully uploaded {file_name} to {url}")
        return True, url
    except Exception as e:
        print(f"Error uploading to GCP bucket: {str(e)}")
        traceback.print_exc()
        return False, ""

async def update_job_details(job_id, total_count, success_count, failure_count, failed_url=None):
    """Update job details in database (matching productsImport.ipynb pattern)"""
    try:
        job_response = {
            "total": total_count,
            "success": success_count,
            "failed": failure_count,
        }
        
        update_data = {
            "job_response": job_response,
            "updated_at": datetime.utcnow()
        }
        
        if failed_url:
            update_data["failed_job_summary_link"] = failed_url
        
        await db_client.update(
            "saas_edge_jobs",
            {"request_id": job_id},
            update_data
        )
        
        print(f"‚úÖ Job details updated: {success_count} success, {failure_count} failed")
        return True
    except Exception as e:
        print(f"‚ùå Failed to update job details: {e}")
        traceback.print_exc()
        return False

print("‚úÖ Utility functions defined")


## 7. Configure Import Pipeline


In [None]:
# Configure file loader for GCS/HTTP (matching productsImport.ipynb)
file_loader_config = {}

# If using GCS and need explicit credentials
if file_url and (file_url.startswith("gs://") or "storage.googleapis.com" in file_url):
    if os.getenv("GOOGLE_APPLICATION_CREDENTIALS"):
        file_loader_config["gcs_credentials_path"] = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
        print(f"‚úÖ GCS credentials configured")

# Create import pipeline configuration
import_config = ImportPipelineConfig(
    file_source=file_url,  # HTTP URL or gs:// URL
    template_id=template_id,
    saas_edge_id=saas_edge_id,
    job_name=job_name,
    batch_size=500,  # Process 500 rows per batch
    max_workers=4,   # Use 4 concurrent workers
    file_loader_config=file_loader_config,
    db_client=db_client  # Real database client (not mock)
)

print("‚úÖ Import pipeline configured")
print(f"  File source: {file_url}")
print(f"  Template ID: {template_id}")
print(f"  Batch size: {import_config.batch_size}")
print(f"  Max workers: {import_config.max_workers}")


## 8. Run Import Pipeline


In [None]:
# Run the import pipeline (matching productsImport.ipynb pattern)
print(f"üöÄ Starting import job: {job_name}")
print(f"   File: {file_name}")
print(f"   Template: {template_id}")
print(f"   Tenant: {saas_edge_id}")
print()

start_time = datetime.now()

try:
    # Execute import pipeline using SDK
    results = await run_product_import(import_config)
    
    end_time = datetime.now()
    duration = (end_time - start_time).total_seconds()
    
    # Extract results
    pipeline_job_id = results.get("job_id")
    total_processed = results.get("total_processed", 0)
    total_errors = results.get("total_errors", 0)
    valid_rows = total_processed - total_errors
    
    print("‚úÖ Import pipeline completed successfully!")
    print(f"\nPipeline Results:")
    print(f"  Job ID: {pipeline_job_id}")
    print(f"  Total rows processed: {total_processed}")
    print(f"  Valid rows: {valid_rows}")
    print(f"  Error rows: {total_errors}")
    print(f"  Duration: {duration:.2f} seconds")
    
    if total_processed > 0:
        throughput = total_processed / duration if duration > 0 else 0
        print(f"  Throughput: {throughput:.0f} rows/second")
    
    # Store results for next cell
    import_results = results
    
except Exception as e:
    print(f"‚ùå Import pipeline failed: {e}")
    traceback.print_exc()
    raise


## 9. Write Products to Database and Handle Failed Items


In [None]:
# Fetch valid records from completeness cache (matching productsImport.ipynb pattern)
from saastify_edge.db import CompletenessReader
import uuid

completeness_reader = CompletenessReader(db_client=db_client)

# Get valid records for this job
pipeline_job_id = import_results.get("job_id")
valid_records = await db_client.query(
    "product_template_completeness",
    filters={
        "job_id": pipeline_job_id,
        "is_valid": True,
        "saas_edge_id": saas_edge_id
    }
)

# Get failed records
failed_records = await db_client.query(
    "product_template_completeness",
    filters={
        "job_id": pipeline_job_id,
        "is_valid": False,
        "saas_edge_id": saas_edge_id
    }
)

print(f"Found {len(valid_records)} valid records and {len(failed_records)} failed records")

# Upsert products to database
upserted_count = 0
created_count = 0
updated_count = 0
failed_items = []

for record in valid_records:
    try:
        transformed_data = record.get("transformed_response", {})
        product_sku = transformed_data.get("sku") or transformed_data.get("product_sku")
        
        if not product_sku:
            continue
        
        # Check if product exists
        existing = await db_client.fetch_one(
            "SELECT product_id FROM products WHERE saas_edge_id = $1 AND sku = $2",
            saas_edge_id,
            product_sku
        )
        
        product_data = {
            "name": transformed_data.get("name") or transformed_data.get("product_title") or transformed_data.get("title"),
            "updated_at": datetime.utcnow()
        }
        
        # Add other transformed fields to product_data
        for key, value in transformed_data.items():
            if key not in ["sku", "product_sku", "name", "product_title", "title"]:
                product_data[key] = value
        
        if existing:
            # Update existing product
            if feed_settings.get("update_products", True):
                await db_client.update(
                    "products",
                    {"saas_edge_id": saas_edge_id, "sku": product_sku},
                    product_data
                )
                updated_count += 1
        else:
            # Create new product
            if feed_settings.get("create_new_products", True):
                product_data["product_id"] = str(uuid.uuid4())
                product_data["saas_edge_id"] = saas_edge_id
                product_data["sku"] = product_sku
                product_data["created_at"] = datetime.utcnow()
                
                await db_client.insert("products", product_data)
                created_count += 1
        
        upserted_count += 1
        
    except Exception as e:
        if not feed_settings.get("skip_errors", False):
            raise
        failed_items.append({"record": record.get("file_row_number"), "error": str(e)})

# Prepare failed items for upload
failed_url = None
if failed_records or failed_items:
    try:
        failed_data = []
        for record in failed_records:
            failed_data.append({
                "row_number": record.get("file_row_number"),
                "validation_errors": record.get("validation_errors"),
                "transformed_response": record.get("transformed_response")
            })
        failed_data.extend(failed_items)
        
        # Upload failed items to GCS
        output_filename = get_output_filename(saas_edge_id, job_path, 'json')
        success, failed_url = await upload_to_gcp_bucket(failed_data, output_filename, GCP_BUCKET_NAME)
        
        if success:
            print(f"‚úÖ Failed items uploaded to: {failed_url}")
    except Exception as e:
        print(f"‚ö†Ô∏è  Failed to upload failed items: {e}")
        traceback.print_exc()

print(f"\n‚úÖ Database write completed:")
print(f"  Upserted: {upserted_count}")
print(f"  Created: {created_count}")
print(f"  Updated: {updated_count}")
print(f"  Failed: {len(failed_records) + len(failed_items)}")

# Update job details in database
await update_job_details(
    job_id=job_id,
    total_count=total_processed,
    success_count=upserted_count,
    failure_count=len(failed_records) + len(failed_items),
    failed_url=failed_url
)


## 10. Final Summary


In [None]:
# Generate final summary (matching productsImport.ipynb pattern)
summary = {
    "job_id": job_id,
    "pipeline_job_id": pipeline_job_id,
    "job_name": job_name,
    "status": "completed",
    "file_name": file_name,
    "file_url": file_url,
    "template_id": template_id,
    "import_type": import_type,
    "total_rows": total_processed,
    "valid_rows": valid_rows,
    "error_rows": total_errors,
    "upserted_count": upserted_count,
    "created_count": created_count,
    "updated_count": updated_count,
    "failed_url": failed_url,
    "duration_seconds": duration,
    "completed_at": datetime.utcnow().isoformat()
}

print("\n" + "="*60)
print("PRODUCTS IMPORT JOB SUMMARY")
print("="*60)
for key, value in summary.items():
    print(f"{key}: {value}")

# Store summary as JSON for papermill output
summary_json = json.dumps(summary, indent=2)
print("\nSummary JSON:")
print(summary_json)


In [None]:
# Disconnect from database
await db_client.disconnect()
print("‚úÖ Database connection closed")
