# Multi‑Year Data Analysis for Data‑Intensive Jobs

This notebook provides a clean and executable version of the job‑posting pipeline originally found in `Testing_OECD_PRE_RUN.ipynb`. It allows you to analyse job advertisements to identify data‑intensive roles across occupations and time. Input data can be supplied either as year‑partitioned CSV files under the `csv_data` folder or as a single Parquet file containing all records. All processing is contained within the `OECD_DATA` root to ensure safe file operations.

## 1. Spark and Utility Setup

We configure a single Spark session for the entire pipeline and attach a `safe_write_parquet` helper to the session. This helper performs atomic writes and optional backups to prevent accidental data loss.

In [None]:

from pyspark.sql import SparkSession, Window
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
import pyspark.sql.functions as F
import os
import datetime
from tqdm.auto import tqdm

# Spark configuration
auto_config = (
    SparkSession.builder
    .appName('BGT_Data_Processing_Clean')
    .config('spark.executor.memory', '13g')
    .config('spark.driver.memory', '3g')
    .config('spark.executor.cores', '4')
    .config('spark.driver.cores', '4')
    .config('spark.sql.adaptive.enabled', 'true')
    .config('spark.sql.adaptive.coalescePartitions.enabled', 'true')
    .config('spark.sql.adaptive.skewJoin.enabled', 'true')
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')
    .config('spark.sql.execution.arrow.pyspark.enabled', 'true')
    .config('spark.sql.parquet.compression.codec', 'snappy')
)

spark = auto_config.getOrCreate()
spark.sparkContext.setLogLevel('ERROR')

# Safe write function
def safe_write_parquet(df, output_path, root_path, create_backup=True):
    '''Safely write a DataFrame to parquet with multiple safety checks.'''
    norm_output = os.path.normpath(output_path)
    norm_root = os.path.normpath(root_path)
    if not norm_output.startswith(norm_root):
        raise ValueError(f'Output path {output_path} is outside project root {root_path}')
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    if create_backup and os.path.exists(output_path):
        timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
        backup_path = f'{output_path}_backup_{timestamp}'
        try:
            spark.read.parquet(output_path).write.mode('overwrite').parquet(backup_path)
            print(f'Created backup at: {backup_path}')
        except Exception as e:
            print(f'Warning: could not create backup: {e}')
    temp_path = f"{output_path}_temp_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}"
    try:
        df.write.mode('overwrite').option('compression','snappy').parquet(temp_path)
        spark.read.parquet(temp_path).count()
        if os.path.exists(output_path):
            os.rename(output_path, f'{output_path}_old')
        os.rename(temp_path, output_path)
        if os.path.exists(f'{output_path}_old'):
            os.remove(f'{output_path}_old')
        print(f'Successfully wrote data to: {output_path}')
    except Exception:
        if os.path.exists(temp_path):
            os.remove(temp_path)
        raise

# Attach helper
setattr(SparkSession, 'safe_write_parquet', safe_write_parquet)

print('Spark configured. Safe write helper registered.')


## 2. Data Path Configuration

Configure file locations.  Set `YEARS` to specify which year(s) to process.  Set `PARQUET_PATH` to the name of a single Parquet file (within `OECD_DATA`) containing all records if you wish to load from Parquet instead of CSV. If `PARQUET_PATH` is `None`, the notebook reads year‑specific CSV files from `csv_data`. All paths are validated to ensure they remain within the `OECD_DATA` root.

In [None]:

from datetime import datetime

# Root directory; do not specify a path outside of OECD_DATA
BASE_S3A_PATH = 's3a://em_sources/_SDD/OECD_DATA'

# Year selection: 'ALL', a single year, a range '2020-2023' or a list [2020,2022]
YEARS = '2025'

# Optional: specify a Parquet file within the OECD_DATA root to bypass CSV loading
# Example: PARQUET_PATH = os.path.join(BASE_S3A_PATH, 'abc.parquet')
PARQUET_PATH = None
# Optional: specify a CSV folder containing all partitions directly under OECD_DATA.
# If provided, this path will be used for all years instead of constructing year-specific paths under
# the default `csv_data/<year>` folder.
CSV_PATH = None

