# Task B: Stack Overflow Developer Survey 2025 Analytics

You are provided with the latest developer survey results from Stack Overflow. Your task is to perform analytics on the survey to extract insights about the programming industry.

## Setup
If you are in google colab, you should just be able to run the cell below. Otherwise find the conda `environment.yml` file provided with all the dependencies.

In [2]:
%pip install pandas
import pandas as pd



## Reading the data

Find a utility class below to read the data for you.

In [3]:
import csv
from typing import List, Dict, Any, Optional
from pathlib import Path

RESPONSE_ID_FIELD_NAME = "ResponseId"
QUESTION_ID_FIELD_NAME = "qid"

class SurveyDataReader:
    """
    Read and process Stack Overflow Developer Survey data.
    """

    def __init__(self, schema_file: str, data_file: str):
        self.schema = self._parse_schema(schema_file)
        self.data = self._parse_data(data_file)

    def _parse_schema(self, schema_file: str) -> List[Dict[str, str]]:
        schema = []
        schema_path = Path(schema_file).resolve()
        with open(schema_path, mode="r") as file:
            reader = csv.DictReader(file)
            schema = [row for row in reader]
        return schema

    def _parse_data(self, data_file: str) -> List[Dict[str, Any]]:
        data = []
        data_path = Path(data_file).resolve()
        with open(data_path, mode="r") as file:
            reader = csv.DictReader(file)
            data = [row for row in reader]
        return data

    def get_schema(self) -> List[Dict[str, str]]:
        return self.schema

    def get_data(self) -> List[Dict[str, Any]]:
        return self.data

    def get_question_by_id(self, qid: str) -> Optional[Dict[str, str]]:
        for question in self.schema:
            if question[QUESTION_ID_FIELD_NAME] == qid:
                return question
        return None

    def get_responses_for_question(self, qname: str) -> List[Any]:
        return [response[qname] for response in self.data if qname in response]

    def get_response_by_id(self, response_id: str | int) -> Optional[Dict[str, Any]]:
        response_id_str = str(response_id)
        for response in self.data:
            if response[RESPONSE_ID_FIELD_NAME] == response_id_str:
                return response
        return None

## Getting to know the data reader

In [9]:
# Use your absolute paths in Jupyter/Colab
SCHEMA_PATH = "/survey_results_schema.csv"
DATA_PATH   = "/survey_results_public_cleaned.csv"

reader = SurveyDataReader(SCHEMA_PATH, DATA_PATH)


In [10]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [11]:
print(reader.get_schema())

print(len(reader.get_data()))

print(reader.get_data()[0:10]) # Be careful when trying to output the data, there's lots of it!

