In [None]:
import pandas as pd
import numpy as np

from ragas import evaluate
from ragas.metrics import (
    faithfulness,
    answer_relevancy,
    context_recall,
    context_precision,
)
from datasets import Dataset
import faiss
from tqdm import tqdm

from dotenv import load_dotenv
load_dotenv()

In [None]:
import os

os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY2")
os.environ["LANGCHAIN_API_KEY"] = os.getenv("LANGSMITH_API_KEY")
os.environ["LANGCHAIN_PROJECT"] = "Game Bot Detection Framework"
os.environ["LANGCHAIN_TRACING_V2"] = "true"

In [None]:
def load_index(player_df, embedding_file="../model/player_embeddings_4000.npy"):
    """Loads pre-computed embeddings from a pickle file and builds FAISS index."""
    try:
        # with open(embedding_file, "rb") as f:
        #     embeddings = pickle.load(f)
        embeddings = np.load(embedding_file)
        player_ids = player_df['Actor'].astype(str).tolist()
        # Ensure loaded embeddings are a numpy array and have the correct shape
        if isinstance(embeddings, list):
            embeddings = np.array(embeddings).astype('float32')  # Convert to numpy array
        if isinstance(embeddings, tuple):
            print("Tuple detected. Converting the embeddings to numpy array")
            embeddings = np.array(embeddings).astype('float32') 
        if not isinstance(embeddings, np.ndarray):
            raise ValueError("Loaded embeddings are not a numpy array.")

        if len(embeddings.shape) != 2:
            raise ValueError(f"Embeddings must be 2D array, got shape {embeddings.shape}")

        # Build FAISS index
        dimension = embeddings.shape[1]
        faiss_index = faiss.IndexFlatL2(dimension)
        faiss_index.add(embeddings)
    except FileNotFoundError:
        print(f"Error: Embedding file not found at {embedding_file}")
        raise
    except Exception as e:
        print(f"Error loading and building FAISS index: {e}")
        raise
    return faiss_index
def get_embedding_for_player(player_id: str, embedding_file="../model/player_embeddings_4000.npy"):
    """
    Retrieves the pre-computed embedding for a specific player from the embeddings file.
    """
    try:
        embeddings = np.load(embedding_file)
        player_df = pd.read_csv('../../data/sample/sample_player_data.csv')
        player_ids = player_df['Actor'].astype(str).tolist()

        player_index = player_ids.index(player_id)
        return np.array(embeddings[player_index]).astype('float32')
    except ValueError:
        print(f"Player ID {player_id} not found in embeddings.")
        return None
    except FileNotFoundError:
            print(f"Error: Embedding file not found at {embedding_file}")
            return None
    except Exception as e:
        print(f"Error loading embeddings: {e}")
        return None

In [None]:
# ml/anomaly_scoring_agent.py
import os
from typing import List
from langchain_community.graphs import Neo4jGraph
from langchain_core.prompts import ChatPromptTemplate
from langchain_groq import ChatGroq


# Initialize LLM and Neo4j Graph
llm = ChatGroq(model="llama-3.3-70b-versatile")
NEO4J_URI = os.getenv("NEO4J_URI")
NEO4J_USERNAME = os.getenv("NEO4J_USERNAME")
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD")

graph = Neo4jGraph(
    url=NEO4J_URI, username=NEO4J_USERNAME, password=NEO4J_PASSWORD
)

