### Read data into PySpark Dataframe.

In [0]:
from pyspark.sql.types import *
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import pandas as pd

pd.set_option('display.max_columns', None)
pd.set_option('display.width', 1000)

spark = SparkSession.builder.getOrCreate()

In [0]:
companies = spark.read.parquet('/dbfs/linkedin_train_data')
profiles = spark.read.parquet('/dbfs/linkedin_people_train_data')

In [0]:
meta_industries_12 = {
    'Furniture and Home Furnishings Manufacturing': 'Manufacturing',
    'Investment Banking': 'Financial and Investment',
    'Architecture and Planning': 'Services',
    'Wholesale': 'Services',
    'Travel Arrangements': 'Services',
    'Ranching': 'Miscellaneous',
    'Hospitals and Health Care': 'Healthcare and Medical',
    'Book and Periodical Publishing': 'Services',
    'Printing Services': 'Services',
    'Professional Training and Coaching': 'Services',
    'Computers and Electronics Manufacturing': 'Manufacturing',
    'Shipbuilding': 'Manufacturing',
    'Public Policy Offices': 'Government and Public Policy',
    'Software Development': 'Technology',
    'Outsourcing and Offshoring Consulting': 'Services',
    'Retail Groceries': 'Retail and Consumer Goods',
    'Education Administration Programs': 'Education and Training',
    'Plastics Manufacturing': 'Manufacturing',
    'Renewable Energy Semiconductor Manufacturing': 'Manufacturing',
    'Computer Networking Products': 'Technology',
    'Events Services': 'Services',
    'Information Services': 'Services',
    'Food and Beverage Services': 'Services',
    'Semiconductor Manufacturing': 'Manufacturing',
    'Business Consulting and Services': 'Services',
    'Insurance': 'Services',
    'Financial Services': 'Services',
    'Wireless Services': 'Services',
    'Computer Hardware Manufacturing': 'Technology',
    'Public Safety': 'Services',
    'Maritime Transportation': 'Transportation and Logistics',
    'Tobacco Manufacturing': 'Manufacturing',
    'Writing and Editing': 'Services',
    'Veterinary Services': 'Services',
    'Staffing and Recruiting': 'Services',
    'Accounting': 'Services',
    'International Affairs': 'Government and Public Policy',
    'Spectator Sports': 'Miscellaneous',
    'Glass, Ceramics and Concrete Manufacturing': 'Manufacturing',
    'Chemical Manufacturing': 'Manufacturing',
    'Mining': 'Miscellaneous',
    'E-Learning Providers': 'Technology',
    'Security and Investigations': 'Services',
    'Translation and Localization': 'Services',
    'Automation Machinery Manufacturing': 'Technology',
    'Computer and Network Security': 'Technology',
    'Political Organizations': 'Government and Public Policy',
    'Environmental Services': 'Government and Public Policy',
    'Oil and Gas': 'Miscellaneous',
    'Real Estate': 'Real Estate and Construction',
    'Think Tanks': 'Government and Public Policy',
    'Executive Offices': 'Miscellaneous',
    'Law Practice': 'Services',
    'Nanotechnology Research': 'Miscellaneous',
    'International Trade and Development': 'Government and Public Policy',
    'Personal Care Product Manufacturing': 'Manufacturing',
    'Philanthropic Fundraising Services': 'Services',
    'Entertainment Providers': 'Media and Entertainment',
    'Market Research': 'Media and Entertainment',
    'Movies, Videos, and Sound': 'Media and Entertainment',
    'Sporting Goods Manufacturing': 'Manufacturing',
    'Graphic Design': 'Services',
    'Technology, Information and Internet': 'Technology',
    'IT Services and IT Consulting': 'Technology',
    'Retail Office Equipment': 'Retail and Consumer Goods',
    'Wholesale Import and Export': 'Services',
    'Capital Markets': 'Financial and Investment',
    'Law Enforcement': 'Services',
    'Freight and Package Transportation': 'Transportation and Logistics',
    'Industrial Machinery Manufacturing': 'Manufacturing',
    'Non-profit Organizations': 'Miscellaneous',
    'Retail Art Supplies': 'Retail and Consumer Goods',
    'Animation and Post-production': 'Media and Entertainment',
    'Transportation, Logistics, Supply Chain and Storage': 'Transportation and Logistics',
    'Aviation and Aerospace Component Manufacturing': 'Transportation and Logistics',
    'Fundraising': 'Financial and Investment',
    'Railroad Equipment Manufacturing': 'Transportation and Logistics',
    'Construction': 'Real Estate and Construction',
    'Investment Management': 'Financial and Investment',
    'Utilities': 'Miscellaneous',
    'Retail Luxury Goods and Jewelry': 'Retail and Consumer Goods',
    'Warehousing and Storage': 'Transportation and Logistics',
    'Media Production': 'Media and Entertainment',
    'Gambling Facilities and Casinos': 'Media and Entertainment',
    'Defense and Space Manufacturing': 'Manufacturing',
    'Facilities Services': 'Services',
    'Government Relations Services': 'Government and Public Policy',
    'Advertising Services': 'Media and Entertainment',
    'Paper and Forest Product Manufacturing': 'Manufacturing',
    'Packaging and Containers Manufacturing': 'Manufacturing',
    'Telecommunications': 'Technology',
    'Medical Equipment Manufacturing': 'Healthcare and Medical',
    'Beverage Manufacturing': 'Manufacturing',
    'Restaurants': 'Retail and Consumer Goods',
    'Leasing Non-residential Real Estate': 'Real Estate and Construction',
    'Newspaper Publishing': 'Media and Entertainment',
    'Armed Forces': 'Miscellaneous',
    'Appliances, Electrical, and Electronics Manufacturing': 'Manufacturing',
    'Hospitality': 'Services',
    'Pharmaceutical Manufacturing': 'Healthcare and Medical',
    'Research Services': 'Services',
    'Retail Apparel and Fashion': 'Retail and Consumer Goods',
    'Photography': 'Media and Entertainment',
    'Wellness and Fitness Services': 'Services',
    'Truck Transportation': 'Transportation and Logistics',
    'Consumer Services': 'Services',
    'Wholesale Building Materials': 'Services',
    'Human Resources Services': 'Services',
    'Airlines and Aviation': 'Transportation and Logistics',
    'Machinery Manufacturing': 'Manufacturing',
    'Individual and Family Services': 'Services',
    'Motor Vehicle Manufacturing': 'Manufacturing',
    'Performing Arts': 'Media and Entertainment',
    'Museums, Historical Sites, and Zoos': 'Media and Entertainment',
    'Broadcast Media Production and Distribution': 'Media and Entertainment',
    'Banking': 'Financial and Investment',
    'Recreational Facilities': 'Miscellaneous',
    'Government Administration': 'Government and Public Policy',
    'Public Relations and Communications Services': 'Media and Entertainment',
    'Fisheries': 'Miscellaneous',
    'Medical Practices': 'Healthcare and Medical',
    'Religious Institutions': 'Miscellaneous',
    'Online Audio and Video Media': 'Media and Entertainment',
    'Artists and Writers': 'Miscellaneous',
    'Biotechnology Research': 'Healthcare and Medical',
    'Legal Services': 'Services',
    'Retail': 'Retail and Consumer Goods',
    'Civil Engineering': 'Services',
    'Libraries': 'Miscellaneous',
    'Alternative Dispute Resolution': 'Miscellaneous',
    'Manufacturing': 'Miscellaneous',
    'Design Services': 'Services',
    'Dairy Product Manufacturing': 'Manufacturing',
    'Higher Education': 'Education and Training',
    'Civic and Social Organizations': 'Miscellaneous',
    'Textile Manufacturing': 'Manufacturing',
    'Venture Capital and Private Equity Principals': 'Financial and Investment',
    'Mental Health Care': 'Healthcare and Medical',
    'Musicians': 'Media and Entertainment',
    'Farming': 'Miscellaneous',
    'Computer Games': 'Media and Entertainment',
    'Strategic Management Services': 'Services',
    'Food and Beverage Manufacturing': 'Manufacturing',
    'Primary and Secondary Education': 'Education and Training',
    'Alternative Medicine': 'Healthcare and Medical',
    'Legislative Offices': 'Services',
    'Administration of Justice': 'Services',
    'Mobile Gaming Apps': 'Media and Entertainment'
}

