In [1]:
import pandas as pd
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging

# ============ Setup logging ============
log_dir = Path("./logs")
log_dir.mkdir(exist_ok=True)
log_file = log_dir / "processing.log"

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(threadName)s] %(levelname)s: %(message)s",
    handlers=[
        logging.FileHandler(log_file),
        logging.StreamHandler(),
    ]
)
logger = logging.getLogger(__name__)


# ============ Constants ============
START_DATE = pd.to_datetime("2021-01-01")
# FIX: Hardcoded Census Date for reproducibility (matches your proposal end date)
CENSUS_DATE = pd.to_datetime("2025-09-30") 


# Formating company data for regression

**Company DF**

- rcid               → firm_id
- user_id            → worker_id
- onet_code          → SOC occupation
- seniority          → seniority group
- startdate/enddate  → spells
- weight             → Revelio weight

**Needed table**

Job → DWA probability



## Merging with task -> Job

### Non vectorized

In [None]:
# ============ Load processed/failed tracking ============
processed_file = Path("./logs/processed.txt")
failed_file = Path("./logs/failed.txt")

processed = set()
failed = set()

if processed_file.exists():
    with open(processed_file) as f:
        processed = set(line.strip() for line in f if line.strip())

if failed_file.exists():
    with open(failed_file) as f:
        failed = set(line.strip() for line in f if line.strip())

logger.info(f"Starting run: {len(processed)} already processed, {len(failed)} known failures")


In [None]:
# ============ Process function ============

def expand_months(row):
    # FIX: Snap start date to the 1st of the month
    # Without this, an employee starting Jan 15th would result in an empty range
    # because 'MS' (Month Start) would look for the next 1st (Feb 1st).
    start_snapped = row["startdate"].replace(day=1)
    
    months = pd.date_range(start_snapped, row["enddate"], freq="MS")
    
    return pd.DataFrame({
        "firm_id":   int(row["rcid"]),
        "onet_code": row["onet_code"],
        "seniority": int(row["seniority"]),
        "user_id":   int(row["user_id"]),
        "weight":    float(row["weight"]),
        "month":     months,
    })

def process_company_data(file_path, task_matrix):
    file_stem = file_path.stem
    try:
        df = pd.read_parquet(file_path, engine='pyarrow')
        
        # 1. Date Cleaning
        df['startdate'] = pd.to_datetime(df['startdate'])
        # FIX: Use CENSUS_DATE instead of now()
        df['enddate'] = pd.to_datetime(df['enddate']).fillna(CENSUS_DATE)
        
        # 2. Filter Valid Window
        # Keep anyone who worked AT ALL during the window
        df = df[df['enddate'] >= START_DATE].copy()
        
        # Clip start dates to window start so we don't expand months prior to 2021
        df.loc[df['startdate'] < START_DATE, 'startdate'] = START_DATE
        
        # 3. Expand to monthly panel (Non-Vectorized Loop)
        # Note: This is slower than vectorization but logically easier to read
        expanded_list = df.apply(expand_months, axis=1).tolist()
        
        if not expanded_list:
             # Handle case where file might be empty after filtering
             logger.warning(f"File {file_stem} resulted in empty dataframe after filtering")
             return (file_stem, "success") # Mark as success to avoid infinite retries
             
        expanded = pd.concat(expanded_list, ignore_index=True)

        # 4. Pre-Aggregation (Memory Fix)
        # Sum weights by Occupation first.
        expanded = expanded.groupby(
            ['firm_id', 'onet_code', 'month', 'seniority'], 
            as_index=False
        )['weight'].sum()

        expanded.rename(columns={'weight': 'headcount'}, inplace=True)

        # 5. Merge Tasks and Calculate FTE
        expanded = expanded.merge(task_matrix, left_on='onet_code', right_on='job_profile_id')
        expanded["FTE"] = expanded["FTE"] * expanded["headcount"]
        
        # 6. Final Aggregation
        panel = expanded.groupby(
            ['firm_id', 'dwa_id', 'month', 'seniority'],
            as_index=False
        )['FTE'].sum()
        
        output_dir = Path("./data/company_data_processed")
        output_dir.mkdir(parents=True, exist_ok=True)
        output_file = output_dir / f"processed_{file_stem}.parquet"
        panel.to_parquet(output_file, index=False, compression='snappy')
        
        logger.info(f"Successfully processed {file_stem}")
        return (file_stem, "success")
    
    except Exception as e:
        logger.error(f"Failed to process {file_stem}: {str(e)}", exc_info=True)
        return (file_stem, "failed")

