In [None]:
import os
os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE'


Importing all necessary libraries


In [None]:
import numpy as np
from ultralytics import YOLO
from tensorflow.keras.models import model_from_json
import cv2
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization, Input
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.optimizers.schedules import ExponentialDecay
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import EarlyStopping
from fastapi import FastAPI, UploadFile, File, BackgroundTasks
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse,HTMLResponse, JSONResponse, StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from sklearn.metrics import precision_score, recall_score, f1_score
from starlette.requests import Request
import logging
import requests
import random
import shutil
import nest_asyncio
import uvicorn
import asyncio
nest_asyncio.apply()

Splitting the valid data into training and val of Widerface

In [None]:


# Source dirs 
source_images_dir = "D:/Project/Emotion_detection-using-YOLOv8/WIDER-FACE-1/test/images"
source_labels_dir = "D:/Project/Emotion_detection-using-YOLOv8/WIDER-FACE-1/test/labels"

# Target dirs
train_images_dir = "D:/Project/Emotion_detection-using-YOLOv8/WIDER-FACE-1/train/images"
train_labels_dir = "D:/Project/Emotion_detection-using-YOLOv8/WIDER-FACE-1/train/labels"
valid_images_dir = "D:/Project/Emotion_detection-using-YOLOv8/WIDER-FACE-1/val/images"
valid_labels_dir = "D:/Project/Emotion_detection-using-YOLOv8/WIDER-FACE-1/val/labels"

# Create directories
for d in [train_images_dir, train_labels_dir, valid_images_dir, valid_labels_dir]:
    os.makedirs(d, exist_ok=True)

# Get all images
all_images = [f for f in os.listdir(source_images_dir) if f.endswith(".jpg")]

# Shuffle and split
random.shuffle(all_images)
train_images = all_images[:int(0.8 * len(all_images))]
val_images = all_images[int(0.8 * len(all_images)):]

# Move training set
for img in train_images:
    shutil.move(os.path.join(source_images_dir, img), os.path.join(train_images_dir, img))
    label_file = img.replace(".jpg", ".txt")
    if os.path.exists(os.path.join(source_labels_dir, label_file)):
        shutil.move(os.path.join(source_labels_dir, label_file), os.path.join(train_labels_dir, label_file))

# Move validation set
for img in val_images:
    shutil.move(os.path.join(source_images_dir, img), os.path.join(valid_images_dir, img))
    label_file = img.replace(".jpg", ".txt")
    if os.path.exists(os.path.join(source_labels_dir, label_file)):
        shutil.move(os.path.join(source_labels_dir, label_file), os.path.join(valid_labels_dir, label_file))

print(f"Train set has {len(train_images)} images, and validation set has {len(val_images)} images.")


Fine Tuning Yolo with WIDER-Face

In [None]:
!pip uninstall torch torchvision torchaudio -y
!pip cache purge



In [None]:


# Load the pre-trained YOLO model
yolo = YOLO('yolov8n.pt')

yolo.train(
    data='WIDER-FACE-1/data.yaml',
    epochs=20,                  
    batch=8,                     
    imgsz=416,                       
    project='runs/train',          
    name='best',                 
    save=True,                   
    device='cuda',                  
    lr0=0.01,                      
    lrf=0.1,                        
    conf=0.001,                    
    half=True                       
)




In [None]:
yolo = YOLO('yolov8n.pt')
# After training, evaluate the model
valid_results = yolo.val()

# Print evaluation results 
print(valid_results) 

Evaluating Results and post-processing using Non maxima Suppression

In [None]:


# Load the trained YOLO model
yolo = YOLO('best.pt')  


results = yolo.predict(
    source='uploads/',  
    conf=0.5,              
    iou=0.4,               
    save=True,           
    device='cuda'           
)

# Print detailed results
print(results)


In [None]:
import os

print("Files in uploads/:", os.listdir("uploads"))


