In [8]:
# ==========================================
# PHASE 1: HEAVY NLP EXTRACTION (RUN ONCE)
# ==========================================
import os
import sys
import gc
import datetime
import pandas as pd
import spacy
import plotly.express as px
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *

# --- CONFIGURATION (PHASE 1) ---
BASE_PATH = "/Users/saurabhkumar/Desktop/OECD_PYSPARK_LOCAL/data/parquet_OECD"
PARQUET_SOURCE = "/Users/saurabhkumar/Desktop/OECD_PYSPARK_LOCAL/data/parquet_OECD/part-00000-6f2787d8-9f9c-4b9b-9903-fc9d83e3d0c0-c000.snappy.parquet"
YEARS = [2020, 2021, 2022, 2023, 2024, 2025]
SAMPLE_FRACTION = 1.0  
FORCE_RECOMPUTE_NLP = True 

def get_nlp_path(year):
    return os.path.join(BASE_PATH, "processed_data", str(year), "noun_chunks")

# --- SPARK SETUP ---
spark = (SparkSession.builder 
    .appName("OECD_Phase1_NLP") 
    .config("spark.executor.memory", "13g") 
    .config("spark.driver.memory", "4g") 
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") 
    .getOrCreate())
spark.sparkContext.setLogLevel("ERROR")
print("✓ Phase 1 Spark Session Created.")

# --- NLP UDF ---
noun_schema = StructType([
    StructField("doc_date", StringType()), StructField("doc_JobID", StringType()),
    StructField("doc_BGTOcc", StringType()), StructField("noun_chunk", StringType()),
    StructField("sim_data", DoubleType())
])

def extract_noun_chunks_worker(iterator):
    try: nlp = spacy.load("en_core_web_lg", disable=["lemmatizer", "ner"])
    except: nlp = spacy.load("en_core_web_sm")
    target = nlp("data")
    
    for pdf in iterator:
        rows = []
        texts = pdf["full_text"].fillna("").astype(str).tolist()
        meta = list(zip(pdf["date"].astype(str), pdf["job_id"].astype(str), pdf["soc_2020"].astype(str)))
        
        for i, doc in enumerate(nlp.pipe(texts, batch_size=50)):
            for chunk in doc.noun_chunks:
                if chunk.has_vector:
                    cleaned = "".join(c for c in chunk.text if not c.isdigit()).strip()
                    if cleaned:
                        rows.append({
                            'doc_date': meta[i][0], 'doc_JobID': meta[i][1], 'doc_BGTOcc': meta[i][2],
                            'noun_chunk': cleaned.lower(), 'sim_data': float(chunk.similarity(target)),
                        })
        yield pd.DataFrame(rows) if rows else pd.DataFrame(columns=noun_schema.fieldNames())

# --- EXECUTION ---
for year in YEARS:
    print(f"\n--- EXTRACTING TEXT FOR {year} ---")
    out_path = get_nlp_path(year)
    
    try:
        if not FORCE_RECOMPUTE_NLP and spark.read.parquet(out_path).limit(1).count() > 0:
            print(f"[SKIP] Data already extracted for {year}.")
            continue
    except: pass

    df_raw = spark.read.parquet(PARQUET_SOURCE).withColumn("date", F.to_date("date")).filter(F.year("date") == year)
    if SAMPLE_FRACTION < 1.0: df_raw = df_raw.sample(False, SAMPLE_FRACTION, seed=42)
    if df_raw.rdd.isEmpty(): continue

    df_raw = df_raw.repartition(max(8, int(df_raw.count()/10000)))
    chunks = df_raw.mapInPandas(extract_noun_chunks_worker, schema=noun_schema)
    chunks.write.mode("overwrite").partitionBy("doc_date").parquet(out_path)
    
    print(f"[DONE] Extracted NLP features for {year}.")
    del df_raw, chunks; gc.collect()

✓ Phase 1 Spark Session Created.

--- EXTRACTING TEXT FOR 2020 ---


                                                                                

[DONE] Extracted NLP features for 2020.

--- EXTRACTING TEXT FOR 2021 ---


                                                                                

[DONE] Extracted NLP features for 2021.

--- EXTRACTING TEXT FOR 2022 ---


                                                                                

[DONE] Extracted NLP features for 2022.

