From fbe868d5a5b221f58a982a11710d790a279d13f9 Mon Sep 17 00:00:00 2001 From: coderpro1102 Date: Sun, 23 Feb 2025 01:09:19 +0530 Subject: [PATCH 1/2] feat: add single token feature detection and logging - Add is_single_token field to LatentRecord - Implement single token detection in constructors - Update scorer and analysis to track single token metrics - Log single token ratio in verbose mode Closes #87 --- delphi/latents/constructors.py | 45 ++++++++++++++++++++++++++++++++++ delphi/latents/latents.py | 3 +++ delphi/log/result_analysis.py | 11 +++++++++ delphi/scorers/scorer.py | 9 +++++++ 4 files changed, 68 insertions(+) diff --git a/delphi/latents/constructors.py b/delphi/latents/constructors.py index ffc42c82..2f65c211 100644 --- a/delphi/latents/constructors.py +++ b/delphi/latents/constructors.py @@ -108,6 +108,40 @@ def pool_max_activation_windows( return token_windows, activation_windows +def is_single_token_feature( + activations: Float[Tensor, "examples ctx_len"], + quantile_threshold: float = 0.5, + activation_ratio_threshold: float = 0.8 +) -> bool: + """ + Determine if a feature is primarily activated by single tokens. + + Args: + activations: Activation values across context windows + quantile_threshold: Threshold for considering top activations (0.5 means top 50%) + activation_ratio_threshold: Ratio of single-token activations needed (0.8 means 80%) + + Returns: + bool: True if the feature is primarily single-token activated + """ + # For each example, check if activation is concentrated in a single position + max_activations = activations.max(dim=1).values + top_k = int(len(max_activations) * quantile_threshold) + top_indices = max_activations.topk(top_k).indices + + # For top activating examples, check if activation is concentrated in single token + top_examples = activations[top_indices] + + # Count positions where activation is significant + threshold = top_examples.max(dim=1).values.unsqueeze(1) * 0.5 + significant_activations = (top_examples > threshold).sum(dim=1) + + # Calculate ratio of single token activations + single_token_ratio = (significant_activations == 1).float().mean().item() + + return single_token_ratio >= activation_ratio_threshold + + def constructor( record: LatentRecord, @@ -125,6 +159,17 @@ def constructor( max_examples = constructor_cfg.max_examples min_examples = constructor_cfg.min_examples + token_windows, act_windows = pool_max_activation_windows( + activations=activations, + tokens=reshaped_tokens, + ctx_indices=ctx_indices, + index_within_ctx=index_within_ctx, + ctx_len=example_ctx_len, + max_examples=max_examples, + ) + + record.is_single_token = is_single_token_feature(act_windows) + # Get all positions where the latent is active flat_indices = ( activation_data.locations[:, 0] * cache_ctx_len diff --git a/delphi/latents/latents.py b/delphi/latents/latents.py index a68f146b..7cf734df 100644 --- a/delphi/latents/latents.py +++ b/delphi/latents/latents.py @@ -143,6 +143,9 @@ class LatentRecord: explanation: str = "" """Explanation of the latent.""" + is_single_token: bool = False + """Whether this latent primarily activates on single tokens.""" + @property def max_activation(self) -> float: """ diff --git a/delphi/log/result_analysis.py b/delphi/log/result_analysis.py index 241733a9..045796e4 100644 --- a/delphi/log/result_analysis.py +++ b/delphi/log/result_analysis.py @@ -34,8 +34,10 @@ def latent_balanced_score_metrics( "true_negative_rate": np.average(df["true_negative_rate"], weights=weights), "false_positive_rate": np.average(df["false_positive_rate"], weights=weights), "false_negative_rate": np.average(df["false_negative_rate"], weights=weights), + "single_token_ratio": df["is_single_token"].mean() if "is_single_token" in df.columns else None } + if verbose: print(f"\n--- {score_type.title()} Metrics ---") print(f"Accuracy: {metrics['accuracy']:.3f}") @@ -54,6 +56,11 @@ def latent_balanced_score_metrics( {sum(fractions_failed) / len(fractions_failed):.3f}""" ) + if metrics["single_token_ratio"] is not None: + print(f"\nSingle Token Features:") + print(f"Ratio of single token features: {metrics['single_token_ratio']:.3f}") + + print("\nConfusion Matrix:") print(f"True Positive Rate: {metrics['true_positive_rate']:.3f}") print(f"True Negative Rate: {metrics['true_negative_rate']:.3f}") @@ -77,6 +84,7 @@ def latent_balanced_score_metrics( def parse_score_file(file_path): with open(file_path, "rb") as f: data = orjson.loads(f.read()) + is_single_token = data.get("is_single_token", False) df = pd.DataFrame( [ { @@ -87,6 +95,7 @@ def parse_score_file(file_path): "probability": example["probability"], "correct": example["correct"], "activations": example["activations"], + "is_single_token": is_single_token, } for example in data ] @@ -158,6 +167,7 @@ def parse_score_file(file_path): total_negatives / total_examples if total_examples > 0 else 0 ), "failed_count": failed_count, + "is_single_token": is_single_token, } for key, value in metrics.items(): @@ -187,6 +197,7 @@ def build_scores_df(path: Path, target_modules: list[str], range: Tensor | None "positive_class_ratio", "negative_class_ratio", "failed_count", + "is_single_token", ] df_data = { col: [] diff --git a/delphi/scorers/scorer.py b/delphi/scorers/scorer.py index fa5a0ae5..9769c579 100644 --- a/delphi/scorers/scorer.py +++ b/delphi/scorers/scorer.py @@ -11,6 +11,15 @@ class ScorerResult(NamedTuple): score: Any """Generated score for latent.""" + def to_dict(self): + """Convert the scorer result to a dictionary for serialization.""" + return { + **asdict(self.record), + "score": self.score, + "is_single_token": self.record.is_single_token + } + + class Scorer(ABC): @abstractmethod From a192a282548280c9652ab4ed44ac398b109fd300 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 22 Feb 2025 19:39:52 +0000 Subject: [PATCH 2/2] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- delphi/latents/constructors.py | 18 +++++++++--------- delphi/latents/neighbours.py | 16 +++++++++++----- delphi/log/result_analysis.py | 14 ++++++++------ delphi/scorers/scorer.py | 3 +-- 4 files changed, 29 insertions(+), 22 deletions(-) diff --git a/delphi/latents/constructors.py b/delphi/latents/constructors.py index 2f65c211..f288f06d 100644 --- a/delphi/latents/constructors.py +++ b/delphi/latents/constructors.py @@ -108,19 +108,20 @@ def pool_max_activation_windows( return token_windows, activation_windows + def is_single_token_feature( activations: Float[Tensor, "examples ctx_len"], quantile_threshold: float = 0.5, - activation_ratio_threshold: float = 0.8 + activation_ratio_threshold: float = 0.8, ) -> bool: """ Determine if a feature is primarily activated by single tokens. - + Args: activations: Activation values across context windows quantile_threshold: Threshold for considering top activations (0.5 means top 50%) activation_ratio_threshold: Ratio of single-token activations needed (0.8 means 80%) - + Returns: bool: True if the feature is primarily single-token activated """ @@ -128,19 +129,18 @@ def is_single_token_feature( max_activations = activations.max(dim=1).values top_k = int(len(max_activations) * quantile_threshold) top_indices = max_activations.topk(top_k).indices - + # For top activating examples, check if activation is concentrated in single token top_examples = activations[top_indices] - + # Count positions where activation is significant threshold = top_examples.max(dim=1).values.unsqueeze(1) * 0.5 significant_activations = (top_examples > threshold).sum(dim=1) - + # Calculate ratio of single token activations single_token_ratio = (significant_activations == 1).float().mean().item() - - return single_token_ratio >= activation_ratio_threshold + return single_token_ratio >= activation_ratio_threshold def constructor( @@ -167,7 +167,7 @@ def constructor( ctx_len=example_ctx_len, max_examples=max_examples, ) - + record.is_single_token = is_single_token_feature(act_windows) # Get all positions where the latent is active diff --git a/delphi/latents/neighbours.py b/delphi/latents/neighbours.py index 747b9bad..f80ef7b0 100644 --- a/delphi/latents/neighbours.py +++ b/delphi/latents/neighbours.py @@ -177,14 +177,16 @@ def _compute_cooccurrence_neighbours(self) -> dict[int, list[tuple[int, float]]] latent_index = latent_index[idx_cantor_sorted_idx] n_tokens = int(idx_cantor.max().item()) - + token_batch_size = 20_000 done = False while not done: try: print("Trying with batch size", token_batch_size) # Find indices where idx_cantor crosses each batch boundary - bounday_values = torch.arange(token_batch_size, n_tokens, token_batch_size) + bounday_values = torch.arange( + token_batch_size, n_tokens, token_batch_size + ) batch_boundaries_tensor = torch.searchsorted(idx_cantor, bounday_values) batch_boundaries = [0] + batch_boundaries_tensor.tolist() @@ -192,10 +194,14 @@ def _compute_cooccurrence_neighbours(self) -> dict[int, list[tuple[int, float]]] if batch_boundaries[-1] != len(idx_cantor): batch_boundaries.append(len(idx_cantor)) - co_occurrence_matrix = torch.zeros((n_latents, n_latents), dtype=torch.int32) - #co_occurrence_matrix = co_occurrence_matrix.cuda() + co_occurrence_matrix = torch.zeros( + (n_latents, n_latents), dtype=torch.int32 + ) + # co_occurrence_matrix = co_occurrence_matrix.cuda() - for start, end in tqdm(zip(batch_boundaries[:-1], batch_boundaries[1:])): + for start, end in tqdm( + zip(batch_boundaries[:-1], batch_boundaries[1:]) + ): # get all ind_cantor values between start and start + token_batch_size selected_idx_cantor = idx_cantor[start:end] selected_latent_index = latent_index[start:end] diff --git a/delphi/log/result_analysis.py b/delphi/log/result_analysis.py index 045796e4..075a81ad 100644 --- a/delphi/log/result_analysis.py +++ b/delphi/log/result_analysis.py @@ -34,10 +34,11 @@ def latent_balanced_score_metrics( "true_negative_rate": np.average(df["true_negative_rate"], weights=weights), "false_positive_rate": np.average(df["false_positive_rate"], weights=weights), "false_negative_rate": np.average(df["false_negative_rate"], weights=weights), - "single_token_ratio": df["is_single_token"].mean() if "is_single_token" in df.columns else None + "single_token_ratio": ( + df["is_single_token"].mean() if "is_single_token" in df.columns else None + ), } - if verbose: print(f"\n--- {score_type.title()} Metrics ---") print(f"Accuracy: {metrics['accuracy']:.3f}") @@ -57,9 +58,10 @@ def latent_balanced_score_metrics( ) if metrics["single_token_ratio"] is not None: - print(f"\nSingle Token Features:") - print(f"Ratio of single token features: {metrics['single_token_ratio']:.3f}") - + print("\nSingle Token Features:") + print( + f"Ratio of single token features: {metrics['single_token_ratio']:.3f}" + ) print("\nConfusion Matrix:") print(f"True Positive Rate: {metrics['true_positive_rate']:.3f}") @@ -197,7 +199,7 @@ def build_scores_df(path: Path, target_modules: list[str], range: Tensor | None "positive_class_ratio", "negative_class_ratio", "failed_count", - "is_single_token", + "is_single_token", ] df_data = { col: [] diff --git a/delphi/scorers/scorer.py b/delphi/scorers/scorer.py index 9769c579..8d172b4e 100644 --- a/delphi/scorers/scorer.py +++ b/delphi/scorers/scorer.py @@ -16,11 +16,10 @@ def to_dict(self): return { **asdict(self.record), "score": self.score, - "is_single_token": self.record.is_single_token + "is_single_token": self.record.is_single_token, } - class Scorer(ABC): @abstractmethod def __call__(self, record: LatentRecord) -> ScorerResult: