### Internal Mobility

In this section, we have created a metric to evaluate internal mobility within companies.
Due to the large amount of data, we decided to calculate the score only for companies that currently have a LinkedIn user working in the user's dream position.

For example, if the user selected "Data Scientist" as their dream job, and the company Dagan & Dror has never hired a Data Scientist, then we would not calculate the metric for this company.

The metric for a company is calculated as follows:

Average number of users who changed their position within the company in the last two years
÷
Number of LinkedIn users currently working at this company


Assumptions:
1. Companies with fewer than 50 employees are not relevant to this metric.
2. The number of employees in a company is estimated based on the number of employees listed on LinkedIn. For some companies, this number may be incomplete.

In [0]:
from pyspark.sql.types import *
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
import sparknlp
from pyspark.sql.functions import regexp_extract, col, when, explode, collect_list, struct, map_from_entries, size, lower, regexp_replace, to_date, sum, avg, split, lit, expr
from pyspark.sql import Window

spark.catalog.clearCache()
spark = sparknlp.start()
profiles = spark.read.parquet('/dbfs/linkedin_people_train_data')
companies = spark.read.parquet('/dbfs/linkedin_train_data')



In [0]:
cols_to_drop = ["about", "avatar", "city", "followers", "following", "groups", "id", "languages", "people_also_viewed", "posts", "recommendations", "recommendations_count", "timestamp", "url", "certifications", "country_code", "education", "education_details", "courses"]
profiles = profiles.drop(*cols_to_drop)
profiles_exploded = profiles.withColumn("exploded_company_id", explode(col("experience.company_id"))).filter(col("exploded_company_id").isNotNull())
profiles_exploded.limit(10).display()

In [0]:
#dream_job = 'data scientist'
#dream_job = 'data engineer'
dream_job = 'hrbp'
#dream_job = 'data analyst'

companies_dream_job = profiles_exploded.filter(
    col("position").like("%" + dream_job + "%")
).select("current_company.name", "current_company.company_id")

# Use `companies_dream_job` to filter relevant profiles
relevant_profiles = profiles_exploded.join(
    companies_dream_job,
    profiles_exploded["exploded_company_id"] == companies_dream_job["company_id"],
    "inner"
).select(profiles_exploded["*"])  # Select all columns from `profiles_exploded`
relevant_profiles.count()


0

In [0]:
relevant_profiels = relevant_profiles.select("name", "experience")
filtered_profiles = relevant_profiels.withColumn("exploded_company_id", explode(col("experience.company_id")))

# Step 1: Explode the 'experience' array to get each experience entry as a separate row
exploded_df = profiles.withColumn("exploded_experience", explode(col("experience")))

# Step 2: Extract relevant fields (name, company, and positions)
extracted_df = exploded_df.select(
    col("name"),
    col("exploded_experience.company").alias("company"),
    explode(col("exploded_experience.positions")).alias("position_details")
)

# Step 3: Extract position title and duration (or tenure) for mapping
positions_with_start_date = extracted_df.select(
    col("name"),
    col("company"),
    col("position_details.title").alias("position_title"),
    col("position_details.start_date").alias("position_start_date")  # You can use another field if needed
)

relevant_companies = companies.select("name", "company_size", "employees_in_linkedin")

companies_size = relevant_companies.dropna(subset =["company_size", "employees_in_linkedin"])

# Define regex patterns to extract X (minimum value) and Y (maximum value)
min_pattern = r"(\d[\d,]*)"         # Matches the first number (X)
max_pattern = r"-(\d[\d,]*)"        # Matches the second number (Y) after '-'
plus_pattern = r"(\d[\d,]*)\+"      # Matches the number (X) before '+'

# Extract min_employees
companies_size = companies_size.withColumn(
    "min_employees",
    when(col("company_size").rlike(r"\+"),  # Check if the value has '+'
         regexp_extract(col("company_size"), plus_pattern, 1))
    .otherwise(regexp_extract(col("company_size"), min_pattern, 1))
    .cast("int")  # Convert to integer
)

# Extract max_employees
companies_size = companies_size.withColumn(
    "max_employees",
    when(col("company_size").rlike(r"\+"),  # If there's '+', max_employees is null
         None)
    .otherwise(regexp_extract(col("company_size"), max_pattern, 1).cast("int"))
)

# Remove commas from min_employees and max_employees
companies_size = companies_size.withColumn("min_employees", regexp_extract(col("min_employees"), r"(\d+)", 0).cast("int"))
companies_size = companies_size.withColumn("max_employees", regexp_extract(col("max_employees"), r"(\d+)", 0).cast("int"))

#keep only the rows where min_employees<=employees_in_linkedin<=max_employees
companies_size = companies_size.filter((companies_size["min_employees"] <= companies_size["employees_in_linkedin"]) & (companies_size["employees_in_linkedin"] <= companies_size["max_employees"]))

relevant_companies = companies_size.select("name","employees_in_linkedin")


# Step 2: Filter relevant companies
relevant_companies = relevant_companies.filter(
    (col("employees_in_linkedin").isNotNull()) & (col("employees_in_linkedin") > 50)
)

relevant_companies = relevant_companies.withColumnRenamed("name", "company_name")

print("There are", relevant_companies.count(), "companies with more than 50 employees in LinkedIn.")

positions_with_start_date = positions_with_start_date.join(
    relevant_companies,
    lower(positions_with_start_date["company"]) == lower(relevant_companies["company_name"]),
    "inner"
)