TRAINING USING FER 2013 datasets

In [None]:

# Initialize image data generator with rescaling and augmentation
train_data_gen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=30,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest'
)

validation_data_gen = ImageDataGenerator(rescale=1./255)

# Preprocess all train images
train_generator = train_data_gen.flow_from_directory(
    'data/train',
    target_size=(48, 48),
    batch_size=64,
    color_mode="grayscale",
    class_mode='categorical'
)

# Preprocess all test images
validation_generator = validation_data_gen.flow_from_directory(
    'data/test',
    target_size=(48, 48),
    batch_size=64,
    color_mode="grayscale",
    class_mode='categorical'
)

# Create model structure
emotion_model = Sequential()

# Specify input shape using Input layer
emotion_model.add(Input(shape=(48, 48, 1))) 

# Convolutional layers with Batch Normalization and filter count
emotion_model.add(Conv2D(64, kernel_size=(3, 3), activation='relu'))
emotion_model.add(BatchNormalization())
emotion_model.add(Conv2D(128, kernel_size=(3, 3), activation='relu'))
emotion_model.add(BatchNormalization())
emotion_model.add(MaxPooling2D(pool_size=(2, 2)))
emotion_model.add(Dropout(0.3))

emotion_model.add(Conv2D(256, kernel_size=(3, 3), activation='relu'))
emotion_model.add(BatchNormalization())
emotion_model.add(MaxPooling2D(pool_size=(2, 2)))
emotion_model.add(Conv2D(256, kernel_size=(3, 3), activation='relu'))
emotion_model.add(BatchNormalization())
emotion_model.add(MaxPooling2D(pool_size=(2, 2)))
emotion_model.add(Dropout(0.3))

emotion_model.add(Flatten())

# Fully connected layers
emotion_model.add(Dense(1024, activation='relu'))
emotion_model.add(Dropout(0.4))
emotion_model.add(Dense(7, activation='softmax'))

cv2.ocl.setUseOpenCL(False)

# Apply learning rate decay with ExponentialDecay
lr_schedule = ExponentialDecay(initial_learning_rate=0.001, 
                               decay_steps=100000, 
                               decay_rate=0.96, 
                               staircase=True)

# Compile the model
emotion_model.compile(loss='categorical_crossentropy',
                      optimizer=Adam(learning_rate=lr_schedule),
                      metrics=['accuracy'])

# Early stopping callback
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)

# Train the neural network/model with early stopping and learning rate scheduling
emotion_model_info = emotion_model.fit(
    train_generator,
    steps_per_epoch=28709 // 64,
    epochs=100,
    validation_data=validation_generator,
    validation_steps=7178 // 64,
    callbacks=[early_stopping]
)

# Save model structure in JSON file
model_json = emotion_model.to_json()
with open("emotion_model.json", "w") as json_file:
    json_file.write(model_json)

# Save trained model weights in .h5 file
emotion_model.save_weights('emotion_model.weights.h5')


Testing and evaluating Matrix using Face++

In [None]:


# Set up logging
logging.basicConfig(level=logging.DEBUG)

# Initialize FastAPI app
app = FastAPI(title="Emotion Detector")

# Enable CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  
    allow_methods=["*"],
    allow_headers=["*"],
)
# Face++ API credentials
API_KEY = "mi11HLqpDSpqFj8lWz6eC82EwHlPICOh"
API_SECRET = "l7s7lwVVkO7CDN5oS3s5uT27tjlNlE40"
FACE_PLUS_PLUS_URL = "https://api-us.faceplusplus.com/facepp/v3/detect"

