<a href="https://colab.research.google.com/github/Shauny123/byword-intake-api/blob/main/copy_of_flamingo_server_update.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# %%capture
!pip install -q flask flask-cors uvicorn[standard] transformers \
               google-api-python-client google-auth-httplib2 google-auth-oauthlib \
               portpicker

In [None]:
%%writefile flamingo_server.py
import asyncio
import json
import logging
import os
import sys

from flask import Flask, request, jsonify
from flask_cors import CORS
from transformers import pipeline
import uvicorn
from uvicorn.config import Config

# Set up logging
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
logger = logging.getLogger(__name__)

# Load the sentiment analysis model
try:
    sentiment_pipeline = pipeline("sentiment-analysis")
    logger.info("Sentiment analysis model loaded successfully.")
except Exception as e:
    logger.error(f"Error loading sentiment analysis model: {e}")
    sentiment_pipeline = None

app = Flask(__name__)
CORS(app)  # Enable CORS for all origins

@app.route('/analyze_sentiment', methods=['POST'])
def analyze_sentiment():
    if sentiment_pipeline is None:
        return jsonify({"error": "Sentiment analysis model not loaded"}), 500

    data = request.get_json()
    if not data or 'text' not in data:
        return jsonify({"error": "Invalid input, please provide 'text'"}), 400

    text = data['text']
    try:
        result = sentiment_pipeline(text)
        logger.info(f"Sentiment analysis result for '{text}': {result}")
        return jsonify(result[0])
    except Exception as e:
        logger.error(f"Error during sentiment analysis: {e}")
        return jsonify({"error": "Error analyzing sentiment"}), 500

# Use portpicker to find an available port
try:
    import portpicker
    PORT = portpicker.pick_unused_port()
    logger.info(f"Picked unused port: {PORT}")
except Exception as e:
    logger.error(f"Error picking port, using default 8000: {e}")
    PORT = 8000

@app.route('/')
def home():
    return "Flamingo Server is running!"

class Server(uvicorn.Server):
    """Custom uvicorn.Server subclass."""
    def __init__(self, config):
        super().__init__(config)
        self._startup_event = asyncio.Event()

    async def startup(self, sockets=None):
        await super().startup(sockets=sockets)
        self._startup_event.set()

    def run(self, sockets=None):
        asyncio.run(self.serve(sockets=sockets))

    async def serve(self, sockets=None):
        config = self.config
        if not config.loaded:
            config.load()
        self.lifespan = config.lifespan_class(config)
        self.install_signal_handlers()
        await self.startup(sockets=sockets)
        if self.should_exit:
            return
        await self.main_loop()
        await self.shutdown(sockets=sockets)

    async def main_loop(self):
        while not self.should_exit:
            await asyncio.sleep(0.1) # Keep the loop alive

    async def shutdown(self, sockets=None):
        await self.lifespan.shutdown()

    async def serve(self, sockets=None):
        await self._startup_event.wait()
        await super().serve(sockets=sockets)

def run_server():
    config = Config(app=app, host="0.0.0.0", port=PORT, log_level="info")
    server = Server(config=config)
    server.run()

if __name__ == '__main__':
    # Start the server in a separate thread or process if needed,
    # but for simplicity in Colab, we'll run it directly.
    # In a real application, consider using multiprocessing or threading
    # or running with `uvicorn flamingo_server:app --host 0.0.0.0 --port 8000`
    logger.info("Starting Flamingo Server...")
    run_server()

Writing flamingo_server.py


In [None]:
!python flamingo_server.py

2025-07-21 06:00:55.694360: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1753077655.723057    2010 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1753077655.732103    2010 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-07-21 06:00:55.761679: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
No model was supplied, defaulted to distilbert/distilbert-base-uncased-finetuned-sst-2-english and revision 714eb0f (https://

In [None]:
#!/usr/bin/env python3
"""
flamingo_server.py – GPU-optimized legal-intake backend for Google Colab
"""

# -----------------------------------------------------------
# 1. Colab one-time installs (quiet)
# -----------------------------------------------------------
try:
    import google.colab
    IN_COLAB = True
    import IPython, portpicker, nest_asyncio, uvicorn, threading
    IPython.get_ipython().system("""
        pip install --quiet \
            torch transformers flask flask-cors flask-socketio openai \
            google-api-python-client google-auth-httplib2 google-auth-oauthlib \
            uvicorn[standard] portpicker nest-asyncio
    """)
except ImportError:
    IN_COLAB = False

# -----------------------------------------------------------
# 2. Standard library
# -----------------------------------------------------------
import os, json, time, logging, threading, smtplib, torch
from datetime import datetime
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

# -----------------------------------------------------------
# 3. 3rd-party
# -----------------------------------------------------------
import numpy as np
from flask import Flask, request, jsonify
from flask_cors import CORS
from flask_socketio import SocketIO, emit
from transformers import AutoTokenizer, AutoModelForCausalLM
import openai

# -----------------------------------------------------------
# 4. Logging
# -----------------------------------------------------------
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)

