In [1]:
!pip install fastapi uvicorn nest-asyncio pyngrok scikit-learn instaloader


Collecting pyngrok
  Downloading pyngrok-7.5.0-py3-none-any.whl.metadata (8.1 kB)
Collecting instaloader
  Downloading instaloader-4.15-py3-none-any.whl.metadata (6.8 kB)
Downloading pyngrok-7.5.0-py3-none-any.whl (24 kB)
Downloading instaloader-4.15-py3-none-any.whl (68 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m68.3/68.3 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pyngrok, instaloader
Successfully installed instaloader-4.15 pyngrok-7.5.0


In [2]:
import pandas as pd
import numpy as np

np.random.seed(42)

N = 3000

data = {
    "followers": np.random.randint(10, 500000, N),
    "following": np.random.randint(5, 5000, N),
    "posts": np.random.randint(0, 2000, N),
    "account_age": np.random.randint(30, 4000, N),
    "bio_length": np.random.randint(0, 250, N),
    "story_count": np.random.randint(0, 40, N),
    "has_profile_picture": np.random.randint(0, 2, N),
    "avg_likes": np.random.randint(0, 10000, N),
    "avg_comments": np.random.randint(0, 300, N),
    "is_private": np.random.randint(0, 2, N),
    "suspicious_hashtags": np.random.randint(0, 2, N)
}

df = pd.DataFrame(data)

df["engagement_rate"] = (df["avg_likes"] + df["avg_comments"]) / (df["followers"] + 1)

df["label"] = (
    (df["followers"] < 150) & (df["following"] > 600) |
    (df["posts"] < 3) |
    (df["has_profile_picture"] == 0) |
    (df["engagement_rate"] < 0.003) |
    (df["suspicious_hashtags"] == 1) |
    (df["bio_length"] < 5)
).astype(int)

df.head()


Unnamed: 0,followers,following,posts,account_age,bio_length,story_count,has_profile_picture,avg_likes,avg_comments,is_private,suspicious_hashtags,engagement_rate,label
0,121968,654,220,3963,220,21,0,4021,144,1,1,0.034148,1
1,146877,973,1891,3415,209,28,0,2364,220,0,1,0.017593,1
2,131942,2642,533,2570,174,32,0,5230,63,0,0,0.040116,1
3,365848,1105,691,952,37,37,1,5970,125,1,1,0.01666,1
4,259188,445,1971,2205,230,34,0,8954,217,1,0,0.035383,1


In [3]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import accuracy_score, classification_report
from sklearn.ensemble import RandomForestClassifier
import pickle

features = [
    "followers", "following", "posts", "account_age", "bio_length",
    "story_count", "has_profile_picture", "avg_likes",
    "avg_comments", "is_private", "suspicious_hashtags",
    "engagement_rate"
]

X = df[features]
y = df["label"]

scaler = MinMaxScaler()
X_scaled = scaler.fit_transform(X)

pickle.dump(scaler, open("scaler.pkl", "wb"))

X_train, X_test, y_train, y_test = train_test_split(
    X_scaled, y, test_size=0.2, random_state=42
)

model = RandomForestClassifier(n_estimators=300, random_state=42)
model.fit(X_train, y_train)

pickle.dump(model, open("model.pkl", "wb"))

y_pred = model.predict(X_test)

print("Accuracy:", accuracy_score(y_test, y_pred))
print("\nClassification Report:\n", classification_report(y_test, y_pred))


Accuracy: 0.9983333333333333

Classification Report:
               precision    recall  f1-score   support

           0       0.99      1.00      1.00       142
           1       1.00      1.00      1.00       458

    accuracy                           1.00       600
   macro avg       1.00      1.00      1.00       600
weighted avg       1.00      1.00      1.00       600



In [4]:
%%writefile api_server.py
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import pickle, re, logging, traceback
import numpy as np
import instaloader

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("fake-profile-api")

MODEL_PATH = "model.pkl"
SCALER_PATH = "scaler.pkl"

try:
    model = pickle.load(open(MODEL_PATH, "rb"))
    scaler = pickle.load(open(SCALER_PATH, "rb"))
    logger.info("Loaded model and scaler.")
except Exception as e:
    logger.exception("Failed to load model/scaler: %s", e)
    model = None
    scaler = None

app = FastAPI(title="Instagram Fake Profile Detection")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
    allow_credentials=True
)

class FeatureInput(BaseModel):
    followers: int
    following: int
    posts: int
    account_age: int
    bio_length: int
    story_count: int
    has_profile_picture: int
    avg_likes: int
    avg_comments: int
    is_private: int
    suspicious_hashtags: int

class URLInput(BaseModel):
    url: str

def extract_username(insta_url: str):
    if not insta_url:
        return None
    m = re.search(r"(?:https?://)?(?:www\.)?instagram\.com/([^/?#]+)", insta_url)
    return m.group(1).strip("/") if m else None

