In [16]:
!pip install opencv-python
!pip install flask-socketio
!pip install pydicom
!pip install flask_sqlalchemy
!pip install mysql-connector-python




In [17]:
import mysql.connector
import numpy as np
import os
from PIL import Image
import torch
from torch import nn, optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from tqdm import tqdm
import torchvision.models as models
import torchvision.transforms as transforms
import cv2
import timm
import time
from torch.optim.lr_scheduler import ReduceLROnPlateau
from flask import Flask, render_template, request, jsonify, session, redirect, url_for, flash
from flask_sqlalchemy import SQLAlchemy
from flask_socketio import SocketIO, emit, join_room, leave_room  # Fixed: Added missing imports
from datetime import datetime
import sys
import logging
from werkzeug.security import generate_password_hash, check_password_hash
from functools import wraps
import secrets
from datetime import timedelta


# Ajouter le répertoire patterns au path
sys.path.append('patterns')

# Import des design patterns
from patterns import (
    ModelSingleton,
    PredictionObserver,
    WebObserver,
    MedicalImageAdapter,
    BasicPreprocessing,
    EnhancedContrastPreprocessing,
    PreprocessingContext,
    PredictCommand,
    HistoryCommand,
    MelanomaDetectionFacade
)

from utils.config import DatabaseConfig
from models import db, Utilisateur, Analyse, Notification
from database_service import DatabaseService



In [18]:
def seed_everything(seed=42):
    """Initialisation des seeds pour la reproductibilité"""
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True

# Configuration du logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

seed_everything()

In [19]:
LR    = 4e-5
H     = 224
W     = 224

In [None]:
# Configure logging
logging.basicConfig(level=logging.INFO)

# Vérification CUDA
print(f"CUDA disponible: {torch.cuda.is_available()}")

# Initialisation de l'application Flask avec SocketIO
app = Flask(__name__)
app.secret_key = os.getenv('SECRET_KEY', 'your-secret-key-here')
socketio = SocketIO(app, cors_allowed_origins="*")

# Configuration de la base de données
app.config.from_object(DatabaseConfig)

# Initialisation de la base de données
db.init_app(app)

# Création des tables au démarrage
with app.app_context():
    try:
        db.create_all()
        print("✅ Tables créées avec succès!")
    except Exception as e:
        print(f"❌ Erreur lors de la création des tables: {e}")

# Configuration des dossiers
UPLOAD_FOLDER = "static"
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER

# Configuration pour les sessions utilisateur
app.config['SESSION_PERMANENT'] = False
app.config['SESSION_TYPE'] = 'filesystem'

# Créer le dossier d'upload s'il n'existe pas
os.makedirs(UPLOAD_FOLDER, exist_ok=True)

# Database Service for handling database operations
class DatabaseService:
    """Service pour gérer les opérations de base de données"""
    
    @staticmethod
    def create_user(nom, email, mot_de_passe):
        """Créer un nouvel utilisateur"""
        try:
            # Vérifier si l'utilisateur existe déjà
            existing_user = Utilisateur.query.filter_by(email=email).first()
            if existing_user:
                return None, "Email déjà utilisé"
            
            # Créer le nouvel utilisateur
            new_user = Utilisateur(nom=nom, email=email, mot_de_passe=mot_de_passe)
            db.session.add(new_user)
            db.session.commit()
            
            return new_user, "Utilisateur créé avec succès"
            
        except Exception as e:
            db.session.rollback()
            logging.error(f"Erreur lors de la création de l'utilisateur: {e}")
            return None, f"Erreur lors de la création: {str(e)}"
    
    @staticmethod
    def authenticate_user(email, mot_de_passe):
        """Authentifier un utilisateur"""
        try:
            user = Utilisateur.query.filter_by(email=email).first()
            if user and user.check_password(mot_de_passe):
                return user, "Authentification réussie"
            return None, "Email ou mot de passe incorrect"
            
        except Exception as e:
            logging.error(f"Erreur lors de l'authentification: {e}")
            return None, f"Erreur d'authentification: {str(e)}"
    
    @staticmethod
    def save_analysis(id_utilisateur, image_path, resultat, probabilite):
        """Sauvegarder une analyse"""
        try:
            analyse = Analyse(
                id_utilisateur=id_utilisateur,
                image_path=image_path,
                resultat=resultat,
                probabilite=probabilite
            )
            db.session.add(analyse)
            db.session.commit()
            return analyse
            
        except Exception as e:
            db.session.rollback()
            logging.error(f"Erreur lors de la sauvegarde de l'analyse: {e}")
            return None
    
    @staticmethod
    def get_user_analyses(id_utilisateur):
        """Récupérer les analyses d'un utilisateur"""
        try:
            analyses = Analyse.query.filter_by(id_utilisateur=id_utilisateur)\
                                  .order_by(Analyse.date_analyse.desc()).all()
            return [analyse.to_dict() for analyse in analyses]
            
        except Exception as e:
            logging.error(f"Erreur lors de la récupération des analyses: {e}")
            return []