In [0]:
from pyspark.sql.functions import col

meta_industry = udf(lambda x: meta_industries_12[x])
companies = companies.filter(companies.industries.isNotNull())
companies = companies.withColumn('meta_industry', meta_industry(col('industries')))

In [0]:
companies.select("industries", "meta_industry").display()

### Question 1: Data processing 

In [0]:
from pyspark.sql.functions import explode, col, when, split

companies_exploded = companies.withColumn("similar_exploded", explode(col("similar"))) \
    .withColumn("similar_company_name", col("similar_exploded.title")) \
    .withColumn("similar_company_link", split(col("similar_exploded.links"), r"\?")[0]) \
    .drop("similar_exploded", "similar")

# Create the new DataFrame with company_A_name and company_B_name
companies_ordered = companies_exploded.withColumn(
    "company_A_name", when(col("name") < col("similar_company_name"), col("name")).otherwise(col("similar_company_name"))
).withColumn(
    "company_B_name", when(col("name") < col("similar_company_name"), col("similar_company_name")).otherwise(col("name"))
).withColumn(
    "company_A_url", when(col("name") < col("similar_company_name"), col("url")).otherwise(col("similar_company_link"))
).withColumn(
    "company_B_url", when(col("name") < col("similar_company_name"), col("similar_company_link")).otherwise(col("url"))
).select("company_A_name", "company_B_name", "company_A_url", "company_B_url").dropDuplicates() \
    .filter(col("company_A_name") != col("company_B_name"))

companies_ordered.cache()
filtered_companies = companies.drop("similar")

companies_A_ordered = companies_ordered.join(
    filtered_companies,
    ((col("company_A_name") == col("name")) & (col("company_A_url") == col("url"))),
    "inner"
).drop("company_A_name", "company_A_url")

companies_A_ordered.cache()

def rename_columns(df, rename_func, letter):
    for column in df.columns:
        df = df.withColumnRenamed(column, rename_func(column, letter))
    return df

def change_col_name(name, letter):
    # Add prefix "company_A_" only if the column name doesn't start with "company_A" or "company_B"
    if not name.startswith("company_A") and not name.startswith("company_B"):
        return f"company_{letter}_{name}"
    return name

companies_A_ordered = rename_columns(companies_A_ordered, change_col_name, "A")

companies_B_ordered = companies_A_ordered.join(
    filtered_companies,
    ((col("company_B_name") == col("name")) & (col("company_B_url") == col("url"))),
    "inner"
).drop("company_B_name", "company_B_url")
companies_B_ordered.cache()

companies_AB_ordered = rename_columns(companies_B_ordered, change_col_name, "B")
companies_AB_ordered.cache()
companies_AB_ordered.printSchema()

In [0]:
# Define the list of company_A_names to filter
filter_list = [
    'Altium Leadership',
    'American College of Music - ACM Online',
    'Arieca Inc.',
    'Bango Bowls',
    'Blue Llamas, Inc.'
]

# Filter the DataFrame and order by company_A_name and company_B_name
filtered_df = companies_AB_ordered.filter(col("company_A_name").isin(filter_list)) \
                                  .orderBy(["company_A_name", "company_B_name"])

# Display the filtered DataFrame
display(filtered_df)


### Question 2: Missingness Mechanisms

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, expr, lit
from pyspark.sql.types import ArrayType, StructType, StringType, LongType, IntegerType, FloatType, DoubleType
import matplotlib.pyplot as plt
from pyspark.sql import functions as F
import seaborn as sns

