**Imports**

In [0]:
from pyspark.sql.types import *
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, expr, when, array_contains, size, desc, asc, collect_set, explode, collect_list
from pyspark.sql.types import ArrayType, StringType


spark = SparkSession.builder.getOrCreate()

In [0]:
import sparknlp                                                                                                              
from sparknlp.base import DocumentAssembler
from sparknlp.annotator import Tokenizer, AlbertEmbeddings, BertEmbeddings, BertSentenceEmbeddings
from pyspark.sql.functions import concat_ws
from pyspark.ml import Pipeline


print(sparknlp.version())

**Load data**

In [0]:
profiles = spark.read.parquet('/linkedin/people')
companies = spark.read.parquet('/linkedin/companies')

**Info about the data**

In [0]:
profiles.count()

In [0]:
companies.count()

In [0]:
matched_profiles = profiles.join(companies, profiles["current_company:name"] == companies["name"], how="inner")
matched_profiles.count()

In [0]:
profiles.columns

In [0]:
companies.columns

**Run the cell below for current jobs DF:**

In [0]:
matched_profiles = profiles.join(companies, profiles["current_company:name"] == companies["name"], how="inner")
rel_profiles = matched_profiles.select(profiles["*"])
rel_profiles = rel_profiles.withColumnRenamed("current_company:name", "companies")
rel_profiles = rel_profiles.withColumnRenamed("сourses", "courses")
rel_profiles = rel_profiles.filter(size(rel_profiles['courses'])>0)
rel_profiles = rel_profiles.filter(size(rel_profiles['experience'])>0)
rel_profiles = rel_profiles.filter(size(rel_profiles['education'])>0)                      
chosen_rel_profiles = rel_profiles.select(["id", "education", "courses", "experience", "companies"]).limit(1000)
chosen_rel_profiles.dropna(subset=['courses'])
chosen_rel_profiles.dropna(subset=['education'])
chosen_rel_profiles.dropna(subset=['experience'])
chosen_rel_profiles = chosen_rel_profiles.dropDuplicates(["id"])
chosen_rel_profiles.printSchema()

**Run the cell below for pasts job DF:**

In [0]:
# Explode the 'experience' array and select only the 'company' field
companies_df = profiles.select(
    "id", 
    "current_company:name",
    explode("experience").alias("experience_exploded")
).select(
    "id",
    "current_company:name",
    col("experience_exploded.company").alias("company")
)
reshaped_df = companies_df.select(
    "id",
    expr("stack(2, 'current_company:name', `current_company:name`, 'company', company) as (CompanyType, companies)")
).select("id", "companies")

reshaped_df = reshaped_df.dropna()
reshaped_df = reshaped_df.dropDuplicates()
matched_profiles = reshaped_df.join(companies, reshaped_df["companies"] == companies["name"], how="inner")
reshaped_profiles = matched_profiles.select(reshaped_df["*"])
rel_profiles = reshaped_profiles.join(profiles, reshaped_profiles["id"] == profiles["id"], how="inner").drop(profiles["id"])
rel_profiles = rel_profiles.withColumnRenamed("сourses", "courses")
rel_profiles = rel_profiles.filter(size(rel_profiles['courses'])>0)
rel_profiles = rel_profiles.filter(size(rel_profiles['experience'])>0)
rel_profiles = rel_profiles.filter(size(rel_profiles['education'])>0)                      
chosen_rel_profiles = rel_profiles.select(["id", "companies", "education", "courses", "experience"])
chosen_rel_profiles.dropna(subset=['courses'])
chosen_rel_profiles.dropna(subset=['education'])
chosen_rel_profiles.dropna(subset=['experience'])
chosen_rel_profiles = chosen_rel_profiles.dropDuplicates(["id", "companies"])
chosen_rel_profiles.printSchema()

# **Profiles and companies augmentation:**

**This is for the profiles part:**

**process courses**

This code extracts and displays unique course titles from the courses array in chosen_rel_profiles

In [0]:
course_transform = chosen_rel_profiles.withColumn(
    "courses", 
    F.expr("transform(courses, x -> x.title)")
)