# ------------------------------------------------------------------------------
# Helper functions for year parsing and path construction
# ------------------------------------------------------------------------------

def get_available_years():
    return list(range(2020, 2026))


def parse_year_input(year_input):
    available_years = get_available_years()
    if isinstance(year_input, str):
        y = year_input.upper()
        if y == 'ALL':
            years = available_years
        elif '-' in y:
            start, end = map(int, y.split('-'))
            years = list(range(start, end + 1))
        else:
            years = [int(y)]
    elif isinstance(year_input, (list, tuple)):
        years = [int(x) for x in year_input]
    else:
        years = [int(year_input)]
    invalid = [y for y in years if y not in available_years]
    if invalid:
        raise ValueError(f'Years {invalid} not available. Available years: {available_years}')
    return sorted(years)

# Parse year input
years_to_process = parse_year_input(YEARS)
print(f'Years to process: {years_to_process}')

# Validate path does not go above OECD_DATA

def validate_oecd_path(path: str) -> bool:
    if not path.startswith(BASE_S3A_PATH):
        raise ValueError(f'Path {path} is outside OECD_DATA folder!')
    return True

# Build paths for each year

def get_paths_for_year(year):
    base_year_path = os.path.join(BASE_S3A_PATH, 'processed_data', str(year))
    return {
        'input_csv': CSV_PATH if CSV_PATH else os.path.join(BASE_S3A_PATH, 'csv_data', str(year)),
        'noun_chunks': os.path.join(base_year_path, 'noun_chunks'),
        'reduced_data': os.path.join(base_year_path, 'reduced_data'),
        'job_categories': os.path.join(base_year_path, 'job_categories'),
        'occupation_summary': os.path.join(base_year_path, 'occupation_summary'),
    }

ALL_PATHS = {year: get_paths_for_year(year) for year in years_to_process}

# Validate all constructed paths
for year, paths in ALL_PATHS.items():
    for key, path in paths.items():
        validate_oecd_path(path)
print('Path validation complete.')


## 3. Data Loading

Load job advertisements either from a Parquet file (`PARQUET_PATH`) or from year‑partitioned CSV files.  When reading from Parquet, the entire dataset is loaded once and filtered per year.  Otherwise, each year's CSV files are read separately. A progress bar tracks progress across years.

In [None]:

from pyspark.sql import DataFrame

# Dictionary to hold DataFrames per year
all_data = {}
total_jobs = 0

if PARQUET_PATH:
    # Validate Parquet path within OECD_DATA
    validate_oecd_path(PARQUET_PATH)
    print(f'Loading unified Parquet dataset from {PARQUET_PATH}...')
    full_df = spark.read.parquet(PARQUET_PATH)
    full_df = full_df.withColumn('date', F.to_date('date'))
    for yr in tqdm(years_to_process, desc='Reading data (Parquet)'):
        year_df = full_df.filter(F.year('date') == yr)
        count = year_df.count()
        total_jobs += count
        all_data[yr] = year_df
        print(f'Year {yr}: {count:,} records')
elif CSV_PATH:
    # Validate unified CSV path within OECD_DATA
    validate_oecd_path(CSV_PATH)
    print(f'Loading unified CSV dataset from {CSV_PATH}...')
    # Read all CSV partitions as a single DataFrame
    full_df = (
        spark.read
        .option('header', True)
        .option('multiline', True)
        .csv(CSV_PATH)
    )
    # Select and clean relevant columns
    full_df = (
        full_df.select(
            'date',
            'job_id',
            'soc_2020',
            'job_title',
            F.col('full_text').cast('string'),
        )
        .filter(F.col('full_text').isNotNull() & (F.length('full_text') > 0))
    )
    full_df = full_df.withColumn('date', F.to_date('date', 'yyyy-MM-dd'))
    # Filter per year from unified CSV
    for yr in tqdm(years_to_process, desc='Reading data (CSV unified)'):
        year_df = full_df.filter(F.year('date') == yr)
        count = year_df.count()
        total_jobs += count
        all_data[yr] = year_df
        print(f'Year {yr}: {count:,} records')
