In [66]:
import pandas as pd

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import  *
from pyspark.sql.types import StringType
from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.feature import VectorAssembler

from sklearn.cluster import KMeans, AgglomerativeClustering
from sklearn.metrics import silhouette_score, calinski_harabasz_score


## Data Preprocessing

In [2]:
# load dataframe from raw data
gcp_storage_path = 'gs://lance-bucket/all_reviews.csv'
sdf = spark.read.csv(gcp_storage_path,
                        sep=",",                # Specify the delimiter (default is comma)
                        header=True,           # Use the first line as header
                        inferSchema=True,      # Automatically infer the schema of the DataFrame
                        escape='"',            # Character used to escape quotes
                        nullValue="NULL",      # Specify what represents a null value
                        quote='"',             # Character used for quoting
                        mode="DROPMALFORMED"   # Ignore malformed lines
                    )

# create clean date & month column
sdf = sdf.withColumn("created_date", to_date(trim(sdf['date']), 'MMM d, yyyy'))
sdf = sdf.drop('date')
sdf = sdf.withColumn("created_month", date_format("created_date", "yyyy-MM"))

# Extract firm from firm_link using regex
sdf = sdf.withColumn("extracted_text", regexp_extract("firm_link", r"Reviews/(.*?)\.htm", 1))
sdf = sdf.withColumn("firm", regexp_extract("extracted_text", r"^(.*?)-Reviews", 1))
sdf = sdf.drop('extracted_text')

                                                                                

In [6]:
# Filtering to useful records
clean_sdf = sdf.filter(
                (sdf['rating'].isNotNull())
                & (sdf['pros'].isNotNull())
                & (sdf['cons'].isNotNull())
                & (sdf['firm'].isNotNull())
            )

In [9]:
# write parquet
clean_sdf.write.format("parquet").mode("overwrite").save("gs://lance-bucket/clean_reviews_processed_parquet")

                                                                                

## Data Modelling

In [7]:
# load parquet
gcp_storage_path = 'gs://lance-bucket/clean_reviews_processed_parquet'
clean_sdf = spark.read.parquet(gcp_storage_path,
                        sep=",",                # Specify the delimiter (default is comma)
                        header=True,           # Use the first line as header
                        inferSchema=True,      # Automatically infer the schema of the DataFrame
                        escape='"',            # Character used to escape quotes
                        nullValue="NULL",      # Specify what represents a null value
                        quote='"',             # Character used for quoting
                        mode="DROPMALFORMED"   # Ignore malformed lines
                    )

[Stage 3:>                                                          (0 + 1) / 1]                                                                                

In [None]:
# aggregating by firm for average scores
agg_sdf = clean_sdf.groupBy('firm').agg(count('rating').alias('review_count')
                              , round(avg('Career Opportunities'),2).alias('opportunities')
                              , round(avg('Compensation and Benefits'),2).alias('compensation')
                              , round(avg('Senior Management'),2).alias('management')
                              , round(avg('Work/Life Balance'),2).alias('worklife_balance')
                              , round(avg('Culture & Values'),2).alias('culture')
                              , round(avg('Diversity & Inclusion'),2).alias('diversity')
                             ).orderBy('review_count', ascending=False)

# keep only firms with at least 100 reviews, and dop any firms without any of the scores
agg_sdf = agg_sdf.filter(col('review_count')>=100).dropna()

In [None]:
## Performing BKM clustering

# Select specific columns for clustering
X_sdf = agg_sdf.select('opportunities', 'compensation', 'management', 'worklife_balance', 'culture', 'diversity')

# Create a VectorAssembler to combine the features into a single column
assembler = VectorAssembler(inputCols=['opportunities', 'compensation', 'management', 'worklife_balance', 'culture', 'diversity'], outputCol='features')
features_df = assembler.transform(X_sdf)

# Initialize Bisecting K-Means
bkm = BisectingKMeans(k=4, minDivisibleClusterSize=1.0, maxIter=20)

# Fit the model on the features DataFrame
model = bkm.fit(features_df)

# Get the cluster predictions
predictions = model.transform(features_df)

In [60]:
# converting spark dataframe to pandas
df = agg_sdf.toPandas()

# creating training dataset
X = df.drop(columns='review_count').set_index('firm')

# K-Means Clustering
kmeans = KMeans(n_clusters=4, random_state=42)
df['kmeans_labels'] = kmeans.fit_predict(X)


# Agglomerative Clustering
agg_clustering = AgglomerativeClustering(n_clusters=4, linkage='ward')
df['agg_labels'] = agg_clustering.fit_predict(X)


# Append bkm predictions to pandas
bkm_labels = predictions.toPandas()
df['bkm_labels'] = bkm_labels['prediction']

  super()._check_params_vs_input(X, default_n_init=10)


In [70]:
# model evaluation via silhouette scores
kmeans_silhouette = silhouette_score(X, df['kmeans_labels'])
agg_silhouette = silhouette_score(X, df['agg_labels'])
bkm_silhouette = silhouette_score(X, df['bkm_labels'])

# model evaluation via calinski harabasz scores
kmeans_ch_score = calinski_harabasz_score(X, df['kmeans_labels'])
agg_ch_score = calinski_harabasz_score(X, df['agg_labels'])
bkm_ch_score = calinski_harabasz_score(X, df['bkm_labels'])


print(f"""
**Silhouette Scores**

Measures how similar each point is to its cluster compared to other clusters.
Values range from -1 to 1 (1 indicates dense, well-separated clusters).

kmeans: {kmeans_silhouette}
agg: {agg_silhouette}
bkm: {bkm_silhouette}

**Clinski-Harabasz Scores**

The ratio of the sum of between-cluster dispersion to within-cluster dispersion.
A higher score indicates better-defined clusters.

kmeans: {kmeans_ch_score}
agg: {agg_ch_score}
bkm: {bkm_ch_score}

""")



**Silhouette Scores**

Measures how similar each point is to its cluster compared to other clusters.
Values range from -1 to 1 (1 indicates dense, well-separated clusters).

kmeans: 0.24338014396503294
agg: 0.19155056420430813
bkm: 0.033337252036037254

**Clinski-Harabasz Scores**

The ratio of the sum of between-cluster dispersion to within-cluster dispersion.
A higher score indicates better-defined clusters.

kmeans: 4079.72302470041
agg: 3623.716263747032
bkm: 938.123266908293


