### Required imports

In [0]:
from pyspark.sql.types import *
import pyspark
import pandas as pd
import numpy as np
import pyspark.sql.functions as F
from datetime import datetime, timedelta
import json

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()



## Loading linkedin tables 

In [0]:
profiles = spark.read.parquet('/linkedin/people')
companies = spark.read.parquet('/linkedin/companies')

In [0]:
company_reviews = spark.createDataFrame(pd.read_csv("/dbfs/FileStore/g45/company_reviews.csv"))
company_reviews = company_reviews.select("name", "industry", "rating", "happiness", "ratings", "roles", "employees")
display(company_reviews)

## Map industries to meta-industries

In [0]:
meta_industries_12 = {
    # Media and Entertainment
    # Real Estate and Construction
    # Manufacturing
    # Retail and Consumer Goods
    # Healthcare and Medical
    # Education and Training
    # Financial and Investment
    # Services
    # Transportation and Logistics
    # Government and Public Policy
    # Technology
    # Miscellaneous
    # Added industries
    "Hotels & Travel Accomodation": "Services",
    "Computers and Eleoctronics": "Technology",
    "Telecommunications": "Technology",
    "Healthcare": "Healthcare and Medical",
    "Financial Services": "Financial and Investment",
    "Manufacturing": "Manufacturing",
    "Education": "Education and Training",
    "Transportation & Logistics": "Transportation and Logistics",
    "Human Resources & Staffing": "Services",
    "Human Resources and Staffing": "Services",
    "Information Technology": "Technology",
    "Real Estate": "Real Estate and Construction",
    "Energy, Mining & Utilities": "Miscellaneous",
    "Consulting and Business Services": "Services",
    "Restaurants, Travel and Leisure": "Services",
    "Restaurants & Food Service": "Services",
    "Construction & Facilities Services": "Real Estate and Construction",
    "Retail & Wholesale": "Retail and Consumer Goods",
    "Nonprofit & NGO": "Miscellaneous",
    "Food and Beverages": "Retail and Consumer Goods",
    "Consumer Goods and Services": "Retail and Consumer Goods",
    "Aerospace & Defense": "Manufacturing",
    "Media & Communication": "Media and Entertainment",
    "Pharmaceutical & Biotechnology": "Healthcare and Medical",
    "Government & Public Administration": "Government and Public Policy",
    "Agriculture and Extraction": "Miscellaneous",
    "Personal Consumer Services": "Services",
    "Legal": "Services",
    "Arts, Entertainment & Recreation": "Media and Entertainment",
    "Health Care": "Healthcare and Medical",
    "Education and Schools": "Education and Training",
    "Internet and Software": "Technology",
    "Agriculture": "Miscellaneous",
    "Construction": "Real Estate and Construction",
    "Industrial Manufacturing": "Manufacturing",
    "Management & Consulting": "Services",
    # Additional mappings based on variations and combinations
    "Automotive": "Manufacturing",  # Adjusted to "Manufacturing" for clearer context
    "Auto": "Manufacturing",
    "Transport and Freight": "Transportation and Logistics",
    "Retail": "Retail and Consumer Goods",
    "Pharmaceuticals": "Healthcare and Medical",
    'Insurance': 'Services'
}


meta_industry = udf(lambda x: meta_industries_12.get(x))
company_reviews = company_reviews.filter(company_reviews.industry.isNotNull())
company_reviews = company_reviews.withColumn('meta_industry', meta_industry(F.col('industry')))
company_reviews = company_reviews.drop('industry')

## Convert columns with numeric values in json format to float  

In [0]:
def calc_avg_in_json(json_col):
    if json_col is None:
        return None
    try:
        # Attempt to replace single quotes with double quotes for JSON parsing
        # json_col = str(json_col)
        json_col = json_col.replace("'", '"')
        val_dict = json.loads(json_col)
        num_values = len(val_dict.values())
        return round(sum(float(v) for v in val_dict.values()) / num_values, 1) if num_values > 0 else "null"
        # return val_dict
    except Exception as e:
        # For better diagnostics, you could log the error message or return it
        return 'Error in parsing: ' + str(e)  # Returning the error message for debugging

# Define a UDF to compute the average of numerical values in a dictionary
# compute_average = udf(lambda d: sum(float(v) for v in d.values()) / len(d.values()), FloatType())
calc_avg_udf = udf(calc_avg_in_json, FloatType())

# Apply the UDF to the "happiness" column
company_reviews = company_reviews.withColumn("happiness", calc_avg_udf(company_reviews["happiness"]))