# Show the result
course_transform.select("courses").distinct().show(truncate=False)

**process education level**

This code extracts unique degree names from the education array in course_transform and displays them in education_degrees

In [0]:
deg_transform = course_transform.withColumn(
    "education_degrees",
    F.expr("transform(education, x -> x.degree)")
)

# Drop the original 'education' column if you only want the 'degree' field
deg_transform = deg_transform.drop("education")

# Show the updated DataFrame
deg_transform.select("education_degrees").distinct().show(truncate=False)

This code assigns a numeric feature, deg_feat, based on the highest education level found in education_degrees:

5 for Postdoc
4 for Ph.D.
3 for Master's
2 for Bachelor's
0 if there are no degrees or only "N/A" values
1 for all other cases
It then displays education_degrees with the deg_feat scores

In [0]:
deg_df = deg_transform.withColumn(
    "deg_feat",
    when(
        size(expr("filter(education_degrees, degree -> degree LIKE '%Postdoc%' OR degree LIKE '%postdoc%')")) > 0, 5
    ).when(
        size(expr("filter(education_degrees, degree -> degree LIKE '%PHD%' OR degree LIKE '%Ph.D.%')")) > 0, 4
    ).when(
        size(expr("filter(education_degrees, degree -> degree LIKE '%Masters%' OR degree LIKE '%masters%' OR degree LIKE '%MS%' OR degree LIKE '%MSC%' OR degree LIKE '%Master%' OR degree LIKE '%Master''s%' OR degree LIKE '%BEME%')")) > 0, 3
    ).when(
        size(expr("filter(education_degrees, degree -> degree LIKE '%bachelor%' OR degree LIKE '%bachelors%' OR degree LIKE '%bachlor%' OR degree LIKE '%BA%' OR degree LIKE '%BS%' OR degree LIKE '%BCa%' OR degree LIKE '%B.A.%')")) > 0, 2
    ).when(
        expr("education_degrees IS NULL OR size(filter(education_degrees, degree -> degree IS NOT NULL AND degree != 'N/A')) = 0"), 0
    ).otherwise(1)
)

deg_df.select("education_degrees", "deg_feat").show(truncate=False)

**process experience count**

This code extracts unique job titles per person, filters out nulls and “retired,” and counts each person’s unique titles (jobs worked at)

In [0]:
exploded_df = deg_df.withColumn("exp", explode(col("experience")))

# Step 2: Filter the DataFrame to keep only valid titles
filtered_df = exploded_df.filter(
    (col("exp.title").isNotNull()) & 
    (col("exp.title") != "retired")
)

# Step 3: Collect unique titles
unique_titles_df = filtered_df.groupBy("id").agg(
    collect_set("exp.title").alias("unique_titles")
)

# Step 4: Count the number of unique titles
counted_df = unique_titles_df.withColumn(
    "count_different_titles",
    size(col("unique_titles"))
)

# Show the updated DataFrame with the counts
counted_df.display()

**Embed courses data**

In [0]:
input_col = "courses"
concat_col = input_col + "_concat"
token_col = input_col + "_tokenized"
de_col = input_col + "_doc_assembled"
tok_col = input_col + "_tokenized"
embed_col = input_col + "_embeddings"
embed_str_col = embed_col + "_str"

# Concatenate the array of strings into a single string, separated by commas
course_transform = course_transform.withColumn(concat_col, concat_ws(" ; ", input_col))

# # Document Assembler
documentAssembler = DocumentAssembler() \
     .setInputCol(concat_col) \
     .setOutputCol(de_col)


# # Embeddings
embeddings = BertSentenceEmbeddings.pretrained("sent_small_bert_L2_128") \
     .setInputCols([de_col]) \
     .setOutputCol(embed_col)


# # Create the pipeline
pipeline = Pipeline(stages=[documentAssembler, embeddings])

# Fit and transform the pipeline
course_embeddings_df = pipeline.fit(course_transform).transform(course_transform)

# Convert embeddings to string
course_embeddings_df = course_embeddings_df.withColumn(embed_str_col, course_embeddings_df[embed_col].cast("string"))