UPLOAD_DIR = 'uploads'
TEMP_DIR = 'temp'
OUTPUT_DIR = 'output'
os.makedirs(UPLOAD_DIR, exist_ok=True)
os.makedirs(TEMP_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Serve uploaded files from the 'uploads' directory
app.mount("/uploads", StaticFiles(directory=UPLOAD_DIR), name="uploads")



# Load YOLOv8 face detection model
face_model = YOLO('best.pt')  # Replace with your YOLOv8 model path

# Load emotion classification model
with open('emotion_model.json', 'r') as json_file:
    loaded_model_json = json_file.read()
emotion_model = model_from_json(loaded_model_json)
emotion_model.load_weights("emotion_model.weights.h5")
print("Loaded emotion model from disk")

# Emotion labels
emotion_dict = {
    0: "Angry",
    1: "Disgust",
    2: "Fear",
    3: "Happy",
    4: "Neutral",
    5: "Sad",
    6: "Surprise"
}

def map_predicted_label(predicted_emotion):
    if predicted_emotion == "Sad":
        return "sadness"
    elif predicted_emotion == "Neutral":
        return "neutral"

    return predicted_emotion

# Function to detect emotion using Face++ API
def detect_emotion(image_path):
    with open(image_path, 'rb') as f:
        img_data = f.read()
    data = {
        'api_key': API_KEY,
        'api_secret': API_SECRET,
        'return_attributes': 'emotion'
    }
    files = {'image_file': img_data}
    response = requests.post(FACE_PLUS_PLUS_URL, data=data, files=files)
    if response.status_code == 200:
        result = response.json()
        if 'faces' in result:
            emotions = result['faces'][0]['attributes']['emotion']
            predicted_emotion = max(emotions, key=emotions.get)
            return predicted_emotion
        else:
            logging.warning("No faces detected")
            return None
    else:
        logging.error(f"Error in API request: {response.status_code}")
        return None

# Function to detect faces using YOLO
def detect_faces(frame, face_model):
    results = face_model(frame)
    boxes = results[0].boxes.xyxy.cpu().numpy()
    confidences = results[0].boxes.conf.cpu().numpy()
    return boxes, confidences

def classify_emotion(face_img, emotion_model):
    gray_face = cv2.cvtColor(face_img, cv2.COLOR_BGR2GRAY)
    resized_face = cv2.resize(gray_face, (48, 48))
    processed_face = np.expand_dims(resized_face, axis=-1) 
    processed_face = np.expand_dims(processed_face, axis=0)  
    processed_face = processed_face / 255.0  
    prediction = emotion_model.predict(processed_face)
    return map_predicted_label(emotion_dict[np.argmax(prediction)])

# Function to process video frames and assign emotions

def process_video_for_emotions(video_path):
    cap = cv2.VideoCapture(video_path)
    frame_count = 0
    ground_truth = {}
    predictions = {}
    frame_interval = 5  

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        frame_count += 1
        if frame_count % frame_interval != 0:
            continue
        temp_frame_path = os.path.join(TEMP_DIR, f"frame_{frame_count}.jpg")
        cv2.imwrite(temp_frame_path, frame)
        
        predicted_emotion = detect_emotion(temp_frame_path)
        if predicted_emotion:
            ground_truth[frame_count] = [predicted_emotion]
        
        boxes, confidences = detect_faces(frame, face_model)
        for box, confidence in zip(boxes, confidences):
            if confidence > 0.7:
                x1, y1, x2, y2 = map(int, box)
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                face_roi = frame[y1:y2, x1:x2]

                predicted_emotion_model = classify_emotion(face_roi, emotion_model)
                font_scale = 1
                thickness = 2
                (text_width, text_height), baseline = cv2.getTextSize(predicted_emotion, cv2.FONT_HERSHEY_SIMPLEX, font_scale, thickness)

                text_x = x1
                text_y = y1 - 10
                if text_x + text_width > frame.shape[1]:
                    text_x = frame.shape[1] - text_width - 10
                if text_y - text_height < 0:
                    text_y = y1 + 10

                # Display the emotion label above the detected face
                cv2.putText(frame, predicted_emotion, (text_x, text_y), cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 255, 0), thickness)
                predictions[frame_count] = [predicted_emotion_model]
        
        os.remove(temp_frame_path)
        cv2.imshow('Emotion Recognition with YOLO', frame)

        # Exit on pressing 'q'
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    cap.release()

    return ground_truth, predictions



