In [None]:
!pip install transformers torch pandas scikit-learn pymongo flask flask-cors requests --quiet

In [None]:
pip install waitress

In [1]:
# --- Imports ---
import pandas as pd
import torch
from transformers import RobertaTokenizer, RobertaForSequenceClassification
from flask import Flask, request, jsonify
from flask_cors import CORS
from pymongo import MongoClient
from datetime import datetime, timedelta
import os
import sys
from functools import wraps
import bcrypt
import jwt
import re
from werkzeug.security import generate_password_hash, check_password_hash

In [2]:
# --- Configuration ---
PORT = 5000
app = Flask(__name__)
CORS(app)
app.config['SECRET_KEY'] = 's1a7q0i7b2i0s0h2faq'  # Change this in production!

In [3]:
# --- MongoDB Setup ---
def connect_to_mongodb():
    try:
        client = MongoClient('mongodb+srv://saqibishfaq23:saqib1707@cluster0.db9wlz3.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0', serverSelectionTimeoutMS=5000)
        client.admin.command('ping')
        db = client["sample_mflix"]
        print("Connected to MongoDB successfully!")
        return db
    except Exception as e:
        print(f"Error connecting to MongoDB: {e}")
        return None

db = connect_to_mongodb()
if db is None:
    print("FATAL: MongoDB connection failed")
    raise Exception("MongoDB connection failed")  
print("Available collections:", db.list_collection_names())

users_collection = db["Users"]
text_analysis_collection = db["Text_Tone_Classification"]

# Initialize with custom users if none exist
if users_collection.count_documents({}) == 0:
    custom_users = [
        {
            "username": "saqib",  # Your desired username
            "password": generate_password_hash("Saqib@123"),  # Strong password
            "role": "admin",
            "created_at": datetime.utcnow()
        },
        {
            "username": "zain",    # Another username
            "password": generate_password_hash("Zain@456"),
            "role": "user",
            "created_at": datetime.utcnow()
        }
    ]
    
    # Validate before insertion
    for user in custom_users:
        if users_collection.find_one({"username": user['username']}):
            print(f"User {user['username']} already exists - skipping")
            continue
        
        users_collection.insert_one(user)
        print(f"Created user: {user['username']}")

Connected to MongoDB successfully!
Available collections: ['theaters', 'movies', 'Users', 'embedded_movies', 'Text_Tone_Classification', 'sessions', 'comments']


In [4]:
# Initialize with admin/user if not exists and the passwords are encrypted here
if users_collection.count_documents({}) == 0:
    users_collection.insert_many([
        {
            "username": "admin",
            "password": generate_password_hash("admin123"),
            "email": "admin@example.com",
            "phone": "+1234567890",
            "role": "admin",
            "created_at": datetime.utcnow()
        },
        {
            "username": "user",
            "password": generate_password_hash("user123"),
            "email": "user@example.com",
            "phone": "+0987654321",
            "role": "user",
            "created_at": datetime.utcnow()
        }
    ])
    print("Created default admin and user accounts")

In [5]:
# --- JWT Authentication Decorators ---
def token_required(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        token = None
        
        if 'Authorization' in request.headers:
            auth_header = request.headers['Authorization']
            if auth_header.startswith('Bearer '):
                token = auth_header.split(' ')[1]
        
        if not token:
            return jsonify({'message': 'Token is missing!'}), 401
        
        try:
            data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=["HS256"])
            current_user = users_collection.find_one({"username": data['username']})
            if not current_user:
                return jsonify({'message': 'User not found!'}), 401
        except:
            return jsonify({'message': 'Token is invalid!'}), 401
            
        return f(current_user, *args, **kwargs)
    
    return decorated

def admin_required(f):
    @wraps(f)
    def decorated(current_user, *args, **kwargs):
        if current_user['role'] != 'admin':
            return jsonify({'message': 'Admin access required!'}), 403
        return f(current_user, *args, **kwargs)
    return decorated

In [6]:
# --- Load Model ---
local_model_path = r"C:\Users\saqib\Desktop\Internship\Text_Tone_Classification_with_MongoDB\model\content\drive\MyDrive\text_tone_classification" 
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

try:
    model = RobertaForSequenceClassification.from_pretrained(local_model_path).to(device)
    tokenizer = RobertaTokenizer.from_pretrained(local_model_path)
    tone_columns = pd.read_csv(os.path.join(local_model_path, "tone_columns.csv")).iloc[:, 0].tolist()
    print("Model and tokenizer loaded successfully.")
except Exception as e:
    print(f"Error loading model from {local_model_path}: {e}")
    sys.exit(1)

Using device: cpu
Model and tokenizer loaded successfully.