companies = spark.read.parquet('/dbfs/linkedin_train_data')

# Define null-like values
null_identifiers = ["unknown", "[]", "None", "null", ""]

# Detect missing values for each column
missing_stats = []
total_rows = companies.count()

for col_name in companies.columns:
    # Get the data type of the column
    col_type = dict(companies.dtypes)[col_name]
    
    if isinstance(companies.schema[col_name].dataType, StringType):  # String columns
        missing_count = companies.filter(
            (col(col_name).isNull()) | (col(col_name).isin(null_identifiers))
        ).count()
    elif isinstance(companies.schema[col_name].dataType, ArrayType):  # Array columns
        missing_count = companies.filter(col(col_name).isNull() | (col(col_name) == lit([]))).count()
    elif isinstance(companies.schema[col_name].dataType, StructType):  # Struct columns
        missing_count = companies.filter(col(col_name).isNull()).count()
    else:  # Numeric or other simple types
        missing_count = companies.filter(col(col_name).isNull()).count()
    
    missing_percentage = (missing_count / total_rows) * 100
    missing_stats.append((col_name, missing_count, missing_percentage))

# Create a DataFrame for missing stats
missing_df = spark.createDataFrame(missing_stats, ["Column", "Missing_Count", "Missing_Percentage"])

# Filter columns with missing percentage at least 2% and less than 95%
columns_to_analyze = missing_df.filter(
    (col("Missing_Percentage") >= 2) & (col("Missing_Percentage") < 95)
)

summary_df = columns_to_analyze

# Show the summary table
summary_df.display()

# Save the summary as a CSV file
summary_df.write.csv("/dbfs/missing_values_summary", header=True, mode="overwrite")

# Visualization (convert to Pandas for external plotting)
summary_pd = summary_df.toPandas()

plt.figure(figsize=(10, 6))
sns.barplot(x='Missing_Percentage', y='Column', data=summary_pd.sort_values(by='Missing_Percentage'))
plt.title('Missing Data Distribution')
plt.xlabel('Missing Percentage')
plt.ylabel('Columns')
plt.tight_layout()
plt.show()

filtered_columns = summary_df.select("Column").rdd.flatMap(lambda x: x).collect()
filtered_companies = companies.select(*filtered_columns)

# Calculating correlation of missing values
missing_correlation = filtered_companies.select([F.col(c).isNull().cast("int").alias(c) for c in filtered_columns])
missing_correlation_df = missing_correlation.toPandas()
correlation_matrix = missing_correlation_df.corr()

# Plot the heatmap
plt.figure(figsize=(10, 8))
sns.heatmap(correlation_matrix, annot=True, cmap="coolwarm", center=0, fmt=".1f")
plt.title("Correlation Heatmap of Missing Values", fontsize=16)
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show()

### Question 3: Evaluating Similarity 

In [0]:
from pyspark.sql.functions import col

# using the df from question 1 here the good and bad companies matches

#GOOD
print("2 GOOD matches:")
good_filtered_AB_companies = companies_AB_ordered.filter((col("company_A_industries") == col("company_B_industries")) 
                                                         & (col("company_A_meta_industry") == col("company_B_meta_industry"))
                                                         & (col("company_A_company_size") == col("company_B_company_size"))) \
                                    .orderBy(["company_A_name", "company_B_name"])
good_filtered_AB_companies.limit(2).display()

# BAD
print("2 BAD matches:")
bad_filtered_AB_companies = companies_AB_ordered.filter((col("company_A_industries") != col("company_B_industries")) 
                                                        & (col("company_A_type") != col("company_B_type"))
                                                        & (col("company_A_meta_industry") != col("company_B_meta_industry")) 
                                                        & (col("company_A_company_size") != col("company_B_company_size"))) \
                                .orderBy(["company_A_name", "company_B_name"])
bad_filtered_AB_companies.limit(2).display()

In [0]:

# more from the dfs so we can obsarve the matches
print("GOOD matches:")
good_filtered_AB_companies.limit(10).display()
print("BAD matches:")
bad_filtered_AB_companies.limit(10).display()

In [0]:
paired_companies = companies_AB_ordered

# Calculate percentage of pairs with matching industries
industry_match_rate = paired_companies.filter(
    col("company_A_industries") == col("company_B_industries")
).count() / paired_companies.count()

print(f"Industry Match Rate: {industry_match_rate * 100:.2f}%")

# Calculate percentage of pairs with matching types
type_match_rate = paired_companies.filter(
    col("company_A_type") == col("company_B_type")
).count() / paired_companies.count()

print(f"Type Match Rate: {type_match_rate * 100:.2f}%")

# Calculate percentage of pairs with matching companies sizes
size_match_rate = paired_companies.filter(
    col("company_A_company_size") == col("company_B_company_size")
).count() / paired_companies.count()

print(f"Company size Match Rate: {size_match_rate * 100:.2f}%")

# Check overlap in specialties
from pyspark.sql.functions import split, array_intersect, size

# Tokenize specialties into keywords
paired_companies = paired_companies.withColumn(
    "specialties_A_keywords", split(col("company_A_specialties"), ", ")
).withColumn(
    "specialties_B_keywords", split(col("company_B_specialties"), ", ")
)

# Calculate overlap
paired_companies = paired_companies.withColumn(
    "specialties_overlap",
    size(array_intersect(col("specialties_A_keywords"), col("specialties_B_keywords")))
)

# Calculate overlap rate
specialty_match = paired_companies.filter(col("specialties_overlap") > 0)
specialty_match_rate = specialty_match.count() / paired_companies.count()

print(f"Specialty Match Rate: {specialty_match_rate * 100:.2f}%")


In [0]:
from pyspark.ml.feature import Tokenizer, CountVectorizer, StopWordsRemover, IDF
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col

