In [1]:
# Import Library
from openai import OpenAI
import os
import requests
from dotenv import load_dotenv

load_dotenv()
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

In [3]:
# Step 1: Check existing containers and find active ones
print("Checking existing containers...")
containers = client.containers.list()
print(containers)


Checking existing containers...
SyncCursorPage[ContainerListResponse](data=[ContainerListResponse(id='cntr_68314a508e8081919e5d1051f3fcbe27', created_at=1748060752, name='data-analysis-container', object='container', status='running', expires_after=ExpiresAfter(anchor='last_active_at', minutes=20), last_active_at=1748060947), ContainerListResponse(id='cntr_6831312c4e608191b358bef087b081be', created_at=1748054316, name='test-container', object='container', status='expired', expires_after=ExpiresAfter(anchor='last_active_at', minutes=20), last_active_at=1748060947), ContainerListResponse(id='cntr_68312ae94c608191a7ae842feb3da6cd', created_at=1748052713, name='test-container', object='container', status='expired', expires_after=ExpiresAfter(anchor='last_active_at', minutes=20), last_active_at=1748060947), ContainerListResponse(id='cntr_683129f7152881918ba879d28b5cfca7', created_at=1748052471, name='auto', object='container', status='expired', expires_after=ExpiresAfter(anchor='last_active

In [4]:
# Find first active (running) container
active_container = None
for container in containers.data:
    print(f"Container {container.id}: {container.status}")
    if container.status == "running":
        active_container = container
        break

if active_container:
    container_id = active_container.id
    print(f"Using active container: {container_id}")
else:
    print("No active containers found. Creating new one...")
    container = client.containers.create(name="data-analysis-container")
    container_id = container.id
    print(f"Created new container: {container_id}")

Container cntr_68314a508e8081919e5d1051f3fcbe27: running
Using active container: cntr_68314a508e8081919e5d1051f3fcbe27


In [5]:
# Step 2: Upload file
url = f"https://api.openai.com/v1/containers/{container_id}/files"
headers = {"Authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}"}
files = {'file': ('data.csv', open('data.csv', 'rb'))}

response = requests.post(url, headers=headers, files=files)
file_path = response.json()['path']
print(f"File uploaded: {file_path}")

# Step 3: Analyze with code interpreter
response = client.responses.create(
    model="gpt-4.1-mini",
    tools=[{"type": "code_interpreter", "container": container_id}],
    tool_choice="required",
    input=f"Analyze CSV at '{file_path}'. How many rows?"
)


File uploaded: /mnt/data/56942508662ee703512c82c531286d53-data.csv


In [6]:
# Results
print(f"Answer: {response.output_text}")
print(f"Code executed:\n{response.output[0].code}")
print(f"Tokens used: {response.usage.total_tokens}")

Answer: The CSV file contains 1000 rows. Would you like me to perform any other analysis on this data?
Code executed:
import pandas as pd

# Load the CSV file
file_path = '/mnt/data/56942508662ee703512c82c531286d53-data.csv'
data = pd.read_csv(file_path)

# Get number of rows
num_rows = data.shape[0]
num_rows
Tokens used: 510


# Handling charts in openai container

In [11]:
# Step 3: Analyze with code interpreter
response = client.responses.create(
    model="gpt-4.1-mini",
    tools=[{"type": "code_interpreter", "container": container_id}],
    tool_choice="required",
    input=f"""Analyze CSV at '{file_path}'. Plot a bar chart showing user breakdown by gender.
Rotate x-axis labels by 45 degrees for readability. Increase figure width if needed.
Add count labels on top of each bar."""

)


In [12]:
# Extract file information from the response
for output in response.output:
    if hasattr(output, 'content'):
        for content in output.content:
            if hasattr(content, 'annotations'):
                for annotation in content.annotations:
                    if annotation.type == 'container_file_citation':
                        file_id = annotation.file_id
                        filename = annotation.filename
                        print(f"Generated file: {filename} (ID: {file_id})")
                        
                        # Download the image file
                        download_url = f"https://api.openai.com/v1/containers/{container_id}/files/{file_id}/content"
                        headers = {"Authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}"}
                        
                        file_response = requests.get(download_url, headers=headers)
                        if file_response.status_code == 200:
                            # Save the image locally
                            local_filename = f"gender_breakdown_chart.png"
                            with open(local_filename, 'wb') as f:
                                f.write(file_response.content)
                            print(f"Image saved as: {local_filename}")
                        else:
                            print(f"Failed to download file: {file_response.status_code}")

# Also display the response for reference
print(f"\nAnswer: {response.output_text}")
print(f"Tokens used: {response.usage.total_tokens}")

Generated file: cfile_68314d0f620481919d557c0e09a639ef.png (ID: cfile_68314d0f620481919d557c0e09a639ef)
Image saved as: gender_breakdown_chart.png

Answer: The bar chart shows the user breakdown by gender from the provided CSV file. The x-axis labels have been rotated by 45 degrees for better readability, and the figure width was increased to accommodate the labels. Count labels are displayed on top of each bar for clarity.

If you need any further analysis or adjustments, please let me know!
Tokens used: 3952


In [10]:
response

Response(id='resp_68314b9a4bc08191990e7d962070b0e20cdb1ad7bd57b962', created_at=1748061082.0, error=None, incomplete_details=None, instructions=None, metadata={}, model='gpt-4.1-mini-2025-04-14', object='response', output=[ResponseCodeInterpreterToolCall(id='ci_68314b9afc6c81919920e1e5ba0a1e300cdb1ad7bd57b962', code="import pandas as pd\n\n# Load the CSV file to examine its content\nfile_path = '/mnt/data/56942508662ee703512c82c531286d53-data.csv'\ndata = pd.read_csv(file_path)\n\n# Display the first few rows and columns to understand its structure\ndata.head(), data.columns", results=None, status='completed', type='code_interpreter_call', container_id='cntr_68314a508e8081919e5d1051f3fcbe27', outputs=None), ResponseOutputMessage(id='msg_68314b9d19448191bbae006e2a38f10f0cdb1ad7bd57b962', content=[ResponseOutputText(annotations=[], text='The dataset contains the following columns: id, first_name, last_name, email, gender, and ip_address. I will now plot a chart to show the user breakdown

In [None]:
# Function to stream data rather than load all in memory
@router.post("/upload_csv", response_model=UploadCSVResponse)
async def upload_csv_true_streaming(
    file: UploadFile = File(...),
    current_user: dict = Depends(get_current_user),
    db: Database = Depends(get_db)
):
    """
    TRUE STREAMING: Never store the full file in memory!
    Stream directly to Azure while getting CSV preview from first chunk.
    """
    
    # 1. Validate file type
    if not file.filename.endswith('.csv'):
        raise HTTPException(
            status_code=400,
            detail="Only CSV files are allowed"
        )
    
    # 2. TRUE STREAMING SETUP
    max_size = 30 * 1024 * 1024  # 30MB in bytes
    chunk_size = 64 * 1024  # 64KB chunks
    azure_block_size = 4 * 1024 * 1024  # 4MB Azure blocks
    
    total_size = 0
    csv_preview_data = None
    column_names = None
    total_columns = 0
    
    # Azure setup
    session_id = str(uuid.uuid4())
    blob_service_client = BlobServiceClient.from_connection_string(settings.BLOB_STORAGE_ACCOUNT_KEY)
    container_name = "images-analysis"
    timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
    blob_name = f"{session_id}_{timestamp}_{file.filename}"
    blob_client = blob_service_client.get_container_client(container_name).get_blob_client(blob_name)
    
    # Azure block upload setup
    
    block_list = []
    current_block_data = bytearray()
    block_counter = 0
    
    print("🚀 TRUE STREAMING: Processing file without storing full content...")
    
    try:
        # Create container if needed
        try:
            blob_service_client.get_container_client(container_name).create_container()
        except:
            pass
        
        # TRUE STREAMING LOOP
        while True:
            # Read one chunk at a time
            chunk = await file.read(chunk_size)
            if not chunk:
                break  # End of file
            
            total_size += len(chunk)
            print(f"📥 Processing chunk: {len(chunk)} bytes (Total processed: {total_size} bytes)")
            
            # Size check - early exit if too big
            if total_size > max_size:
                print("❌ File too large - stopping stream")
                raise HTTPException(
                    status_code=400,
                    detail=f"File too large. Maximum size is {max_size//1024//1024}MB"
                )
            
            # GET CSV PREVIEW FROM FIRST CHUNK ONLY
            if csv_preview_data is None and total_size <= chunk_size:
                try:
                    # Decode first chunk to get CSV structure
                    csv_string = chunk.decode('utf-8')
                    df = pd.read_csv(StringIO(csv_string), nrows=5)
                    
                    if df.empty or len(df.columns) == 0:
                        raise HTTPException(status_code=400, detail="Invalid CSV format")
                    
                    # Extract metadata from first chunk
                    total_columns = len(df.columns)
                    column_names = df.columns.tolist()
                    
                    # Create preview data
                    csv_preview_data = []
                    for index, row in df.iterrows():
                        row_dict = {}
                        for column in df.columns:
                            value = row[column]
                            if pd.isna(value):
                                row_dict[column] = None
                            elif isinstance(value, (int, float)):
                                row_dict[column] = value.item() if hasattr(value, 'item') else value
                            else:
                                row_dict[column] = str(value)
                        csv_preview_data.append(row_dict)
                    
                    print(f"✅ CSV preview extracted from first chunk: {total_columns} columns")
                    del df, csv_string  # Free memory immediately
                    
                except Exception as e:
                    print(f"❌ Error processing CSV preview: {e}")
                    raise HTTPException(status_code=400, detail="Invalid CSV format")
            
            # ADD CHUNK TO CURRENT AZURE BLOCK
            current_block_data.extend(chunk)
            
            # UPLOAD BLOCK WHEN IT REACHES 4MB OR END OF FILE
            if len(current_block_data) >= azure_block_size:
                # Upload this block to Azure immediately
                block_id = base64.b64encode(f"block-{block_counter:06d}".encode()).decode()
                
                print(f"📤 Uploading Azure block {block_counter + 1}: {len(current_block_data)} bytes")
                
                blob_client.stage_block(
                    block_id=block_id,
                    data=bytes(current_block_data)
                )
                
                block_list.append(BlobBlock(block_id=block_id))
                block_counter += 1
                
                print(f"✅ Block uploaded. Memory freed. Total blocks: {len(block_list)}")
                
                # CLEAR BLOCK DATA - FREE MEMORY!
                current_block_data = bytearray()
        
        # Upload final block if there's remaining data
        if len(current_block_data) > 0:
            block_id = base64.b64encode(f"block-{block_counter:06d}".encode()).decode()
            
            print(f"📤 Uploading final Azure block: {len(current_block_data)} bytes")
            
            blob_client.stage_block(
                block_id=block_id,
                data=bytes(current_block_data)
            )
            
            block_list.append(BlobBlock(block_id=block_id))
            print(f"✅ Final block uploaded")
        
        # Commit all blocks to create final blob
        print(f"🔗 Committing {len(block_list)} blocks to create final blob...")
        
        blob_client.commit_block_list(
            block_list=block_list,
            content_type="text/csv"
        )
        
        file_url = blob_client.url
        
        print(f"✅ TRUE STREAMING COMPLETE!")
        print(f"📊 Total file size: {total_size} bytes")
        print(f"📊 Azure blocks created: {len(block_list)}")
        print(f"💾 Max memory used: ~{azure_block_size//1024//1024}MB (one block)")
        print(f"🔗 File URL: {file_url}")
        
    except HTTPException:
        raise
    except Exception as e:
        print(f"❌ Error during streaming: {e}")
        raise HTTPException(status_code=500, detail="Error during file processing")
    
    # Validate that we got CSV preview
    if csv_preview_data is None:
        raise HTTPException(status_code=400, detail="Could not extract CSV preview")
    
    # 3. Save session data to MongoDB
    try:
        session_document = {
            "session_id": session_id,
            "user_email": current_user["email"],
            "user_id": str(current_user["_id"]),
            "file_info": {
                "original_filename": file.filename,
                "blob_name": blob_name,
                "container_name": container_name,
                "file_url": file_url,
                "file_size": total_size,
                "content_type": "text/csv"
            },
            "csv_info": {
                "total_columns": total_columns,
                "column_names": column_names,
                "preview_data": csv_preview_data
            },
            "created_at": datetime.utcnow(),
            "updated_at": datetime.utcnow(),
            "status": "active"
        }
        
        sessions_collection = db["csv_sessions"]
        result = await sessions_collection.insert_one(session_document)
        
        if not result.inserted_id:
            raise Exception("Failed to create session in database")
        
        print(f"✅ MongoDB session created: {session_id}")
        
    except Exception as e:
        # Clean up blob if database fails
        try:
            blob_client.delete_blob()
            print(f"🗑️ Cleaned up blob after database error")
        except:
            pass
        
        await log_error(
            error=e,
            location="upload_csv_mongodb",
            additional_info={"session_id": session_id, "user_email": current_user["email"]}
        )
        raise HTTPException(status_code=500, detail="Failed to create session in database")
    
    # 4. Return response
    return {
        "session_id": session_id,
        "file_url": file_url,
        "file_name": file.filename,
        "preview_data": csv_preview_data,
        "file_info": {
            "original_filename": file.filename,
            "blob_name": blob_name,
            "container_name": container_name,
            "file_url": file_url,
            "file_size": total_size,
            "content_type": "text/csv"
        },
        "message": "CSV file uploaded successfully with true streaming",
        "success": True
    }
    