# Apply the UDF to the "ratings" column
company_reviews = company_reviews.withColumn("ratings", calc_avg_udf(company_reviews["ratings"]))

# Apply the UDF to the "roles" column
company_reviews = company_reviews.withColumn("roles", calc_avg_udf(company_reviews["roles"]))

## Merge two similar columns

In [0]:
# Generate python code to update "rating" values based on conditions and drop "ratings" column
from pyspark.sql.functions import col

company_reviews = company_reviews.withColumn("rating", 
                                             F.when(col("rating").isNotNull() & col("ratings").isNotNull(),
                                                  (col("rating") + col("ratings")) / 2)
                                             .when(col("rating").isNull(), col("ratings"))
                                             .otherwise(col("rating")))
company_reviews = company_reviews.withColumn("rating", F.round(col("rating"), 2) )

company_reviews = company_reviews.drop("ratings")

In [0]:
display(company_reviews)

In [0]:
display(profiles)

## Manipulating "experience" column

In [0]:
filtered_profiles = profiles.filter((F.col("experience").isNotNull()) & (F.size("experience") > 0)).select("name", "experience", "education")
exploded_df = filtered_profiles.withColumn("experience_item", F.explode("experience"))
atleast_one_trans = exploded_df.groupBy("name").count()
# print(f"n rows {atleast_one_trans.count()}")
atleast_one_trans = atleast_one_trans.filter(F.col("count") > 1)
# print(f"n rows {atleast_one_trans.count()}")
exploded_df = exploded_df.join(atleast_one_trans, on="name", how="inner")
# print(f"n rows {exploded_df.count()}")
# n rows 5712784

exploded_df = exploded_df.withColumn("exp_title", F.col("experience_item.title"))
exploded_df = exploded_df.withColumn("exp_positions", F.col("experience_item.positions"))
exploded_df = exploded_df.withColumn("exp_subtitle", F.col("experience_item.subtitle"))
exploded_df = exploded_df.withColumn("exp_company_id", F.col("experience_item.company_id"))
exploded_df = exploded_df.withColumn("exp_company_name", F.col("experience_item.company"))
exploded_df = exploded_df.withColumn("exp_duration", F.col("experience_item.duration"))
exploded_df = exploded_df.withColumn("exp_start_date", F.col("experience_item.start_date"))
exploded_df = exploded_df.withColumn("exp_end_date", F.col("experience_item.end_date"))

## Cleaning invalid records (overlapping employment durations)

In [0]:
from pyspark.sql.functions import col, current_timestamp, when, to_timestamp, lag, expr, count
from pyspark.sql.window import Window

# Remove rows with null values in start_date or end_date
exploded_df = exploded_df.filter((col("exp_start_date").isNotNull()) & (col("exp_end_date").isNotNull()))

# Filter out rows with invalid date formats
exploded_df = exploded_df.filter(
    ((to_timestamp("exp_start_date", "MMM yyyy").isNotNull()) & 
    (to_timestamp("exp_end_date", "MMM yyyy").isNotNull())) |
    (col("exp_end_date") == "Present")
)

# # Create an index column
window_spec = Window.orderBy(F.monotonically_increasing_id())
exploded_df = exploded_df.withColumn("index", F.row_number().over(window_spec))

window_spec_name = Window.partitionBy("name").orderBy("index")
exploded_df = exploded_df.withColumn("present_end_count", count(when(col("exp_end_date") == "Present", True)).over(window_spec_name))
# # Add a new column max_present_end_count
window_spec_name = Window.partitionBy("name")
exploded_df = exploded_df.withColumn("max_present_end_count", F.max(col("present_end_count")).over(window_spec_name))

exploded_df = exploded_df.filter(col("max_present_end_count") <= 1)

# # Convert exp_start_date and exp_end_date to timestamps
exploded_df = exploded_df.withColumn("start_date_timestamp", to_timestamp("exp_start_date", "MMM yyyy"))
exploded_df = exploded_df.withColumn("end_date_timestamp", 
                                     when(col("exp_end_date") == "Present", current_timestamp())
                                     .otherwise(to_timestamp("exp_end_date", "MMM yyyy")))

# # display(exploded_df)
# # print(f"n rows {exploded_df.count()}")
# # n rows 3175272

# Alias exploded_df as e
e = exploded_df.alias("e")

# Create a copy of exploded_df named higher_ind
higher_ind = exploded_df.alias("h")