# Replace nulls with empty strings to ensure proper text processing
companies_df = companies.fillna({
    "about": "",
    "industries": "",
    "type": "",
    "specialties": ""
})

# Tokenize "about", specialties", "type", and "industries" columns
tokenizer = Tokenizer(inputCol="about", outputCol="about_words")
companies_df = tokenizer.transform(companies_df)

tokenizer = Tokenizer(inputCol="specialties", outputCol="specialties_words")
companies_df = tokenizer.transform(companies_df)

tokenizer = Tokenizer(inputCol="industries", outputCol="industries_words")
companies_df = tokenizer.transform(companies_df)

tokenizer = Tokenizer(inputCol="type", outputCol="type_words")
companies_df = tokenizer.transform(companies_df)

# Remove stop words from the columns
stop_words_remover = StopWordsRemover(inputCol="about_words", outputCol="about_clean")
companies_df = stop_words_remover.transform(companies_df)

stop_words_remover = StopWordsRemover(inputCol="specialties_words", outputCol="specialties_clean")
companies_df = stop_words_remover.transform(companies_df)

count_vectorizer = CountVectorizer(inputCol="about_clean", outputCol="about_features", vocabSize=500)
companies_df = count_vectorizer.fit(companies_df).transform(companies_df)

count_vectorizer = CountVectorizer(inputCol="specialties_clean", outputCol="specialties_features", vocabSize=500)
companies_df = count_vectorizer.fit(companies_df).transform(companies_df)

count_vectorizer = CountVectorizer(inputCol="industries_words", outputCol="industries_features", vocabSize=500)
companies_df = count_vectorizer.fit(companies_df).transform(companies_df)

count_vectorizer = CountVectorizer(inputCol="type_words", outputCol="type_features", vocabSize=500)
companies_df = count_vectorizer.fit(companies_df).transform(companies_df)

# Create the final features vector
companies_with_features = companies_df

assembler = VectorAssembler(
    inputCols=["about_features", "specialties_features", "type_features", "industries_features"],
    outputCol="features_vector"
)

companies_with_features = assembler.transform(companies_with_features)


In [0]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans(k=5, seed=1, featuresCol="features_vector", predictionCol="cluster")
kmeans_model = kmeans.fit(companies_with_features)
clustered_df = kmeans_model.transform(companies_with_features)

# Add cluster information for company_A
companies_AB = companies_AB_ordered.join(
    clustered_df.select(col("name").alias("company_A"), col("cluster").alias("cluster_A")),
    (col("company_A_name") == col("company_A")),
    "left"
)

# Add cluster information for company_B
companies_AB = companies_AB.join(
    clustered_df.select(col("name").alias("company_B"), col("cluster").alias("cluster_B")),
    (col("company_B_name") == col("company_B")),
    "left"
)

# Add a column indicating whether the clusters match
companies_AB = companies_AB.withColumn(
    "same_cluster",
    col("cluster_A") == col("cluster_B")
)

# Count matches and mismatches
validation_results = companies_AB.groupBy("same_cluster").count()
validation_results.show()


# Filter true and false cases
tp = companies_AB.filter((col("same_cluster") == True)).count()
fn = companies_AB.filter((col("same_cluster") == False)).count()
fp = companies_AB.filter(
    (col("same_cluster") == True) & (col("cluster_A").isNotNull()) & (col("cluster_B").isNotNull())
).count() - tp

# Calculate precision, recall, and F1 score
precision = tp / (tp + fp) if (tp + fp) > 0 else 0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0
f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

print(f"Precision: {precision}, Recall: {recall}, F1 Score: {f1_score}")


### Question 4: Supervised Learning 

In [0]:
companies_test = spark.read.parquet('/dbfs/linkedin_companies_test')

In [0]:
from pyspark.sql.functions import col, size, when, count, lit

# Selecting relevant columns
relevant_columns = [
    "id", "about", "affiliated", "company_size", "country_code", "employees",
    "employees_in_linkedin", "followers", "founded", "funding",
    "headquarters", "name", "slogan", "specialties", "type", "updates"
]
train_data = companies.select(relevant_columns + ["industries", "meta_industry"]).dropna(subset=["industries", "meta_industry"])
test_data = companies_test.select(relevant_columns)


""" 
# Calculating missing data percentage for each column:

total_rows = train_data.count()
def calculate_missing_percentage(df, columns):
    missing_percentages = {}
    
    for column in columns:
        # Handle array columns
        if isinstance(df.schema[column].dataType, ArrayType):
            missing_count = df.filter(size(col(column)) == 0).count()
        # Handle string or other non-array columns
        elif isinstance(df.schema[column].dataType, StringType):
            missing_count = df.filter(col(column).isNull() | (col(column) == "")).count()
        # Handle numeric or other columns
        elif isinstance(df.schema[column].dataType, LongType) or isinstance(df.schema[column].dataType, IntegerType):
            missing_count = df.filter(col(column).isNull()).count()
        else:
            missing_count = df.filter(col(column).isNull()).count()
        
        missing_percentage = (missing_count / total_rows) * 100
        missing_percentages[column] = missing_percentage
        
    return missing_percentages

# Calculate missing percentages
missing_percentages = calculate_missing_percentage(train_data, relevant_columns)

# Display results
for column, percentage in missing_percentages.items():
    print(f"Column: {column}, Missing Data Percentage: {percentage:.2f}%")

# The results printed:
Column: about, Missing Data Percentage: 71.19%
Column: affiliated, Missing Data Percentage: 99.20%
Column: company_size, Missing Data Percentage: 7.09%
Column: country_code, Missing Data Percentage: 0.00%
Column: employees, Missing Data Percentage: 70.22%
Column: employees_in_linkedin, Missing Data Percentage: 70.79%
Column: followers, Missing Data Percentage: 0.00%
Column: founded, Missing Data Percentage: 0.00%
Column: funding, Missing Data Percentage: 99.49%
Column: headquarters, Missing Data Percentage: 0.22%
Column: name, Missing Data Percentage: 0.00%
Column: slogan, Missing Data Percentage: 86.25%
Column: specialties, Missing Data Percentage: 87.41%
Column: type, Missing Data Percentage: 78.29%
Column: updates, Missing Data Percentage: 93.17%

"""