# Function to evaluate model
def evaluate_model(ground_truth, predictions):
    flat_predictions = [label for frame_preds in predictions.values() for label in frame_preds]
    flat_true_labels = [label for frame_labels in ground_truth.values() for label in frame_labels]
    
    min_length = min(len(flat_true_labels), len(flat_predictions))
    flat_true_labels = flat_true_labels[:min_length]
    flat_predictions = flat_predictions[:min_length]

    precision = precision_score(flat_true_labels, flat_predictions, average='macro')
    recall = recall_score(flat_true_labels, flat_predictions, average='macro')
    f1 = f1_score(flat_true_labels, flat_predictions, average='macro')

    logging.info(f"Precision: {precision:.2f}, Recall: {recall:.2f}, F1-score: {f1:.2f}")


@app.post("/process_video")
async def process_video(file: UploadFile = File(...)):
    file_path = os.path.join(UPLOAD_DIR, file.filename)
    with open(file_path, "wb") as buffer:
        shutil.copyfileobj(file.file, buffer)
    ground_truth, predictions = process_video_for_emotions(file_path)
    evaluate_model(ground_truth, predictions)
    return {"filename": file.filename, "message": "Processing complete"}

@app.get("/uploads/{filename}")
async def stream_full_video(filename: str):
    video_path = os.path.join(UPLOAD_DIR, filename)
    if not os.path.exists(video_path):
        return {"error": "Video not found"}
    return FileResponse(video_path, media_type="video/mp4")

@app.get("/")
async def get_html():
    return HTMLResponse(content=open("index.html").read(), status_code=200)


Integrating YOLOv8 with Emotion classification and deploying it to Web using FastAPI

In [None]:
# Set up logging
logging.basicConfig(level=logging.DEBUG)

# Initialize FastAPI app
app = FastAPI(title="Emotion Detector")

# Enable CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"], 
    allow_methods=["*"],
    allow_headers=["*"],
)

# Directory setup
UPLOAD_DIR = 'uploads'
os.makedirs(UPLOAD_DIR, exist_ok=True)


# Serve uploaded files from the 'uploads' directory
app.mount("/uploads", StaticFiles(directory=UPLOAD_DIR), name="uploads")

# Face++ API credentials
API_KEY = "mi11HLqpDSpqFj8lWz6eC82EwHlPICOh"
API_SECRET = "l7s7lwVVkO7CDN5oS3s5uT27tjlNlE40"
FACE_PLUS_PLUS_URL = "https://api-us.faceplusplus.com/facepp/v3/detect"

# Load YOLOv8 face detection model
face_model = YOLO('best.pt')  # Replace with your YOLOv8 model path

# Load emotion classification model
with open('emotion_model.json', 'r') as json_file:
    loaded_model_json = json_file.read()
emotion_model = model_from_json(loaded_model_json)
emotion_model.load_weights("emotion_model.weights.h5")
print("Loaded emotion model from disk")

# Emotion labels
emotion_dict = {
    0: "Angry",
    1: "Disgust",
    2: "Fear",
    3: "Happy",
    4: "Neutral",
    5: "Sad",
    6: "Surprise"
}

# Function to detect emotion using Face++ API
def detect_emotion(image_path):
    with open(image_path, 'rb') as f:
        img_data = f.read()
    data = {
        'api_key': API_KEY,
        'api_secret': API_SECRET,
        'return_attributes': 'emotion'
    }
    files = {'image_file': img_data}
    response = requests.post(FACE_PLUS_PLUS_URL, data=data, files=files)
    if response.status_code == 200:
        result = response.json()
        if 'faces' in result:
            emotions = result['faces'][0]['attributes']['emotion']
            predicted_emotion = max(emotions, key=emotions.get)
            return predicted_emotion
        else:
            logging.warning("No faces detected")
            return None
    else:
        logging.error(f"Error in API request: {response.status_code}")
        return None