This code joins the education info df and experience info df

In [0]:
fin_deg_df = deg_df.select("id", "deg_feat", "companies")
fin_counted_df = counted_df.select("id", "count_different_titles")
counts_df = fin_deg_df.join(fin_counted_df, "id")

In [0]:
course_embeddings_df = course_embeddings_df.select("id", "courses_embeddings")
pre_final_df = course_embeddings_df.join(counts_df, "id")

Get only the embedding from the embedding - clear what's unnecessary

In [0]:
pre_final_df = pre_final_df.withColumn("embedding", explode(col("courses_embeddings.embeddings")))
pre_final_df = pre_final_df.withColumn("emb_est_courses", pre_final_df["embedding"].cast("string"))
pre_final_df = pre_final_df[["id", "emb_est_courses", "deg_feat", "count_different_titles", "companies"]]
pre_final_df = pre_final_df.dropDuplicates()

**Final process:**


This code creates augmented_people_profiles by joining people data in pre_final_df with company data from companies on company names. It selects key columns (renaming id to people_id), removes nulls, and displays the enriched profiles with people and some company details

In [0]:
comps = companies.select("name", "id", "industries","company_id", "organization_type")

augmented_people_profiles = pre_final_df.join(comps, pre_final_df["companies"] == comps["name"], how="inner")

augmented_people_profiles = augmented_people_profiles.select( 
    pre_final_df["id"].alias("people_id"),  
    "emb_est_courses",
    "deg_feat", 
    "count_different_titles",
    "companies",
    "company_id", 
    "industries", 
    "organization_type"  
)
augmented_people_profiles = augmented_people_profiles.dropna()
augmented_people_profiles.display()

In [0]:
augmented_people_profiles.count()

**Work on companies' industries data:**

**Embed industries data**

In [0]:
ind = companies[['industries']].distinct()


input_col2 = "industries"
concat_col2 = input_col2 + "_concat"
de_col2 = input_col2 + "_doc_assembled"
token_col2 = input_col2 + "_tokenized"
embed_col2 = input_col2 + "_embeddings"
embed_str_col2 = embed_col2 + "_str"

## Document Assembler
documentAssembler2 = DocumentAssembler() \
    .setInputCol(input_col2) \
    .setOutputCol(de_col2)

# Embeddings
embeddings2 = BertSentenceEmbeddings.pretrained("sent_small_bert_L2_128") \
    .setInputCols([de_col2]) \
    .setOutputCol(embed_col2)

# Create the pipeline
pipeline2 = Pipeline(stages=[documentAssembler2, embeddings2])

# Fit and transform the pipeline
industry_embeddings_df = pipeline2.fit(ind).transform(ind)

# Convert embeddings to string
industry_embeddings_df = industry_embeddings_df.withColumn(embed_str_col2, industry_embeddings_df[embed_col2].cast("string"))
industry_embeddings_df.printSchema()

Get only the embedding from the embedding - clear what's unnecessary

In [0]:
industry_embeddings_df = industry_embeddings_df.select("industries", "industries_embeddings")
industry_embeddings_df = industry_embeddings_df.withColumn("embedding", explode(col("industries_embeddings.embeddings")))
industry_embeddings_df = industry_embeddings_df.withColumn("emb_est_industries", industry_embeddings_df["embedding"].cast("string"))
industry_embeddings_df = industry_embeddings_df[["industries", "emb_est_industries"]]
industry_embeddings_df = industry_embeddings_df.dropDuplicates()
industry_embeddings_df = industry_embeddings_df.dropna()
industry_embeddings_df.display()

In [0]:
industry_embeddings_df.count()

Note:
1) We display the DataFrames to be saved as CSVs by selecting "Download all rows (up to 5GB compressed)."
2) To optimize file size, we separate downloads into two DataFrames: one for profile data and one for industry embeddings, reducing the load on the profiles DataFrame.

Another Note:

If you run the industries embeddings code first, some cells may not rerun correctly and could cause errors. We recommend running the profiles augmentation code before the company-industries augmentation to avoid this issue.