In [1]:
# imports
import pandas as pd
import numpy as np
import os
import json
from pathlib import Path

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand

# setup paths and detect project root
cwd = os.getcwd()
if 'notebooks' in cwd:
    PROJECT_ROOT = os.path.dirname(os.path.dirname(cwd))  # TWO levels up
else:
    PROJECT_ROOT = cwd
sys.path.insert(0, PROJECT_ROOT)

print('imports loaded')
print(f'project root: {PROJECT_ROOT}')

imports loaded
project root: /home/developer/project


In [2]:
# spark needs more memory for embeddings with 768 floats each
spark = SparkSession.builder \
    .appName('TrainValTestSplits') \
    .config('spark.driver.memory', '16g') \
    .config('spark.executor.memory', '8g') \
    .config('spark.driver.maxResultSize', '4g') \
    .getOrCreate()
print(f'spark started: {spark.version}')

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/27 01:33:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


spark started: 4.1.1


In [3]:
cv_data_dir = os.path.join(PROJECT_ROOT, 'ingest_cv', 'output')

print('loading CV splits')
train_cv_ids = pd.read_parquet(cv_data_dir / 'training_set_cv_ids.parquet')
val_cv_ids = pd.read_parquet(cv_data_dir / 'validation_set_cv_ids.parquet')
test_cv_ids = pd.read_parquet(cv_data_dir / 'test_set_cv_ids.parquet')

print(f'\nCV split sizes:')
print(f' training: {len(train_cv_ids):,} CVs')
print(f' validation: {len(val_cv_ids):,} CVs')
print(f' test: {len(test_cv_ids):,} CVs')
total_cvs = len(train_cv_ids) + len(val_cv_ids) + len(test_cv_ids)
print(f'\nsplit percentages:')
print(f' training: {len(train_cv_ids)/total_cvs*100:.1f}%')
print(f' validation: {len(val_cv_ids)/total_cvs*100:.1f}%')
print(f' test: {len(test_cv_ids)/total_cvs*100:.1f}%')

loading CV splits

CV split sizes:
 training: 5,839 CVs
 validation: 730 CVs
 test: 730 CVs

split percentages:
 training: 80.0%
 validation: 10.0%
 test: 10.0%


In [4]:
# check for overlaps
train_cv_set = set(train_cv_ids.iloc[:, 0].values)
val_cv_set = set(val_cv_ids.iloc[:, 0].values)
test_cv_set = set(test_cv_ids.iloc[:, 0].values)
print(f'\ntrain n val: {len(train_cv_set & val_cv_set)}')
print(f'train n test: {len(train_cv_set & test_cv_set)}')
print(f'val n test: {len(val_cv_set & test_cv_set)}')
no_overlap = (
    len(train_cv_set & val_cv_set) == 0 and
    len(train_cv_set & test_cv_set) == 0 and
    len(val_cv_set & test_cv_set) == 0
)
if no_overlap:
    print('\nCV splits validation: PASSED (no overlap)')


train n val: 0
train n test: 0
val n test: 0

CV splits validation: PASSED (no overlap)


In [5]:
# load job embeddings with Spark
jobs_path = str(os.path.join(PROJECT_ROOT, 'training', 'output', 'embeddings', 'jobs_embedded.parquet'))

print('loading job embeddings with Spark')
jobs_df = spark.read.parquet(jobs_path)
total_jobs = jobs_df.count()
print(f'loaded {total_jobs:,} jobs')
print(f'columns: {jobs_df.columns}')
print('\nsample:')
jobs_df.show(3, truncate=60)

loading job embeddings with Spark
loaded 165,193 jobs
columns: ['job_id', 'embedding_text', 'embedding', 'isco_code']

