# take the following code to run the Google Colab server side.

In [None]:
!pip install -qU pyngrok nest_asyncio fastapi uvicorn

import nest_asyncio
from pyngrok import ngrok
import uvicorn

nest_asyncio.apply()

port = 7860

ngrok.kill()
ngrok.set_auth_token("Your Private Key from NGROK")

public_url = ngrok.connect(port, bind_tls=True)
print(f"🔗 Public FastAPI URL: {public_url}/docs")

In [None]:
from fastapi import FastAPI, Form, UploadFile, File
from pydantic import BaseModel
from fastapi.responses import JSONResponse
from fastapi.staticfiles import StaticFiles
import uuid, os, torch, traceback, asyncio
from diffusers import StableDiffusionPipeline, StableDiffusionImg2ImgPipeline
from PIL import Image
import nest_asyncio
import uvicorn
from typing import Optional
import base64
from io import BytesIO
# ========== setting FastAPI ==========
app = FastAPI()

OUTPUT_DIR = "stable_diffusion_output"
EDITED_DIR = "edited_images"
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(EDITED_DIR, exist_ok=True)

app.mount("/generated", StaticFiles(directory=OUTPUT_DIR), name="generated")
app.mount("/edited", StaticFiles(directory=EDITED_DIR), name="edited")

device = "cuda" if torch.cuda.is_available() else "cpu"

# ========== Load Models ==========
pipe = StableDiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5").to(device)
pipe2 = StableDiffusionImg2ImgPipeline.from_pretrained(
    "runwayml/stable-diffusion-v1-5",
    torch_dtype=torch.float16 if device == "cuda" else torch.float32
).to(device)

# ========== Concurrency Control ==========
pipe_semaphore = asyncio.Semaphore(2)  # allow 2 concurrent generations

# ========== Prompt structure ==========
class Prompt(BaseModel):
    text: str

class EditImageRequest(BaseModel):
    prompt: str
    image_base64: str  


# ========== Generate one image (single) ==========
@app.post("/generate_image")
async def generate_image_endpoint(prompt: Prompt):
    try:
        async with pipe_semaphore:
            pipe.scheduler.set_timesteps(50)
            result = pipe(prompt.text, num_inference_steps=50)
            image = result.images[0]

        # حفظ الصورة في Bytes
        buffered = BytesIO()
        image.save(buffered, format="PNG")
        img_bytes = buffered.getvalue()

        # تحويلها إلى Base64
        img_base64 = base64.b64encode(img_bytes).decode("utf-8")

        return {"image_base6": img_base64}

    except Exception as e:
        traceback.print_exc()
        return JSONResponse(content={"error": str(e)}, status_code=500)

# ========== Core async image generation ==========
async def generate_image_async(prompt_text: str) -> str:
    async with pipe_semaphore:
        try:
            pipe.scheduler.set_timesteps(50)
            result = pipe(prompt_text, num_inference_steps=50)
            if result.images:
                image = result.images[0]
                filename = f"{uuid.uuid4().hex}.png"
                file_path = os.path.join(OUTPUT_DIR, filename)
                image.save(file_path)
                return f"generated/{filename}"
            else:
                return "error: no image generated"
        except Exception as e:
            traceback.print_exc()
            return "error: exception"

# ========== Generate multiple images in parallel ==========
@app.post("/multi_image_generation")
async def multi_image_generation(prompt: Prompt, count: int = 3):
    try:
        if not prompt.text.strip():
            return JSONResponse(content={"error": "Prompt is required."}, status_code=400)

        # Launch tasks immediately and let them run independently
        tasks = [asyncio.create_task(generate_image_async(prompt.text)) for _ in range(count)]
        urls = []

        for task in tasks:
            result = await task
            urls.append(result)

        return {"urls": urls}

    except Exception as e:
        traceback.print_exc()
        return JSONResponse(content={"error": str(e)}, status_code=500)

# ========== Image Editing using img2img ==========
@app.post("/edit_image")
async def edit_image(request: EditImageRequest):
    try:
        # فك تشفير صورة base64
        image_data = base64.b64decode(request.image_base64)
        img = Image.open(BytesIO(image_data)).convert("RGB").resize((512, 512))

        # توليد الصورة المعدلة باستخدام Stable Diffusion
        async with pipe_semaphore:
            pipe2.scheduler.set_timesteps(50)
            result = pipe2(
                prompt=request.prompt,
                image=img,
                strength=0.75,
                guidance_scale=7.5,
                num_inference_steps=50,
            )

        edited_image = result.images[0]

        # حفظ إلى BytesIO ثم تحويلها إلى base64
        buffered = BytesIO()
        edited_image.save(buffered, format="PNG")
        img_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8")

        return {"image_base64": img_base64}

    except Exception as e:
        traceback.print_exc()
        return JSONResponse(content={"error": str(e)}, status_code=500)

@app.post("/is_available")
async def check():
  return {"ready": True}

# ========== Run FastAPI Server ==========
nest_asyncio.apply()
uvicorn.run(app, host="0.0.0.0", port=7860)