positions_with_start_date = positions_with_start_date.select("name", "company", "position_title", "position_start_date", "employees_in_linkedin")
positions_with_start_date.limit(10).display()



In [0]:


# Extract year based on the number of words in position_start_date
positions_with_start_year = positions_with_start_date.withColumn(
    "start_year",
    when(size(split(col("position_start_date"), " ")) == 1, col("position_start_date").cast("int"))  # Single word (Year)
    .when(size(split(col("position_start_date"), " ")) == 2, split(col("position_start_date"), " ")[1].cast("int"))  # Two words (Month Year)
)
positions_with_start_month_year = positions_with_start_year.withColumn(
    "start_month",
    when(size(split(col("position_start_date"), " ")) == 1, "Jan")  # Single word (Only Year) → Default to "Jan"
    .when(size(split(col("position_start_date"), " ")) == 2, split(col("position_start_date"), " ")[0])  # Two words → Extract Month
)

max_year = positions_with_start_month_year.agg({"start_year": "max"}).collect()[0]["max(start_year)"]
# Show results
positions_with_start_month_year.limit(10).display()
print("Max year:", max_year)


In [0]:


# Define mapping for month names to numeric values
month_mapping = {
    "Jan": "01", "Feb": "02", "Mar": "03", "Apr": "04", "May": "05", "Jun": "06",
    "Jul": "07", "Aug": "08", "Sep": "09", "Oct": "10", "Nov": "11", "Dec": "12"
}

# Convert start_month from text to numeric values
positions_with_start_month_year = positions_with_start_month_year.withColumn(
    "start_month_numeric",
    when(col("start_month") == "Jan", "01")
    .when(col("start_month") == "Feb", "02")
    .when(col("start_month") == "Mar", "03")
    .when(col("start_month") == "Apr", "04")
    .when(col("start_month") == "May", "05")
    .when(col("start_month") == "Jun", "06")
    .when(col("start_month") == "Jul", "07")
    .when(col("start_month") == "Aug", "08")
    .when(col("start_month") == "Sep", "09")
    .when(col("start_month") == "Oct", "10")
    .when(col("start_month") == "Nov", "11")
    .when(col("start_month") == "Dec", "12")
)

# Construct proper date using numeric month values
positions_with_start_date_final = positions_with_start_month_year.withColumn(
    "start_date_date",
    to_date(concat_ws("-", "start_year", "start_month_numeric", lit("01")), "yyyy-MM-dd")
)

# Show results
positions_with_start_date_final.limit(10).display()


In [0]:
# Define a window specification to sort positions by start date
windowSpec = Window.partitionBy("name", "company").orderBy("start_date_date")

# Group by name and company, collecting sorted positions
result_df = (
    positions_with_start_date_final.withColumn(
        "sorted_positions", struct(col("start_date_date"), col("position_title"))
    )
    .groupBy("name", "company")
    .agg(
        collect_list("sorted_positions").alias("positions"),
        collect_list("start_date_date").alias("start_dates")  # Collect all start dates in a list
    )
)


# Extract only position_title from the struct list
result_df = result_df.withColumn(
    "positions",
    col("positions").getField("position_title")
)


# Keep only results where the length of start_dates list is at least 2
result_df = result_df.where(size(col("start_dates")) >= 2)

# Ensure at least one of the dates is in max_year or max_year - 1
result_df = result_df.where(
    expr(f"exists(start_dates, x -> year(x) = {max_year} OR year(x) = {max_year - 1})")
)

# Show the final DataFrame
result_df.display()





In [0]:
# Define a window specification to sort positions by start date
windowSpec = Window.partitionBy("name", "company").orderBy("position_start_date")

# Group by name and company, collecting sorted positions
result_df = (
    positions_with_start_date.withColumn("sorted_positions", struct(col("position_start_date"), col("position_title")))
    .groupBy("name", "company")
    .agg(collect_list("sorted_positions").alias("positions"))
)

# Extract only position_title from the struct list
result_df = result_df.withColumn(
    "positions",
    col("positions").getField("position_title")
)

# Show the resulting DataFrame
result_df.limit(10).display()

In [0]:
result_df.count()

2584

In [0]:
result_df = result_df.select("company", "positions")
results_df = result_df.withColumn("num_positions_changes", size(col("positions"))-1)
results_df_grouped = results_df.select("company", "num_positions_changes")
results_df_grouped_sum = results_df_grouped.groupBy("company").agg(sum("num_positions_changes").alias("total_num_positions_changes"))

results_df_grouped_avg = results_df_grouped.groupBy("company").agg(avg("num_positions_changes").alias("avg_num_positions_changes"))

# Join the two DataFrames on the "company" column
results_df_grouped = results_df_grouped_sum.join(results_df_grouped_avg, on="company")
results_df_grouped = results_df_grouped.withColumnRenamed("avg_num_positions_changes", "average positions change")
display(results_df_grouped)


In [0]:
# Perform the join by comparing company names in lowercase
final_IM = results_df_grouped.join(
    relevant_companies,
    lower(relevant_companies["company_name"]) == lower(results_df_grouped["company"]),
    "inner"
)

final_IM = final_IM.select("company", "total_num_positions_changes", "employees_in_linkedin", "average positions change")
final_IM = final_IM.withColumn("percentage_change", col("total_num_positions_changes") / col("employees_in_linkedin") * 100)

# Show the result
final_IM.display()


In [0]:
companies_amazon = companies.filter((companies["name"]) == "Amazon")

companies_amazon.display()