# Mock classes for the design patterns (replace these with your actual implementations)
class ModelSingleton:
    _instance = None
    _model = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance
    
    @property
    def model(self):
        if self._model is None:
            # Load your model here
            # self._model = load_your_model()
            print("Mock model loaded")
            self._model = "mock_model"  # Replace with actual model
        return self._model

class PreprocessingStrategyFactory:
    """Factory pour créer les bonnes stratégies selon le type d'image"""
    
    @staticmethod
    def create_strategy(filename):
        """Créer la stratégie appropriée selon le type de fichier"""
        filename_lower = filename.lower()
        
        if filename_lower.endswith('.dcm'):
            print(f"Stratégie DICOM sélectionnée pour: {filename}")
            return "EnhancedContrastPreprocessing"
        elif filename_lower.endswith(('.tiff', '.tif')):
            print(f"Stratégie haute résolution sélectionnée pour: {filename}")
            return "EnhancedContrastPreprocessing"
        else:
            print(f"Stratégie basique sélectionnée pour: {filename}")
            return "BasicPreprocessing"

class MelanomaDetectionFacade:
    """Facade pour la détection de mélanome"""
    
    def __init__(self, socketio=None):
        self.model_singleton = ModelSingleton()
        self.socketio = socketio
        print("MelanomaDetectionFacade initialisée")
    
    def process_image(self, image_file):
        """Traiter une image pour la prédiction"""
        try:
            # Mock processing - replace with your actual implementation
            result = {
                'status': 'success',
                'prediction': 'Lésion probablement bénigne (confiance: 85%)',
                'confidence': 0.85,
                'risk_level': 'low',
                'processing_strategy': 'BasicPreprocessing',
                'timestamp': datetime.now().isoformat()
            }
            
            if self.socketio:
                self.socketio.emit('prediction_complete', result)
            
            return result
            
        except Exception as e:
            error_result = {
                'status': 'error',
                'error': f"Erreur lors du traitement: {str(e)}",
                'timestamp': datetime.now().isoformat()
            }
            return error_result

# Initialisation de la facade
try:
    facade = MelanomaDetectionFacade(socketio)
    print("Facade initialisée avec succès")
except Exception as e:
    print(f"Erreur lors de l'initialisation de la facade: {e}")
    facade = None

# Décorateurs d'authentification
def login_required(f):
    """Décorateur pour les routes qui exigent une authentification"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if 'id_utilisateur' not in session:
            return redirect(url_for('login'))
        return f(*args, **kwargs)
    return decorated_function

def login_required_optional(f):
    """Décorateur pour les routes qui bénéficient de l'authentification mais ne l'exigent pas"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        return f(*args, **kwargs)
    return decorated_function

# Routes Flask
@app.route("/", methods=["GET", "POST"])
def upload_predict():
    """Route principale pour l'upload et la prédiction d'images"""
    is_authenticated = 'id_utilisateur' in session
    nom = session.get('nom') if is_authenticated else None
    
    if request.method == "POST":
        try:
            image_file = request.files.get("image")
            if not image_file or image_file.filename == '':
                return render_template("index.html", 
                                     error="Aucun fichier sélectionné",
                                     prediction=None, 
                                     image_url=None,
                                     nom=nom,
                                     is_authenticated=is_authenticated)

            # Utilisation de la facade pour traiter l'image
            result = facade.process_image(image_file)
            
            if result['status'] == 'success':
                # Sauvegarder le fichier pour affichage
                timestamp_str = datetime.now().strftime('%Y%m%d_%H%M%S')
                user_prefix = session.get('id_utilisateur', 'guest')
                filename = f"{user_prefix}_{timestamp_str}_{image_file.filename}"
                image_location = os.path.join(UPLOAD_FOLDER, filename)
                
                # Réinitialiser le curseur du fichier
                image_file.seek(0)
                image_file.save(image_location)
                
                # Générer l'URL de l'image
                image_url = url_for('static', filename=filename)
                
                # Sauvegarder dans la base de données si l'utilisateur est connecté
                if is_authenticated:
                    DatabaseService.save_analysis(
                        id_utilisateur=session.get('id_utilisateur'),
                        image_path=image_url,
                        resultat=result['prediction'],
                        probabilite=result.get('confidence', 0.0)
                    )
                
                return render_template("index.html", 
                                     prediction=result['prediction'],
                                     image_url=image_url,
                                     timestamp=result['timestamp'],
                                     nom=nom,
                                     is_authenticated=is_authenticated,
                                     confidence=result.get('confidence'),
                                     risk_level=result.get('risk_level'))
            else:
                return render_template("index.html", 
                                     error=result.get('error', 'Erreur inconnue'),
                                     prediction=None, 
                                     image_url=None,
                                     nom=nom,
                                     is_authenticated=is_authenticated)

        except Exception as e:
            logging.error(f"Erreur lors du traitement: {str(e)}")
            return render_template("index.html", 
                                 error=f"Erreur lors du traitement: {str(e)}",
                                 prediction=None, 
                                 image_url=None,
                                 nom=nom,
                                 is_authenticated=is_authenticated)

    return render_template("index.html", 
                         prediction=None, 
                         image_url=None,
                         nom=nom,
                         is_authenticated=is_authenticated)

