### Please notice the following before running the notebook:
1. Import National_Ocupational.xlsx into your workspace, under your user
2. Run every cell until "Creating tables for the Demo" and then skip to "Pipeline"
3. Continue to run cells as writen


### Read data into PySpark Dataframe.

In [0]:
from pyspark.sql.types import *
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import pandas as pd

pd.set_option('display.max_columns', None)
pd.set_option('display.width', 1000)

spark = SparkSession.builder.getOrCreate()

In [0]:
companies = spark.read.parquet('/dbfs/linkedin_train_data')
profiles=spark.read.parquet('/dbfs/linkedin_people_train_data')

### Preprocessing
The table **profiles_with_positions_exploded_full**__ contain in each row the following info about some preson: his current role, his education and one job from his expereince alogside the company and duration of that job.

In [0]:
from pyspark.sql.functions import explode, col, to_date, datediff, lit, when

profiles_with_experience_exploded = profiles.select("*",
    F.explode(F.col("experience")).alias("experience_entry")
).withColumn(
    "experience_duration_short", F.col("experience_entry.duration_short"))


# positions array exploded
profiles_with_positions_exploded = profiles_with_experience_exploded.withColumn(
    "position_entry", 
    F.explode(F.col("experience_entry.positions"))).withColumn(
    "company_subtitle", F.col("position_entry.subtitle")).withColumn(
    "company_title", F.col("position_entry.title")).withColumn(
    "company", F.col("experience_entry.company")).withColumn(
    "company_id", F.col("experience_entry.company_id")).withColumn(
    "start_date", F.col("position_entry.start_date")).withColumn(
    "duration_short", F.col("position_entry.duration_short")).withColumn(
    "end_date", F.col("position_entry.end_date")).select(
        col("id"),
        col('name'),
        col("current_company.company_id").alias('current_company_id'),
        col("current_company.name").alias('current_company_name'),
        col('company'),
        col('company_id'),
        col('company_title').alias('job_title'),
        col('start_date'),
        col('end_date'),
        col('duration_short'))
    
# experience array exploded 
profiles_with_experience_exploded = profiles_with_experience_exploded.filter(
    (F.col("experience_entry.positions").isNull()) | (F.size(F.col("experience_entry.positions")) == 0)
).withColumnRenamed("experience_entry", "position_entry").withColumnRenamed("experience_duration_short", "duration_short").withColumn(
    "company_subtitle", F.col("position_entry.subtitle")).withColumn(
    "company_title", F.col("position_entry.title")).withColumn(
    "company", F.col("position_entry.company")).withColumn(
    "company_id", F.col("position_entry.company_id")).withColumn(
    "start_date", F.col("position_entry.start_date")).withColumn(
    "end_date", F.col("position_entry.end_date")
    ).select(
        col("id"),
        col('name'),
        col("current_company.company_id").alias('current_company_id'),
        col("current_company.name").alias('current_company_name'),
        col('company'),
        col('company_id'),
        col('company_title').alias('job_title'),
        col('start_date'),
        col('end_date'),
        col('duration_short'),
        )
    
profiles_with_positions_exploded_full = profiles_with_positions_exploded.union(
    profiles_with_experience_exploded)
""".withColumn(
    "start_date_normalized",
    when(
        col("start_date").rlike("^[A-Za-z]+ \\d{4}$"),  # For month year format
        to_date(col("start_date"), "MMM yyyy")
    ).otherwise(
        to_date(col("start_date"), "yyyy")  # For year only format
    ))"""



'.withColumn(\n    "start_date_normalized",\n    when(\n        col("start_date").rlike("^[A-Za-z]+ \\d{4}$"),  # For month year format\n        to_date(col("start_date"), "MMM yyyy")\n    ).otherwise(\n        to_date(col("start_date"), "yyyy")  # For year only format\n    ))'

In [0]:
from pyspark.sql.functions import col, explode, to_date, when, lit, first
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Explode the education column and add a rank
education_exploded = profiles.select(
    col("id"),
    col("name"),
    F.explode(F.col("education")).alias("education_entry")
).withColumn(
    "degree", col("education_entry.degree")
).withColumn(
    "education_title", col("education_entry.title")
).withColumn(
    "field_of_study", col("education_entry.field")
).withColumn(
    "edu_rank",
    F.row_number().over(Window.partitionBy("id").orderBy(lit(1)))  # Assign rank to entries
).filter(
    col("edu_rank") == 1  # Only keep the first entry
).select("id", "degree", 'field_of_study', "education_title")


# Join the education data
profiles_with_positions_exploded_full = profiles_with_positions_exploded_full.join(
    education_exploded,
    on="id",
    how="left"  # Use a left join to keep all profiles
)

# Filter out rows where the company column is null
profiles_with_positions_exploded_full = profiles_with_positions_exploded_full.filter(col("company").isNotNull())