# Function to classify emotion using a local model
def classify_emotion(face_img, emotion_model):
    gray_face = cv2.cvtColor(face_img, cv2.COLOR_BGR2GRAY)
    resized_face = cv2.resize(gray_face, (48, 48))
    processed_face = np.expand_dims(resized_face, axis=-1)  # Add channel dimension
    processed_face = np.expand_dims(processed_face, axis=0)  # Add batch dimension
    processed_face = processed_face / 255.0  # Normalize pixel values
    prediction = emotion_model.predict(processed_face)
    return emotion_dict[np.argmax(prediction)]

# Function to detect faces using YOLO
def detect_faces(frame, face_model):
    results = face_model(frame)
    boxes = results[0].boxes.xyxy.cpu().numpy()
    confidences = results[0].boxes.conf.cpu().numpy()
    return boxes, confidences

# Function to detect and classify emotions in video
def detect_and_classify_emotions(video_path, output_path):
    cap = cv2.VideoCapture(video_path)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    frame_count = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.resize(frame, (640, 480))
        frame_count += 1

        boxes, confidences = detect_faces(frame, face_model)
        for box, confidence in zip(boxes, confidences):
            if confidence > 0.6:
                x1, y1, x2, y2 = map(int, box)
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                face_roi = frame[y1:y2, x1:x2]
                if face_roi.size == 0:
                    continue
                predicted_emotion = classify_emotion(face_roi, emotion_model)

                # Draw label inside frame
                cv2.putText(
                    frame, predicted_emotion, (x1, max(20, y1 - 10)),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2
                )

        out.write(frame)

    cap.release()
    out.release()
    logging.info(f"Video processing complete. Processed frames: {frame_count}")


def process_video_for_emotions(video_path):
    cap = cv2.VideoCapture(video_path)
    frame_count = 0
    ground_truth = {}
    frame_interval = 5  # Process every 5th frame
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        frame_count += 1
        if frame_count % frame_interval != 0:
            continue
        temp_frame_path = os.path.join(UPLOAD_DIR, f"frame_{frame_count}.jpg")
        cv2.imwrite(temp_frame_path, frame)
        predicted_emotion = detect_emotion(temp_frame_path)
        if predicted_emotion:
            ground_truth[frame_count] = [predicted_emotion]
        os.remove(temp_frame_path)
    cap.release()
    return ground_truth

def evaluate_model(ground_truth, predictions):
    flat_predictions = [label for frame_preds in predictions.values() for label in frame_preds]
    flat_true_labels = [label for frame_labels in ground_truth.values() for label in frame_labels]
    min_length = min(len(flat_true_labels), len(flat_predictions))
    flat_true_labels = flat_true_labels[:min_length]
    
    flat_predictions = flat_predictions[:min_length]
    precision = precision_score(flat_true_labels, flat_predictions, average='macro')
    recall = recall_score(flat_true_labels, flat_predictions, average='macro')
    f1 = f1_score(flat_true_labels, flat_predictions, average='macro')
    logging.info(f"Precision: {precision:.2f}, Recall: {recall:.2f}, F1-score: {f1:.2f}")

def gen_frames(video_path):
    cap = cv2.VideoCapture(video_path)
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Resize for consistency
        frame = cv2.resize(frame, (640, 480))

        # Detect faces + classify emotions
        boxes, confidences = detect_faces(frame, face_model)
        for box, confidence in zip(boxes, confidences):
            if confidence > 0.6:
                x1, y1, x2, y2 = map(int, box)
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                face_roi = frame[y1:y2, x1:x2]
                if face_roi.size == 0:
                    continue
                predicted_emotion = classify_emotion(face_roi, emotion_model)
                cv2.putText(
                    frame, predicted_emotion, (x1, max(20, y1 - 10)),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2
                )

        # Encode frame as JPEG
        _, buffer = cv2.imencode('.jpg', frame)
        frame_bytes = buffer.tobytes()

        # Yield frame for MJPEG
        yield (b'--frame\r\n'
               b'Content-Type: image/jpeg\r\n\r\n' + frame_bytes + b'\r\n')

    cap.release()