In [0]:
from pyspark.sql.functions import col, lit, current_date, when, size, split, array_contains, expr, explode, avg
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import OneHotEncoder, VectorAssembler, Word2Vec, Tokenizer, StopWordsRemover
from pyspark.ml import Pipeline
from datetime import datetime


# Remove nulls for string and numeric columns
train_data_no_nulls = train_data.na.fill({
    "about": "", 'company_size': 'Unknown', 'country_code': 'Unknown',
    "employees_in_linkedin": 0, "followers": 0, "founded": 0,
    "headquarters": "", "name": "", "slogan": "", "specialties": "", "type": ""
})
test_data_no_nulls = test_data.na.fill({
    "about": "", 'company_size': 'Unknown', 'country_code': 'Unknown',
    "employees_in_linkedin": 0, "followers": 0, "founded": 0,
    "headquarters": "", "name": "", "slogan": "", "specialties": "", "type": ""
})

# Remove nulls for array columns
train_data_no_nulls = train_data_no_nulls.withColumn("updates", when(col("updates").isNull(), lit([])).otherwise(col("updates"))) \
    .withColumn("employees", when(col("employees").isNull(), lit([])).otherwise(col("employees"))) \
    .withColumn("affiliated", when(col("affiliated").isNull(), lit([])).otherwise(col("affiliated")))
test_data_no_nulls = test_data_no_nulls.withColumn("updates", when(col("updates").isNull(), lit([])).otherwise(col("updates"))) \
    .withColumn("employees", when(col("employees").isNull(), lit([])).otherwise(col("employees"))) \
    .withColumn("affiliated", when(col("affiliated").isNull(), lit([])).otherwise(col("affiliated")))


""" Feature Engineering """

# Company size indexing
size_map_expr = expr("""
    CASE
        WHEN split(company_size, " ")[0] = 'Unknown' THEN 0
        WHEN split(company_size, " ")[0] = '1' THEN 1
        WHEN split(company_size, " ")[0] = '2-10' THEN 2
        WHEN split(company_size, " ")[0] = '11-50' THEN 3
        WHEN split(company_size, " ")[0] = '51-200' THEN 4
        WHEN split(company_size, " ")[0] = '201-500' THEN 5
        WHEN split(company_size, " ")[0] = '501-1,000' THEN 6
        WHEN split(company_size, " ")[0] = '1,001-5,000' THEN 7
        WHEN split(company_size, " ")[0] = '5,001-10,000' THEN 8
        WHEN split(company_size, " ")[0] = '10,001+' THEN 9
        ELSE 0
    END
""")
train_data_company_size = train_data_no_nulls.withColumn("company_size_encoded", size_map_expr).drop("company_size")
test_data_company_size = test_data_no_nulls.withColumn("company_size_encoded", size_map_expr).drop("company_size")

# One-hot encoding of company size
one_hot_encoder = OneHotEncoder(inputCol="company_size_encoded", outputCol="company_size_one_hot")
one_hot_model = one_hot_encoder.fit(train_data_company_size)
train_data_company_size = one_hot_model.transform(train_data_company_size).drop("company_size_encoded")
test_data_company_size = one_hot_model.transform(test_data_company_size).drop("company_size_encoded")

# Founding year encoding
current_year = datetime.now().year
train_data_founding = train_data_company_size.withColumn("num_years", lit(current_year) - col("founded")).drop("founded").cache()
test_data_founding = test_data_company_size.withColumn("num_years", lit(current_year) - col("founded")).drop("founded").cache()

In [0]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, Word2Vec
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf
from pyspark.ml.linalg import DenseVector
from pyspark.sql.types import ArrayType, FloatType
import numpy as np
from pyspark.ml.linalg import Vectors, VectorUDT


# Specialties vectorization using Word2Vec
specialties_tokenizer = Tokenizer(inputCol="specialties", outputCol="specialties_tokens")
specialties_stopwords_remover = StopWordsRemover(inputCol="specialties_tokens", outputCol="filtered_specialties")
specialties_word2vec = Word2Vec(
    inputCol="filtered_specialties", 
    outputCol="specialties_vector", 
    vectorSize=50, 
    minCount=2
)

specialties_pipeline = Pipeline(stages=[specialties_tokenizer, specialties_stopwords_remover, specialties_word2vec])
specialties_model = specialties_pipeline.fit(train_data_founding)
train_data_specialties = specialties_model.transform(train_data_founding) \
    .drop("specialties", "specialties_tokens", "filtered_specialties").cache()
test_data_specialties = specialties_model.transform(test_data_founding) \
    .drop("specialties", "specialties_tokens", "filtered_specialties").cache()


# Description embeddings from "about", "name", "slogan"
def create_text_pipeline(input_col, token_output_col, filtered_output_col):
    # Define tokenizers and stopwords removers for each column
    tokenizer = Tokenizer(inputCol=input_col, outputCol=token_output_col)
    stopwords_remover = StopWordsRemover(inputCol=token_output_col, outputCol=filtered_output_col)
    return tokenizer, stopwords_remover

about_tokenizer, about_stopwords_remover = create_text_pipeline("about", "about_tokens", "filtered_about")
name_tokenizer, name_stopwords_remover = create_text_pipeline("name", "name_tokens", "filtered_name")
slogan_tokenizer, slogan_stopwords_remover = create_text_pipeline("slogan", "slogan_tokens", "filtered_slogan")