# Select final columns
profiles_with_positions_exploded_full = profiles_with_positions_exploded_full.select(
    col("id"),
    col("name"),
    #col("current_company_id"),
    col("current_company_name"),
    col("company"),
    #col("company_id"),
    col("job_title"),
    col("start_date"),
    col("end_date"),
    col("duration_short"),
    col("degree"),  
    col("education_title"),
    col("field_of_study")
)


In [0]:
profiles_with_positions_exploded_full.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- current_company_name: string (nullable = true)
 |-- company: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- end_date: string (nullable = true)
 |-- duration_short: string (nullable = true)
 |-- degree: string (nullable = true)
 |-- education_title: string (nullable = true)
 |-- field_of_study: string (nullable = true)



In [0]:
# Count the occurrences of each job title
job_title_counts = profiles_with_positions_exploded_full.groupBy("job_title").count()

# Filter titles where count > 3
job_title_counts_filtered = job_title_counts.filter(job_title_counts['count'] > 3)

# Convert the filtered job titles to Pandas DataFrame
profiles_jobs = job_title_counts_filtered.select("job_title").toPandas().dropna(subset=['job_title'])

profiles_jobs = profiles_jobs['job_title'].tolist()

Web Scrapping of jobs titles from **Planit**

In [0]:
# Install necessary dependencies
!pip install requests beautifulsoup4 pandas openpyxl

# Import required libraries
import requests
from bs4 import BeautifulSoup
import pandas as pd
import string
import os

# Base URL for the job profiles
base_url = "https://www.planitplus.net/JobProfiles?letter="

# List to store all job roles
all_job_roles = []

# Loop through each letter (A-Z)
for letter in string.ascii_uppercase:
    # Construct the URL for the current letter
    url = base_url + letter

    # Send an HTTP GET request to the page
    response = requests.get(url)

    # Check if the request was successful
    if response.status_code == 200:
        # Parse the HTML content using BeautifulSoup
        soup = BeautifulSoup(response.content, "html.parser")

        # Find all `a` tags
        a_tags = soup.find_all("a")

        # Filter and clean the job roles
        for tag in a_tags:
            text = tag.text.strip()
            if " - " in text:  # Split by ' - ' and take only the first part
                job = text.split(" - ")[0].strip()
                all_job_roles.append(job)
    else:
        print(f"Failed to fetch the page for letter '{letter}'. Status code: {response.status_code}")

# Remove duplicates
unique_job_roles = sorted(set(all_job_roles))


