In [None]:
import requests

service_url = "https://upload-service-89113266086.us-central1.run.app"  # <-- add /upload-url

resp = requests.post(
    service_url,
    json={
        "user_id": "testuser",
        "filename": "test.txt",
        "upload_type": "user_document"
    }
)
print("Status code:", resp.status_code)
print("Raw response:", resp.text)
if resp.status_code == 200:
    result = resp.json()
    upload_url = result['upload_url']
    print("Signed upload URL:", upload_url)

    # Step 2: Upload a file to the signed URL
    with open("C:\\Users\\anass\\OneDrive\\Desktop\\agentflow\\test\\test.txt", "rb") as f:
        put_resp = requests.put(upload_url, data=f, headers={'Content-Type': 'application/octet-stream'})
        print("Upload status code:", put_resp.status_code)
else:
    print("API error or not ready.")


Status code: 400
Raw response: <?xml version='1.0' encoding='UTF-8'?><Error><Code>InvalidArgument</Code><Message>Invalid argument.</Message><Details>POST object expects Content-Type multipart/form-data</Details></Error>
API error or not ready.


In [15]:
#!/usr/bin/env python3
"""
AgentFlow Upload Service Test Script
This script tests the upload service endpoints and performs a complete upload flow.
"""

import requests
import json
import tempfile
import os

# Configuration
CLOUD_RUN_URL = "https://upload-service-89113266086.us-central1.run.app"  # Replace with your actual Cloud Run URL
USER_ID = "test-user-123"
TEST_FILE_CONTENT = "This is a test document for AgentFlow upload service testing."

def test_upload_url_endpoint():
    """Test the /upload-url endpoint"""
    print("🔍 Testing /upload-url endpoint...")
    
    # Test user document upload
    payload = {
        "user_id": USER_ID,
        "file_name": "test.txt",
        "file_type": "user_document"
    }
    
    try:
        response = requests.post(
            f"{CLOUD_RUN_URL}/upload-url",
            json=payload,
            headers={"Content-Type": "application/json"}
        )
        
        print(f"Status Code: {response.status_code}")
        print(f"Response: {response.text}")
        
        if response.status_code == 200:
            data = response.json()
            return data.get("upload_url"), data.get("upload_id")
        else:
            print(f"❌ Upload URL request failed: {response.text}")
            return None, None
            
    except Exception as e:
        print(f"❌ Error calling upload-url endpoint: {e}")
        return None, None

def test_file_upload(upload_url, file_content):
    """Test uploading a file to the signed URL"""
    print("📤 Testing file upload to signed URL...")
    
    try:
        # Upload file to the signed URL
        response = requests.put(
            upload_url,
            data=file_content.encode('utf-8'),
            headers={"Content-Type": "text/plain"}
        )
        
        print(f"Upload Status Code: {response.status_code}")
        
        if response.status_code in [200, 204]:
            print("✅ File upload successful!")
            return True
        else:
            print(f"❌ File upload failed: {response.text}")
            return False
            
    except Exception as e:
        print(f"❌ Error uploading file: {e}")
        return False

def test_confirm_upload(upload_id):
    """Test the /confirm-upload endpoint"""
    print("✅ Testing /confirm-upload endpoint...")
    
    payload = {
        "upload_id": upload_id,
        "status": "completed"
    }
    
    try:
        response = requests.post(
            f"{CLOUD_RUN_URL}/confirm-upload",
            json=payload,
            headers={"Content-Type": "application/json"}
        )
        
        print(f"Confirm Status Code: {response.status_code}")
        print(f"Confirm Response: {response.text}")
        
        if response.status_code == 200:
            print("✅ Upload confirmation successful!")
            return True
        else:
            print(f"❌ Upload confirmation failed: {response.text}")
            return False
            
    except Exception as e:
        print(f"❌ Error confirming upload: {e}")
        return False

def test_health_check():
    """Test if the service is running"""
    print("🏥 Testing service health...")
    
    try:
        # Try a simple GET request to see if service is up
        response = requests.get(f"{CLOUD_RUN_URL}/", timeout=10)
        print(f"Health check status: {response.status_code}")
        
        if response.status_code == 404:
            print("✅ Service is running (404 is expected for root path)")
            return True
        elif response.status_code == 200:
            print("✅ Service is running")
            return True
        else:
            print(f"⚠️ Service responded with: {response.status_code}")
            return False
            
    except Exception as e:
        print(f"❌ Service health check failed: {e}")
        return False