In [7]:
# --- Helper Functions ---
def analyze_text(text):
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=64).to(device)
    with torch.no_grad():
        logits = model(**inputs).logits
    probs = torch.sigmoid(logits)[0].cpu().numpy()
    tone_predictions = {tone: float(prob) for tone, prob in zip(tone_columns, probs)}
    return sorted(tone_predictions.items(), key=lambda x: x[1], reverse=True)

In [8]:
# --- Token Validation Decorator ---
def token_required(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        token = None
        if 'Authorization' in request.headers:
            token = request.headers['Authorization'].split(" ")[1]  # Bearer <token>
        if not token:
            return jsonify({'error': 'Token is missing', 'status': 401}), 401
        try:
            data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=["HS256"])
            current_user = users_collection.find_one({"username": data['username']})
            if not current_user:
                return jsonify({'error': 'User not found', 'status': 404}), 404
        except jwt.ExpiredSignatureError:
            return jsonify({'error': 'Token has expired', 'status': 401}), 401
        except jwt.InvalidTokenError:
            return jsonify({'error': 'Invalid token', 'status': 401}), 401
        return f(current_user, *args, **kwargs)
    return decorated

# --- Admin Role Decorator ---
def admin_required(f):
    @wraps(f)
    def decorated(current_user, *args, **kwargs):
        if current_user.get("role") != "admin":
            return jsonify({'error': 'Admin access required', 'status': 403}), 403
        return f(current_user, *args, **kwargs)
    return decorated

# --- Home Route ---
@app.route('/')
def home():
    return "Flask server is running"

# --- Signup Route ---
@app.route('/signup', methods=['POST'])
def signup():
    data = request.get_json()
    required_fields = ['username', 'email', 'password']
    if not all(field in data and data[field] for field in required_fields):
        return jsonify({
            "error": "Missing required fields",
            "required_fields": required_fields,
            "status": 400
        }), 400

    username = data['username']
    email = data['email']
    password = data['password']

    # === Password Validation ===
    errors = []

    if len(password) != 8:
        errors.append("Password must be exactly 8 characters long.")
    if not re.search(r'[A-Z]', password):
        errors.append("You must have to enter at least an uppercase letter.")
    if not re.search(r'[a-z]', password):
        errors.append("You must have to enter at least a lowercase letter.")
    if not re.search(r'\d', password):
        errors.append("You must have to enter at least a number.")
    if not re.search(r'[^A-Za-z0-9]', password):
        errors.append("You must have to enter at least a special character.")

    if errors:
        return jsonify({"password_errors": errors, "status": 400}), 400

    # Check if user exists
    if users_collection.find_one({"$or": [{"username": username}, {"email": email}]}):
        return jsonify({"error": "Username or email already exists", "status": 409}), 409

    # Hash the password
    hashed_password = generate_password_hash(password)

    # Create user document
    user_doc = {
        "username": username,
        "email": email,
        "password": hashed_password,
        "role": data.get("role", "user"),
        "created_at": datetime.utcnow()
    }

    users_collection.insert_one(user_doc)

    # Generate JWT token
    token = jwt.encode({
        'username': username,
        'email': email,
        'role': user_doc['role'],
        'exp': datetime.utcnow() + timedelta(hours=24)
    }, app.config['SECRET_KEY'], algorithm="HS256")

    return jsonify({
        "message": "User registered successfully",
        "user": {
            "username": username,
            "email": email,
            "role": user_doc["role"]
        },
        "token": token,
        "expires_in": "24 hours"
    }), 201

# --- Signin Route ---
@app.route('/signin', methods=['POST'])
def signin():
    auth = request.get_json()

    if not auth or not auth.get('login') or not auth.get('password'):
        return jsonify({
            "error": "Login credential (username/email) and password required",
            "required_fields": ["login", "password"],
            "status": 400
        }), 400

    login = auth['login']
    password = auth['password']

    user = users_collection.find_one({
        "$or": [
            {"username": login},
            {"email": login}
        ]
    })

    if not user or not check_password_hash(user['password'], password):
        return jsonify({"error": "Invalid credentials", "status": 401}), 401

    token = jwt.encode({
        'username': user['username'],
        'email': user['email'],
        'role': user['role'],
        'exp': datetime.utcnow() + timedelta(hours=24)
    }, app.config['SECRET_KEY'], algorithm="HS256")

    return jsonify({
        "message": "Login successful",
        "user": {
            "username": user['username'],
            "email": user['email'],
            "role": user['role']
        },
        "token": token,
        "expires_in": "24 hours"
    }), 200