# Word2Vec models of each column
about_word2vec = Word2Vec(inputCol="filtered_about", outputCol="about_vector", vectorSize=100, minCount=2)
name_word2vec = Word2Vec(inputCol="filtered_name", outputCol="name_vector", vectorSize=100, minCount=2)
slogan_word2vec = Word2Vec(inputCol="filtered_slogan", outputCol="slogan_vector", vectorSize=100, minCount=2)

# Pipelines of each column
about_pipeline = Pipeline(stages=[about_tokenizer, about_stopwords_remover, about_word2vec])
name_pipeline = Pipeline(stages=[name_tokenizer, name_stopwords_remover, name_word2vec])
slogan_pipeline = Pipeline(stages=[slogan_tokenizer, slogan_stopwords_remover, slogan_word2vec])

# Fit the models to the data
about_model = about_pipeline.fit(train_data_specialties)
name_model = name_pipeline.fit(train_data_specialties)
slogan_model = slogan_pipeline.fit(train_data_specialties)

train_data_with_vectors = about_model.transform(train_data_specialties) \
    .drop("about", "about_tokens", "filtered_about")
test_data_with_vectors = about_model.transform(test_data_specialties) \
    .drop("about", "about_tokens", "filtered_about")
train_data_with_vectors = name_model.transform(train_data_with_vectors) \
    .drop("name", "name_tokens", "filtered_name")
test_data_with_vectors = name_model.transform(test_data_with_vectors) \
    .drop("name", "name_tokens", "filtered_name")
train_data_with_vectors = slogan_model.transform(train_data_with_vectors) \
    .drop("slogan", "slogan_tokens", "filtered_slogan")
test_data_with_vectors = slogan_model.transform(test_data_with_vectors) \
    .drop("slogan", "slogan_tokens", "filtered_slogan")

train_data_with_vectors.write.parquet("dbfs:/tmp/train_data_with_vectors.parquet", mode="overwrite")
test_data_with_vectors.write.parquet("dbfs:/tmp/test_data_with_vectors.parquet", mode="overwrite")

train_data_description = spark.read.parquet("dbfs:/tmp/train_data_with_vectors.parquet")
test_data_description = spark.read.parquet("dbfs:/tmp/test_data_with_vectors.parquet")

def average_vectors(about_vec, name_vec, slogan_vec):
    about_vec = about_vec.toArray().tolist() if isinstance(about_vec, DenseVector) else about_vec
    name_vec = name_vec.toArray().tolist() if isinstance(name_vec, DenseVector) else name_vec
    slogan_vec = slogan_vec.toArray().tolist() if isinstance(slogan_vec, DenseVector) else slogan_vec
    
    # Filter out None vectors
    vectors = [v for v in [about_vec, name_vec, slogan_vec] if v is not None]
    if not vectors:
        return None
    
    # Initialize an empty vector
    vector_length = len(vectors[0])
    avg_vector = [0.0] * vector_length
    # Sum all vectors element-wise
    for vec in vectors:
        for i in range(vector_length):
            avg_vector[i] += vec[i]
    # Divide by the number of vectors
    num_vectors = len(vectors)
    avg_vector = [x / num_vectors for x in avg_vector]
    return avg_vector

average_udf = udf(average_vectors, ArrayType(FloatType()))

train_data_description = train_data_description.withColumn(
    "description_vector", average_udf("about_vector", "name_vector", "slogan_vector")
).drop("about_vector", "name_vector", "slogan_vector")
test_data_description = test_data_description.withColumn(
    "description_vector", average_udf("about_vector", "name_vector", "slogan_vector")
).drop("about_vector", "name_vector", "slogan_vector")

# Use UDF to convert description_vector to DenseVector
def array_to_vector(arr):
    if arr is not None:
        return Vectors.dense(arr)
    return None
array_to_vector_udf = udf(array_to_vector, VectorUDT())
train_data_description = train_data_description.withColumn("description_vector", array_to_vector_udf("description_vector"))
test_data_description = test_data_description.withColumn("description_vector", array_to_vector_udf("description_vector"))

In [0]:
from pyspark.sql.functions import udf, when, size, col, array, lit, concat_ws
from pyspark.sql.types import ArrayType, StringType, FloatType
from pyspark.ml.feature import Word2Vec, Tokenizer, StringIndexer
from pyspark.ml.linalg import Vectors
import numpy as np


# Affiliated column processing
def extract_subtitles(affiliated):
    return [item['subtitle'] for item in affiliated if 'subtitle' in item and item['subtitle']]
extract_subtitles_udf = udf(extract_subtitles, ArrayType(StringType()))

# Extract non-null subtitles from the array of dictionaries
train_data_affiliated = train_data_description.withColumn("affiliated_subtitles", extract_subtitles_udf(train_data_description["affiliated"])).drop("affiliated")
test_data_affiliated = test_data_description.withColumn("affiliated_subtitles", extract_subtitles_udf(test_data_description["affiliated"])).drop("affiliated")

# Cocatenating all subtitles into a single string
train_data_affiliated = train_data_affiliated.withColumn("affiliated_subtitles", 
    when(size(train_data_affiliated["affiliated_subtitles"]) == 0, array(lit("missing")))
    .otherwise(train_data_affiliated["affiliated_subtitles"]))
train_data_affiliated = train_data_affiliated.withColumn("affiliated_subtitles_string", concat_ws(" ", train_data_affiliated["affiliated_subtitles"])).drop("affiliated_subtitles")

test_data_affiliated = test_data_affiliated.withColumn("affiliated_subtitles",
    when(size(test_data_affiliated["affiliated_subtitles"]) == 0, array(lit("missing")))
    .otherwise(test_data_affiliated["affiliated_subtitles"]))
test_data_affiliated = test_data_affiliated.withColumn("affiliated_subtitles_string", concat_ws(" ", test_data_affiliated["affiliated_subtitles"])).drop("affiliated_subtitles")

tokenizer = Tokenizer(inputCol="affiliated_subtitles_string", outputCol="affiliated_words")
train_data_affiliated = tokenizer.transform(train_data_affiliated).drop("affiliated_subtitles_string")
test_data_affiliated = tokenizer.transform(test_data_affiliated).drop("affiliated_subtitles_string")