@app.route("/login", methods=["GET", "POST"])
def login():
    """Route pour la connexion utilisateur"""
    if request.method == "POST":
        try:
            email = request.form.get("email", "").strip()
            # CORRECTION: Utiliser 'mot_de_passe' pour correspondre au HTML
            mot_de_passe = request.form.get("mot_de_passe", "")
            
            if not email or not mot_de_passe:
                # Retourner une réponse JSON pour AJAX
                if request.headers.get('Content-Type') == 'application/json':
                    return jsonify({
                        'success': False, 
                        'message': 'Veuillez remplir tous les champs'
                    }), 400
                return render_template("login.html", 
                                     error="Veuillez remplir tous les champs")
            
            user, message = DatabaseService.authenticate_user(email, mot_de_passe)
            if user:
                session['id_utilisateur'] = user.id_utilisateur
                session['nom'] = user.nom
                session['email'] = user.email
                
                # Rediriger vers la page d'origine ou le dashboard
                next_page = request.args.get('next')
                redirect_url = next_page if next_page else url_for('dashboard')
                
                # Réponse pour AJAX
                if request.headers.get('Content-Type') == 'application/json':
                    return jsonify({
                        'success': True,
                        'message': 'Connexion réussie !',
                        'redirect_url': redirect_url
                    })
                
                return redirect(redirect_url)
            else:
                # Réponse pour AJAX
                if request.headers.get('Content-Type') == 'application/json':
                    return jsonify({
                        'success': False,
                        'message': message
                    }), 401
                return render_template("login.html", error=message)
                
        except Exception as e:
            logging.error(f"Erreur lors de la connexion: {str(e)}")
            error_message = "Erreur lors de la connexion"
            
            # Réponse pour AJAX
            if request.headers.get('Content-Type') == 'application/json':
                return jsonify({
                    'success': False,
                    'message': error_message
                }), 500
            return render_template("login.html", error=error_message)
    
    return render_template("login.html")