--- EXTRACTING TEXT FOR 2023 ---


                                                                                

[DONE] Extracted NLP features for 2023.

--- EXTRACTING TEXT FOR 2024 ---


                                                                                

[DONE] Extracted NLP features for 2024.

--- EXTRACTING TEXT FOR 2025 ---


                                                                                

[DONE] Extracted NLP features for 2025.


In [9]:
# ==========================================
# CELL 2: PHASE 2 SETUP & CONFIGURATION 
# (Run this on a fresh server boot)
# ==========================================
import os
import pandas as pd
import plotly.express as px
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# --- 1. INITIALIZE SPARK (Needed after a server restart) ---
spark = (SparkSession.builder 
    .appName("OECD_Phase2_Analytics") 
    .config("spark.executor.memory", "8g") # Doesn't need as much memory as NLP!
    .config("spark.driver.memory", "4g") 
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") 
    .getOrCreate())
spark.sparkContext.setLogLevel("ERROR")
print("✓ Phase 2 Spark Session Ready.")

# --- 2. FILE PATHS ---
BASE_PATH = "/Users/saurabhkumar/Desktop/OECD_PYSPARK_LOCAL/data/parquet_OECD"
CENSUS_CSV = "/Users/saurabhkumar/Desktop/OECD_PYSPARK_LOCAL/data/Census.csv"
SUT_CSV = "/Users/saurabhkumar/Desktop/OECD_PYSPARK_LOCAL/data/SUT_TABLE.csv"
YEARS = [2020, 2021, 2022, 2023, 2024, 2025]

def get_nlp_path(year):
    # This tells Phase 2 where to find the data saved by Phase 1
    return os.path.join(BASE_PATH, "processed_data", str(year), "noun_chunks")

# --- 3. DYNAMIC CONFIGURATION (Tweak these freely!) ---
SIM_THRESHOLD = 0.50      
DATA_THRESHOLD = 3         

SUT_YEAR = 2023
ALPHA_LOW = 1.58
ALPHA_ECONOMY_AVG = 3.62
ALPHA_MAP = {
    "A": 3.62, "B-E": 6.45, "F": 6.64, "G-I": 2.95, "J": 2.97,
    "K": 3.91, "L": 3.62, "M-N": 2.79, "O-Q": 2.07, "R-T": 3.06, "U": 3.62
}

SOC_GROUPS = {
    "data_entry": ["4111","4112","4113","4114","4121","4131","4132","4150"],
    "database":   ["2423","2136"],
    "analytics":  ["2421","2424","2133","2135"]
}

print(f"✓ Phase 2 Config: Sim>{SIM_THRESHOLD}, Chunks>={DATA_THRESHOLD}")

# --- 4. RUN CLASSIFICATION ---
def run_classification_for_year(year):
    in_path = get_nlp_path(year)
    try: chunks = spark.read.parquet(in_path)
    except: 
        print(f"No NLP data found for {year}. Did Phase 1 finish?")
        return None
        
    features = chunks.filter(F.col("sim_data") >= SIM_THRESHOLD) \
        .groupBy("doc_date", "doc_JobID", "doc_BGTOcc").agg(F.count("*").alias("n_chunks_data")) \
        .withColumnRenamed("doc_BGTOcc", "soc_2020")

    features = features.withColumn("soc4", F.substring("soc_2020", 1, 4))
    is_intensive = (F.col("n_chunks_data") >= DATA_THRESHOLD)
    
    classified = features \
        .withColumn("data_entry", (F.col("soc4").isin(SOC_GROUPS["data_entry"]) & is_intensive).cast("int")) \
        .withColumn("database", (F.col("soc4").isin(SOC_GROUPS["database"]) & is_intensive).cast("int")) \
        .withColumn("data_analytics", (F.col("soc4").isin(SOC_GROUPS["analytics"]) & is_intensive).cast("int")) \
        .withColumn("any_data_intensive", is_intensive.cast("int"))

    occ_sum = classified.groupBy("soc4").agg(
        F.count("*").alias("total_jobs"),
        F.sum("data_entry").alias("data_entry_jobs"),
        F.sum("database").alias("database_jobs"),
        F.sum("data_analytics").alias("data_analytics_jobs"),
        F.sum("any_data_intensive").alias("any_data_intensive_jobs")
    ).withColumn("year", F.lit(year))
    
    occ_sum = occ_sum \
        .withColumn("total_data_share", 100 * F.col("any_data_intensive_jobs") / F.col("total_jobs")) \
        .withColumn("data_entry_share", 100 * F.col("data_entry_jobs") / F.col("total_jobs")) \
        .withColumn("database_share", 100 * F.col("database_jobs") / F.col("total_jobs")) \
        .withColumn("data_analytics_share", 100 * F.col("data_analytics_jobs") / F.col("total_jobs"))
        
    return occ_sum