# Train a Word2Vec model on the tokenized words
word2Vec = Word2Vec(vectorSize=100, minCount=0, inputCol="affiliated_words", outputCol="affiliated_embeddings")
model = word2Vec.fit(train_data_affiliated)
train_data_affiliated = model.transform(train_data_affiliated).drop("affiliated_words")
test_data_affiliated = model.transform(test_data_affiliated).drop("affiliated_words")


# Employees column processing
train_data_employees = train_data_affiliated.withColumn("employee_subtitles", extract_subtitles_udf(train_data_affiliated["employees"])).drop("employees")
test_data_employees = test_data_affiliated.withColumn("employee_subtitles", extract_subtitles_udf(test_data_affiliated["employees"])).drop("employees")

# Cocatenating all subtitles into a single string
train_data_employees = train_data_employees.withColumn("employee_subtitles",
    when(size(train_data_employees["employee_subtitles"]) == 0, array(lit("missing")))
    .otherwise(train_data_employees["employee_subtitles"]))
train_data_employees = train_data_employees.withColumn("employee_subtitles_string", concat_ws(" ", train_data_employees["employee_subtitles"])).drop("employee_subtitles")

test_data_employees = test_data_employees.withColumn("employee_subtitles",
    when(size(test_data_employees["employee_subtitles"]) == 0, array(lit("missing")))
    .otherwise(test_data_employees["employee_subtitles"]))
test_data_employees = test_data_employees.withColumn("employee_subtitles_string", concat_ws(" ", test_data_employees["employee_subtitles"])).drop("employee_subtitles")

tokenizer = Tokenizer(inputCol="employee_subtitles_string", outputCol="employee_words") 
train_data_employees = tokenizer.transform(train_data_employees).drop("employee_subtitles_string")
test_data_employees = tokenizer.transform(test_data_employees).drop("employee_subtitles_string")

# Train the Word2Vec model on the tokenized words
word2Vec = Word2Vec(vectorSize=100, minCount=0, inputCol="employee_words", outputCol="employee_embeddings")
model = word2Vec.fit(train_data_employees)
train_data_employees = model.transform(train_data_employees).drop("employee_words")
test_data_employees = model.transform(test_data_employees).drop("employee_words")


# Headquarters location
location_indexer = StringIndexer(inputCol="headquarters", outputCol="location_index", handleInvalid="keep")
train_data_location = location_indexer.fit(train_data_employees).transform(train_data_employees) \
    .drop("headquarters")
test_data_location = location_indexer.fit(train_data_employees).transform(test_data_employees) \
    .drop("headquarters")

In [0]:
from pyspark.sql.functions import expr, col, to_date, datediff, year, lit, udf, coalesce, struct, log1p
from pyspark.ml import Pipeline
import numpy as np
from datetime import datetime
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import ArrayType, DoubleType


# Popularity calculation according to the number of comments and likes
train_data_popularity = train_data_location.withColumn(
    "popularity",
    expr("aggregate(updates, cast(0 as BIGINT), (acc, x) -> acc + coalesce(x.comments_count, 0) + coalesce(x.likes_count, 0))")
).drop("updates")
test_data_popularity = test_data_location.withColumn(
    "popularity", 
    expr("aggregate(updates, cast(0 as BIGINT), (acc, x) -> acc + coalesce(x.comments_count, 0) + coalesce(x.likes_count, 0))")
).drop("updates")


# Log-transform the number of followers
train_data_followers = train_data_popularity.withColumn("followers_log", log1p(col("followers"))).drop("followers")
test_data_followers = test_data_popularity.withColumn("followers_log", log1p(col("followers"))).drop("followers")


# Organization type encoding
train_data_org_types = train_data_followers.withColumn("type", col("type").cast("string"))
test_data_org_types = test_data_followers.withColumn("type", col("type").cast("string"))

organization_type_indexer = StringIndexer(inputCol="type", outputCol="organization_type_index", handleInvalid="keep")
types_model = organization_type_indexer.fit(train_data_org_types)
train_data_org_types = types_model.transform(train_data_org_types).drop("type")
test_data_org_types = types_model.transform(test_data_org_types).drop("type")


# Funding column
train_data_funding = train_data_org_types.withColumn(
    "funding",
    struct(
        coalesce(col("funding.last_round_type"), lit("Unknown")).alias("last_round_type"),  # Replace NULL with 'Unknown'
        coalesce(col("funding.last_round_date"), to_date(lit("1900-01-01"), "yyyy-MM-dd")).alias("last_round_date"),
        coalesce(col("funding.rounds"), lit(0)).alias("rounds")
    )
)
test_data_funding = test_data_org_types.withColumn(
    "funding", 
    struct(
        coalesce(col("funding.last_round_type"), lit("Unknown")).alias("last_round_type"),  # Replace NULL with 'Unknown'
        coalesce(col("funding.last_round_date"), to_date(lit("1900-01-01"), "yyyy-MM-dd")).alias("last_round_date"),
        coalesce(col("funding.rounds"), lit(0)).alias("rounds")
    )
)

# Encode last round type
train_data_funding = train_data_funding.withColumn("last_round_type", col("funding.last_round_type"))
test_data_funding = test_data_funding.withColumn("last_round_type", col("funding.last_round_type"))

round_type_indexer = StringIndexer(inputCol="last_round_type", outputCol="last_round_type_index", handleInvalid="keep")
round_type_model = round_type_indexer.fit(train_data_funding)
train_data_funding = round_type_model.transform(train_data_funding).drop("last_round_type")
test_data_funding = round_type_model.transform(test_data_funding).drop("last_round_type")

# Calculate years since last funding round
current_year = datetime.now().year
train_data_funding = train_data_funding.withColumn("years_since_last_round", 
    (current_year - year(col("funding.last_round_date"))).cast("double"))