# ml/prompts.py
def anomaly_scoring_prompt():
    return """
<|system|>
You are an expert game analyst tasked with identifying bots in an online game by analyzing player statistics. Your role is to generate an anomaly score (0-100, higher score means more likely to be a bot) and provide concise reasoning based on the player data.

Here is how to interpret each player statistic. Provide context for how each statistic informs your score. Focus on how these characteristics point to a bot.

- **Actor**: The player's unique identifier.
- **A_Acc**: Account ID of the player; may provide clues about account creation patterns.
- **Login_day_count**: Number of distinct days the player has logged in. Focus on if low compared to other statitics, this will indicate bot behaviour.
- **Logout_day_count**: Number of days the player logged out. Should closely align with login days; inconsistencies could be a sign.
- **Playtime**: Total playtime in seconds. An unusually high total compared to login days could signal a bot farming.
- **playtime_per_day**: Average playtime per day in seconds. High values here are key identifiers of 24/7 bots.
- **avg_money**: Average amount of in-game currency. Focus on this, as extreme values here could mean exploits or excessive farming.
- **Login_count**: Total number of logins; high activity would indicate a bot constantly repeating logins.
- **ip_count**: Number of unique IP addresses used. A very low counts or a high could signal an anomaly
- **Max_level**: Maximum level achieved by the player. This, with the above information would provide an indicator on its anomaly.

Consider:

1.  Is the "Login_day_count" very low, which would indicate little human input.
2.  Examine Playtime, playtime_per_day, and avg_money to uncover indications of farming or exploiting.
3. Look at the ip_count. An extremely large number or low number would indicate bot behaviour

Based on those descriptions, you will be given the following information and need to determine these two things

1.  Determine the **anomaly score** (0-100): A higher score indicates a greater likelihood of the player being a bot. This MUST be provided

2.  Provide a **detailed explanation** of your reasoning. This MUST be provided

Your final output **MUST** follow this structure:

Anomaly Score: [your anomaly score 0-100]
Reasoning: [Your reasoning based on patterns from the statistics. Be as clear as possible]

Respond concisely and directly.

<|user|>
Here is the player data:

Actor: {actor}
A_Acc: {a_acc}
Login_day_count: {login_day_count}
Logout_day_count: {logout_day_count}
Playtime: {playtime}
playtime_per_day: {playtime_per_day}
avg_money: {avg_money}
Login_count: {login_count}
ip_count: {ip_count}
Max_level: {max_level}

Anomaly Score:
Reasoning:
"""
def extract_player_features(player_id: str) -> dict:
    """Extracts features from the knowledge graph for a given player."""
    query = f"""
    MATCH (p:Player {{Actor: toInteger('{player_id}')}})
    RETURN
        p.Actor AS player_id,
        p.A_Acc AS a_acc,
        p.Login_day_count AS login_day_count,
        p.Logout_day_count AS logout_day_count,
        p.Playtime AS playtime,
        p.playtime_per_day AS playtime_per_day,
        p.avg_money AS avg_money,
        p.Login_count AS login_count,
        p.ip_count AS ip_count,
        p.Max_level AS max_level
    """
    results = graph.query(query)
    if results:
        return results[0]
    else:
        return {}

prompt_template = anomaly_scoring_prompt()

if prompt_template:
    prompt = ChatPromptTemplate.from_template(prompt_template)
else:
    prompt = None  # Handle the case where the prompt couldn't be loaded

def assess_bot_likelihood(player_data: dict, similar_player_ids: List[str] = []) -> tuple[int, str, str]:
    """Assesses the likelihood of a player being a bot using LLM, considering player statistics and insights from similar players."""
    if prompt is None:
        return None, "Prompt could not be loaded", None

    # Get insights from similar players
    similar_player_insights = ""
    if similar_player_ids:
        similar_player_data = [extract_player_features(pid) for pid in similar_player_ids]
        # Combine insights (e.g., summarize their behaviors or anomaly scores)
        similar_player_insights = f"Similar players: {', '.join(similar_player_ids)}. " \
                                  f"Insights: {similar_player_data}"  # Simple concatenation for now

    # Format the prompt
    formatted_prompt = prompt.format_messages(
        actor=player_data['player_id'],
        a_acc=player_data['a_acc'],
        login_day_count=player_data['login_day_count'],
        logout_day_count=player_data['logout_day_count'],
        playtime=player_data['playtime'],
        playtime_per_day=player_data['playtime_per_day'],
        avg_money=player_data['avg_money'],
        login_count=player_data['login_count'],
        ip_count=player_data['ip_count'],
        max_level=player_data['max_level'],
        similar_player_insights=similar_player_insights  # Pass insights
    )

    # Call the LLM directly
    response = llm.invoke(formatted_prompt)
    full_analysis = response.content

    # Extract Anomaly Score and Reasoning
    try:
        score_line = next(line for line in full_analysis.split('\n') if "Anomaly Score" in line)
        anomaly_score = int(score_line.split(":")[1].strip())

        reasoning = "\n".join(full_analysis.split('\n')[1:])  # All lines after the score
    except Exception as e:
            anomaly_score = None
            reasoning = f"Could not reliably parse LLM response: {str(e)}"

    return anomaly_score, reasoning, full_analysis