# --- Change Password Route ---
@app.route('/change-password', methods=['POST'])
@token_required
def change_password(current_user):
    data = request.json

    if not data or not data.get('current_password') or not data.get('new_password'):
        return jsonify({"error": "Current and new password required", "status": 400}), 400

    if not check_password_hash(current_user['password'], data['current_password']):
        return jsonify({"error": "Current password is incorrect", "status": 401}), 401

    new_password_hash = generate_password_hash(data['new_password'])
    users_collection.update_one(
        {"username": current_user['username']},
        {"$set": {"password": new_password_hash}}
    )

    return jsonify({"message": "Password updated successfully"}), 200

# --- Analyze Text Route ---
@app.route('/analyze', methods=['POST'])
@token_required
def analyze_endpoint(current_user):
    if not request.json or 'text' not in request.json:
        return jsonify({"error": "Please provide 'text' in the request body", "status": 400}), 400

    user_text = request.json['text']
    user_id = text_analysis_collection.count_documents({}) + 1
    sorted_tones = analyze_text(user_text)

    document = {
        "UserID": current_user['username'],
        "Text": user_text,
        "Timestamp": datetime.utcnow(),
        "DominantTone": {
            "tone": sorted_tones[0][0],
            "confidence": round(sorted_tones[0][1] * 100, 2)
        },
        "Top3Tones": [
            {"tone": tone, "confidence": round(conf * 100, 2)}
            for tone, conf in sorted_tones[:3]
        ]
    }

    text_analysis_collection.insert_one(document)
    return jsonify({"message": "Analysis saved successfully", "data": document}), 201

# --- Users (Admin Only) ---
@app.route('/users', methods=['GET'])
@token_required
@admin_required
def get_users(current_user):
    user_id = request.args.get('user_id')

    if user_id:
        try:
            user_data = text_analysis_collection.find_one({"UserID": user_id})
            if user_data:
                user_data['Timestamp'] = user_data['Timestamp'].isoformat()
                return jsonify(user_data)
            return jsonify({"error": "User not found", "status": 404}), 404
        except ValueError:
            return jsonify({"error": "Invalid UserID format", "status": 400}), 400

    all_users = list(text_analysis_collection.find({}, {'_id': 0}).sort("Timestamp", -1))
    for user in all_users:
        if 'Timestamp' in user:
            user['Timestamp'] = user['Timestamp'].isoformat()

    return jsonify(all_users)

# --- Development Token Generator ---
@app.route('/token', methods=['GET'])
def get_test_token():
    username = request.args.get('username', 'admin')
    user = users_collection.find_one({"username": username})
    if not user:
        return jsonify({"error": "User not found", "status": 404}), 404

    token = jwt.encode({
        'username': user['username'],
        'role': user['role'],
        'exp': datetime.utcnow() + timedelta(hours=24)
    }, app.config['SECRET_KEY'])

    return jsonify({
        "message": "Test token generated",
        "token": token,
        "user": username,
        "role": user['role'],
        "expires_in": "24 hours"
    }), 200


In [None]:
if __name__ == '__main__':
    try:
        print(f"\n=== Starting Server ===")
        print(f"Access URLs:")
        print(f"http://localhost:{PORT}")
        print(f"http://127.0.0.1:{PORT}")
        
        # Development server
        app.run(
            host='0.0.0.0', 
            port=PORT, 
            debug=True,
            use_reloader=False  # Disable reloader to prevent double init
        )
        
    except KeyboardInterrupt:
        print("\nServer stopped by user")
    except Exception as e:
        print(f"\n!!! Server failed !!!")
        print(f"Error: {str(e)}")
        print("Check:")
        print("- MongoDB connection")
        print("- Port availability")
        print("- Model file paths")


=== Starting Server ===
Access URLs:
http://localhost:5000
http://127.0.0.1:5000
 * Serving Flask app '__main__'
 * Debug mode: on


 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:5000
 * Running on http://192.168.18.41:5000
Press CTRL+C to quit
127.0.0.1 - - [05/Aug/2025 12:05:04] "POST /signup HTTP/1.1" 400 -
127.0.0.1 - - [05/Aug/2025 12:05:17] "POST /signup HTTP/1.1" 400 -
127.0.0.1 - - [05/Aug/2025 12:05:31] "POST /signup HTTP/1.1" 400 -
  "created_at": datetime.utcnow()
  'exp': datetime.utcnow() + timedelta(hours=24)
POST /signup HTTP/1.1" 201 -2:05:38] "
127.0.0.1 - - [05/Aug/2025 12:06:02] "POST /signup HTTP/1.1" 400 -
127.0.0.1 - - [05/Aug/2025 12:06:14] "POST /signup HTTP/1.1" 400 -
