In [9]:
import os
print("Working Directory:", os.getcwd())


Working Directory: d:\SKRIPSI\Code\poisoning_detection\Src


In [12]:
cd ..

d:\SKRIPSI\Code\poisoning_detection


  self.shell.db['dhist'] = compress_dhist(dhist)[-100:]


In [14]:
# File: poison_detector.py
import numpy as np
import pandas as pd
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.cluster import KMeans
import random

# Define missing classes
class PoisonSimulator:
    def __init__(self):
        self.poison_keywords = ['free', 'download', 'watch now', 'click here', 'limited time', 
                               'exclusive', 'secret', 'hidden', 'special offer', 'buy now']
    
    def generate_poisoned_dataset(self, movies, poison_ratio=0.2):
        """Generate poisoned dataset by injecting malicious keywords"""
        clean_df = movies.copy()
        clean_df['is_poisoned'] = False
        clean_df['poisoned_description'] = clean_df['title']  # Using title as placeholder
        
        # Create poisoned samples
        n_poison = int(len(movies) * poison_ratio)
        poison_indices = random.sample(range(len(movies)), n_poison)
        
        poisoned_df = movies.iloc[poison_indices].copy()
        poisoned_df['is_poisoned'] = True
        
        # Add poison keywords to descriptions
        poisoned_descriptions = []
        for idx, row in poisoned_df.iterrows():
            base_desc = row['title']
            keyword = random.choice(self.poison_keywords)
            poisoned_desc = f"{base_desc} - {keyword}!"
            poisoned_descriptions.append(poisoned_desc)
        
        poisoned_df['poisoned_description'] = poisoned_descriptions
        
        return clean_df, poisoned_df

class CharacteristicVectorExtractor:
    def __init__(self, vector_size=50):
        self.vector_size = vector_size
    
    def simple_embedding(self, text):
        """Create simple text embedding based on character frequencies"""
        # Simple hash-based embedding for demonstration
        text = text.lower()
        embedding = np.zeros(self.vector_size)
        
        for i, char in enumerate(text[:self.vector_size]):
            if i < len(text):
                embedding[i] = ord(char) % 100 / 100.0  # Normalize
        
        # Fill remaining positions if text is shorter than vector_size
        if len(text) < self.vector_size:
            for i in range(len(text), self.vector_size):
                embedding[i] = (i * 13) % 100 / 100.0  # Pseudo-random fill
        
        return embedding
    
    def extract(self, description):
        """Extract characteristic vector from description"""
        embedding = self.simple_embedding(description)
        return {
            'embedding': embedding,
            'length': len(description),
            'has_special_chars': any(c in description for c in '!@#$%^&*()')
        }
    
    def batch_extract(self, descriptions):
        """Extract characteristics for multiple descriptions"""
        return [self.extract(desc) for desc in descriptions]

class SimplePoisonDetector:
    def __init__(self, threshold=0.8):
        self.threshold = threshold
        self.centroids = {}
        self.is_fitted = False
    
    def compute_centroids(self, characteristics, genres):
        """Compute centroids for each genre"""
        genre_vectors = {}
        
        for char_vec, genre in zip(characteristics, genres):
            if genre not in genre_vectors:
                genre_vectors[genre] = []
            genre_vectors[genre].append(char_vec['embedding'])
        
        # Compute centroid for each genre
        for genre, vectors in genre_vectors.items():
            self.centroids[genre] = np.mean(vectors, axis=0)
        
        self.is_fitted = True
        print(f"Computed centroids for {len(self.centroids)} genres")
    
    def detect_poison(self, characteristics, genres):
        """Detect poisoned items based on distance to centroid"""
        if not self.is_fitted:
            raise ValueError("Detector not fitted. Call compute_centroids first.")
        
        predictions = []
        distances = []
        
        for char_vec, genre in zip(characteristics, genres):
            if genre not in self.centroids:
                # Unknown genre, mark as suspicious
                predictions.append(True)
                distances.append(1.0)
                continue
            
            embedding = char_vec['embedding']
            centroid = self.centroids[genre]
            
            # Compute cosine distance
            similarity = cosine_similarity([embedding], [centroid])[0][0]
            distance = 1 - similarity
            
            distances.append(distance)
            predictions.append(distance > self.threshold)
        
        return predictions, distances

# Test detector
if __name__ == "__main__":
    # Create sample data if file doesn't exist
    try:
        movies = pd.read_csv('data/ml-32m/movies.csv')
    except:
        print("CSV file not found, creating sample data...")
        # Create sample movie data
        movies_data = {
            'movieId': range(1, 101),
            'title': [f'Movie {i}' for i in range(1, 101)],
            'genres': ['Action'] * 25 + ['Comedy'] * 25 + ['Drama'] * 25 + ['Horror'] * 25
        }
        movies = pd.DataFrame(movies_data)
    
    # Generate poisoned data
    simulator = PoisonSimulator()
    clean_df, poisoned_df = simulator.generate_poisoned_dataset(movies, poison_ratio=0.2)
    
    # Extract characteristics
    extractor = CharacteristicVectorExtractor()
    
    # Combine clean and poisoned for testing
    test_df = pd.concat([clean_df, poisoned_df[poisoned_df['is_poisoned'] == True]])
    descriptions = test_df['poisoned_description'].tolist()
    genres = test_df['genres'].tolist()
    true_labels = test_df['is_poisoned'].tolist()
    
    characteristics = extractor.batch_extract(descriptions)
    
    # Train detector on clean data only
    clean_chars = [c for c, label in zip(characteristics, true_labels) if not label]
    clean_genres = [g for g, label in zip(genres, true_labels) if not label]
    
    detector = SimplePoisonDetector(threshold=0.3)
    detector.compute_centroids(clean_chars, clean_genres)
    
    # Test detection
    predictions, distances = detector.detect_poison(characteristics, genres)
    
    # Evaluate
    accuracy = np.mean([p == tl for p, tl in zip(predictions, true_labels)])
    print(f"Detection Accuracy: {accuracy:.3f}")
    
    # Print some examples
    print("\nSample predictions:")
    for i in range(min(5, len(predictions))):
        status = "POISONED" if predictions[i] else "CLEAN"
        actual = "POISONED" if true_labels[i] else "CLEAN"
        print(f"Movie: {descriptions[i][:30]}... | Prediction: {status} | Actual: {actual} | Distance: {distances[i]:.3f}")

Computed centroids for 1798 genres
Detection Accuracy: 0.816

Sample predictions:
Movie: Toy Story (1995)... | Prediction: CLEAN | Actual: CLEAN | Distance: 0.027
Movie: Jumanji (1995)... | Prediction: CLEAN | Actual: CLEAN | Distance: 0.069
Movie: Grumpier Old Men (1995)... | Prediction: CLEAN | Actual: CLEAN | Distance: 0.037
Movie: Waiting to Exhale (1995)... | Prediction: CLEAN | Actual: CLEAN | Distance: 0.070
Movie: Father of the Bride Part II (1... | Prediction: CLEAN | Actual: CLEAN | Distance: 0.145