# Join exploded_df and higher_ind based on name column and conditions specified
join_condition = ((col("e.name") == col("h.name")) &
                  (col("e.index") < col("h.index")) &
                  ((col("e.start_date_timestamp") < col("h.end_date_timestamp")) |
                   (col("e.start_date_timestamp") < col("h.start_date_timestamp")) |
                   (col("e.end_date_timestamp") < col("h.start_date_timestamp")) |
                   (col("e.end_date_timestamp") < col("h.end_date_timestamp"))))

invalid_df = exploded_df.alias("e").join(higher_ind, join_condition, "inner")

# Extract the names from the joined DataFrame
names_to_remove = invalid_df.select("e.name").distinct()

# Filter out rows from exploded_df for the names that appear in names_to_remove
exploded_df = exploded_df.join(names_to_remove, exploded_df.name == names_to_remove.name, "left_anti")
# n rows 1547589
display(exploded_df)

# # Drop unnecessary columns
exploded_df = exploded_df.drop("index", "exp_start_date", "exp_end_date", "present_end_count", "max_present_end_count", "start_date_timestamp", "end_date_timestamp")


## Taking last title from positions column in cases title column is null

In [0]:
exploded_df = exploded_df.withColumn("exp_title", F.when(F.col("exp_title").isNull(), F.element_at(F.col("exp_positions"), 1).getField("title")).otherwise(F.col("exp_title")))
exploded_df = exploded_df.drop("exp_positions")
display(exploded_df)

In [0]:
relevant_df = exploded_df.select("name", "exp_title", "exp_subtitle", "exp_company_name", "exp_company_id", "exp_duration", "education")

## Extracting academic degrees from education column

In [0]:
relevant_df = relevant_df.withColumn("education_str", F.lower(F.col("education").cast("string")))
relevant_df = relevant_df.withColumn("has_doctor", F.when(F.col("education_str").like("%doctor%"), 1).when(F.col("education_str").like("%[]%"), None).otherwise(0))
relevant_df = relevant_df.withColumn("has_master", F.when((F.col("education_str").like("%master%")) | (F.col("has_doctor") == 1), 1).when(F.col("education_str").like("%[]%"), None).otherwise(0))
relevant_df = relevant_df.withColumn("has_bachelor", F.when((F.col("education_str").like("%bachelor%")) | (F.col("has_master") == 1) | (F.col("has_doctor") == 1), 1).when(F.col("education_str").like("%[]%"), None).otherwise(0))
relevant_df = relevant_df.drop("education").drop("education_str")
display(relevant_df)

## Making sure each record has a company name in a single column

In [0]:
import json

# Create a new column based on the conditions
relevant_df = relevant_df.withColumn("final_company_id",
                   F.when(F.col("exp_subtitle").isNotNull(), F.col("exp_subtitle"))
                   .when(F.col("exp_company_name").isNotNull(), F.col("exp_company_name"))
                #    .when(F.col("exp_position_subtitle").isNotNull(), F.col("exp_position_subtitle"))
                   .otherwise(F.col("exp_company_id"))
                   )

relevant_df = relevant_df.drop("exp_subtitle").drop("exp_company_name").drop("exp_company_id")


In [0]:
display(relevant_df)

## Convert duration column to amount of months in a company 

In [0]:
import re
def convert_to_total_months(exp_duration):
    years_match = re.search(r'(\d+)\s*year', exp_duration)

    if exp_duration is None or 'null' in exp_duration.lower():
        return None
    if 'less than a year' in exp_duration.lower():
        return 6
    if 'present' in exp_duration.lower():
        present_date = datetime.now()
        match = re.search(r'(\w{3}) (\d{4})', exp_duration)
        if match:
            start_date = datetime.strptime(match.group(0), '%b %Y')
        elif years_match:
            years = int(years_match.group(1))
            return years * 12
        else:
            start_date = present_date
        
        return (present_date.year - start_date.year) * 12 + (present_date.month - start_date.month)
    
    months_match = re.search(r'(\d+)\s*month', exp_duration)
    years = int(years_match.group(1)) if years_match else 0
    months = int(months_match.group(1)) if months_match else 0
    return years * 12 + months

# Register UDF
convert_to_total_months_udf = udf(convert_to_total_months, IntegerType())

# Apply the UDF to create the new column
relevant_df = relevant_df.withColumn('exp_months', convert_to_total_months_udf(relevant_df['exp_duration']))
relevant_df = relevant_df.drop("exp_duration")
display(relevant_df)