test_data_funding = test_data_funding.withColumn("years_since_last_round", 
    (current_year - year(col("funding.last_round_date"))).cast("double"))

# Use funding rounds as-is
train_data_funding = train_data_funding.withColumn("funding_rounds", col("funding.rounds")).drop("funding")
test_data_funding = test_data_funding.withColumn("funding_rounds", col("funding.rounds")).drop("funding")


# Country code indexing
country_code_indexer = StringIndexer(inputCol="country_code", outputCol="country_code_index", handleInvalid="keep")
train_data_country = country_code_indexer.fit(train_data_funding).transform(train_data_funding) \
    .drop("country_code")
test_data_country = country_code_indexer.fit(test_data_funding).transform(test_data_funding) \
    .drop("country_code")


In [0]:
from pyspark.ml.feature import VectorAssembler, StringIndexer

# Apply VectorAssembler
vector_assembler = VectorAssembler(
    inputCols=[
        "country_code_index", "employees_in_linkedin", "followers_log", "location_index",
        "company_size_one_hot", "num_years", "specialties_vector", "description_vector",
        "affiliated_embeddings", "employee_embeddings", "popularity", 
        "organization_type_index", "last_round_type_index", "years_since_last_round", 
        "funding_rounds"
    ],
    outputCol="features"
)

train_transformed = vector_assembler.transform(train_data_country)
test_transformed = vector_assembler.transform(test_data_country)

indexer = StringIndexer(inputCol="meta_industry", outputCol="label_idx")
train_transformed = indexer.fit(train_transformed).transform(train_transformed)

# Final dataset
train_final = train_transformed.select("id", "features", "meta_industry", "label_idx")
test_final = test_transformed.select("id", "features")

In [0]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import matplotlib.pyplot as plt
import pandas as pd

# Split data into training and val sets
train, validation = train_final.randomSplit([0.8, 0.2], seed=1234)

# LogisticRegression classifier
lr = LogisticRegression(labelCol="label_idx", featuresCol="features", maxIter=10, regParam=0.1)
lr_model = lr.fit(train)

# Make predictions on the validation dataset
predictions = lr_model.transform(validation)

# Evaluate the model, and compute weighted precision, recall, and F1 score
evaluator = MulticlassClassificationEvaluator(labelCol="label_idx", predictionCol="prediction")
weighted_precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
weighted_recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
f1_score = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})

print(f"Weighted Precision: {weighted_precision:.3f}")
print(f"Weighted Recall: {weighted_recall:.3f}")
print(f"F1 Score: {f1_score:.3f}")  # Best F1 Score on val: 0.503

In [0]:
"""
# Plot to show the influence of every feature on the prediction of each class.

# Retrieve class names (meta-industry) corresponding to each label
class_mapping = train_final.select("label_idx", "meta_industry").distinct().toPandas()
class_mapping = class_mapping.sort_values(by="label_idx")  # Ensure classes are sorted by label
class_names = class_mapping["meta_industry"].tolist()

# Coefficient matrix
coefficient_matrix = lr_model.coefficientMatrix

# Convert it to a DataFrame
coefficients_df = pd.DataFrame(
    coefficient_matrix.toArray(),
    columns=["Feature " + str(i) for i in range(coefficient_matrix.numCols)]
)
coefficients_df["Class"] = class_names  # Use class names instead of indices
transposed_df = coefficients_df.set_index("Class").transpose()

# Plot the coefficients
plt.figure(figsize=(12, 8))
for class_name in class_names:
    plt.plot(transposed_df.index, transposed_df[class_name], label=class_name)

plt.title("Feature Coefficients by Meta-Industry Class (Logistic Regression)")
plt.xlabel("Features")
plt.ylabel("Coefficient Value")
plt.xticks(
    ticks=range(0, len(transposed_df.index), max(1, len(transposed_df.index) // 20)),  # Show every 20th feature
    labels=transposed_df.index[::max(1, len(transposed_df.index) // 20)],  # Corresponding labels
    rotation=45,  # Rotate labels for readability
    ha="right"  # Align labels to the right
)
plt.legend(title="Meta-Industry", loc='lower right')
plt.tight_layout()
plt.show()

"""

In [0]:
"""
# Plot confusion matrix

import seaborn as sns
import numpy as np

# Create confusion matrix
pred_labels = predictions.selectExpr("CAST(label_idx AS INT) AS label_idx", "CAST(prediction AS INT) AS prediction")
conf_matrix = pred_labels.groupBy("label_idx", "prediction").count().toPandas()

# Pivot the confusion matrix and replace NaN with 0
conf_matrix_pivot = conf_matrix.pivot(index="label_idx", columns="prediction", values="count").fillna(0).astype(int)

# Create heatmap
plt.figure(figsize=(10, 7))
sns.heatmap(conf_matrix_pivot, annot=True, fmt='d', cmap='Blues')
plt.title("Confusion Matrix")
plt.ylabel("True Label")
plt.xlabel("Predicted Label")
plt.show()

"""


In [0]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType


# Retrieve class names (meta-industry) corresponding to each label
class_mapping = train_final.select("label_idx", "meta_industry").distinct()
label_to_meta_industry = {row["label_idx"]: row["meta_industry"] for row in class_mapping.collect()}

def get_meta_industry(label):
    # Map label indices to meta-industry names
    return label_to_meta_industry.get(label, "Unknown")
meta_industry_udf = udf(get_meta_industry, StringType())

# Predict on the test set
test_predictions = lr_model.transform(test_final)

final_results = test_predictions.withColumn("label", meta_industry_udf(col("prediction"))) \
                                .select("id", "label")

final_results.display()


#### Analysis Report 

###### Write here a textual summary of all the steps you implemented:
- Feature engineering and selection methods 
- Preprocessing pipeline 
- Model selection
- Error analysis and Class-wise Performance Metrics 