@app.get("/webcam_feed")
async def webcam_feed(request: Request):
    async def generate_webcam():
        cap = cv2.VideoCapture(0)  # 0 = default webcam
        try:
            while True:
                # Stop if the client disconnected (e.g. user clicked "Stop Webcam")
                if await request.is_disconnected():
                    break

                success, frame = cap.read()
                if not success:
                    break

                # Face detection + emotion classification
                boxes, confidences = detect_faces(frame, face_model)
                for box, confidence in zip(boxes, confidences):
                    if confidence > 0.6:
                        x1, y1, x2, y2 = map(int, box)
                        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                        face_roi = frame[y1:y2, x1:x2]
                        if face_roi.size > 0:
                            predicted_emotion = classify_emotion(face_roi, emotion_model)
                            cv2.putText(frame, predicted_emotion, (x1, max(20, y1 - 10)),
                                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

                # Encode frame as JPEG for MJPEG streaming
                _, buffer = cv2.imencode('.jpg', frame)
                frame_bytes = buffer.tobytes()
                yield (b'--frame\r\n'
                       b'Content-Type: image/jpeg\r\n\r\n' + frame_bytes + b'\r\n')
        finally:
            cap.release()
            logging.info("✅ Webcam released after client disconnected.")

    return StreamingResponse(generate_webcam(),
                             media_type="multipart/x-mixed-replace; boundary=frame")


# Streaming endpoint
@app.get("/video_feed")
async def video_feed(file: str):
    video_path = os.path.join(UPLOAD_DIR, file)
    if not os.path.exists(video_path):
        return {"error": "Video not found"}
    return StreamingResponse(gen_frames(video_path),
                             media_type="multipart/x-mixed-replace; boundary=frame")

@app.post("/process_video")
async def process_video(file: UploadFile = File(...)):
    # Save uploaded video
    file_path = os.path.join(UPLOAD_DIR, file.filename)
    with open(file_path, "wb") as buffer:
        shutil.copyfileobj(file.file, buffer)

    logging.info(f"Video uploaded: {file.filename}")

    # Instead of saving processed video, just return a stream URL
    return {
        "filename": file.filename,
        "stream_url": f"/video_feed?file={file.filename}"
    }


@app.get("/uploads/{filename}")
async def stream_full_video(filename: str):
    video_path = os.path.join(UPLOAD_DIR, filename)
    if not os.path.exists(video_path):
        return {"error": "Video not found"}
    return FileResponse(video_path, media_type="video/mp4")

@app.get("/")
async def get_html():
    with open("index.html", "r", encoding="utf-8") as f:
        return HTMLResponse(content=f.read(), status_code=200)
    
@app.post("/upload")
async def upload_file(file: UploadFile = File(...)):
    file_path = os.path.join(UPLOAD_DIR, file.filename)
    with open(file_path, "wb") as buffer:
        buffer.write(await file.read())
    
    return JSONResponse({"url": f"/uploads/{file.filename}"})    


In [None]:
config = uvicorn.Config(app, host="127.0.0.1", port=8000, reload=False, log_level="info")
server = uvicorn.Server(config)
try:
    server_task.cancel()
except NameError:
    pass
except Exception as e:
    print("No previous server task to cancel:", e)

# Start new server in background
server_task = asyncio.create_task(server.serve())
print("🚀 Server started at http://127.0.0.1:8000")

In [None]:
try:
    server_task.cancel()
    print("🛑 Server stopped")
except NameError:
    print("⚠️ No server task found")