# -----------------------------------------------------------
# 5. Configuration (no secrets in code)
# -----------------------------------------------------------
class FlamingoConfig:
    HF_TOKEN          = os.getenv("HUGGINGFACE_TOKEN")
    OPENAI_API_KEY    = os.getenv("OPENAI_API_KEY")
    EMAIL_USER        = os.getenv("EMAIL_USER", "bywordofmouthcatering@gmail.com")
    EMAIL_PASSWORD    = os.getenv("EMAIL_PASSWORD", "")
    USE_GPU           = torch.cuda.is_available()
    MODEL_NAME        = "microsoft/DialoGPT-medium" if USE_GPU else "microsoft/DialoGPT-small"
    MAX_LENGTH        = 512 if USE_GPU else 256
    DEVICE            = "cuda" if USE_GPU else "cpu"
    SERVICE_ACCOUNT_FILE = os.getenv("GOOGLE_APPLICATION_CREDENTIALS", "credentials.json")
    SCOPES = ["https://www.googleapis.com/auth/drive.readonly"]

cfg = FlamingoConfig()
openai.api_key = cfg.OPENAI_API_KEY

# -----------------------------------------------------------
# 6. Optional Google Drive service
# -----------------------------------------------------------
drive_service = None
if os.path.isfile(cfg.SERVICE_ACCOUNT_FILE):
    try:
        from google.oauth2 import service_account
        from googleapiclient.discovery import build

        creds = service_account.Credentials.from_service_account_file(
            cfg.SERVICE_ACCOUNT_FILE, scopes=cfg.SCOPES
        )
        drive_service = build("drive", "v3", credentials=creds)
        logger.info("Google Drive service ready")
    except Exception as e:
        logger.warning("Google Drive init failed: %s", e)
else:
    logger.warning("No credentials.json – Drive API disabled")

# -----------------------------------------------------------
# 7. Load HuggingFace model once
# -----------------------------------------------------------
logger.info("Loading %s on %s", cfg.MODEL_NAME, cfg.DEVICE)
tokenizer = AutoTokenizer.from_pretrained(
    cfg.MODEL_NAME,
    use_auth_token=cfg.HF_TOKEN,
    trust_remote_code=True,
)
model = AutoModelForCausalLM.from_pretrained(
    cfg.MODEL_NAME,
    use_auth_token=cfg.HF_TOKEN,
    trust_remote_code=True,
).to(cfg.DEVICE)
tokenizer.pad_token = tokenizer.eos_token
logger.info("Model loaded")

# -----------------------------------------------------------
# 8. Flask + SocketIO app
# -----------------------------------------------------------
app = Flask(__name__)
CORS(app)
socketio = SocketIO(app, cors_allowed_origins="*")

@app.route("/health")
def health_check():
    return jsonify({"status": "ok", "gpu": cfg.USE_GPU, "model": cfg.MODEL_NAME})

@app.route("/chat", methods=["POST"])
def chat():
    data = request.get_json(force=True, silent=True) or {}
    prompt = data.get("prompt", "")
    if not prompt:
        return jsonify({"error": "prompt required"}), 400

    inputs = tokenizer.encode(prompt + tokenizer.eos_token, return_tensors="pt").to(
        cfg.DEVICE
    )
    reply_ids = model.generate(
        inputs,
        max_length=inputs.shape[1] + cfg.MAX_LENGTH,
        pad_token_id=tokenizer.eos_token_id,
        do_sample=True,
        top_p=0.9,
    )
    reply = tokenizer.decode(
        reply_ids[:, inputs.shape[-1] :][0], skip_special_tokens=True
    )
    return jsonify({"reply": reply.strip()})

@socketio.on("connect")
def on_connect(auth=None):
    emit("status", {"msg": "connected"})

@socketio.on("chat")
def on_chat(json_msg):
    prompt = json_msg.get("prompt", "")
    if not prompt:
        emit("reply", {"error": "prompt required"})
        return

    inputs = tokenizer.encode(prompt + tokenizer.eos_token, return_tensors="pt").to(
        cfg.DEVICE
    )
    reply_ids = model.generate(
        inputs,
        max_length=inputs.shape[1] + cfg.MAX_LENGTH,
        pad_token_id=tokenizer.eos_token_id,
        do_sample=True,
        top_p=0.9,
    )
    reply = tokenizer.decode(
        reply_ids[:, inputs.shape[-1] :][0], skip_special_tokens=True
    )
    emit("reply", {"prompt": prompt, "reply": reply.strip()})

# -----------------------------------------------------------
# 9. Non-blocking launch (Colab)
# -----------------------------------------------------------
if __name__ == "__main__" and IN_COLAB:
    nest_asyncio.apply()  # allow uvicorn inside Jupyter
    port = portpicker.pick_unused_port()
    logger.info("Starting uvicorn on port %d", port)

    def run():
        uvicorn.run(
            "flamingo_server:app",
            host="0.0.0.0",
            port=port,
            log_level="info",
            reload=False,
        )

    threading.Thread(target=run, daemon=True).start()
    print(f"🚀 Ready – server should be reachable at https://localhost:{port}/health")