def generate_bot_report(player_ids: list[str], faiss_index) -> list[dict]:
    """Generates a report for a list of player IDs, incorporating insights from semantically similar players."""
    report = []
    for player_id in player_ids:
        player_data = extract_player_features(player_id)
        if player_data:
            # Get the player embedding (Assuming 1-1 correspondence between player_id and embeddings)
            try:
                query_embedding = get_embedding_for_player(player_id)

                if query_embedding is None:
                    print(f"No embedding found for player ID: {player_id}")
                    continue # Skip to the next player

                similar_player_ids = None

                anomaly_score, reasoning, full_analysis = assess_bot_likelihood(player_data, similar_player_ids)  # Pass similar IDs

                report.append({
                    "player_id": player_id,
                    "anomaly_score": anomaly_score,
                    "reasoning": reasoning,
                    "full_analysis": full_analysis,
                    "similar_player_ids": similar_player_ids,
                })
            except Exception as e:
                print(f"Error generating report for player {player_id}: {e}")
                # You might want to continue to the next player or re-raise the exception
    return report


In [None]:
evaluation_df = pd.read_csv('../../data/sample/sample_player_data.csv')
if evaluation_df is None:
    print("Evaluation data loading failed")

In [None]:
def prepare_ragas_dataset(evaluation_df):
    ragas_data = []
    for _, row in tqdm(evaluation_df.iterrows()):
        player_info = f"Actor: {row['Actor']}, A_Acc: {row['A_Acc']}, Login_day_count: {row['Login_day_count']}, Logout_day_count: {row['Logout_day_count']}, Playtime: {row['Playtime']}, playtime_per_day: {row['playtime_per_day']}, avg_money: {row['avg_money']}, Login_count: {row['Login_count']}, ip_count: {row['ip_count']}, Max_level: {row['Max_level']}"
        faiss_index = load_index(evaluation_df)
        # Generate bot report for this player
        bot_report = generate_bot_report([str(row['Actor'])], faiss_index)[0]
        
        ragas_data.append({
            "user_input": f"Analyze the following player data and determine if it's likely a bot: {player_info}",
            "retrieved_contexts": player_info,
            "response": f"Anomaly Score: {bot_report['anomaly_score']}\nReasoning: {bot_report['reasoning']}",
            "reference": f"Type: {row['Type']}"  # Assuming 'Type' indicates bot or human
        })
    return ragas_data

In [None]:
evaluation_df = evaluation_df.iloc[5 : 10]
evaluation_df

In [None]:
ragas_dataset = prepare_ragas_dataset(evaluation_df)
ragas_dataset

In [None]:
from ragas import EvaluationDataset
evaluation_dataset = EvaluationDataset.from_list(ragas_dataset)
evaluation_dataset

In [None]:
from ragas import evaluate
from ragas.llms import LangchainLLMWrapper
from ragas.metrics import LLMContextRecall, Faithfulness, FactualCorrectness

evaluator_llm = LangchainLLMWrapper(llm)
metrics = [Faithfulness(), LLMContextRecall(), FactualCorrectness()]

with tqdm(desc= 'Evaluating Queries') as pbar:
    result = evaluate(dataset=evaluation_dataset,metrics=metrics,llm=evaluator_llm)
    pbar.update(1)
result