def scrape_profile(username: str, loader=None):
    if loader is None:
        loader = instaloader.Instaloader(download_pictures=False, download_videos=False,
                                         download_comments=False, save_metadata=False, compress_json=False)
    try:
        profile = instaloader.Profile.from_username(loader.context, username)
    except Exception as e:
        raise RuntimeError(f"Instaloader error: {e}")

    followers = int(profile.followers or 0)
    following = int(profile.followees or 0)
    posts = int(profile.mediacount or 0)
    bio = profile.biography or ""
    bio_length = len(bio)
    is_private = 1 if profile.is_private else 0
    has_profile_pic = 0 if (profile.profile_pic_url is None or profile.profile_pic_url == "") else 1
    story_count = 0

    avg_likes = 0
    avg_comments = 0
    try:
        likes = []
        comments = []
        for post in profile.get_posts():
            likes.append(post.likes or 0)
            comments.append(post.comments or 0)
            if len(likes) >= 10:
                break
        if len(likes) > 0:
            avg_likes = int(sum(likes) / len(likes))
            avg_comments = int(sum(comments) / len(comments))
    except Exception:
        avg_likes = 0
        avg_comments = 0

    account_age = 0
    try:
        posts_iter = profile.get_posts()
        oldest = None
        for i, p in enumerate(posts_iter):
            oldest = p
        if oldest is not None and hasattr(oldest, "date_utc"):
            from datetime import datetime
            account_age = (datetime.utcnow() - oldest.date_utc).days
        else:
            account_age = 0
    except Exception:
        account_age = 0

    suspicious_keywords = ["follow", "like4like", "follow4follow", "free", "giveaway", "earn money"]
    bio_lower = bio.lower()
    suspicious_hashtags = 1 if any(k in bio_lower for k in suspicious_keywords) else 0

    result = {
        "followers": followers,
        "following": following,
        "posts": posts,
        "account_age": int(account_age),
        "bio_length": bio_length,
        "story_count": int(story_count),
        "has_profile_picture": int(has_profile_pic),
        "avg_likes": int(avg_likes),
        "avg_comments": int(avg_comments),
        "is_private": int(is_private),
        "suspicious_hashtags": int(suspicious_hashtags)
    }
    return result

FEATURE_ORDER = [
    "followers", "following", "posts", "account_age", "bio_length",
    "story_count", "has_profile_picture", "avg_likes",
    "avg_comments", "is_private", "suspicious_hashtags", "engagement_rate"
]

def prepare_features_row(feat_dict):
    followers = feat_dict.get("followers", 0)
    avg_likes = feat_dict.get("avg_likes", 0)
    avg_comments = feat_dict.get("avg_comments", 0)
    engagement_rate = float((avg_likes + avg_comments) / (followers + 1))
    row = [
        int(feat_dict.get("followers", 0)),
        int(feat_dict.get("following", 0)),
        int(feat_dict.get("posts", 0)),
        int(feat_dict.get("account_age", 0)),
        int(feat_dict.get("bio_length", 0)),
        int(feat_dict.get("story_count", 0)),
        int(feat_dict.get("has_profile_picture", 0)),
        int(feat_dict.get("avg_likes", 0)),
        int(feat_dict.get("avg_comments", 0)),
        int(feat_dict.get("is_private", 0)),
        int(feat_dict.get("suspicious_hashtags", 0)),
        float(engagement_rate)
    ]
    return np.array([row], dtype=float)

@app.get("/")
def root():
    return {"status": "ok", "message": "Fake profile detection API is up."}

@app.post("/predict_features")
def predict_features(payload: FeatureInput):
    if model is None or scaler is None:
        raise HTTPException(status_code=500, detail="Model or scaler not loaded on server.")
    features = payload.dict()
    row = prepare_features_row(features)
    scaled = scaler.transform(row)
    prob = float(model.predict_proba(scaled)[0][1])
    pred = int(model.predict(scaled)[0])
    return {
        "prediction": "FAKE" if pred == 1 else "REAL",
        "probability_fake": round(prob, 4),
        "features_used": features
    }

@app.post("/predict_url")
def predict_url(payload: URLInput):
    if model is None or scaler is None:
        raise HTTPException(status_code=500, detail="Model or scaler not loaded on server.")
    username = extract_username(payload.url)
    if not username:
        raise HTTPException(status_code=400, detail="Invalid Instagram URL")
    try:
        feat_dict = scrape_profile(username)
    except Exception as e:
        logger.error("Scrape failure: %s", traceback.format_exc())
        raise HTTPException(status_code=500, detail=f"Failed scraping Instagram profile: {str(e)}")

    row = prepare_features_row(feat_dict)
    try:
        scaled = scaler.transform(row)
        prob = float(model.predict_proba(scaled)[0][1])
        pred = int(model.predict(scaled)[0])
    except Exception as e:
        logger.exception("Prediction error: %s", e)
        raise HTTPException(status_code=500, detail=f"Prediction failed: {e}")

    response = {
        "username": username,
        "prediction": "FAKE" if pred == 1 else "REAL",
        "probability_fake": round(prob, 4),
        "instagram_stats": feat_dict
    }
    return response


Writing api_server.py


In [5]:

from pyngrok import ngrok
ngrok.set_auth_token("35Vy6OQIw1o0psFsS8XBxo9JvEr_5WwLvcgit8Yx7dbMxhY86")
import nest_asyncio
nest_asyncio.apply()
!nohup uvicorn api_server:app --host 0.0.0.0 --port 8000 &>/content/uvicorn.log &
from pyngrok import ngrok
public_url = ngrok.connect(8000)
print("Public URL:", public_url)



Public URL: NgrokTunnel: "https://kyndall-punctate-liane.ngrok-free.dev" -> "http://localhost:8000"