else:
    # Read year-specific CSV data from partitioned folders
    for yr in tqdm(years_to_process, desc='Reading data (CSV)'):
        print(f'Loading CSV data for year {yr}...')
        df = (
            spark.read
            .option('header', True)
            .option('multiline', True)
            .csv(ALL_PATHS[yr]['input_csv'])
        )
        df = (
            df.select(
                'date',
                'job_id',
                'soc_2020',
                'job_title',
                F.col('full_text').cast('string'),
            )
            .filter(F.col('full_text').isNotNull() & (F.length('full_text') > 0))
        )
        df = df.withColumn('date', F.to_date('date', 'yyyy-MM-dd'))
        cnt = df.count()
        total_jobs += cnt
        all_data[yr] = df
        print(f'	Rows read: {cnt:,}')
print(f'Total jobs loaded: {total_jobs:,}')


## 4. NLP Processing – Noun Chunk Extraction

Extract noun chunks from job descriptions using spaCy and compute their similarity to the word `data`. The extraction runs per year with a progress bar.  Results are partitioned by date and written to disk. Similarity scores below a threshold are removed in the next stage.

In [None]:

# Define schema for noun chunk extraction
noun_schema = StructType([
    StructField('doc_date', StringType()),
    StructField('doc_JobID', StringType()),
    StructField('doc_BGTOcc', StringType()),
    StructField('noun_chunk', StringType()),
    StructField('sim_data', DoubleType()),
])

# Pandas UDF

def extract_noun_chunks(iterator):
    import spacy
    import pandas as pd
    try:
        nlp = spacy.load('en_core_web_lg', exclude=['lemmatizer','ner'])
    except OSError:
        nlp = spacy.load('en_core_web_sm')
    target = nlp('data')
    for pdf in iterator:
        rows = []
        texts = pdf['full_text'].fillna('').astype(str).tolist()
        jobids = pdf['job_id'].astype(str).tolist()
        dates = pdf['date'].astype(str).tolist()
        socs = pdf['soc_2020'].astype(str).tolist()
        for i, doc in enumerate(nlp.pipe(texts, batch_size=50, n_process=1)):
            for chunk in doc.noun_chunks:
                if chunk.has_vector:
                    sim = float(chunk.similarity(target))
                    cleaned = ''.join(c for c in chunk.text if not c.isdigit()).strip()
                    if cleaned:
                        rows.append({
                            'doc_date': dates[i],
                            'doc_JobID': jobids[i],
                            'doc_BGTOcc': socs[i],
                            'noun_chunk': cleaned.lower(),
                            'sim_data': sim,
                        })
        if rows:
            yield pd.DataFrame(rows)
        else:
            yield pd.DataFrame(columns=['doc_date','doc_JobID','doc_BGTOcc','noun_chunk','sim_data'])

SIM_THRESHOLD = 0.45