def run_complete_test():
    """Run the complete upload flow test"""
    print("🚀 Starting AgentFlow Upload Service Test")
    print("=" * 50)
    
    # Step 1: Health check
    if not test_health_check():
        print("❌ Service is not accessible. Check your Cloud Run URL.")
        return
    
    # Step 2: Get upload URL
    upload_url, upload_id = test_upload_url_endpoint()
    if not upload_url or not upload_id:
        print("❌ Failed to get upload URL. Check service logs.")
        return
    
    print(f"📋 Upload ID: {upload_id}")
    print(f"🔗 Upload URL: {upload_url[:50]}...")
    
    # Step 3: Upload file
    if not test_file_upload(upload_url, TEST_FILE_CONTENT):
        print("❌ File upload failed.")
        return
    
    # Step 4: Confirm upload
    if not test_confirm_upload(upload_id):
        print("❌ Upload confirmation failed.")
        return
    
    print("\n🎉 All tests passed! Upload service is working correctly.")
    print("\n📊 Test Summary:")
    print("✅ Service health check")
    print("✅ Upload URL generation")
    print("✅ File upload to GCS")
    print("✅ Upload confirmation")

def test_different_file_types():
    """Test different file types"""
    print("\n🔄 Testing different file types...")
    
    file_types = [
        {"type": "user_doc", "filename": "document.txt"},
        {"type": "user_prompt", "filename": "prompt.txt"},
        {"type": "shared_kb", "filename": "knowledge.md"},
        {"type": "shared_prompt", "filename": "system-prompt.txt"}
    ]
    
    for file_type in file_types:
        print(f"\nTesting {file_type['type']}...")
        payload = {
            "user_id": USER_ID,
            "file_name": file_type["filename"],
            "file_type": file_type["type"]
        }
        
        try:
            response = requests.post(
                f"{CLOUD_RUN_URL}/upload-url",
                json=payload,
                headers={"Content-Type": "application/json"}
            )
            
            if response.status_code == 200:
                print(f"✅ {file_type['type']} upload URL generated successfully")
            else:
                print(f"❌ {file_type['type']} failed: {response.text}")
                
        except Exception as e:
            print(f"❌ Error testing {file_type['type']}: {e}")

if __name__ == "__main__":
    # Update this with your actual Cloud Run URL
    if CLOUD_RUN_URL == "https://your-service-url.a.run.app":
        print("❌ Please update CLOUD_RUN_URL with your actual Cloud Run service URL")
        print("You can find it in the GCP Console > Cloud Run > your-service")
        exit(1)
    
    # Run main test
    run_complete_test()
    
    # Test different file types
    test_different_file_types()
    
    print("\n🔧 Next Steps:")
    print("1. Check Cloud Run logs if any tests failed")
    print("2. Verify files appeared in your GCS buckets")
    print("3. Check Firestore for upload metadata")
    print("4. Test with different file sizes and types")

🚀 Starting AgentFlow Upload Service Test
🏥 Testing service health...
Health check status: 404
✅ Service is running (404 is expected for root path)
🔍 Testing /upload-url endpoint...
Status Code: 422
Response: {"detail":[{"type":"missing","loc":["body","filename"],"msg":"Field required","input":{"user_id":"test-user-123","file_name":"test.txt","file_type":"user_document"}},{"type":"missing","loc":["body","upload_type"],"msg":"Field required","input":{"user_id":"test-user-123","file_name":"test.txt","file_type":"user_document"}}]}
❌ Upload URL request failed: {"detail":[{"type":"missing","loc":["body","filename"],"msg":"Field required","input":{"user_id":"test-user-123","file_name":"test.txt","file_type":"user_document"}},{"type":"missing","loc":["body","upload_type"],"msg":"Field required","input":{"user_id":"test-user-123","file_name":"test.txt","file_type":"user_document"}}]}
❌ Failed to get upload URL. Check service logs.

🔄 Testing different file types...

Testing user_doc...
❌ user_