print("Classifying Occupations from saved NLP data...")
occupation_summaries = {y: run_classification_for_year(y) for y in YEARS}
print("✓ Occupations Classified.")

✓ Phase 2 Spark Session Ready.
✓ Phase 2 Config: Sim>0.5, Chunks>=3
Classifying Occupations from saved NLP data...
✓ Occupations Classified.


In [10]:
# ==========================================
# CELL 3: SECTOR MAPPING
# ==========================================
print("Applying Census Weights...")

df_census = spark.read.option("header", True).csv(CENSUS_CSV)
desc_col = df_census.columns[0]
df_census = df_census.withColumn("soc4", F.regexp_extract(F.col(desc_col), r"^(\d{4})", 1))

sic_cols = [c for c in df_census.columns if c != desc_col and c != "soc4"]
stack_expr = f"stack({len(sic_cols)}, " + ", ".join([f"'{c}', `{c}`" for c in sic_cols]) + ") as (sic_col, count_raw)"
long_df = df_census.select("soc4", F.expr(stack_expr))

long_df = long_df.withColumn("sic2", F.regexp_extract("sic_col", r"^(\d{2})", 1).cast("int")) \
                 .withColumn("n", F.regexp_replace("count_raw", ",", "").cast("long")).filter(F.col("n") > 0)

long_df = long_df.withColumn("SIC_Code", F.expr("""
    CASE WHEN sic2 BETWEEN 1 AND 3 THEN 'A' WHEN sic2 BETWEEN 5 AND 39 THEN 'B-E' WHEN sic2 BETWEEN 41 AND 43 THEN 'F' WHEN sic2 BETWEEN 45 AND 56 THEN 'G-I' WHEN sic2 BETWEEN 58 AND 63 THEN 'J' WHEN sic2 BETWEEN 64 AND 66 THEN 'K' WHEN sic2 = 68 THEN 'L' WHEN sic2 BETWEEN 69 AND 82 THEN 'M-N' WHEN sic2 BETWEEN 84 AND 88 THEN 'O-Q' WHEN sic2 BETWEEN 90 AND 98 THEN 'R-T' WHEN sic2 = 99 THEN 'U' ELSE NULL END
""")).filter(F.col("SIC_Code").isNotNull())

totals = long_df.groupBy("soc4").agg(F.sum("n").alias("total_soc"))
weights_df = long_df.groupBy("soc4", "SIC_Code").agg(F.sum("n").alias("n_sic")) \
                 .join(totals, "soc4").withColumn("w_soc4_SIC", F.col("n_sic") / F.col("total_soc")).cache()

sector_summaries = []
for year, occ_df in occupation_summaries.items():
    if occ_df is None: continue
    
    joined = occ_df.join(weights_df, "soc4", "left").fillna(0, subset=["w_soc4_SIC"])
    
    weighted = joined.select("SIC_Code",
        (F.col("total_jobs") * F.col("w_soc4_SIC")).alias("w_total"),
        (F.col("data_entry_jobs") * F.col("w_soc4_SIC")).alias("w_entry"),
        (F.col("database_jobs") * F.col("w_soc4_SIC")).alias("w_db"),
        (F.col("data_analytics_jobs") * F.col("w_soc4_SIC")).alias("w_ana"),
        (F.col("any_data_intensive_jobs") * F.col("w_soc4_SIC")).alias("w_any")
    )
    
    sec_sum = weighted.groupBy("SIC_Code").agg(
        F.sum("w_total").alias("total_jobs"), F.sum("w_entry").alias("data_entry_jobs"),
        F.sum("w_db").alias("database_jobs"), F.sum("w_ana").alias("data_analytics_jobs"), 
        F.sum("w_any").alias("any_data_intensive_jobs")
    )
    
    sec_sum = sec_sum \
        .withColumn("total_data_share", F.when(F.col("total_jobs") > 0, 100 * F.col("any_data_intensive_jobs") / F.col("total_jobs")).otherwise(0.0)) \
        .withColumn("data_entry_share", F.when(F.col("total_jobs") > 0, 100 * F.col("data_entry_jobs") / F.col("total_jobs")).otherwise(0.0)) \
        .withColumn("database_share", F.when(F.col("total_jobs") > 0, 100 * F.col("database_jobs") / F.col("total_jobs")).otherwise(0.0)) \
        .withColumn("data_analytics_share", F.when(F.col("total_jobs") > 0, 100 * F.col("data_analytics_jobs") / F.col("total_jobs")).otherwise(0.0)) \
        .withColumn("year", F.lit(year))
        
    sector_summaries.append(sec_sum)