@app.route("/register", methods=["GET", "POST"])
@app.route("/api/register", methods=["POST"])
def register():
    """Route pour l'inscription utilisateur"""
    if request.method == "POST":
        try:
            # Handle both form data and JSON data
            if request.content_type == 'application/json':
                data = request.get_json()
                if not data:
                    return jsonify({"error": "Données JSON invalides"}), 400
                
                # Handle firstName and lastName for JSON
                first_name = data.get("firstName", "").strip()
                last_name = data.get("lastName", "").strip()
                nom = f"{first_name} {last_name}".strip()
                
                email = data.get("email", "").strip()
                mot_de_passe = data.get("password", "")
                confirm_password = data.get("confirmPassword", "")
                is_api_call = True
            else:
                # Handle form data - check both possible field names
                first_name = request.form.get("firstName", "").strip()
                last_name = request.form.get("lastName", "").strip()
                
                # If firstName/lastName not found, try alternative field names
                if not first_name and not last_name:
                    nom = request.form.get("nom", "").strip()
                else:
                    nom = f"{first_name} {last_name}".strip()
                
                email = request.form.get("email", "").strip()
                # Try both 'password' and 'mot_de_passe'
                mot_de_passe = request.form.get("password", "") or request.form.get("mot_de_passe", "")
                confirm_password = request.form.get("confirmPassword", "") or request.form.get("confirm_password", "")
                
                # Check if this is an AJAX request (API call)
                is_api_call = request.headers.get('X-Requested-With') == 'XMLHttpRequest' or request.headers.get('Content-Type') == 'application/x-www-form-urlencoded'
            
            # Validation
            if not all([nom.strip(), email, mot_de_passe, confirm_password]):
                error_msg = "Veuillez remplir tous les champs obligatoires"
                if is_api_call:
                    return jsonify({"success": False, "error": error_msg}), 400
                return render_template("register.html", error=error_msg)
            
            if mot_de_passe != confirm_password:
                error_msg = "Les mots de passe ne correspondent pas"
                if is_api_call:
                    return jsonify({"success": False, "error": error_msg}), 400
                return render_template("register.html", error=error_msg)
            
            if len(mot_de_passe) < 6:
                error_msg = "Le mot de passe doit contenir au moins 6 caractères"
                if is_api_call:
                    return jsonify({"success": False, "error": error_msg}), 400
                return render_template("register.html", error=error_msg)
            
            # Simple email validation
            if "@" not in email or "." not in email:
                error_msg = "Format d'email invalide"
                if is_api_call:
                    return jsonify({"success": False, "error": error_msg}), 400
                return render_template("register.html", error=error_msg)
            
            # Create user using DatabaseService
            user, message = DatabaseService.create_user(nom, email, mot_de_passe)
            if user:
                # Set session for successful registration
                session['id_utilisateur'] = user.id_utilisateur
                session['nom'] = user.nom
                session['email'] = user.email
                session.permanent = True  # Make session permanent
                
                logging.info(f"Utilisateur créé avec succès: {user.email}")
                
                if is_api_call:
                    return jsonify({
                        "success": True, 
                        "message": "Utilisateur créé avec succès",
                        "redirect_url": url_for('dashboard'),
                        "user": {
                            "id": user.id_utilisateur,
                            "nom": user.nom,
                            "email": user.email
                        }
                    }), 201
                else:
                    # For form submission, redirect to dashboard
                    flash("Inscription réussie! Bienvenue!", "success")
                    return redirect(url_for('dashboard'))
            else:
                logging.warning(f"Échec de création d'utilisateur: {message}")
                if is_api_call:
                    return jsonify({"success": False, "error": message}), 409
                return render_template("register.html", error=message)
                
        except Exception as e:
            logging.error(f"Erreur lors de l'inscription: {str(e)}")
            error_msg = f"Erreur lors de l'inscription: {str(e)}"
            if is_api_call:
                return jsonify({"success": False, "error": error_msg}), 500
            return render_template("register.html", error=error_msg)
    
    # GET request
    return render_template("register.html")

# Also add this helper route to debug sessions
@app.route("/debug/session")
def debug_session():
    """Debug route to check session content (remove in production)"""
    if app.debug:  # Only available in debug mode
        return jsonify({
            "session_data": dict(session),
            "is_authenticated": 'id_utilisateur' in session
        })
    else:
        return "Debug disabled", 404
@app.route("/logout")
def logout():
    """Route pour la déconnexion utilisateur"""
    session.clear()
    return redirect(url_for('upload_predict'))

@app.route("/dashboard")
@login_required
def dashboard():
    """Route pour le tableau de bord utilisateur"""
    try:
        id_utilisateur = session.get('id_utilisateur')
        nom = session.get('nom')
        
        user_analyses = DatabaseService.get_user_analyses(id_utilisateur)
        
        total_predictions = len(user_analyses)
        recent_predictions = [a for a in user_analyses 
                            if datetime.fromisoformat(a['date_analyse']) > 
                            datetime.now() - timedelta(days=7)]
        
        benign_count = sum(1 for a in user_analyses 
                          if 'bénigne' in a['resultat'].lower())
        malignant_count = sum(1 for a in user_analyses 
                             if 'maligne' in a['resultat'].lower())
        
        stats = {
            'total_predictions': total_predictions,
            'recent_predictions_count': len(recent_predictions),
            'benign_count': benign_count,
            'malignant_count': malignant_count,
            'success_rate': 100.0 if total_predictions > 0 else 0.0
        }
        
        latest_predictions = user_analyses[:5]  # Already sorted by date desc
        
        return render_template("dashboard.html", 
                             nom=nom,
                             stats=stats,
                             latest_predictions=latest_predictions)
        
    except Exception as e:
        logging.error(f"Erreur dashboard: {str(e)}")
        return render_template("error.html", 
                             error="Erreur lors du chargement du tableau de bord")