## Applying a company name format before join

In [0]:
def clean_company_name_before_matching(company_name_df, company_name_col: str):
    remove_words = ['llc', 'inc', 'corp', 'corporation', 'ltd', 'limited', 'gmbh', 'pty', 'sarl', 'nv', 'sa']

    # Create a regular expression pattern to match these words as whole words
    remove_words_pattern = r'\b(?:' + '|'.join(remove_words) + r')\b'

    # Use the pattern in the regexp_replace function
    company_name_df = company_name_df.withColumn(
        "clean_company_name_tmp",
        F.lower(
            F.regexp_replace(F.col(company_name_col), "[^a-zA-Z0-9\s]+", "")  # Remove punctuation
        )
    )
    company_name_df = company_name_df.withColumn(
        "clean_company_name",
            F.regexp_replace(F.col("clean_company_name_tmp"), remove_words_pattern, "")  # Remove specific whole words
    )
    return company_name_df.drop("clean_company_name_tmp")

In [0]:
# clean final_company_id
relevant_df = clean_company_name_before_matching(company_name_df=relevant_df, company_name_col="final_company_id")
relevant_df = relevant_df.drop("final_company_id")
company_reviews = clean_company_name_before_matching(company_name_df=company_reviews, company_name_col="name")
company_reviews = company_reviews.drop("name")

In [0]:
display(relevant_df)

In [0]:
display(company_reviews)

## Join the datasets and keep only records with at least one company transition

In [0]:
joined_df = relevant_df.join(company_reviews, on="clean_company_name", how="inner")
# print(f"n rows: {joined_df.count()}")
atleast_one_trans = joined_df.groupBy("name", "clean_company_name").count().groupBy("name").count()
atleast_one_trans = atleast_one_trans.withColumnRenamed("count", f"company_count")
# print(f"n rows {atleast_one_trans.count()}")
atleast_one_trans = atleast_one_trans.filter(F.col("company_count") > 1)
# print(f"n rows {atleast_one_trans.count()}")
joined_df = joined_df.join(atleast_one_trans, on="name", how="inner")
n_rows = joined_df.count()
# n rows 512573
print(f"n rows {n_rows}")
display(joined_df)
joined_df = joined_df.drop("company_count")

## Merge multiple records of a person in the same company 

In [0]:
from pyspark.sql import Window

# Define a window specification to partition by 'name' and 'clean_company_name'
window_spec = Window.partitionBy('name', 'clean_company_name')

# Add a new column 'exp_months_sum' that calculates the sum of 'exp_months' within the window
joined_df = joined_df.withColumn('exp_months', F.sum('exp_months').over(window_spec))

# Remove the duplicate rows with the same 'name' and 'clean_company_name', keeping the row with the highest 'exp_months_sum'
joined_df = joined_df.withColumn('row_number', F.row_number().over(window_spec.orderBy(F.desc('exp_months')))) \
                     .filter(F.col('row_number') == 1) \
                     .drop('row_number')

# Display the updated joined_df dataframe
display(joined_df)

## Creating features based on a person's history

In [0]:
# Correct the last_exp column creation
joined_df = joined_df.withColumn("index", F.monotonically_increasing_id())

# Create a window specification for collecting previous exp_months
exp_history_window = Window.partitionBy("name").orderBy(F.desc("index")).rowsBetween(Window.unboundedPreceding, -1)

# Create the exp_history column
joined_df = joined_df.withColumn("exp_history", F.collect_list("exp_months").over(exp_history_window))
joined_df = joined_df.withColumn("company_count", F.size("exp_history"))
joined_df = joined_df.withColumn("total_exp", F.sum("exp_months").over(exp_history_window))
joined_df = joined_df.withColumn("avg_exp", F.col("total_exp") / F.col("company_count"))

exp_history_window = Window.partitionBy("name").orderBy(F.desc("index"))
joined_df = joined_df.withColumn("last_exp", F.lag("exp_months").over(exp_history_window))
joined_df = joined_df.fillna({"total_exp": 0, "avg_exp": 0, "last_exp": 0})

joined_df = joined_df.drop("index").drop("exp_history")

display(joined_df)

In [0]:
joined_df = joined_df.fillna("None")

## Saving the dataframe to enable reading it from other notebooks

In [0]:
# Save the DataFrame as a Parquet file, and overwrite if it already exists
joined_df.write.mode("overwrite").parquet("/FileStore/g45/joined_df.parquet")