print("✓ Sectors Mapped.")

Applying Census Weights...
✓ Sectors Mapped.


In [11]:
# ==========================================
# CELL 4: ECONOMIC VALUATION
# ==========================================
print("Calculating Economic Valuation (Component Sum Method)...")

if not sector_summaries: raise ValueError("No sector data generated.")
full_sector_df = sector_summaries[0]
for d in sector_summaries[1:]: full_sector_df = full_sector_df.unionByName(d)

sut_df = spark.read.option("header", True).csv(SUT_CSV).filter(F.col("year") == SUT_YEAR) \
    .select(F.upper(F.trim("SIC_Code")).alias("SIC_Code"), F.col("GVA_basic_prices").cast("double"), F.col("COMP_EMP").cast("double"))

alpha_expr = F.create_map([F.lit(x) for i in ALPHA_MAP.items() for x in i])
valued = full_sector_df.withColumn("SIC_Code", F.upper(F.trim("SIC_Code"))) \
    .join(sut_df, "SIC_Code", "inner") \
    .withColumn("alpha_low", F.lit(ALPHA_LOW)) \
    .withColumn("alpha_sector", F.coalesce(alpha_expr[F.col("SIC_Code")], F.lit(ALPHA_ECONOMY_AVG)))

# THE FIX: Multiply specific categories by Alpha, leaving uncategorized out of the monetary valuation
valued = valued \
    .withColumn("inv_entry", F.col("alpha_sector") * F.col("COMP_EMP") * (F.col("data_entry_share")/100)) \
    .withColumn("inv_db",    F.col("alpha_sector") * F.col("COMP_EMP") * (F.col("database_share")/100)) \
    .withColumn("inv_ana",   F.col("alpha_sector") * F.col("COMP_EMP") * (F.col("data_analytics_share")/100)) \
    .withColumn("inv_low_tot", F.col("alpha_low") * F.col("COMP_EMP") * (F.col("total_data_share")/100))

valued = valued.withColumn("total_investment_sector", F.col("inv_entry") + F.col("inv_db") + F.col("inv_ana"))

valued = valued \
    .withColumn("inv_share_gva_sector", F.when(F.col("GVA_basic_prices")>0, (F.col("total_investment_sector")/F.col("GVA_basic_prices"))*100).otherwise(0.0)) \
    .withColumn("inv_share_gva_low", F.when(F.col("GVA_basic_prices")>0, (F.col("inv_low_tot")/F.col("GVA_basic_prices"))*100).otherwise(0.0))

valued.cache()
print("✓ Valuation Complete. Ready for Visualizations.")

Calculating Economic Valuation (Component Sum Method)...
✓ Valuation Complete. Ready for Visualizations.


In [12]:
# ==========================================
# CELL 5: DETAILED VISUALIZATIONS
# ==========================================
print("Generating Detailed Visualizations...")
pdf = valued.toPandas().sort_values(["year", "SIC_Code"])

# --- 1. Economy-Wide Trend (% of GVA) ---
econ = pdf.groupby("year")[["inv_low_tot", "total_investment_sector", "GVA_basic_prices"]].sum().reset_index()
econ["Low Scenario %"] = (econ["inv_low_tot"] / econ["GVA_basic_prices"]) * 100
econ["Sector Scenario %"] = (econ["total_investment_sector"] / econ["GVA_basic_prices"]) * 100
fig1 = px.line(econ, x="year", y=["Low Scenario %", "Sector Scenario %"], 
               title="1. Economy-wide Data Investment as % of GVA", markers=True)
