# OEWS Code Standardization

This notebook documents the end-to-end process to standardize occupational codes (OCC_CODE) in BLS's State Occupational Employment and Wage Estimates (OEWS) data to 6-digit SOC 2019 codes. The standardized OEWS table was used for enriching the Career229K dataset with occupational wage information as described in the paper "Leveraging Large Language Models for Career Mobility Analysis: A Study of Gender, Race, and Job Change Using U.S. Online Resume Profiles."

**Important Note on Reproducibility**

* This pipeline relies on public BLS OEWS data and O*NET SOC crosswalks.
* The codebase reproduces the data processing steps described in the paper.
* Minor divergences from the wage tables used in the paper exist (e.g., due to code mapping and aggregation differences), but downstream effects are minimal.
* The file names used in the code are placeholders. To run this notebook, users must substitute them with their own files.

## Overview of Pipeline

The pipeline has four main stages following the steps used in the paper :

### 1. Generate SOC Code Mapping Table

Construct a lookup table mapping all legacy occupational codes (``OCC_CODE``) in the OEWS tables from 1999 - 2022 to O*NET-SOC 2019 6-digit codes, using multiple BLS crosswalk files. The mapping accounts for SOC system updates and code splits and merges over time.

### 2. Standardize Occupational Codes in OEWS Data

Apply the mapping table to standardize all ``OCC_CODE`` entries in the OEWS tables. If multiple old codes map to the same SOC 2019 code, wage values are aggregated by mean within each group.

### 3. Global Imputation of Standardized OEWS Data

Remaining missing wage values within each `(OCC_CODE, AREA_TITLE)` group are filled using linear interpolation over time.

## Input Files

### 1. OEWS CSV File

