# 🧠 Fragment Detection Using Machine Learning

### 📌 Objective


---

## 📚 Table of Contents
1. [Introduction](#introduction)
2. [Imports and Setup](#imports-and-setup)
3. [Data Loading](#data-loading)
4. [Exploratory Data Analysis (EDA)](#eda)
5. [Data Preprocessing](#data-preprocessing)
6. [Modeling](#modeling)
7. [Evaluation](#evaluation)
8. [Conclusion](#conclusion)


In [1]:
# --------------------------------------------------
# 📦 Imports and Setup
# --------------------------------------------------
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import KFold, cross_val_score,train_test_split
from dataclasses import dataclass
from typing import Tuple , List, Callable
import random
from  matplotlib import pyplot as plt
from matplotlib import animation, cm
from sklearn.preprocessing import StandardScaler





## 📁 Data Loading

Load and preview the dataset.


In [2]:

PROCESSEDPATH = 'processed_dataset.csv'
PROCESSEDPATH_URL = "https://gist.githubusercontent.com/alaamer12/49c5d930b52f189d7fd195ca8c66b7d1/raw/68cdd6fca9709247a9d497a4f7102f42bef4d040/processed_dataset.csv"


def download_gist_file(raw_url, output):
  import requests

  output_filename = output

  # Perform the download
  response = requests.get(raw_url)

  # Check if the request was successful
  if response.status_code == 200:
      with open(output_filename, 'wb') as f:
          f.write(response.content)
      print(f"Gist file downloaded successfully as '{output_filename}'")
  else:
      print(f"Failed to download file. Status code: {response.status_code}")



In [3]:
download_gist_file(PROCESSEDPATH_URL , PROCESSEDPATH )


Gist file downloaded successfully as 'processed_dataset.csv'


In [5]:
df = pd.read_csv('processed_dataset.csv')

## 📊 Exploratory Data Analysis (EDA)

Look for trends, distributions, and outliers.


## 🛠️ Data Preprocessing

Clean, transform, and prepare the dataset for modeling.

In [6]:
#preprocessing encode boolean values to numerical to be safer and more consistent
df.replace({True: 1, False: 0})


  df.replace({True: 1, False: 0})


Unnamed: 0,Sentence Fragment,is_fragment,has_auxiliary,has_fullstop,has_question_mark,has_exclamation_mark,has_comma,has_semicolon,has_colon,has_quotation,has_expression,has_conjunction,has_temporal,has_opinion_adverb,has_adverb,has_starter,has_past_verb,has_gerund,starts_capitalized,Processed Text
0,Why isn't everyone talking about Daybreak?,0,0,0,1,0,0,0,0,1,0,0,0,0,0,0,0,1,1,everyone talking daybreak
1,The CGI in Tiny Pretty Things is groundbreaking.,0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,cgi tiny pretty things groundbreaking
2,The characters in Gentefied are wonderfully de...,0,1,1,0,0,0,0,0,0,0,0,0,0,1,0,1,0,1,characters gentefied wonderfully developed
3,I wish the supporting character from Ragnarok ...,0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,wish supporting character ragnarok screen time
4,Netflix really knows how to tell a story with ...,0,0,1,0,0,0,0,0,0,0,0,0,1,1,0,0,0,1,netflix really knows tell story last kingdom
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
45336,Netflix's original content like Warrior Nun is...,1,1,1,0,0,0,0,0,1,0,0,0,0,0,0,0,0,1,netflix 's original content like warrior nun b...
45337,episode of Locke & Key was instantly hooked.,1,1,1,0,0,0,0,0,0,0,0,1,0,1,1,1,0,0,episode locke key instantly hooked
45338,outdid themselves with 13 Reasons Why.,1,0,1,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,outdid 13 reasons
45339,Vikings: Valhalla make me feel anxious?,1,0,0,1,0,0,0,1,0,0,0,0,0,0,0,0,0,1,vikings valhalla make feel anxious


In [7]:
X = df.drop(columns=["Sentence Fragment", "is_fragment"])
y = df["is_fragment"]

In [9]:
# structured features
structured_features = ['has_fullstop', 'has_question_mark', 'has_auxiliary','has_exclamation_mark','has_semicolon','has_quotation']

# preprocessing step
preprocessor = ColumnTransformer([
    ('text', TfidfVectorizer(ngram_range=(1,2)), 'Processed Text'),
    ('struct', 'passthrough', structured_features)
])

# Create pipeline
pipeline = Pipeline([
    ('preprocess', preprocessor),
    ('clf', RandomForestClassifier(random_state=42))
])

# Set up K-Fold cross-validation
kf = KFold(n_splits=5, shuffle=True, random_state=42)

# Perform cross-validation and get scores
cross_val_score(pipeline, X, y, cv=kf, scoring='accuracy', error_score='raise')

# Print results
print(f"Cross-validation scores: {cv_scores}")
print(f"Mean accuracy: {np.mean(cv_scores):.4f} (±{np.std(cv_scores):.4f})")


ValueError: np.nan is an invalid document, expected byte or unicode string.

In [None]:
"""
Particle Swarm Optimization (PSO) Algorithm Implementation

This module implements the Particle Swarm Optimization algorithm for solving continuous optimization problems.
PSO is inspired by social behavior of bird flocking or fish schooling, where particles move
in the search space based on their own experience and the experience of the swarm.

Key components to implement:
1. Particle representation (position, velocity)
2. Personal best memory
3. Global best memory
4. Velocity update rule
5. Position update rule

Usage:
    from _pso import ParticleSwarmOptimization

    # Define objective function to minimize
    def objective_function(x):
        return sum(x**2)  # Example: minimize sum of squares

    # Define bounds for each parameter
    bounds = [(-5, 5), (-5, 5)]  # Example: 2D problem with bounds [-5, 5]

    # Create PSO instance
    config = PSOConfig(objective_function=objective_function, bounds=bounds)
    pso = ParticleSwarmOptimization(config)

    # Run optimization
    best_position, best_fitness = pso.optimize()
"""
from dataclasses import dataclass
from typing import Tuple , List, Callable
import numpy as np
import random
from  matplotlib import pyplot as plt
from matplotlib import animation, cm

@dataclass
class AlgConfig:
    """Base class for algorithm configuration"""
    def __post_init__(self) -> None:
        pass

@dataclass
class PSOConfig(AlgConfig):
    objective_function: Callable[[List[float]], float]
    bounds: List[tuple[float, float]]
    n_dimensions: int = None
    n_particles: int = 30
    iterations: int = 100
    w: float = 0.5
    c1: float = 1.5
    c2: float = 1.5

    def __post_init__(self) -> None:
        super().__post_init__()
        if self.n_dimensions is None:
            self.n_dimensions = len(self.bounds)


class Particle:
    """
    Particle in the PSO algorithm.

    Each particle has a position, velocity, and memory of its best position.
    """
    def __init__(self, bounds: List[Tuple[float, float]], n_dimensions: int):

        self.position = [random.uniform(bounds[i][0], bounds[i][1]) for i in range(n_dimensions)]


        self.velocity = [random.uniform(-1, 1) for _ in range(n_dimensions)]


        self.best_position = self.position.copy()
        self.best_fitness = float('inf')
        self.current_fitness = float('inf')

    def update_velocity(self, global_best_position: List[float], w: float, c1: float, c2: float) -> None:
        """Update the velocity of the particle"""
        for i in range(len(self.velocity)):
            # Inertia component
            inertia = w * self.velocity[i]

            # Cognitive component (personal best)
            r1 = random.random()
            cognitive = c1 * r1 * (self.best_position[i] - self.position[i])

            # Social component (global best)
            r2 = random.random()
            social = c2 * r2 * (global_best_position[i] - self.position[i])

            # Update velocity
            self.velocity[i] = inertia + cognitive + social

    def update_position(self, bounds: List[Tuple[float, float]]) -> None:
        """Update the position of the particle and handle boundary conditions"""
        for i in range(len(self.position)):
            # Update position
            self.position[i] += self.velocity[i]

            if self.position[i] < bounds[i][0]:
                self.position[i] = bounds[i][0]
                self.velocity[i] *= -0.5  # Bounce back with reduced velocity
            elif self.position[i] > bounds[i][1]:
                self.position[i] = bounds[i][1]
                self.velocity[i] *= -0.5  # Bounce back with reduced velocity

    def evaluate(self, objective_function: Callable[[List[float]], float]) -> float:
        """Evaluate the particle's current position and update best if improved"""
        self.current_fitness = objective_function(self.position)

        # Update personal best if current position is better
        if self.current_fitness < self.best_fitness:
            self.best_fitness = self.current_fitness
            self.best_position = self.position.copy()

        return self.current_fitness


class ParticleSwarmOptimization:
    """
    Main PSO algorithm implementation.

    Manages the swarm of particles and the optimization process.
    """
    def __init__(self, config: PSOConfig):
        self.config = config
        self.particles = []
        self.global_best_position = None
        self.global_best_fitness = float('inf')
        self.fitness_history = []
        self.avg_fitness_history = []


        for _ in range(config.n_particles):
            self.particles.append(Particle(config.bounds, config.n_dimensions))

    def optimize(self) -> Tuple[List[float], float, List[float], List[float]]:
        """Run the optimization algorithm"""

        for particle in self.particles:
            fitness = particle.evaluate(self.config.objective_function)

            # Update global best if this particle is better
            if fitness < self.global_best_fitness:
                self.global_best_fitness = fitness
                self.global_best_position = particle.position.copy()

        # Main optimization loop
        for i in range(self.config.iterations):
            current_best_fitness = self.global_best_fitness
            total_fitness = 0


            for particle in self.particles:
                particle.update_velocity(
                    self.global_best_position,
                    self.config.w,
                    self.config.c1,
                    self.config.c2
                )
                particle.update_position(self.config.bounds)

                fitness = particle.evaluate(self.config.objective_function)
                total_fitness += fitness


                if fitness < self.global_best_fitness:
                    self.global_best_fitness = fitness
                    self.global_best_position = particle.position.copy()


            self.fitness_history.append(self.global_best_fitness)
            self.avg_fitness_history.append(total_fitness / len(self.particles))



        return (
            self.global_best_position,
            self.global_best_fitness,
            self.fitness_history,
            self.avg_fitness_history
        )


# Visualization functions
def plot_convergence(fitness_history, avg_fitness_history=None, title="PSO Convergence"):
    """Plot the convergence of the PSO algorithm"""
    plt.figure(figsize=(10, 6))
    plt.plot(fitness_history, 'b-', linewidth=2, label='Global Best Fitness')
    if avg_fitness_history:
        plt.plot(avg_fitness_history, 'r--', linewidth=1, label='Average Swarm Fitness')
    plt.xlabel('Iteration')
    plt.ylabel('Fitness Value')
    plt.title(title)
    plt.legend()
    plt.grid(True)
    plt.yscale('log')  # Use log scale for better visualization
    plt.show()



In [None]:
def optimize_random_forest_classification():
    print("Starting Random Forest Classification Hyperparameter Optimization...")

    # Split the data
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Define the objective function for PSO
    def objective_function(params):
        # Extract parameters and convert to appropriate types
        n_estimators = int(params[0])
        max_depth = int(params[1]) if params[1] > 0 else None
        min_samples_split = int(params[2])
        min_samples_leaf = int(params[3])

        # Create Random Forest model
        rf = RandomForestClassifier(
            n_estimators=n_estimators,
            max_depth=max_depth,
            min_samples_split=min_samples_split,
            min_samples_leaf=min_samples_leaf,
            random_state=42
        )

        # Use cross-validation to evaluate the model
        try:
            scores = cross_val_score(pipeline, X, y, cv=kf, scoring='accuracy')
            # We want to maximize accuracy, but PSO minimizes, so return negative accuracy
            return -np.mean(scores)
        except Exception as e:
            # Return a high value in case of errors
            print(f"Error in evaluation: {e}")
            return 0

    # Define parameter bounds
    # n_estimators: 10 to 200
    # max_depth: 1 to 30 (None is represented by 0)
    # min_samples_split: 2 to 20
    # min_samples_leaf: 1 to 10
    bounds = [(10, 200), (0, 30), (2, 20), (1, 10)]

    # Create PSO config and initialize optimizer
    config = PSOConfig(
        objective_function=objective_function,
        bounds=bounds,
        n_particles=20,
        iterations=50,
        w=0.5,
        c1=1.5,
        c2=1.5
    )
    pso = ParticleSwarmOptimization(config)

    # Run optimization
    best_params, best_fitness, fitness_history, avg_fitness_history = pso.optimize()

    # Convert parameters to appropriate types
    best_n_estimators = int(best_params[0])
    best_max_depth = int(best_params[1]) if best_params[1] > 0 else None
    best_min_samples_split = int(best_params[2])
    best_min_samples_leaf = int(best_params[3])

    print(f"Best Parameters:")
    print(f"  n_estimators: {best_n_estimators}")
    print(f"  max_depth: {best_max_depth}")
    print(f"  min_samples_split: {best_min_samples_split}")
    print(f"  min_samples_leaf: {best_min_samples_leaf}")
    print(f"Best Fitness (negative accuracy): {best_fitness}")

    # Train final model with best parameters
    final_model = RandomForestClassifier(
        n_estimators=best_n_estimators,
        max_depth=best_max_depth,
        min_samples_split=best_min_samples_split,
        min_samples_leaf=best_min_samples_leaf,
        random_state=42
    )
    final_model.fit(X_train, y_train)

    # Evaluate on test set
    accuracy = final_model.score(X_test, y_test)
    print(f"Test Accuracy with Best Parameters: {accuracy:.4f}")

    # Plot convergence
    plot_convergence(fitness_history, avg_fitness_history)

    return best_params, best_fitness

In [None]:
best_params, best_fitness = optimize_random_forest_classification()
print(best_params,best_fitness)