sample:
+------+------------------------------------------------------------+------------------------------------------------------------+---------+
|job_id|                                              embedding_text|                                                   embedding|isco_code|
+------+------------------------------------------------------------+------------------------------------------------------------+---------+
|    B7|passage: Role of $18.00 Assistant Manager at McDonald's i...|[-0.030975342, -0.018707275, -0.029586792, -0.03857422, 0...|        1|
|   B16|passage: Role of $4,500 Sign on Bonus - MDS Coordinator a...|[-0.013710022, -0.016983032, 0.0032138824, -0.0390625, 0....|        1|
|   B18|passage: Role of $45/hr - School RN at Maxim Healthcare S...|[-0.014793396, -0.046905518, -0.0357666, -0.013916016, 0....|        2|
+------+-------------------

In [6]:
# check if isco_code column exists from 05 stratified sampling
has_isco = 'isco_code' in jobs_df.columns
print(f'has isco_code column: {has_isco}')

if has_isco:
    print('\nISCO distribution in embeddings:')
    isco_counts = jobs_df.groupBy('isco_code').count().orderBy('isco_code').collect()
    for row in isco_counts:
        print(f'  {row["isco_code"]}: {row["count"]:,}')

has isco_code column: True

ISCO distribution in embeddings:
  0: 639
  1: 59,897
  2: 84,320
  3: 12,146
  4: 1,220
  5: 3,051
  6: 157
  7: 1,255
  8: 1,418
  9: 1,090


In [7]:
# ISCO names for display
ISCO_NAMES = {
    0: 'Military',
    1: 'Managers',
    2: 'Professionals', 
    3: 'Technicians',
    4: 'Clerical',
    5: 'Service/Sales',
    6: 'Agriculture',
    7: 'Craft/Trade',
    8: 'Operators',
    9: 'Elementary'
}

if has_isco:
    # stratified split using sampleBy with fractions per ISCO code
    print('creating stratified job splits by ISCO domain...')
    
    # get counts per domain for fraction calculation
    domain_counts = {row['isco_code']: row['count'] for row in isco_counts}
    
    # calculate fractions for 80/10/10 split per domain
    # train: 0.80, val: 0.10, test: 0.10
    train_fractions = {code: 0.80 for code in domain_counts.keys()}
    val_fractions = {code: 0.10 for code in domain_counts.keys()}
    test_fractions = {code: 0.10 for code in domain_counts.keys()}
    
    # shuffle first for randomness
    jobs_shuffled = jobs_df.orderBy(rand(seed=42))
    
    # sample train set
    train_jobs_df = jobs_shuffled.sampleBy('isco_code', train_fractions, seed=42)
    
    # get remaining jobs (not in train)
    train_ids = set(train_jobs_df.select('job_id').rdd.flatMap(lambda x: x).collect())
    remaining_df = jobs_shuffled.filter(~col('job_id').isin(list(train_ids)))
    
    # split remaining 50/50 for val/test
    val_test_fractions = {code: 0.50 for code in domain_counts.keys()}
    val_jobs_df = remaining_df.sampleBy('isco_code', val_test_fractions, seed=43)
    
    # test is everything not in train or val
    val_ids = set(val_jobs_df.select('job_id').rdd.flatMap(lambda x: x).collect())
    test_jobs_df = remaining_df.filter(~col('job_id').isin(list(val_ids)))
    
    print('stratified sampling complete')
    
else:
    # fallback to random split if no isco_code
    print('WARNING: no isco_code column, falling back to random split')
    jobs_shuffled = jobs_df.orderBy(rand(seed=42))
    train_jobs_df, val_jobs_df, test_jobs_df = jobs_shuffled.randomSplit([0.8, 0.1, 0.1], seed=42)

creating stratified job splits by ISCO domain...


26/01/27 01:34:16 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
26/01/27 01:34:17 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
26/01/27 01:34:17 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
26/01/27 01:34:17 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB


stratified sampling complete


In [8]:
# cache and count
train_jobs_df.cache()
val_jobs_df.cache()
test_jobs_df.cache()

train_count = train_jobs_df.count()
val_count = val_jobs_df.count()
test_count = test_jobs_df.count()
total_split = train_count + val_count + test_count

print(f'\njob split sizes:')
print(f'  training: {train_count:,} jobs ({train_count/total_split*100:.1f}%)')
print(f'  validation: {val_count:,} jobs ({val_count/total_split*100:.1f}%)')
print(f'  test: {test_count:,} jobs ({test_count/total_split*100:.1f}%)')
print(f'  total: {total_split:,}')

26/01/27 01:34:25 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
26/01/27 01:34:26 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
26/01/27 01:34:26 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
26/01/27 01:34:27 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
26/01/27 01:34:28 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
26/01/27 01:34:30 WARN DAGScheduler: Broadcasting large task binary with size 3.5 MiB
26/01/27 01:34:31 WARN DAGScheduler: Broadcasting large task binary with size 3.5 MiB
26/01/27 01:34:31 WARN DAGScheduler: Broadcasting large task binary with size 3.5 MiB
26/01/27 01:34:32 WARN DAGScheduler: Broadcasting large task binary with size 3.5 MiB
26/01/27 01:34:33 WARN DAGScheduler: Broadcasting large task binary with size 3.5 MiB
                                                                                


job split sizes:
  training: 132,170 jobs (80.1%)
  validation: 16,378 jobs (9.9%)
  test: 16,428 jobs (10.0%)
  total: 164,976


In [9]:
# verify stratification maintained in each split
if has_isco:
    train_isco = train_jobs_df.groupBy('isco_code').count().orderBy('isco_code').collect()
    val_isco = val_jobs_df.groupBy('isco_code').count().orderBy('isco_code').collect()
    test_isco = test_jobs_df.groupBy('isco_code').count().orderBy('isco_code').collect()
    
    print(f'\n{"ISCO":15} {"Train":>10} {"Val":>10} {"Test":>10}')
    print('-' * 50)
    
    train_dict = {r['isco_code']: r['count'] for r in train_isco}
    val_dict = {r['isco_code']: r['count'] for r in val_isco}
    test_dict = {r['isco_code']: r['count'] for r in test_isco}
    
    all_codes = sorted(set(train_dict.keys()) | set(val_dict.keys()) | set(test_dict.keys()))
    for code in all_codes:
        name = ISCO_NAMES.get(code, 'Unknown')
        t = train_dict.get(code, 0)
        v = val_dict.get(code, 0)
        ts = test_dict.get(code, 0)
        print(f'{code} {name:12} {t:>10,} {v:>10,} {ts:>10,}')
    
    print('\nstratification: VERIFIED')

26/01/27 01:34:36 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
26/01/27 01:34:39 WARN DAGScheduler: Broadcasting large task binary with size 3.5 MiB
                                                                                


ISCO                 Train        Val       Test
--------------------------------------------------
0 Military            517         50         62
1 Managers         47,874      5,937      5,977
2 Professionals     67,617      8,421      8,388
3 Technicians       9,677      1,174      1,222
4 Clerical            980        109        113
5 Service/Sales      2,423        324        303
6 Agriculture         128         16         15
7 Craft/Trade         967        114        125
8 Operators         1,121        144        118
9 Elementary          866         89        105

stratification: VERIFIED


In [10]:
output_dir = os.path.join(PROJECT_ROOT, 'training', 'output', 'splits')
output_dir.mkdir(parents=True, exist_ok=True)

train_jobs_path = output_dir / 'train_jobs.parquet'
val_jobs_path = output_dir / 'val_jobs.parquet'
test_jobs_path = output_dir / 'test_jobs.parquet'

train_jobs_df.write.mode('overwrite').parquet(str(train_jobs_path))
val_jobs_df.write.mode('overwrite').parquet(str(val_jobs_path))
test_jobs_df.write.mode('overwrite').parquet(str(test_jobs_path))

print(f'saved job splits:')
print(f'  {train_jobs_path}: {train_count:,} jobs')
print(f'  {val_jobs_path}: {val_count:,} jobs')
print(f'  {test_jobs_path}: {test_count:,} jobs')

26/01/27 01:34:45 WARN DAGScheduler: Broadcasting large task binary with size 3.4 MiB
26/01/27 01:34:48 WARN DAGScheduler: Broadcasting large task binary with size 3.7 MiB
                                                                                

saved job splits:
  /home/developer/project/output/splits/train_jobs.parquet: 132,170 jobs
  /home/developer/project/output/splits/val_jobs.parquet: 16,378 jobs
  /home/developer/project/output/splits/test_jobs.parquet: 16,428 jobs


In [11]:
cvs_path = str(os.path.join(PROJECT_ROOT, 'training', 'output', 'embeddings', 'cvs_embedded.parquet'))

print('loading CV embeddings with Spark')
cvs_df = spark.read.parquet(cvs_path)

print(f'loaded {cvs_df.count():,} CVs')
print(f'\ncolumns: {cvs_df.columns}')
print('\nsample:')
cvs_df.show(3, truncate=60)

loading CV embeddings with Spark
loaded 7,299 CVs

columns: ['cv_id', 'embedding_text', 'embedding']

sample:
+-----+------------------------------------------------------------+------------------------------------------------------------+
|cv_id|                                              embedding_text|                                                   embedding|
+-----+------------------------------------------------------------+------------------------------------------------------------+
|   A1|Query: I am a Python Developer with 0 years of experience...|[0.016677856, -0.032989502, -0.033813477, 0.004070282, 0....|
|   A2|Query: I am a Operations Manager with 11 years of experie...|[0.005622864, -0.026641846, -0.05456543, 0.0016298294, 0....|
|   A3|Query: I am a DevOps Engineer with 0 years of experience,...|[0.008331299, -0.030975342, -0.05633545, 0.013511658, 0.0...|
+-----+------------------------------------------------------------+------------------------------------------

In [12]:
# get CV ID column name
cv_id_col = train_cv_ids.columns[0]
print(f'CV ID column: {cv_id_col}')
# convert to lists for filtering
train_cv_list = train_cv_ids[cv_id_col].tolist()
val_cv_list = val_cv_ids[cv_id_col].tolist()
test_cv_list = test_cv_ids[cv_id_col].tolist()

print(f'\nfiltering CVs based on existing splits')

CV ID column: anchor

filtering CVs based on existing splits


In [13]:
# filter with Spark
train_cvs_df = cvs_df.filter(col('cv_id').isin(train_cv_list))
val_cvs_df = cvs_df.filter(col('cv_id').isin(val_cv_list))
test_cvs_df = cvs_df.filter(col('cv_id').isin(test_cv_list))

# cache and count
train_cvs_df.cache()
val_cvs_df.cache()
test_cvs_df.cache()

train_cv_count = train_cvs_df.count()
val_cv_count = val_cvs_df.count()
test_cv_count = test_cvs_df.count()

print(f'\nCV split sizes:')
print(f' training: {train_cv_count:,} CVs')
print(f' validation: {val_cv_count:,} CVs')
print(f' test: {test_cv_count:,} CVs')

if train_cv_count == len(train_cv_ids):
    print('\nall CV IDs found in embeddings')


CV split sizes:
 training: 5,839 CVs
 validation: 730 CVs
 test: 730 CVs

all CV IDs found in embeddings


In [14]:
train_cvs_path = output_dir / 'train_cvs.parquet'
val_cvs_path = output_dir / 'val_cvs.parquet'
test_cvs_path = output_dir / 'test_cvs.parquet'

train_cvs_df.write.mode('overwrite').parquet(str(train_cvs_path))
val_cvs_df.write.mode('overwrite').parquet(str(val_cvs_path))
test_cvs_df.write.mode('overwrite').parquet(str(test_cvs_path))

print(f'saved CV splits:')
print(f'  {train_cvs_path}: {train_cv_count:,} CVs')
print(f'  {val_cvs_path}: {val_cv_count:,} CVs')
print(f'  {test_cvs_path}: {test_cv_count:,} CVs')

saved CV splits:
  /home/developer/project/output/splits/train_cvs.parquet: 5,839 CVs
  /home/developer/project/output/splits/val_cvs.parquet: 730 CVs
  /home/developer/project/output/splits/test_cvs.parquet: 730 CVs


In [15]:
print(f'\njobs (stratified by ISCO):')
print(f' training: {train_count:,} ({train_count/total_split*100:.1f}%)')
print(f' validation: {val_count:,} ({val_count/total_split*100:.1f}%)')
print(f' test: {test_count:,} ({test_count/total_split*100:.1f}%)')
print(f' total: {total_split:,}')

print(f'\nCVs (from colleague splits):')
print(f' training: {train_cv_count:,} ({train_cv_count/total_cvs*100:.1f}%)')
print(f' validation: {val_cv_count:,} ({val_cv_count/total_cvs*100:.1f}%)')
print(f' test: {test_cv_count:,} ({test_cv_count/total_cvs*100:.1f}%)')
print(f' total: {total_cvs:,}')

print(f'\nvalidation checks:')
print(f' CV splits no overlap: {no_overlap}')
if has_isco:
    print(f' job splits stratified: True')

print(f'\nratios (jobs:CVs):')
print(f' training: {train_count}:{train_cv_count} = 1:{train_cv_count//max(1,train_count//train_cv_count if train_cv_count else 1)}')
print(f' validation: {val_count}:{val_cv_count}')
print(f' test: {test_count}:{test_cv_count}')

print(f'\nall splits ready for training')


jobs (stratified by ISCO):
 training: 132,170 (80.1%)
 validation: 16,378 (9.9%)
 test: 16,428 (10.0%)
 total: 164,976

CVs (from colleague splits):
 training: 5,839 (80.0%)
 validation: 730 (10.0%)
 test: 730 (10.0%)
 total: 7,299

validation checks:
 CV splits no overlap: True
 job splits stratified: True

ratios (jobs:CVs):
 training: 132170:5839 = 1:265
 validation: 16378:730
 test: 16428:730

all splits ready for training


In [16]:
print(f'output directory: {output_dir}/')
print(f'\njob splits (stratified by ISCO):')
print(f' train_jobs.parquet: {train_count:,} jobs (80%)')
print(f' val_jobs.parquet: {val_count:,} jobs (10%)')
print(f' test_jobs.parquet: {test_count:,} jobs (10%)')
print(f'\nCV splits (from colleague):')
print(f' train_cvs.parquet: {train_cv_count:,} CVs (80%)')
print(f' val_cvs.parquet: {val_cv_count:,} CVs (10%)')
print(f' test_cvs.parquet: {test_cv_count:,} CVs (10%)')
print(f'\nall splits created with Spark')

output directory: /home/developer/project/output/splits/

job splits (stratified by ISCO):
 train_jobs.parquet: 132,170 jobs (80%)
 val_jobs.parquet: 16,378 jobs (10%)
 test_jobs.parquet: 16,428 jobs (10%)

CV splits (from colleague):
 train_cvs.parquet: 5,839 CVs (80%)
 val_cvs.parquet: 730 CVs (10%)
 test_cvs.parquet: 730 CVs (10%)

all splits created with Spark


In [17]:
spark.stop()
print('spark stopped')

spark stopped