fig1.show()

# --- 2. Absolute Levels (Total Currency Value) ---
fig2 = px.line(econ, x="year", y=["inv_low_tot", "total_investment_sector"], 
               title="2. Absolute Data Investment Value (Units of SUT values)", markers=True)
fig2.show()

# --- 3. Sector Trends (Sector Alpha Scenario) ---
fig3 = px.line(pdf, x="year", y="inv_share_gva_sector", color="SIC_Code", 
               title="3. Sector Data Investment % of GVA (Sector Specific Alphas)", markers=True)
fig3.show()

# --- 4. Top Data-Intensive Occupations (Stacked Facet Grid) ---
# Combine all occupation summaries into one Pandas DF
occ_frames = []
for y, df in occupation_summaries.items():
    if df is not None:
        occ_frames.append(df.toPandas())

if occ_frames:
    all_occ_df = pd.concat(occ_frames, ignore_index=True)
    top_n = 20
    
    # Get top occupations for each year based on total data share
    top_each_year = all_occ_df.sort_values(["year","total_data_share"], ascending=[True,False]).groupby("year").head(top_n)
    
    melted = top_each_year.melt(
        id_vars=["year","soc4"], 
        value_vars=["data_entry_share", "database_share", "data_analytics_share"],
        var_name="Category", value_name="Share"
    )
    
    # Map labels to be cleaner
    label_map = {"data_entry_share": "Data Entry", "database_share": "Database", "data_analytics_share": "Data Analytics"}
    melted["Category"] = melted["Category"].map(label_map)

    fig4 = px.bar(melted, x="soc4", y="Share", color="Category", facet_col="year", facet_col_wrap=2,
                  title=f"4. Top {top_n} Occupations by Data Intensity (Stacked Components)", 
                  height=900, barmode="stack")
    fig4.show()

Generating Detailed Visualizations...


                                                                                

In [13]:
# ==========================================
# CELL 6: DATA DUMP FOR VERIFICATION
# ==========================================
LOG_FILE = "pipeline_audit_log_FINAL.txt"

def dump_df(df, name, f):
    f.write(f"\n{'='*50}\nDATASET: {name}\n{'='*50}\n")
    if df is None:
        f.write("[MISSING OR EMPTY]\n")
        return
    f.write(f"Columns: {df.columns}\n")
    try:
        # Dump 30 rows to text
        f.write(df.limit(30).toPandas().to_string())
        f.write("\n")
    except Exception as e:
        f.write(f"Error dumping: {e}\n")

with open(LOG_FILE, "w") as f:
    f.write(f"OECD PIPELINE FINAL AUDIT - {datetime.datetime.now()}\n")
    
    dump_df(weights_df, "Census Weights", f)
    
    # Dump 2024 as the primary sample year
    sample_year = 2024
    f.write(f"\n\n>>> YEAR {sample_year} SNAPSHOTS <<<\n")
    
    if sample_year in occupation_summaries:
        dump_df(occupation_summaries[sample_year], f"Occupation Summary {sample_year}", f)
        
    # Find the corresponding sector summary
    # sector_summaries is a list, so we filter by year
    sec_2024 = None
    for sec in sector_summaries:
        if sec.filter(F.col("year") == sample_year).count() > 0:
            sec_2024 = sec.filter(F.col("year") == sample_year)
            break
    dump_df(sec_2024, f"Sector Summary {sample_year}", f)

    # Dump the final valuation table
    dump_df(valued, "Final Valuation Table (With Corrected Logic)", f)

print(f"Data successfully dumped to: {LOG_FILE}")
print("Please review the final valuation table in the log to confirm 'inv_share_gva_sector' is now accurate.")



Data successfully dumped to: pipeline_audit_log_FINAL.txt
Please review the final valuation table in the log to confirm 'inv_share_gva_sector' is now accurate.


                                                                                

In [14]:
!jupyter nbconvert --to script Test_1.ipynb --output OECD_PySpark_Local.py

[NbConvertApp] Converting notebook Test_1.ipynb to script
[NbConvertApp] Writing 16595 bytes to OECD_PySpark_Local.py.py