@app.route("/history")
@login_required
def get_history():
    """Récupérer l'historique des prédictions"""
    try:
        id_utilisateur = session.get('id_utilisateur')
        nom = session.get('nom')
        
        history = DatabaseService.get_user_analyses(id_utilisateur)
        
        return render_template("history.html", 
                             history=history, 
                             nom=nom)
    
    except Exception as e:
        logging.error(f"Erreur historique: {str(e)}")
        return render_template("error.html", error=str(e))

@app.route("/profile")
@login_required
def profile():
    """Route pour afficher le profil utilisateur"""
    try:
        id_utilisateur = session.get('id_utilisateur')
        user = Utilisateur.query.get(id_utilisateur)
        
        if not user:
            return redirect(url_for('logout'))
        
        return render_template("profile.html", user=user.to_dict())
        
    except Exception as e:
        logging.error(f"Erreur profil: {str(e)}")
        return render_template("error.html", error=str(e))

@app.route("/health")
def health_check():
    """Endpoint de vérification de l'état du service"""
    try:
        model_instance = ModelSingleton()
        model_loaded = model_instance.model is not None
        
        return jsonify({
            "status": "healthy",
            "model_loaded": model_loaded,
            "cuda_available": torch.cuda.is_available(),
            "timestamp": datetime.now().isoformat()
        })
    except Exception as e:
        return jsonify({
            "status": "unhealthy",
            "error": str(e),
            "timestamp": datetime.now().isoformat()
        }), 500

# WebSocket handlers
@socketio.on('connect')
def handle_connect():
    emit('status', {'msg': 'Connecté au serveur de prédictions'})
    logging.info('Client connecté via WebSocket')

@socketio.on('disconnect')
def handle_disconnect():
    logging.info('Client déconnecté')

@socketio.on('join_user_room')
def handle_join_user_room():
    if 'id_utilisateur' in session:
        join_room(session['id_utilisateur'])
        emit('status', {'msg': f'Connecté à la room utilisateur'})

@socketio.on('leave_user_room')
def handle_leave_user_room():
    if 'id_utilisateur' in session:
        leave_room(session['id_utilisateur'])
        emit('status', {'msg': 'Déconnecté de la room utilisateur'})

if __name__ == "__main__":
    try:
        logging.info("Démarrage de l'application SkinSafe...")
        logging.info("Chargement du modèle...")
        
        # Pré-chargement du modèle via le Singleton
        model_instance = ModelSingleton()
        logging.info("Modèle chargé avec succès!")
        
        # Démarrage du serveur
        socketio.run(app, host='0.0.0.0', port=12000, debug=False, allow_unsafe_werkzeug=True)
        
    except Exception as e:
        logging.error(f"Erreur lors du démarrage: {str(e)}")
        sys.exit(1)

2025-05-28 17:58:00,638 - root - INFO - Démarrage de l'application SkinSafe...
2025-05-28 17:58:00,638 - root - INFO - Chargement du modèle...
2025-05-28 17:58:00,639 - root - INFO - Modèle chargé avec succès!


CUDA disponible: True
✅ Tables créées avec succès!
MelanomaDetectionFacade initialisée
Facade initialisée avec succès
 * Serving Flask app '__main__'
 * Debug mode: on


 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:12000
 * Running on http://192.168.100.159:12000
2025-05-28 17:58:00,650 - werkzeug - INFO - [33mPress CTRL+C to quit[0m
2025-05-28 17:58:00,750 - werkzeug - INFO - 127.0.0.1 - - [28/May/2025 17:58:00] "GET /dashboard HTTP/1.1" 200 -
2025-05-28 17:58:03,908 - werkzeug - INFO - 127.0.0.1 - - [28/May/2025 17:58:03] "[32mGET /logout HTTP/1.1[0m" 302 -
2025-05-28 17:58:03,917 - werkzeug - INFO - 127.0.0.1 - - [28/May/2025 17:58:03] "GET / HTTP/1.1" 200 -
2025-05-28 17:58:05,463 - werkzeug - INFO - 127.0.0.1 - - [28/May/2025 17:58:05] "GET /login HTTP/1.1" 200 -
2025-05-28 17:58:09,309 - werkzeug - INFO - 127.0.0.1 - - [28/May/2025 17:58:09] "[32mPOST /login HTTP/1.1[0m" 302 -
2025-05-28 17:58:09,317 - werkzeug - INFO - 127.0.0.1 - - [28/May/2025 17:58:09] "GET /dashboard HTTP/1.1" 200 -
2025-05-28 17:58:09,342 - werkzeug - INFO - 127.0.0.1 - - [28/May/2025 17:58:09] "GET /dashboard HTTP/1.1" 200 -
2025-05-28 17:58:1