[{'\ufeff"qid"': 'QID18', 'qname': 'TechEndorse_1', 'question': 'What attracts you to a technology or causes you to endorse it (most to least important)?', 'type': 'RO', 'sub': 'AI integration or AI Agent capabilities', 'sq_id': '1'}, {'\ufeff"qid"': 'QID18', 'qname': 'TechEndorse_2', 'question': 'What attracts you to a technology or causes you to endorse it (most to least important)?', 'type': 'RO', 'sub': 'Easy-to-use API', 'sq_id': '2'}, {'\ufeff"qid"': 'QID18', 'qname': 'TechEndorse_3', 'question': 'What attracts you to a technology or causes you to endorse it (most to least important)?', 'type': 'RO', 'sub': 'Robust and complete API', 'sq_id': '3'}, {'\ufeff"qid"': 'QID18', 'qname': 'TechEndorse_4', 'question': 'What attracts you to a technology or causes you to endorse it (most to least important)?', 'type': 'RO', 'sub': 'Customizable and manageable codebase', 'sq_id': '4'}, {'\ufeff"qid"': 'QID18', 'qname': 'TechEndorse_5', 'question': 'What attracts you to a technology or cause

## Questions

1. Print all of the questions asked in the developer survey

In [41]:
import pandas as pd

# Load schema and clean up column headers
schema = pd.DataFrame(reader.get_schema())
schema.columns = [
    str(col).strip().strip('"').strip("'").lstrip('\ufeff')
    for col in schema.columns
]

# Build a case-insensitive column lookup
column_map = {col.lower(): col for col in schema.columns}

# Try to find the question ID column
id_options = ["qid", "sq_id", "id"]
question_id_col = next((column_map[opt] for opt in id_options if opt in column_map), None)

# Try to find the question text column
text_options = ["questiontext", "question", "text", "questionlabel", "label", "prompt", "qname"]
question_text_col = next((column_map[opt] for opt in text_options if opt in column_map), None)

if not question_id_col or not question_text_col:
    raise ValueError(f"Missing required columns. Available: {list(schema.columns)}")

# Extract and clean the questions
questions = (
    schema[[question_id_col, question_text_col]]
    .dropna()
    .drop_duplicates()
    .sort_values(question_id_col, key=lambda x: pd.to_numeric(x, errors="ignore"))
)

print(f"Total questions: {len(questions)}")
for question_id, question_text in questions.itertuples(index=False):
    print(f"{question_id}: {question_text}")

Total questions: 137
1: What attracts you to a technology or causes you to endorse it (most to least important)?
1: When visiting Stack Overflow, which following activities are you most interested in?  Please rank the following so the first activity is what most interests you and last is your least interested activity.
1: Rank the following attributes of your current professional job in technology according to those that contribute your job satisfaction so that the first is the most important, last is least important (if you just started a new job, consider the job you spent the most time at in the past year):
1: What would turn you off or cause you to reject it (most to least important)?
10: Rank the following attributes of your current professional job in technology according to those that contribute your job satisfaction so that the first is the most important, last is least important (if you just started a new job, consider the job you spent the most time at in the past year):
10: 

  .sort_values(question_id_col, key=lambda x: pd.to_numeric(x, errors="ignore"))


2. Which age range has the most responses in the survey?

In [42]:
import os
import subprocess
import tempfile
import shutil
import pandas as pd

SCHEMA_FILE = "/survey_results_schema.csv"
DATA_FILE = "/survey_results_public_cleaned.csv"

def is_lfs_pointer(filepath):
    with open(filepath, "r", encoding="utf-8", errors="ignore") as f:
        return "git-lfs.github.com/spec/v1" in f.read(200)

# Pull actual files from Git LFS if needed
if not os.path.exists(DATA_FILE) or is_lfs_pointer(DATA_FILE):
    working_dir = tempfile.mkdtemp()

    subprocess.run(["apt-get", "update", "-qq"], check=True)
    subprocess.run(["apt-get", "install", "-y", "git-lfs"], check=True)
    subprocess.run(["git", "clone", "https://github.com/MarshallWace/warwick-25.git", working_dir], check=True)
    subprocess.run(["git", "-C", working_dir, "lfs", "install"], check=True)
    subprocess.run(["git", "-C", working_dir, "lfs", "pull"], check=True)
    subprocess.run(["git", "-C", working_dir, "lfs", "checkout"], check=True)

    data_source = os.path.join(working_dir, "stack-overflow-developer-survey-2025", "survey_results_public_cleaned.csv")
    schema_source = os.path.join(working_dir, "stack-overflow-developer-survey-2025", "survey_results_schema.csv")

    shutil.copyfile(data_source, DATA_FILE)
    shutil.copyfile(schema_source, SCHEMA_FILE)

# Find the most common age range
survey_data = pd.read_csv(DATA_FILE, low_memory=False)

age_candidates = [col for col in survey_data.columns
                  if col.lower() in {"age", "agerange", "agegroup", "age_bracket"}
                  or "age" in col.lower()]

if not age_candidates:
    raise KeyError(f"No age column found. Available columns: {list(survey_data.columns)[:20]}")

age_column = age_candidates[0]
age_counts = survey_data[age_column].replace({"": pd.NA}).dropna().value_counts()

print(f"Age column: {age_column}")
print(f"Most common age range: {age_counts.index[0]} ({int(age_counts.iloc[0])} responses)")

Age column: Age
Most common age range: 25-34 years old (16485 responses)


3. How many survey respondents do we know definitely work for a company larger than Marshall Wace? (Feel free to ask one of us if you don't know how large Marshall Wace is!)

In [44]:
import os
import re
import tempfile
import shutil
import subprocess
import pandas as pd

SCHEMA_FILE = "/survey_results_schema.csv"
DATA_FILE = "/survey_results_public_cleaned.csv"
REPO_URL = "https://github.com/MarshallWace/warwick-25.git"
SURVEY_FOLDER = "stack-overflow-developer-survey-2025"
EMPLOYEE_THRESHOLD = 800

def is_lfs_pointer(filepath):
    try:
        with open(filepath, "r", encoding="utf-8", errors="ignore") as f:
            return "git-lfs.github.com/spec/v1" in f.read(200)
    except FileNotFoundError:
        return True

# Download actual survey files if we only have LFS pointers
if is_lfs_pointer(DATA_FILE) or is_lfs_pointer(SCHEMA_FILE):
    subprocess.run(["apt-get", "update", "-qq"], check=True)
    subprocess.run(["apt-get", "install", "-y", "git-lfs"], check=True)

    clone_directory = tempfile.mkdtemp()
    subprocess.run(["git", "clone", REPO_URL, clone_directory], check=True)
    subprocess.run(["git", "-C", clone_directory, "lfs", "install"], check=True)
    subprocess.run(["git", "-C", clone_directory, "lfs", "pull"], check=True)
    subprocess.run(["git", "-C", clone_directory, "lfs", "checkout"], check=True)

    survey_data_source = os.path.join(clone_directory, SURVEY_FOLDER, "survey_results_public_cleaned.csv")
    schema_source = os.path.join(clone_directory, SURVEY_FOLDER, "survey_results_schema.csv")

    shutil.copyfile(survey_data_source, DATA_FILE)
    shutil.copyfile(schema_source, SCHEMA_FILE)

survey_data = pd.read_csv(DATA_FILE, low_memory=False)
schema = pd.read_csv(SCHEMA_FILE)

# Look for company size column
size_column_candidates = [
    col for col in survey_data.columns
    if re.search(r'(org|company).*size|employees', col, flags=re.I)
]

# Fall back to searching the schema if we didn't find it
if not size_column_candidates:
    schema.columns = [str(col).strip().strip('"').strip("'").lstrip("\ufeff") for col in schema.columns]
    column_lookup = {col.lower(): col for col in schema.columns}

    question_name_col = column_lookup.get("qname")
    question_text_col = column_lookup.get("question", column_lookup.get("questiontext"))

    size_related_questions = []
    if question_name_col:
        size_related_questions += schema[question_name_col][
            schema[question_name_col].str.contains("size", case=False, na=False)
        ].tolist()
    if question_text_col:
        size_related_questions += schema[question_text_col][
            schema[question_text_col].str.contains("size", case=False, na=False)
        ].tolist()

    for question in map(str, size_related_questions):
        matching_columns = [col for col in survey_data.columns if col.lower() == question.lower()]
        if matching_columns:
            size_column_candidates.append(matching_columns[0])
            break

if not size_column_candidates:
    raise KeyError(f"No company size column found. Sample columns: {list(survey_data.columns)[:25]}")

company_size_column = size_column_candidates[0]

def extract_lower_bound(value):
    if pd.isna(value):
        return pd.NA

    text = str(value).replace(",", "").lower()

    # Handle "1000+" or "1000 or more" format
    open_ended_match = re.search(r"(\d+)\s*(\+|or more)", text)
    if open_ended_match:
        return int(open_ended_match.group(1))

    # Extract first number from range like "100-500"
    numbers = re.findall(r"\d+", text)
    return int(numbers[0]) if numbers else pd.NA

lower_bounds = survey_data[company_size_column].apply(extract_lower_bound)
respondents_above_threshold = int((lower_bounds > EMPLOYEE_THRESHOLD).sum())

print(f"Column: {company_size_column}")
print(f"Larger than {EMPLOYEE_THRESHOLD} employees: {respondents_above_threshold}")

Column: OrgSize
Larger than 800 employees: 10715


4. How many survey respondents had less than 1 year of coding experience before (or outside of) coding for their profession?

In [45]:
import re
import pandas as pd

SCHEMA_FILE = "/survey_results_schema.csv"
DATA_FILE = "/survey_results_public_cleaned.csv"

survey_data = pd.read_csv(DATA_FILE, low_memory=False)
schema = pd.read_csv(SCHEMA_FILE)
schema.columns = [str(col).strip().strip('"').strip("'").lstrip("\ufeff") for col in schema.columns]

def find_column(target_name):
    # Check schema question names first
    matching_questions = schema[schema["qname"].str.lower().eq(target_name.lower())]
    if not matching_questions.empty:
        question_name = matching_questions.iloc[0]["qname"]
        for col in survey_data.columns:
            if col.lower() == str(question_name).lower():
                return col

    # Fall back to direct column match
    for col in survey_data.columns:
        if col.lower() == target_name.lower():
            return col
    return None

# Try to find a dedicated non-professional years column
non_pro_column = next((
    col for col in survey_data.columns
    if "year" in col.lower() and "code" in col.lower()
    and ("before" in col.lower() or "outside" in col.lower())
), None)

# Otherwise look for total and professional years columns
total_years_column = find_column("YearsCode") or next((
    col for col in survey_data.columns
    if "yearscode" in col.lower() and "pro" not in col.lower()
), None)

pro_years_column = find_column("YearsCodePro") or next((
    col for col in survey_data.columns
    if "yearscodepro" in col.lower()
), None)

def parse_years(value):
    if pd.isna(value):
        return pd.NA

    text = str(value).strip().lower()
    if text in {"", "na", "nan"}:
        return pd.NA

    if "less than 1" in text or "<1" in text or "under 1" in text:
        return 0.5

    numeric_match = re.search(r"(\d+(\.\d+)?)", text)
    return float(numeric_match.group(1)) if numeric_match else pd.NA

# Calculate non-professional experience
if non_pro_column:
    non_pro_years = survey_data[non_pro_column].map(parse_years)
    beginner_count = int((non_pro_years.dropna() < 1).sum())
else:
    total_years = survey_data[total_years_column].map(parse_years) if total_years_column else pd.Series(pd.NA, index=survey_data.index)
    professional_years = survey_data[pro_years_column].map(parse_years) if pro_years_column else pd.Series(pd.NA, index=survey_data.index)

    has_both = total_years.notna() & professional_years.notna()
    non_pro_years = (total_years - professional_years).where(has_both).clip(lower=0)

    from_subtraction = (non_pro_years < 1).sum()
    from_total_only = (total_years.notna() & professional_years.isna() & (total_years < 1)).sum()
    beginner_count = int(from_subtraction + from_total_only)

print(f"Respondents with <1 year non-professional coding: {beginner_count}")

Respondents with <1 year non-professional coding: 0


5. Of the people who had 1 or more years of coding experience outside of coding professionally, what is the average number of years they spent coding outside of work? For simplicity, you can consider only the people who have given an exact number of years they have spent coding in both columns (i.e. excluding those with over 50 or less than 1 year)

In [46]:
# Average years coding outside work (>=1 year), exact numeric answers only

df = pd.read_csv(DATA_PATH, low_memory=False)

# detect columns
total_col = next((c for c in df.columns if c.lower() == "yearscode"), None)
pro_col   = next((c for c in df.columns if c.lower() in ("yearscodepro", "workexp")), None)
if total_col is None or pro_col is None:
    raise KeyError(f"Missing YearsCode/WorkExp columns. Found: {list(df.columns)[:20]}")

def to_exact_years(x):
    s = str(x).strip().lower()
    if not s or any(k in s for k in ["less than", "more than", "over", "<", ">", "+"]):
        return pd.NA
    return float(s) if re.fullmatch(r"\d+(\.\d+)?", s) else pd.NA

years_total = df[total_col].map(to_exact_years)
years_pro   = df[pro_col].map(to_exact_years)

mask_exact  = years_total.notna() & years_pro.notna()
mask_range  = years_total.between(1, 50) & years_pro.between(0, 50)
non_prof    = (years_total - years_pro).where(mask_exact & mask_range)

eligible = non_prof[non_prof >= 1]
print(f"Rows used: {int(eligible.count())}")
print(f"Average years outside work: {eligible.mean():.2f}")


Rows used: 30832
Average years outside work: 5.83


6. What is the median annual total compensation of those who specified their compensation in USD

In [47]:
# Median annual total compensation for respondents who specified USD

df = pd.read_csv(DATA_PATH, low_memory=False)

# Detect currency and compensation columns
cur_col = next((c for c in df.columns if c.lower() in (
    "currency","compcurrency","comp_total_currency","currencycode")), None)
if cur_col is None:
    cur_col = next((c for c in df.columns if "curren" in c.lower()), None)

comp_col = next((c for c in df.columns if c.lower() in (
    "convertedcompyearly","comptotalusd","comptotal","compusd","totalcompensation","annualcompensation","annualsalary","salaryusd","salary")), None)
if comp_col is None:
    # best-effort fallback
    comp_candidates = [c for c in df.columns if ("comp" in c.lower() or "salary" in c.lower()) and ("year" in c.lower() or "total" in c.lower())]
    comp_col = comp_candidates[0] if comp_candidates else None

if cur_col is None or comp_col is None:
    raise KeyError(f"Missing columns. Found sample: {list(df.columns)[:20]}")

# Filter USD rows
cur = df[cur_col].astype(str).str.strip().str.upper()
usd_mask = cur.isin({"USD","US DOLLAR","UNITED STATES DOLLAR","US DOLLARS","UNITED STATES DOLLARS"}) | cur.str.contains(r"\bUSD\b", na=False)

# Numeric compensation, sensible bounds
comp_usd = pd.to_numeric(df.loc[usd_mask, comp_col], errors="coerce")
comp_usd = comp_usd[(comp_usd > 0) & (comp_usd < 1_000_000_000)]

median_usd = comp_usd.median()
count_usd = int(comp_usd.notna().sum())

print(f"Currency column: {cur_col}")
print(f"Compensation column: {comp_col}")
print(f"USD respondents with numeric comp: {count_usd}")
print(f"Median annual compensation (USD): {median_usd:,.2f}")


Currency column: Currency
Compensation column: CompTotal
USD respondents with numeric comp: 6320
Median annual compensation (USD): 135,000.00


7. Which programming language has respondents with the highest annual compensation in USD? If a response lists multiple languages, you can attribute the compensation to each language in the response.

In [48]:
# Language with highest annual compensation in USD (attributes comp to every listed language)

df = pd.read_csv(DATA_PATH, low_memory=False)

# compensation in USD
if "ConvertedCompYearly" in df.columns:
    df["_CompUSD"] = pd.to_numeric(df["ConvertedCompYearly"], errors="coerce")
else:
    cur_col = next((c for c in df.columns if "curren" in c.lower()), None)
    comp_col = next((c for c in df.columns if ("comp" in c.lower() or "salary" in c.lower()) and ("total" in c.lower() or "year" in c.lower() or "annual" in c.lower() or "usd" in c.lower())), None)
    if cur_col is None or comp_col is None:
        raise KeyError(f"Missing currency/comp columns. Sample: {list(df.columns)[:20]}")
    cur = df[cur_col].astype(str).str.upper()
    usd_mask = cur.str.contains(r"\bUSD\b|UNITED STATES", na=False)
    df = df.loc[usd_mask].copy()
    df["_CompUSD"] = pd.to_numeric(df[comp_col], errors="coerce")

df = df[df["_CompUSD"].between(1, 1_000_000_000)]

# language column
lang_col = next((c for c in df.columns if "language" in c.lower() and ("worked" in c.lower() or "have" in c.lower())), None)
if lang_col is None:
    lang_col = next((c for c in df.columns if "language" in c.lower()), None)
if lang_col is None:
    raise KeyError(f"Language column not found. Sample: {list(df.columns)[:20]}")

def split_langs(s):
    if pd.isna(s): return []
    return [p.strip() for p in re.split(r"[;,\|]", str(s)) if p.strip()]

exploded = (
    df[[lang_col, "_CompUSD"]]
    .assign(Language=lambda d: d[lang_col].apply(split_langs))
    .explode("Language")
    .dropna(subset=["Language"])
)

stats = (
    exploded.groupby("Language")["_CompUSD"]
    .agg(median="median", max="max", count="count")
    .sort_values("median", ascending=False)
)

top_median_lang = stats.index[0]
top_median_val = stats.iloc[0]["median"]
top_max_lang = stats["max"].idxmax()
top_max_val = stats.loc[top_max_lang, "max"]

print(f"Language column: {lang_col}")
print(f"Top by median USD comp: {top_median_lang} ({top_median_val:,.2f})")
print(f"Top by highest single USD comp: {top_max_lang} ({top_max_val:,.2f})")
print("\nTop 10 by median:")
print(stats.head(10).to_string(formatters={'median': '{:,.0f}'.format, 'max': '{:,.0f}'.format}))


Language column: LanguageHaveWorkedWith
Top by median USD comp: Ruby (102,961.00)
Top by highest single USD comp: TypeScript (33,552,715.00)

Top 10 by median:
                         median       max  count
Language                                        
Ruby                    102,961 6,890,299   1550
Erlang                  100,000 6,000,000    313
Perl                     99,886 6,000,000    806
Elixir                   99,194 6,371,285    615
Scala                    96,524 6,000,000    600
Swift                    92,000 6,000,000   1198
Groovy                   90,362 6,000,000   1170
Lisp                     90,000 6,000,000    459
Go                       89,815 6,371,285   3782
Bash/Shell (all shells)  83,531 6,371,285  11102


## Bonus Task: SurveyDataReader

`SurveyDataReader` is a basic class that allows you to access the underlying survey data programmatically. The class is implemented with basic data structures and no external dependencies hence there is plenty of room for optimisation. Try to improve the speed of basic operations and add some of your own by potentially leveraging a package such as [NumPy](https://numpy.org/).

In [49]:
import pandas as pd
import numpy as np
from typing import List, Dict, Any, Optional, Iterable

RESPONSE_ID_FIELD_NAME = "ResponseId"
QUESTION_ID_FIELD_NAME = "qid"

class SurveyDataReader:
    """
    Fast survey data reader using pandas/numpy under the hood.

    Compatible with original reader API:
      get_schema(), get_data(), get_question_by_id(),
      get_responses_for_question(), get_response_by_id()

    Extra utility methods:
      value_counts, unique, filter_equals, filter_in, rows_for,
      groupby_agg, to_numpy, search_schema
    """

    def __init__(self, schema_file: str, data_file: str, memory_map: bool = True):
        # Load schema - keep everything as strings
        self.schema = pd.read_csv(
            schema_file,
            dtype=str,
            encoding="utf-8-sig",
            memory_map=memory_map,
            engine="c"
        )
        self.schema.columns = [col.strip().strip('"').strip("'") for col in self.schema.columns]

        # Load main survey data
        self.data = pd.read_csv(
            data_file,
            low_memory=False,
            encoding="utf-8-sig",
            memory_map=memory_map,
            engine="c"
        )
        self.data.columns = [col.strip().strip('"').strip("'") for col in self.data.columns]

        # Pre-convert to numpy arrays for faster access
        self._columns = {col: self.data[col].to_numpy(copy=False) for col in self.data.columns}

        # Build question ID lookup
        self._question_index = {}
        if QUESTION_ID_FIELD_NAME in self.schema.columns:
            question_ids = self.schema[QUESTION_ID_FIELD_NAME].astype(str)
            for i in range(len(self.schema)):
                qid = question_ids.iat[i]
                self._question_index[qid] = self.schema.iloc[i].to_dict()

        # Build response ID lookup
        self._response_index = {}
        if RESPONSE_ID_FIELD_NAME in self.data.columns:
            response_ids = self.data[RESPONSE_ID_FIELD_NAME].astype(str)
            for i in range(len(self.data)):
                self._response_index[response_ids.iat[i]] = i

        # For optional column indexes (built on demand)
        self._column_indexes = {}

    # Original API methods

    def _parse_schema(self, _: str) -> List[Dict[str, str]]:
        # Legacy compatibility
        return self.get_schema()

    def _parse_data(self, _: str) -> List[Dict[str, Any]]:
        return self.get_data()

    def get_schema(self) -> List[Dict[str, str]]:
        return self.schema.to_dict("records")

    def get_data(self) -> List[Dict[str, Any]]:
        return self.data.to_dict("records")

    def get_question_by_id(self, qid: str) -> Optional[Dict[str, str]]:
        return self._question_index.get(str(qid))

    def get_responses_for_question(self, qname: str) -> List[Any]:
        column = self._find_column(qname)
        if column is None:
            return []
        return self._columns[column].tolist()

    def get_response_by_id(self, response_id: str | int) -> Optional[Dict[str, Any]]:
        if not self._response_index:
            return None
        row_index = self._response_index.get(str(response_id))
        if row_index is None:
            return None
        return self.data.iloc[int(row_index)].to_dict()

    # Utility methods

    def value_counts(self, col: str, top: Optional[int] = None, normalise: bool = False) -> pd.Series:
        actual_column = self._get_column(col)
        counts = self.data[actual_column].value_counts(dropna=True, normalize=normalise)
        return counts.head(top) if top else counts

    def unique(self, col: str) -> np.ndarray:
        actual_column = self._get_column(col)
        return pd.unique(self._columns[actual_column])

    def filter_equals(self, col: str, value: Any, as_df: bool = True):
        actual_column = self._get_column(col)
        matches = self._columns[actual_column] == value
        result = self.data.loc[matches]
        return result if as_df else result.to_dict("records")

    def filter_in(self, col: str, values: Iterable[Any], as_df: bool = True):
        actual_column = self._get_column(col)
        result = self.data[self.data[actual_column].isin(values)]
        return result if as_df else result.to_dict("records")

    def make_index(self, col: str) -> None:
        """Build an inverted index for faster lookups on this column."""
        actual_column = self._get_column(col)
        index = {}
        values = self._columns[actual_column]
        for row_num, val in enumerate(values):
            if val not in index:
                index[val] = []
            index[val].append(row_num)
        self._column_indexes[actual_column] = index

    def rows_for(self, col: str, value: Any, as_df: bool = True):
        """Fast lookup using inverted index (builds one if needed)."""
        actual_column = self._get_column(col)
        if actual_column not in self._column_indexes:
            self.make_index(actual_column)

        row_indices = self._column_indexes[actual_column].get(value, [])
        result = self.data.iloc[row_indices]
        return result if as_df else result.to_dict("records")

    def groupby_agg(self, by: str | List[str], agg_spec: Dict[str, Any]) -> pd.DataFrame:
        group_cols = [by] if isinstance(by, str) else by
        return self.data.groupby(group_cols).agg(agg_spec).reset_index()

    def to_numpy(self, col: str) -> np.ndarray:
        actual_column = self._get_column(col)
        return self._columns[actual_column]

    def search_schema(self, text: str) -> pd.DataFrame:
        """Search for text across all schema columns."""
        search_term = str(text).lower()
        matching_rows = []

        for col in self.schema.columns:
            if self.schema[col].dtype == object:
                col_matches = self.schema[col].astype(str).str.lower().str.contains(search_term, na=False)
                matching_rows.append(col_matches)

        if not matching_rows:
            return self.schema.iloc[:0].copy()

        combined_mask = np.logical_or.reduce(matching_rows)
        return self.schema.loc[combined_mask].copy()

    # Internal helpers

    def _find_column(self, name: str) -> Optional[str]:
        """Find column by exact or case-insensitive match."""
        if name in self.data.columns:
            return name

        lowercase_name = name.lower()
        for col in self.data.columns:
            if col.lower() == lowercase_name:
                return col
        return None

    def _get_column(self, name: str) -> str:
        """Like _find_column but raises if not found."""
        column = self._find_column(name)
        if column is None:
            raise KeyError(f"Column not found: {name}")
        return column