# ============ Main processing loop ============
task_matrix = pd.read_csv('./data/job_DWA_FTE_distribution_30_0.csv')
task_matrix = task_matrix.dropna()
task_matrix = task_matrix[["job_profile_id", "dwa_id", "FTE"]]

data_dir = Path("./data/company_data")
parquet_files = list(data_dir.glob("*.parquet"))

# Filter to only unprocessed files
files_to_process = [
    f for f in parquet_files 
    if f.stem not in processed and f.stem not in failed
]

logger.info(f"Processing {len(files_to_process)} files (skipped {len(processed) + len(failed)} already done)")

# Process in parallel
futures_map = {}
with ThreadPoolExecutor(max_workers=5) as executor:
    for file_path in files_to_process:
        future = executor.submit(process_company_data, file_path, task_matrix)
        futures_map[future] = file_path.stem

    # Collect results
    for future in as_completed(futures_map):
        file_stem, status = future.result()
        
        if status == "success":
            processed.add(file_stem)
            with open(processed_file, "a") as f:
                f.write(f"{file_stem}\n")
        else:
            failed.add(file_stem)
            with open(failed_file, "a") as f:
                f.write(f"{file_stem}\n")

logger.info(f"Run complete: {len(processed)} successful, {len(failed)} failed")

2025-12-02 10:04:31,035 [MainThread] INFO: Starting run: 0 already processed, 0 known failures
2025-12-02 10:04:31,082 [MainThread] INFO: Processing 4092 files (skipped 0 already done)
2025-12-02 10:04:33,891 [ThreadPoolExecutor-0_2] INFO: Successfully processed 349767
2025-12-02 10:04:33,891 [ThreadPoolExecutor-0_3] INFO: Successfully processed 530946
2025-12-02 10:04:39,157 [ThreadPoolExecutor-0_1] INFO: Successfully processed 346494
2025-12-02 10:04:40,758 [ThreadPoolExecutor-0_2] INFO: Successfully processed 22251539
2025-12-02 10:04:40,842 [ThreadPoolExecutor-0_0] INFO: Successfully processed 778963
2025-12-02 10:04:40,905 [ThreadPoolExecutor-0_4] INFO: Successfully processed 22144010
2025-12-02 10:04:40,935 [ThreadPoolExecutor-0_1] INFO: Successfully processed 22217929
2025-12-02 10:04:45,632 [ThreadPoolExecutor-0_2] INFO: Successfully processed 196042
2025-12-02 10:04:45,852 [ThreadPoolExecutor-0_0] INFO: Successfully processed 22150778
2025-12-02 10:04:45,882 [ThreadPoolExecuto

# Merging on one seniority level

In [None]:
data_dir = Path("./data/company_data_processed")
parquet_files_processed = list(data_dir.glob("*.parquet"))

In [None]:
parquet_files_processed[0]

PosixPath('data/company_data_processed/processed_7924585.parquet')

In [None]:
SENIORITY = 1

# Use list + pd.concat instead of deprecated DataFrame.append
frames = []
for file_path in parquet_files_processed:
    tmp = pd.read_parquet(file_path)
    tmp = tmp[tmp["seniority"] == SENIORITY]
    if not tmp.empty:
        frames.append(tmp)

if frames:
    full_df = pd.concat(frames, ignore_index=True)
else:
    full_df = pd.DataFrame(columns=["firm_id", "dwa_id", "month", "seniority", "FTE"])

full_df.reset_index(drop=True, inplace=True)

In [None]:
full_df

Unnamed: 0,firm_id,dwa_id,month,seniority,FTE
0,7924585,4.A.1.a.1.I01.D01,2021-01-01,1,1.677072
1,7924585,4.A.1.a.1.I01.D01,2021-02-01,1,1.677072
2,7924585,4.A.1.a.1.I01.D01,2021-03-01,1,1.525569
3,7924585,4.A.1.a.1.I01.D01,2021-04-01,1,1.525569
4,7924585,4.A.1.a.1.I01.D01,2021-05-01,1,1.820836
...,...,...,...,...,...
307090,77744,4.A.4.c.3.I07.D01,2025-05-01,1,0.040740
307091,77744,4.A.4.c.3.I07.D01,2025-06-01,1,0.040740
307092,77744,4.A.4.c.3.I07.D01,2025-07-01,1,0.040740
307093,77744,4.A.4.c.3.I07.D01,2025-08-01,1,0.040740