[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
# Specify the path in DBFS (Databricks File System)
dbfs_path = '/dbfs/tmp/All_job_roles_scrapped.csv'

# Save the unique job roles to a CSV file
df = pd.DataFrame(unique_job_roles, columns=["Job Role"])
df.to_csv(dbfs_path, index=False)

Standardization of roles according to the scrapped data from planit

In [0]:
import pandas as pd
import os

# Use dbutils to fetch the current Databricks username
try:
    username = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().get("user").get()
except:
    # Fallback to "Yoni Email" if dbutils is not available
    username = "jonathanh@campus.technion.ac.il"

# Construct the file path dynamically
#file_path = f"/Workspace/Users/{username}/All_job_roles_scrapped.xlsx"
title_list = pd.read_csv(dbfs_path)['Job Role'].tolist()

In [0]:
file_path = f"/Workspace/Users/{username}/National_Occupational.xlsx"
# Read the Excel file
national_jobs = pd.read_excel(file_path)['Title']
# Filter out titles that do NOT end with 'Occupations'
national_jobs = [title for title in national_jobs if not title.endswith("Occupations")]

In [0]:
pip install -U sentence-transformers


[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
import numpy as np
from sentence_transformers import SentenceTransformer
import pandas as pd
from tqdm import tqdm

# Load the sentence embedding model
model = SentenceTransformer('all-MiniLM-L6-v2')

# Generate embeddings for the main title list, profiles_jobs, and national_jobs
title_list_embeddings = model.encode(title_list)
profiles_jobs_embeddings = model.encode(profiles_jobs)
national_jobs_embeddings = model.encode(national_jobs)

# Normalize embeddings to unit vectors (important for cosine similarity)
title_list_embeddings = title_list_embeddings / np.linalg.norm(title_list_embeddings, axis=1, keepdims=True)
profiles_jobs_embeddings = profiles_jobs_embeddings / np.linalg.norm(profiles_jobs_embeddings, axis=1, keepdims=True)
national_jobs_embeddings = national_jobs_embeddings / np.linalg.norm(national_jobs_embeddings, axis=1, keepdims=True)

# Compute cosine similarities between profiles_jobs and title_list embeddings
# Use matrix multiplication for cosine similarity
cos_sim_profiles = np.dot(profiles_jobs_embeddings, title_list_embeddings.T)

# Compute cosine similarities between national_jobs and title_list embeddings
cos_sim_national = np.dot(national_jobs_embeddings, title_list_embeddings.T)

# Find the closest title for each profiles_job and national_job
# Get the index of the closest match for each title
closest_profiles_idx = np.argmax(cos_sim_profiles, axis=1)
closest_national_idx = np.argmax(cos_sim_national, axis=1)

# Map the titles to their closest matches
profiles_to_closest = {profiles_jobs[i]: title_list[closest_profiles_idx[i]] for i in range(len(profiles_jobs))}
national_to_closest = {national_jobs[i]: title_list[closest_national_idx[i]] for i in range(len(national_jobs))}

# Create DataFrames to hold the mappings
profiles_mapping_df = pd.DataFrame(list(profiles_to_closest.items()), columns=["profiles_job", "mapped_title"])
national_mapping_df = pd.DataFrame(list(national_to_closest.items()), columns=["national_job", "mapped_title"])

# Combine both DataFrames
mapping_df = pd.merge(profiles_mapping_df, national_mapping_df, left_on="mapped_title", right_on="mapped_title", how="outer")

# Save the mapping DataFrame to a Parquet file
#mapping_df.to_parquet("/Workspace/Users/anna.zavin@campus.technion.ac.il/job_title_mapping.parquet", engine='pyarrow', compression='snappy')
mapping_df.to_parquet(f"/Workspace/Users/{username}/job_title_mapping.parquet", engine='pyarrow', compression='snappy')

# If you want to ensure the file is saved successfully, you can print the path
print("Mapping saved to 'job_title_mapping.parquet'")



`resume_download` is deprecated and will be removed in version 1.0.0. Downloads always resume when possible. If you want to force a new download, use `force_download=True`.



Mapping saved to 'job_title_mapping.parquet'


In [0]:
import numpy as np
from sentence_transformers import SentenceTransformer
import pandas as pd
from tqdm import tqdm

# Load the sentence embedding model
model = SentenceTransformer('all-MiniLM-L6-v2')


`resume_download` is deprecated and will be removed in version 1.0.0. Downloads always resume when possible. If you want to force a new download, use `force_download=True`.



In [0]:
# Generate embeddings for the main title list, profiles_jobs, and national_jobs
title_list_embeddings = model.encode(title_list)
profiles_jobs_embeddings = model.encode(profiles_jobs)
national_jobs_embeddings = model.encode(national_jobs)

# Debugging: Output sizes of embeddings
print(f"Title list embeddings shape: {title_list_embeddings.shape}")
print(f"Profiles jobs embeddings shape: {profiles_jobs_embeddings.shape}")
print(f"National jobs embeddings shape: {national_jobs_embeddings.shape}")

# Normalize embeddings to unit vectors (important for cosine similarity)
title_list_embeddings = title_list_embeddings / np.linalg.norm(title_list_embeddings, axis=1, keepdims=True)
profiles_jobs_embeddings = profiles_jobs_embeddings / np.linalg.norm(profiles_jobs_embeddings, axis=1, keepdims=True)
national_jobs_embeddings = national_jobs_embeddings / np.linalg.norm(national_jobs_embeddings, axis=1, keepdims=True)


# Compute cosine similarities between profiles_jobs and title_list embeddings
cos_sim_profiles = np.dot(profiles_jobs_embeddings, title_list_embeddings.T)

# Compute cosine similarities between national_jobs and title_list embeddings
cos_sim_national = np.dot(national_jobs_embeddings, title_list_embeddings.T)

# Debugging: Output the shape of the cosine similarity matrices
print(f"Cosine similarity matrix for profiles jobs shape: {cos_sim_profiles.shape}")
print(f"Cosine similarity matrix for national jobs shape: {cos_sim_national.shape}")

# Find the closest title for each profiles_job and national_job
closest_profiles_idx = np.argmax(cos_sim_profiles, axis=1)
closest_national_idx = np.argmax(cos_sim_national, axis=1)

# Debugging: Output the length of closest indices
print(f"Closest profiles indices length: {len(closest_profiles_idx)}")
print(f"Closest national indices length: {len(closest_national_idx)}")

# Map the titles to their closest matches and store the cosine similarity values
profiles_to_closest = {
    profiles_jobs[i]: (title_list[closest_profiles_idx[i]], cos_sim_profiles[i, closest_profiles_idx[i]]) 
    for i in range(len(profiles_jobs))
}

national_to_closest = {
    national_jobs[i]: (title_list[closest_national_idx[i]], cos_sim_national[i, closest_national_idx[i]]) 
    for i in range(len(national_jobs))
}


# Create DataFrames to hold the mappings with cosine similarity
profiles_mapping_df = pd.DataFrame(
    [(job, title, sim) for job, (title, sim) in profiles_to_closest.items()],
    columns=["profiles_job", "mapped_title", "cosine_similarity"]
)

national_mapping_df = pd.DataFrame(
    [(job, title, sim) for job, (title, sim) in national_to_closest.items()],
    columns=["national_job", "mapped_title", "cosine_similarity"]
)

national_mapping_df = national_mapping_df.loc[
    national_mapping_df.groupby('mapped_title')['cosine_similarity'].idxmax()
]

# Final merge for a strict 1:1:1 mapping based on the highest similarity match
final_mapping_df = pd.merge(profiles_mapping_df, national_mapping_df, on="mapped_title", how="outer")

# Debugging: Output the shape of the final merged DataFrame
print(f"Final mapping DataFrame shape: {final_mapping_df.shape}")

# Save the mapping DataFrame to a Parquet file
final_mapping_df.to_parquet(f"/Workspace/Users/{username}/job_title_mapping.parquet", engine='pyarrow', compression='snappy')

# Print the path to confirm the file was saved
print("Mapping saved to 'job_title_mapping.parquet'")


Title list embeddings shape: (634, 384)
Profiles jobs embeddings shape: (25477, 384)
National jobs embeddings shape: (1073, 384)
Cosine similarity matrix for profiles jobs shape: (25477, 634)
Cosine similarity matrix for national jobs shape: (1073, 634)
Closest profiles indices length: 25477
Closest national indices length: 1073
Final mapping DataFrame shape: (25489, 5)
Mapping saved to 'job_title_mapping.parquet'


In [0]:
mapping_spark_df = spark.createDataFrame(final_mapping_df)
new_data = profiles_with_positions_exploded_full.join(
    mapping_spark_df, 
    profiles_with_positions_exploded_full.job_title == mapping_spark_df.profiles_job,
    how="inner"
)

# Step 4: Reorder the columns to place mapped_title next to job_title
new_data = new_data.select(
    col("id"),
    col("name"),
    col("current_company_name"),
    col("company"),
    col("job_title"),
    col("mapped_title"), 
    col('national_job'),
    col("start_date"),
    col("end_date"),
    col("duration_short"),
    col("degree"),
    col("education_title"),
    col("field_of_study")
)


In [0]:
mapping_inner_df = pd.merge(profiles_mapping_df, national_mapping_df, left_on="mapped_title", right_on="mapped_title", how="inner")

### Creating tables for the Demo - No need to run this section
Extract only main jobs from profiles_with_positions_exploded_full due to computational limitations in streamlit. The full use of Big Data continues in this notebook.

In [0]:
from pyspark.sql import Window
from pyspark.sql.functions import row_number, col, to_date, lead, lag, count

# Filter out rows where 'company' is null
demo_profiles_with_positions_exploded_full = profiles_with_positions_exploded_full.filter(col("company").isNotNull())

# Convert start_date to a proper date type
demo_profiles_with_positions_exploded_full = demo_profiles_with_positions_exploded_full.withColumn(
    "start_date_format", to_date(col("start_date"), "MMM yyyy")
)

# Calculate the frequency of each job title
job_title_counts = (
    demo_profiles_with_positions_exploded_full.groupBy("job_title")
    .agg(count("job_title").alias("job_count"))
    .filter((col("job_count") >= 500) & (col("job_count") < 2000))  # Filter for job_count between 500 and 2000
)

# Collect the frequent job titles
frequent_jobs = [row["job_title"] for row in job_title_counts.collect()]

# Filter for people who have worked in one of the frequent jobs
relevant_ids = (
    demo_profiles_with_positions_exploded_full.filter(col('job_title').isin(frequent_jobs))
    .select("id")  # Select only the user IDs
    .distinct()    # Get distinct IDs
)

# Keep all jobs for those individuals
filtered_df = demo_profiles_with_positions_exploded_full.join(relevant_ids, on="id", how="inner")

# Define the window and add the job_index column
window_spec = Window.partitionBy("id").orderBy(col("start_date_format").asc())
final_result = filtered_df.withColumn("job_index", row_number().over(window_spec))

# Add the "next_role" column based on the job index using the lead function
final_result = final_result.withColumn("next_role", lead("job_title", 1).over(window_spec))

# Add the "previous_role" column based on the job index using the lag function
final_result = final_result.withColumn("previous_role", lag("job_title", 1).over(window_spec))

# Filter by the frequent job titles
filtered_by_jobs = final_result.filter(col('job_title').isin(frequent_jobs))


# Pipeline
In the following code, we will present the results of our analyses.<br> All of these have been integrated into our real interface, as can be seen in the demo. Before each section, we will briefly state what we analyzed.

### Creating temporary interface in Data Bricks
The goal is to use Streamlit (or some other interface), where we built the demo. Here you can use the selection boxes made by Databricks, which mimic the behaviour of the final app.

Creating scrolling lists for selecting a role and a company directly from DataBricks in the upper left corner. <br>First choose a role and then a company (You will only be shown those that have the role you selected).

In [0]:
# Get unique job titles
role_options = new_data.select("mapped_title").distinct()

# Collect roles into a list of strings
role_options = [row['mapped_title'] for row in role_options.collect()]

role_options.sort()

The next cell will automatically rerun for each new selection of role or company (take around 30 sec). 
<br>If not, run the cell again to see the updated options.

In [0]:
from pyspark.sql.functions import col
from pyspark.sql import functions as F

# Create a combobox for role selection
dbutils.widgets.combobox("role", "", role_options, "Select a Role")

# Get the selected role from the widget
selected_role = dbutils.widgets.get("role")

filtered_df = new_data.select("job_title", "company")
filtered_companies = filtered_df.filter(col("mapped_title") == selected_role).select("company").distinct()

# Select distinct companies and limit to 1023
frequent_companies_df = (
    filtered_df
    .filter(col("job_title") == selected_role)
    .select("company")
    .distinct()
    .limit(1023)  # Combobox limitation
)

# Collect the company names as a Python list
frequent_companies = [row["company"] for row in frequent_companies_df.collect()]

# Remove empty strings
frequent_companies = [name for name in frequent_companies if name is not None and name.strip() != "" and name.strip() != "-"]

# Sort the results
frequent_companies = sorted(frequent_companies)

# Company combox
# Add combobox widgets with auto-completion
dbutils.widgets.combobox("company", "", frequent_companies, "Select a Company")

# Retrieve selected widget values
selected_company = dbutils.widgets.get("company")


In [0]:
selected_role , selected_company


('Data Analyst', 'Amazon')

#### Notice  
You have to choose a role and a company before running the following cells.
<br>If you changed at least one of them - run the following cells again to see updated analyses.

###Statistical analyses
The number of employees in the selected company, followed by the number of employees in the company in the selected position.

In [0]:
from pyspark.sql import functions as F

# Replace with the chosen company name
chosen_company_name = selected_company  # Example: "Google"

# Count the number of employees in the chosen company
employees_count = new_data.filter(
    F.col("current_company_name") == chosen_company_name
).select("id").distinct().count()  # Count unique profiles (by 'id')

# Display results
print(f"{chosen_company_name} has {employees_count} employees in the collected data.")

Amazon has 1286 employees in the collected data.


In [0]:
from pyspark.sql import functions as F

# Replace with the chosen company name and role
chosen_company_name = selected_company  # Example: "Google"
chosen_role = selected_role  # Example: "Senior System Software Engineer"

# Count employees with the chosen company name and role
employees_with_role_count = new_data.filter(
    (F.col("current_company_name") == chosen_company_name) & (F.col("job_title") == chosen_role)
).count()

# Display results
print(f"The collected data has {employees_with_role_count} {chosen_role}s at {chosen_company_name}.")


The collected data has 16 Data Analysts at Amazon.


**Academic background analysis of the selected role**

In [0]:
from pyspark.sql.functions import explode, col, when

# Explode the experience array and positions array
selected_profiles = new_data.filter(col("job_title")==selected_role)

# Remove rows with null degree
filtered_education = selected_profiles.filter(col("degree").isNotNull())

# Standardize the degree column
standardized_education = filtered_education.withColumn(
    "degree",
    when(col("degree").rlike("(?i)bachelor|ba|bs"), "Bachelor")  # Merge all Bachelor's variants
    .when(col("degree").rlike("(?i)master|ma|ms"), "Master")     # Merge all Master's variants
    .when(col("degree").rlike("(?i)high school"), "High School Diploma")  # High School
    .when(col("degree").rlike("(?i)doctor|phd|md|dvm|jd|dr"), "Doctor")  # Doctoral degrees
    .when(col("degree").rlike("(?i)associate|aa|as"), "Associate Degree")   # Associate degrees
    .when(col("degree").rlike("(?i)diploma|certificate"), "Certificate/Diploma")  # Diplomas
    .when(col("degree").rlike("(?i)college"), "Unspecified Degree")  # College but no specific degree
)
# Group by the standardized degree and field, then count occurrences
grouped_standardized_education = standardized_education.groupBy(
    col("degree"), col("field_of_study")
).count().filter(col('count')>5).orderBy(col("count").desc())


In [0]:
import pandas as pd
import plotly.express as px
top_education_pd = grouped_standardized_education.toPandas()
# Normalize count to percentage
total_count = top_education_pd['count'].sum()
top_education_pd['percentage'] = (top_education_pd['count'] / total_count) * 100

# Identify the top N fields for each degree and aggregate the rest as "Others"
def process_top_fields(group, top_n=4):
    # Sort by count in descending order
    group = group.sort_values(by='count', ascending=False)
    
    # Get the top N fields
    top_fields = group.head(top_n)  # Top N fields
    
    # Get the remaining fields (those after the top N)
    others = group.iloc[top_n:]  # Remaining fields after top N
    
    # Add field_label for top N fields
    top_fields['field_label'] = top_fields.apply(
        lambda x: f"{x['field_of_study']} ({x['percentage']:.1f}%)", axis=1
    )
    
    # If there are other fields, aggregate them into "Others"
    if not others.empty:
        # Aggregate the "Others" group per degree
        others_row = pd.DataFrame({
            'degree': [group['degree'].iloc[0]],
            'field_of_study': ['Others'],
            'count': [others['count'].sum()],
            'percentage': [others['percentage'].sum()],
            'field_label': [f"Others ({others['percentage'].sum():.1f}%)"]
        })
        
        # Concatenate top fields and "Others"
        return pd.concat([top_fields, others_row], ignore_index=True)
    else:
        return top_fields


# Apply to group by degree
top_education_pd = (
    top_education_pd.groupby('degree', group_keys=False)
    .apply(lambda group: process_top_fields(group, top_n=4))  # Change top_n to 4
    .reset_index(drop=True)
)

# Define the custom order for the degrees
degree_order = ['Bachelor', 'Master', 'Doctor']

# Define a categorical colormap
categorical_colors = px.colors.qualitative.Prism  # Choose a Plotly categorical color scheme
unique_fields = top_education_pd['field_of_study'].unique()
field_color_map = dict(zip(unique_fields, categorical_colors[:len(unique_fields)]))  # Map colors to fields

# Create the Plotly bar chart
fig = px.bar(
    top_education_pd,
    x='degree',  # X-axis: Degrees
    y='percentage',  # Y-axis: Percentage
    color='field_of_study',  # Grouped by field_of_study
    title='Top Degrees and Fields by Percentage (Stacked)',
    labels={'percentage': 'Percentage (%)', 'degree': 'Degree', 'field_of_study': 'Field of Study'},
    color_discrete_map=field_color_map,  # Apply the categorical color map
    text='field_label',  # Add percentage labels inside the bars
    category_orders={'degree': degree_order}  # Manual order for x-axis labels
)

# Customize the layout
fig.update_layout(
    barmode='stack',  # Stacked bars
    title=dict(
        text=f"What did people who work as {selected_role} study?",
        font=dict(size=14, color="black")
    ),
    font=dict(size=14),  # General font settings
    legend=dict(
        title="Field of Study",
        font=dict(size=12),
        bgcolor="rgba(255,255,255,0.8)",  # Transparent legend background
        bordercolor="rgba(0,0,0,0.1)",
        borderwidth=1
    ),
    plot_bgcolor='white',  # White background
    xaxis=dict(
        title="Degree",
        titlefont=dict(size=14),
        tickfont=dict(size=12),
        showgrid=False,  # Disable vertical gridlines
        zeroline=False  # Disable x-axis baseline
    ),
    yaxis=dict(
        title="Percentage (%)",
        titlefont=dict(size=14),
        tickfont=dict(size=12),
        gridcolor="rgba(200,200,200,0.5)"  # Subtle horizontal gridlines
    )
)



# Improve text label appearance
fig.update_traces(
    texttemplate='%{text}',  # Show field label with percentage
    textposition='inside',  # Position text inside bars
    insidetextanchor='middle',
    insidetextfont=dict(size=12, color="white"),  # White text inside bars
    textfont=dict(size=12, color="black")  # Black text for outside labels
)

# Display the chart
fig.show()


**Analysis of the distribution of the durations in the selected role**

In [0]:
from pyspark.sql.functions import explode, col, to_date, datediff, lit, when
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.functions import when, col, sum as _sum

# Step 1: Explode the experience array

# Step 3: Filter for the selected role in position title
profiles_with_role = new_data.filter(
    F.col("job_title").contains(selected_role)
)

# Define a function to categorize durations
def categorize_duration(duration_in_years):
    if duration_in_years is None or duration_in_years < 1:
        return "less than 1 year"
    elif 1 <= duration_in_years < 2:
        return "1-2 years"
    elif 2 <= duration_in_years < 5:
        return "2-5 years"
    elif 5 <= duration_in_years < 10:
        return "5-10 years"
    elif 10 <= duration_in_years < 20:
        return "10-20 years"
    else:
        return "more than 20 years"

categorize_duration_udf = udf(categorize_duration, StringType())

# Convert duration_short to years (replace with your logic to parse correctly)
profiles_with_role = profiles_with_role.withColumn(
    "duration_in_years",
    when(col("duration_short").rlike(r"\d+ year"), col("duration_short").substr(1, 2).cast("int"))
    .otherwise(0)
)

# Aggregate durations per user per title
aggregated_profiles = profiles_with_role.groupBy("id").agg(
    _sum("duration_in_years").alias("total_duration_in_years")
)

# Apply categorization to the aggregated durations
aggregated_profiles_with_bins = aggregated_profiles.withColumn(
    "duration_bin",
    categorize_duration_udf(col("total_duration_in_years"))
)

# Group by bins and count
histogram_data = aggregated_profiles_with_bins.groupBy("duration_bin").count()


# Convert to Pandas DataFrame for Plotly (to plot)
histogram_df = histogram_data.toPandas()

In [0]:
import plotly.express as px
import pandas as pd

# Define the bin order and categorical mapping
bin_order = [
    "less than 1 year",
    "1-2 years",
    "2-5 years",
    "5-10 years",
    "10-20 years",
    "more than 20 years",
]

# Make sure the "duration_bin" column is treated as a categorical type
histogram_df["duration_bin"] = pd.Categorical(histogram_df["duration_bin"], categories=bin_order, ordered=True)
histogram_df = histogram_df.sort_values("duration_bin")
#print(histogram_df)
# Create a Plotly bar chart

fig = px.bar(
    histogram_df,
    x='duration_bin',  # X-axis: Duration Bin
    y='count',  # Y-axis: Count
    title=f"How long do people typically stay in a {selected_role} role?",  # Title of the chart
    labels={"duration_bin": "Duration Bin", "count": "Count"},  # Axis labels
    color='duration_bin',  # Color bars by duration bin
    color_discrete_sequence=px.colors.sequential.Burg,
    )

# Customize the layout
fig.update_layout(
    title=dict(
        font=dict(size=18, family="Arial, sans-serif", color="black")
    ),
    xaxis=dict(
        title="Duration Bin",
        title_font=dict(size=14),
        tickfont=dict(size=12),
    ),
    yaxis=dict(
        title="Count",
        title_font=dict(size=14),
        tickfont=dict(size=12),
    ),
    font=dict(
        family="Arial, sans-serif",
        size=14,
        color="black",
    ),
    plot_bgcolor="white",  # White background
    paper_bgcolor="white",  # White background for paper
    xaxis_tickangle=-45,  # Rotate x-axis labels for better readability
)

# Show the plot
fig.show()


####SCRAPPING
We have collected employment data with an emphasis on salary and present the data for the selected role.
<br>We will match the collected data to the sellected role

In [0]:
pip install openpyxl


[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
import pandas as pd

# Read the Excel file

national_df = pd.read_excel(f"/Workspace/Users/{username}/National_Occupational.xlsx")

# Display the DataFrame
national_df.head()


Unnamed: 0,Occupation_code,Title,Occupation_title_click_on_the_occupation_title_to_view_its_profile_URL,Level,Employment,Employment_RSE,Employment_per_1_000_jobs,Median_hourly_wage,Mean_hourly_wage,Annual_mean_wage,Mean_wage_RSE
0,00-0000,All Occupations,,total,151853870,0.0%,1000.0,$23.11,$31.48,"$65,470",0.3%
1,11-0000,Management Occupations,https://www.bls.gov/oes/current/oes110000.htm,major,10495770,0.4%,69.118,$56.19,$66.23,"$137,750",0.5%
2,11-1000,Top Executives,,minor,3751510,0.4%,24.705,$49.74,$65.43,"$136,100",0.9%
3,11-1011,Chief Executives,https://www.bls.gov/oes/current/oes111011.htm,detail,211230,1.4%,1.391,$99.37,$124.47,"$258,900",0.7%
4,11-1021,General and Operations Managers,https://www.bls.gov/oes/current/oes111021.htm,detail,3507810,0.4%,23.1,$48.69,$62.18,"$129,330",0.9%


In [0]:
# Find the closest match using fuzzywuzzy
closest_match = profiles_with_role.select('national_job').distinct().collect()[0][0]
print(f"Closest match to '{selected_role}': {closest_match}")


Closest match to 'Data Analyst': Computer and Information Analysts


In [0]:
# Define the mapping of old column names to new, readable ones
column_mapping = {
    "Title": "Title",
    "Employment": "Employment",
    "Employment_per_1_000_jobs": "Employment per 1,000 Jobs",
    "Median_hourly_wage": "Median Hourly Wage",
    "Mean_hourly_wage": "Mean Hourly Wage",
    "Annual_mean_wage": "Annual Mean Wage",
}

# Rename the columns for readability
filtered_row = national_df[national_df['Title'] == closest_match]
#print(filtered_row)
# Select only the desired columns
#filtered_row = filtered_row[list(column_mapping.values())]

# Format the row as text
formatted_text = "\n".join([f"{col}: {filtered_row.iloc[0][col]}" for col in filtered_row.columns])

# Print the formatted text
print("National Occupational Details from 2023:\n")
print(formatted_text)

National Occupational Details from 2023:

Occupation_code: 15-1210
Title: Computer and Information Analysts
Occupation_title_click_on_the_occupation_title_to_view_its_profile_URL: nan
Level: broad
Employment: 674,170
Employment_RSE: 0.6%
Employment_per_1_000_jobs: 4.44
Median_hourly_wage: $50.88
Mean_hourly_wage: $55.01
Annual_mean_wage: $114,420
Mean_wage_RSE: 0.4%


####Mentor

First we creat current and past employees tables

In [0]:
from pyspark.sql.functions import col

# Filter rows where company matches selected_company and job_title contains selected_role
filtered_profiles = new_data.filter(
    (col("job_title") == selected_role) & (col("company") == selected_company))

# Split into two DataFrames based on end_date
# 1. Current employees (end_date is 'Present' or null)
current_employees = filtered_profiles.filter(
    (col("current_company_name") == selected_company)               # Current company matches
)

# 2. Past employees (end_date is not 'Present' and not null)
past_employees = filtered_profiles.filter(
    (col("end_date").isNotNull()) & (col("end_date") != "Present")
)

Show Mentors - Experienced employee and recent former employee + relevant information

In [0]:
from pyspark.sql.functions import col

# Check if current_employees is not empty
if current_employees.count() > 0:
    # Sort current_employees by duration_short in descending order and take the top row
    longest_duration_employee = current_employees.orderBy(
        col("duration_short").desc()
    ).limit(1)

    # Collect the row and extract the name and duration
    longest_employee_data = longest_duration_employee.collect()[0]
    employee_name = longest_employee_data["name"]
    employee_duration = longest_employee_data["duration_short"]

    # Display the result
    print(f"Try reaching out to {employee_name}! experience of {employee_duration} in {selected_company} as {selected_role} can be interesting!")
else:
    print("No current employees found.")


Try reaching out to Curtis Owens! experience of 9 months in Amazon as Data Analyst can be interesting!


In [0]:
from pyspark.sql.functions import col, to_date, unix_timestamp, when

# Convert the end_date column to a proper date format
# Handle formats like "Jul 2018", "2007", and "Oct 2010"
past_employees = past_employees.withColumn(
    "parsed_end_date",
    when(
        col("end_date").rlike("^[A-Za-z]{3} \\d{4}$"),  # Check if format is "MMM yyyy"
        to_date(unix_timestamp(col("end_date"), "MMM yyyy").cast("timestamp"))
    ).when(
        col("end_date").rlike("^\\d{4}$"),  # Check if format is just a year (e.g., "2007")
        to_date(unix_timestamp(col("end_date"), "yyyy").cast("timestamp"))
    )
)

# Check if past_employees is not empty
if past_employees.count() > 0:
    # Sort past_employees by parsed_end_date in descending order and take the top row
    latest_past_employee = past_employees.orderBy(
        col("parsed_end_date").desc()
    ).limit(1)

    # Collect the row and extract the name and end_date
    latest_employee_data = latest_past_employee.collect()[0]
    employee_name = latest_employee_data["name"]
    employee_end_date = latest_employee_data["end_date"]

    # Display the friendly message
    print(f"The most recent past employee is {employee_name}, who left in {employee_end_date}. Might have some insights for you")
else:
    print("No past employees found.")


The most recent past employee is Kevin E. Nigg, who left in Apr 2023. Might have some insights for you


####Description of the selected Role using Gemini LM - notice you need gemini api key
In the demo there is also part of offering tips depending on the text a user enters

In [0]:
selected_role , selected_company

('Data Analyst', 'Amazon')

In [0]:
%pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
%pip install transformers
%pip install python-dotenv 


Looking in indexes: https://download.pytorch.org/whl/cu118
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


#### To run the next cell you need a GEMINI API KEY

In [0]:
# Use Gemini 1.5 for Job description

import requests
import json
GEMINI_API_KEY = "Enter your GEMINI API KEY here"
# API Endpoint
url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:generateContent?key={GEMINI_API_KEY}"

# Request headers
headers = {
    "Content-Type": "application/json",
}

# Request body with your prompt
data = {
    "contents": [
        {
            "parts": [
                {"text": f"Describe the role of a {selected_role} in 3 sentences "}
            ]
        }
    ]
}

# Make the POST request
response = requests.post(url, headers=headers, data=json.dumps(data))

# print(response)
# Check the response status
if response.status_code == 200:
    # Parse the response JSON
    result = response.json()
    candidates = result.get("candidates", [])
    
    # Extract the generated text from the first candidate
    if candidates:
        generated_content = candidates[0].get("content", {}).get("parts", [{}])[0].get("text", "No content generated.")
        print("Generated Content:")
        print(generated_content)
    else:
        print("No candidates found in the response.")
else:
    # Print the error details
    print(f"Error: {response.status_code}")
    print(response.json())


Generated Content:
Data analysts collect, clean, and organize large datasets to identify patterns and trends.  They then use statistical methods and data visualization techniques to interpret this data and provide actionable insights.  Ultimately, data analysts help organizations make better decisions by transforming raw data into understandable information.