for yr in tqdm(years_to_process, desc='Extracting noun chunks'):
    print(f'
Processing noun chunks for {yr}...')
    df = all_data[yr]
    estimated_row_size = 1000
    total_rows = df.count()
    target_partition_size = 128 * 1024 * 1024
    num_partitions = max(8, int((total_rows * estimated_row_size) / target_partition_size))
    df_part = df.repartition(num_partitions)
    nc_df = df_part.mapInPandas(extract_noun_chunks, schema=noun_schema)
    nc_df.cache()
    out_path = ALL_PATHS[yr]['noun_chunks']
    nc_df.write.mode('overwrite').partitionBy('doc_date').parquet(out_path)
    total_chunks = nc_df.count()
    uniq_jobs = nc_df.select('doc_JobID').distinct().count()
    print(f'	Saved noun chunks to {out_path} (chunks: {total_chunks:,}, jobs: {uniq_jobs:,})')
    nc_df.unpersist()


## 5. Data Reduction

Filter noun chunks by similarity, aggregate duplicate chunks and compute average similarity and counts. Results are partitioned by occupation code for efficient downstream processing.

In [None]:

for yr in tqdm(years_to_process, desc='Reducing noun chunks'):
    print(f'
Reducing data for {yr}...')
    nc_path = ALL_PATHS[yr]['noun_chunks']
    noun_chunks = spark.read.parquet(nc_path)
    cleaned = noun_chunks.withColumn('counter', F.lit(1))
    filtered = cleaned.filter(F.col('sim_data') >= SIM_THRESHOLD)
    reduced = filtered.groupBy('noun_chunk','doc_JobID','doc_BGTOcc').agg(
        F.avg('sim_data').alias('avg_sim'),
        F.sum('counter').alias('count'),
    )
    red_path = ALL_PATHS[yr]['reduced_data']
    reduced.write.mode('overwrite').partitionBy('doc_BGTOcc').parquet(red_path)
    tot = reduced.count()
    uniq = reduced.select('noun_chunk').distinct().count()
    print(f'	Saved reduced data to {red_path} (records: {tot:,}, unique chunks: {uniq:,})')


## 6. Category Analysis and Occupation Summary

Classify jobs into high‑level categories based on keyword lists.  Jobs with at least `DATA_THRESHOLD` mentions of category‑specific terms are marked accordingly.  A progress bar tracks classification across years.

In [None]:

DATA_THRESHOLD = 3

data_entry_terms = ['data entry','data-input','manual data','typing data','keying data']
database_terms = ['database','sql','mysql','postgres','oracle','db2','nosql']
data_analytics_terms = ['analytics','analysis','statistical','machine learning','data science','insights']

for yr in tqdm(years_to_process, desc='Classifying jobs'):
    print(f'
Classifying jobs for {yr}...')
    red_path = ALL_PATHS[yr]['reduced_data']
    reduced = spark.read.parquet(red_path)
    def assign_category(term: str):
        t = term.lower()
        if t in data_entry_terms:
            return 'data_entry'
        elif t in database_terms:
            return 'database'
        elif t in data_analytics_terms:
            return 'data_analytics'
        else:
            return None
    from pyspark.sql.functions import udf
    from pyspark.sql.types import StringType
    category_udf = udf(assign_category, StringType())
    categorized = reduced.withColumn('category', category_udf('noun_chunk'))
    job_cat_counts = (
        categorized.filter(F.col('category').isNotNull())
        .groupBy('doc_JobID','doc_BGTOcc','category')
        .agg(F.sum('count').alias('mentions'))
    )
    job_categories = job_cat_counts.withColumn(
        'is_data_entry', (F.col('category')=='data_entry') & (F.col('mentions')>=DATA_THRESHOLD)
    ).withColumn(
        'is_database', (F.col('category')=='database') & (F.col('mentions')>=DATA_THRESHOLD)
    ).withColumn(
        'is_data_analytics', (F.col('category')=='data_analytics') & (F.col('mentions')>=DATA_THRESHOLD)
    )
    job_categories = job_categories.groupBy('doc_JobID','doc_BGTOcc').agg(
        F.max('is_data_entry').alias('data_entry'),
        F.max('is_database').alias('database'),
        F.max('is_data_analytics').alias('data_analytics'),
    )
    jc_path = ALL_PATHS[yr]['job_categories']
    job_categories.write.mode('overwrite').parquet(jc_path)
    occ_summary = job_categories.groupBy('doc_BGTOcc').agg(
        F.count('*').alias('total_jobs'),
        F.sum(F.col('data_entry').cast('int')).alias('data_entry_jobs'),
        F.sum(F.col('database').cast('int')).alias('database_jobs'),
        F.sum(F.col('data_analytics').cast('int')).alias('data_analytics_jobs'),
    )
    occ_summary = occ_summary.withColumn('data_entry_share', F.col('data_entry_jobs')/F.col('total_jobs')*100)
    occ_summary = occ_summary.withColumn('database_share', F.col('database_jobs')/F.col('total_jobs')*100)
    occ_summary = occ_summary.withColumn('data_analytics_share', F.col('data_analytics_jobs')/F.col('total_jobs')*100)
    os_path = ALL_PATHS[yr]['occupation_summary']
    occ_summary.write.mode('overwrite').parquet(os_path)
    print(f'	Saved job categories to {jc_path} and occupation summary to {os_path}')


## 7. Visualisations

The following plots help interpret the results of the data‑intensive job analysis for the United Kingdom.  You may rerun this cell after processing different years to update the charts.  The figures show the share of jobs that are data‑intensive by occupation and the distribution of data‑related categories across all processed jobs.

In [None]:

import plotly.express as px
import pandas as pd

# Load occupation summaries for all years into a single pandas DataFrame
occupation_dfs = []
for yr in years_to_process:
    os_path = ALL_PATHS[yr]['occupation_summary']
    spark_df = spark.read.parquet(os_path)
    occupation_dfs.append(spark_df.toPandas())
occupation_df = pd.concat(occupation_dfs, ignore_index=True)

# Plot share of data-intensive jobs by occupation code (data_entry + database + data_analytics share)
occupation_df['total_data_share'] = occupation_df[['data_entry_share','database_share','data_analytics_share']].sum(axis=1)
fig_occ = px.bar(
    occupation_df.sort_values('total_data_share', ascending=False),
    x='doc_BGTOcc',
    y='total_data_share',
    color='total_data_share',
    title='Data‑intensive jobs share by occupation (UK)',
    labels={'doc_BGTOcc': 'Occupation Code','total_data_share': 'Share of data‑intensive jobs (%)'},
    height=450
)
fig_occ.update_layout(showlegend=False)
fig_occ.show()

# Load job categories for all years and explode flags to count category prevalence
job_cat_dfs = []
for yr in years_to_process:
    jc_path = ALL_PATHS[yr]['job_categories']
    spark_df = spark.read.parquet(jc_path)
    job_cat_dfs.append(spark_df.toPandas())
job_cat_df = pd.concat(job_cat_dfs, ignore_index=True)

# Count number of jobs per category
counts = {
    'data_entry': job_cat_df['data_entry'].sum(),
    'database': job_cat_df['database'].sum(),
    'data_analytics': job_cat_df['data_analytics'].sum(),
}

fig_cat = px.pie(
    names=list(counts.keys()),
    values=list(counts.values()),
    title='Distribution of data‑related categories across jobs',
    height=450
)
fig_cat.update_traces(textposition='inside', textinfo='percent+label')
fig_cat.show()


## Visualising data intensity results

The following charts summarise the distribution of data-intensive work across occupations for the UK dataset.  The first chart plots the top 20 occupations (by Standard Occupational Classification code) sorted by the **share of jobs that are data‑intensive**, with the contributions of the three categories (data‑entry, database, and data‑analytics) stacked to show which activities drive data intensity.  The second chart shows how the overall pool of data‑intensive jobs is distributed across those three categories.  These visualisations are inspired by Figures 6 and 8 in the OECD report【429441829467423†L1340-L1364】; they help interpret the aggregate metrics by illustrating which occupations and activities dominate the data‑intensive landscape.


In [None]:
# Plot top occupations with stacked category shares (similar to OECD Figure 8)
top_n = 20
top_occ_df = occupation_df.sort_values('total_data_share', ascending=False).head(top_n).reset_index(drop=True)

# Melt into long format for stacked bar plot
top_melt = top_occ_df.melt(
    id_vars=['doc_BGTOcc'],
    value_vars=['data_entry_share','database_share','data_analytics_share'],
    var_name='category',
    value_name='share'
)
category_labels = {
    'data_entry_share': 'Data entry',
    'database_share': 'Database',
    'data_analytics_share': 'Data analytics'
}

fig_top = px.bar(
    top_melt,
    x='doc_BGTOcc',
    y='share',
    color=top_melt['category'].map(category_labels),
    title=f'Top {top_n} occupations by data intensity (UK)',
    labels={'doc_BGTOcc':'Occupation code','share':'Share of data‑intensive jobs (%)','category':'Category'},
    height=450
)
fig_top.update_layout(barmode='stack')
fig_top.show()


## 7. Final Notes

This rewritten notebook honours the original methodology while fixing undefined variables and syntax errors. It adds support for loading data from either partitioned CSV files or a single Parquet file and introduces progress bars via `tqdm` so you can monitor long‑running operations. All paths remain under the `OECD_DATA` root. Feel free to customise the keyword lists and thresholds to suit your analysis.