An OEWS wage table compiled from the original annual OEWS data files. This file contains the raw occupational wage information for all areas and occupations from 1997 - 2002 prior to code standardization. Download it [here](https://drive.google.com/file/d/1bifxmMD-IldLH2OwweHg1s18w9aYwTnv/view?usp=drive_link).

Required columns:

| Column       | Description                                                                                                             |
| ------------ | ----------------------------------------------------------------------------------------------------------------------- |
| `year`       | Survey year (1997–2022, though only 1999+ is used)                                                                      |
| `AREA_TITLE` | Geographic area (e.g., “California”)                                                                                    |
| `OCC_CODE`   | Original SOC code (maybe legacy or SOC 2019)                                                                            |
| `OCC_TITLE`  | Occupation title                                                                                                        |
| `O_GROUP`    | BLS occupation group (filtered to `"detailed"`)                                                                         |
| Wage fields  | `A_MEAN`, `H_MEAN`, `A_MEDIAN`, `H_MEDIAN`, and so on.                                                                  |

### 2. SOC Crosswalk CSV Files

These are used to construct the mapping from older SOC versions to SOC 2019. Downloadable from [the O*NET Taxonomy website)](https://www.onetcenter.org/taxonomy.html).

### 3. SOC Mapping CSV File

A complete SOC crosswalk table that maps all 8-digit legacy SOC codes to 8-digit SOC 2019 codes, using multiple O*NET crosswalk files, created in Stetp 1. The mapping table is constructed via depth-first search through the crosswalk graph to trace all possible paths from legacy codes to their SOC 2019 equivalents. For convenience, a pre-generated mapping file is availabel at `/data/soc_non2019_to_2019_mapping.csv`.

**Required columns:**

| Column                   | Description                                                                                     |
| ------------------------ | ----------------------------------------------------------------------------------------------- |
| `original_code`          | Original SOC code as it appeared in legacy OEWS data                                            |
| `normalized_code`        | Canonicalized SOC code (e.g., `151030` → `15-1030.00`) used for consistent merging              |
| `mapped_soc2019_code`    | Final 6-digit SOC 2019 code reached through crosswalk traversal                                 |
| `mapped_soc2019_title`   | Official SOC 2019 occupation title corresponding to `mapped_soc2019_code`                       |
| `path_length`            | Number of crosswalk steps required to reach SOC 2019 (e.g., 0 if already SOC 2019, 3 if legacy) |
| `first_detected_version` | Earliest SOC taxonomy version in which the original code appeared                               |
| `last_detected_version`  | Last SOC taxonomy version encountered along the mapping path                                    |

### 4. O*NET-SOC 2018 CSV File

Used for mapping SOC codes to their corresponding official occupation titles (``OCC_TITLE``) after standardization.

# Generate SOC Code Mapping Table

In [None]:
import pandas as pd
import re

# -----------------------------
# Paths
# -----------------------------
CROSSWALK_FILES = {
    "2000_2006": "../data/2000_to_2006_Crosswalk.csv",
    "2006_2009": "../data/2006_to_2009_Crosswalk.csv",
    "2009_2010": "../data/2009_to_2010_Crosswalk.csv",
    "2010_2019": "../data/2010_to_2019_Crosswalk.csv",
}
SOC2019_PATH = "../data/onet-soc_2019.csv"
OUT_MAPPING_CSV = "../data/soc_non2019_to_2019_mapping.csv"

# -----------------------------
# Utilities
# -----------------------------
def normalize_soc_code(code):
    if pd.isna(code):
        return None
    s = str(code).strip()
    if s.lower() in {"", "na", "nan", "n/a", "none"}:
        return None

    # Remove quotes
    s = s.strip('"').strip("'")

    # Remove non-digit/non-dot characters except dash
    s = re.sub(r'[^0-9\.-]', '', s)

    # Split decimal if exists
    if '.' in s:
        main, suffix = s.split('.')
        suffix = suffix.ljust(2, '0')  # pad to 2 digits
    else:
        main = s
        suffix = '00'

    # Insert dash in main if needed
    main_digits = re.sub(r'\D', '', main)
    if len(main_digits) != 6:
        return None  # invalid
    main_formatted = main_digits[:2] + '-' + main_digits[2:]

    return f"{main_formatted}.{suffix}"


# -----------------------------
# Load SOC 2019 taxonomy
# -----------------------------
soc2019_df = pd.read_csv(SOC2019_PATH, dtype=str)
soc2019_df['soc_norm'] = soc2019_df['code'].apply(normalize_soc_code)
soc2019_codes = set(soc2019_df['soc_norm'].dropna())
soc2019_title_map = dict(zip(soc2019_df['soc_norm'], soc2019_df['title']))
print(f"Loaded {len(soc2019_codes)} SOC 2019 codes")

# -----------------------------
# Build crosswalk adjacency with version tracking
# -----------------------------
version_map = {
    "2000_2006": ("O*NET-SOC 2000 Code", "O*NET-SOC 2006 Code", "O*NET-SOC 2006 Title", "2000"),
    "2006_2009": ("O*NET-SOC 2006 Code", "O*NET-SOC 2009 Code", "O*NET-SOC 2009 Title", "2006"),
    "2009_2010": ("O*NET-SOC 2009 Code", "O*NET-SOC 2010 Code", "O*NET-SOC 2010 Title", "2009"),
    "2010_2019": ("O*NET-SOC 2010 Code", "O*NET-SOC 2019 Code", "O*NET-SOC 2019 Title", "2010"),
}

code_to_next = dict()
code_to_version = dict()  # node -> version

for ver, path in CROSSWALK_FILES.items():
    df = pd.read_csv(path, dtype=str)
    src_col, tgt_col, tgt_title_col, ver_label = version_map[ver]
    for _, row in df.iterrows():
        src_norm = normalize_soc_code(row[src_col])
        tgt_norm = normalize_soc_code(row[tgt_col])
        tgt_title = row[tgt_title_col]
        if src_norm and tgt_norm:
            code_to_next.setdefault(src_norm, []).append((tgt_norm, tgt_title))
            code_to_version[src_norm] = ver_label
            code_to_version[tgt_norm] = ver_label

print(f"Built stepwise crosswalk with {len(code_to_next)} source codes")

# -----------------------------
# DFS to collect all paths to SOC 2019
# -----------------------------
def dfs_collect_2019(code, first_ver=None, visited=None):
    if visited is None:
        visited = set()
    if code in visited:
        return []
    visited.add(code)

    # Stop only if truly SOC 2019
    if code in soc2019_codes:
        return [(code, 0, first_ver or code_to_version.get(code, "unknown"),
                 code_to_version.get(code, "unknown"))]

    results = []
    for next_code, _ in code_to_next.get(code, []):
        next_ver = code_to_version.get(next_code, "unknown")
        new_first_ver = first_ver or code_to_version.get(code, "unknown")
        sub_paths = dfs_collect_2019(next_code, new_first_ver, visited.copy())
        for fc, path_len, first_detected, last_detected in sub_paths:
            results.append((fc, path_len + 1, first_detected, next_ver))
    return results

# -----------------------------
# Map all non-2019 codes
# -----------------------------
all_old_codes = set(code_to_next.keys()) - soc2019_codes
mapping_rows = []
unmapped_codes = []

for i, orig_code in enumerate(sorted(all_old_codes)):
    orig_norm = normalize_soc_code(orig_code)
    final_paths = dfs_collect_2019(orig_norm)
    if final_paths:
        for fc, path_len, first_ver, last_ver in final_paths:
            mapping_rows.append({
                "original_code": orig_code,
                "normalized_code": orig_norm,
                "mapped_soc2019_code": fc,
                "mapped_soc2019_title": soc2019_title_map.get(fc),
                "path_length": path_len,
                "first_detected_version": first_ver,
                "last_detected_version": last_ver
            })
    else:
        unmapped_codes.append(orig_code)

# -----------------------------
# Save mapping
# -----------------------------
mapping_df = pd.DataFrame(mapping_rows)
mapping_df.to_csv(OUT_MAPPING_CSV, index=False)
print(f"\nStepwise crosswalk mapping saved to {OUT_MAPPING_CSV}")
print(f"Total mapped codes: {len(mapping_df)}")
print(f"Total unmapped codes: {len(unmapped_codes)}")
if unmapped_codes:
    print("Some codes could not be mapped to 2019:")
    print(", ".join(unmapped_codes[:20]) + ("..." if len(unmapped_codes) > 20 else ""))


# Standardize Occupational Codes in OEWS Data

In [None]:
import pandas as pd
import time

def standardize_wage_soc2019(wage_df, mapping_df, soc2018_titles_df, verbose=True):
    """
    Standardize BLS wage data to SOC 2019 codes using stepwise crosswalk mapping.
    - Aggregates wage rows by mean for each (year, AREA_TITLE, OCC_CODE, OCC_TITLE).
    - Flags rows as 'is_aggregate' if multiple legacy codes are collapsed.
    - Returns aggregated standardized DataFrame only.
    """
    start_total = time.time()

    # --- 1. Filter wage data ---
    start = time.time()
    wage_df = wage_df[wage_df['year'].astype(int) >= 1999].copy()
    wage_df = wage_df[wage_df['O_GROUP'].isna() | (wage_df['O_GROUP'] == 'detailed')].copy()
    wage_df['OLD_OCC_CODE'] = wage_df['OCC_CODE']
    wage_df['OLD_OCC_TITLE'] = wage_df['OCC_TITLE']
    wage_df = wage_df.drop(columns=['O_GROUP'])
    wage_df['OCC_CODE_TRUNC'] = wage_df['OLD_OCC_CODE'].str[:7]

    if verbose:
        print(f"Filtering and truncating done in {time.time() - start:.2f} seconds")

    # --- 2. Prepare mapping ---
    mapping_df = mapping_df.copy()
    mapping_df['original_code_trunc'] = mapping_df['original_code'].str[:7]
    mapping_df['mapped_soc2019_code_trunc'] = mapping_df['mapped_soc2019_code'].str[:7]

    # --- 3. Merge wage data with mapping ---
    start = time.time()
    merged = wage_df.merge(
        mapping_df[['original_code_trunc', 'mapped_soc2019_code_trunc']],
        left_on='OCC_CODE_TRUNC',
        right_on='original_code_trunc',
        how='left'
    )
    merged['OCC_CODE'] = merged['mapped_soc2019_code_trunc'].fillna(merged['OCC_CODE_TRUNC'])

    if verbose:
        print(f"Merging wage with mapping done in {time.time() - start:.2f} seconds")
        print(f"# unique OCC_CODE after merging: {merged['OCC_CODE'].nunique()}")

    # --- 4. Map SOC2018 titles ---
    start = time.time()
    soc2018_title_map = dict(zip(soc2018_titles_df['OCC_CODE'], soc2018_titles_df['OCC_TITLE_SOC2018']))
    merged['OCC_TITLE'] = merged['OCC_CODE'].map(soc2018_title_map)

    if verbose:
        print(f"Mapping SOC2018 titles done in {time.time() - start:.2f} seconds")

    # --- 5. Mean aggregation ---
    start = time.time()
    wage_cols = [
        'TOT_EMP', 'H_MEAN', 'A_MEAN', 'H_MEDIAN', 'A_MEDIAN',
        'H_PCT10', 'A_PCT10', 'H_PCT25', 'A_PCT25', 'H_PCT90', 'A_PCT90'
    ]

    # Ensure numeric
    for col in wage_cols:
        merged[col] = pd.to_numeric(merged[col], errors='coerce')

    # Mean aggregation
    aggregated = merged.groupby(
        ['year', 'AREA_TITLE', 'OCC_CODE', 'OCC_TITLE'],
        as_index=False
    )[wage_cols].mean()

    # Compute group sizes to flag aggregates
    group_sizes = merged.groupby(
        ['year', 'AREA_TITLE', 'OCC_CODE', 'OCC_TITLE']
    ).size().reset_index(name='group_size')
    aggregated = aggregated.merge(group_sizes, on=['year', 'AREA_TITLE', 'OCC_CODE', 'OCC_TITLE'])
    aggregated['is_aggregate'] = aggregated['group_size'] > 1
    aggregated = aggregated.drop(columns=['group_size'])

    if verbose:
        print(f"Mean aggregation and aggregate flag done in {time.time() - start:.2f} seconds")

    merged = None  # free memory

    if verbose:
        print(f"Total pipeline runtime: {time.time() - start_total:.2f} seconds")

    return aggregated

# -----------------------------
# Run standardization
# -----------------------------
wage_df = pd.read_csv(f"../data/wage_1997_2022.csv", low_memory=False)
unique_file = "../data/wage_1999_2022_soc2019_unique.csv.gz"
unique_df = standardize_wage_soc2019(wage_df, mapping_df, soc2018_titles_df, verbose=True)
unique_df.to_csv(unique_file, index=False, compression='gzip')
print(f"Final unique wage table saved: {unique_file}")


# Global Imputation of Standardized OEWS Data

In [None]:
import pandas as pd
import time

def impute_wage_timeseries(df, progress_every=5000):
    """
    Impute all wage columns via linear interpolation between years within each (OCC_CODE, AREA_TITLE) group.
    - Only interpolates, no forward/backward fill.
    - Includes progress printout for large datasets.
    """
    wage_cols = ['H_MEAN', 'A_MEAN', 'H_MEDIAN', 'A_MEDIAN',
                 'H_PCT10', 'A_PCT10', 'H_PCT25', 'A_PCT25',
                 'H_PCT90', 'A_PCT90']

    # Ensure numeric
    df[wage_cols] = df[wage_cols].apply(pd.to_numeric, errors='coerce')

    # Sort for interpolation
    df = df.sort_values(['OCC_CODE', 'AREA_TITLE', 'year'])

    grouped = df.groupby(['OCC_CODE', 'AREA_TITLE'], group_keys=False)
    total_groups = len(grouped)
    print(f"Starting imputation for {total_groups:,} groups...")

    result = []
    for i, (_, group) in enumerate(grouped):
        # Interpolate within group
        group[wage_cols] = group[wage_cols].interpolate(method='linear', axis=0)

        if (i + 1) % progress_every == 0:
            print(f"  → Processed {i + 1:,}/{total_groups:,} groups...")

        result.append(group)

    df_imputed = pd.concat(result)
    print("Imputation complete.")
    return df_imputed

# Load and impute
wage_std_df = pd.read_csv("../data/wage_1999_2022_soc2019_unique.csv.gz")
start = time.time()
wage_interp_df = impute_wage_timeseries(wage_std_df)
end = time.time()
print(f"Done in {end-start:.2f} sec")

# Save
wage_interp_df.to_csv(
    "../data/wage_interpolated_1999_2022_soc2019_unique.csv.gz",
    index=False, compression='gzip'
)
print("Imputation complete and saved")