# Install Spark, PySpark and Java

In [None]:
print("Starting Python Notebook")

In [None]:
!brew install apache-spark

In [None]:
print("Completed installing apache-spark")


In [None]:
!brew install openjdk@11


 The above Java installation doesn't work immediately. If you do a "re-install" it tells which commands need to be run to fix the path variable etc.

(echo; echo 'eval "$(/opt/homebrew/bin/brew shellenv)"') >> /Users/ganapathychidambaram/.zprofile

   
eval "$(/opt/homebrew/bin/brew shellenv)"

In [None]:
!which java


In [None]:
!export JAVA_HOME=/usr
!export PATH=$JAVA_HOME/bin:$PATH

In [None]:
!java --version

In [None]:
!export SPARK_HOME=/opt/homebrew/Cellar/apache-spark/3.3.0/libexec
!export PATH=/opt/homebrew/Cellar/apache-spark/3.3.0/bin:$PATH

# looks the case of the JAVA_HOME variable made all the difference    

!export PYSPARK_SUBMIT_ARGS="--master local[3] pyspark-shell"

In [None]:
!pip3 install pyspark
!pip3 install findspark

In [None]:
import findspark
findspark.init()

In [None]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# Start Loading Data

### Load the applications data

In [None]:
applications_df = spark.read.format("csv").option("header", "true").option("delimiter", "\t").load("/Users/ganapathychidambaram/Desktop/predixions/apps.tsv")

In [None]:
applications_df.show()

In [None]:
applications_df.describe().show()

pre-processing = Drop null values

In [None]:
applications_df = applications_df.dropna()

In [None]:
applications_df.describe().show()

### Load the users data

In [None]:
users_df = spark.read.format("csv").option("header", "true").option("delimiter", "\t").load("/Users/ganapathychidambaram/Desktop/predixions/users.tsv")

In [None]:
users_df.show()

##### Data Analysis (EDA) on the users data

In [None]:
unique_values = users_df.select('DegreeType').distinct()
unique_values.show()

In [None]:
unique_values = users_df.select('Major').distinct()
print(unique_values.count())

In [None]:
from pyspark.sql.functions import col, when
users_df.filter(col('Major') == "Not Applicable").count()

In [None]:
users_df.filter(col('Major') == "None").count()

In [None]:
users_df.filter(col('Major').isNull()).count()

In [None]:
users_df.filter(col('Major') == "null").count()

In [None]:
users_df.filter(col('CurrentlyEmployed').isNull()).count()

Pre-processing - convert None to Not Applicable; Category Indexing, Normalizing etc.

In [None]:
# Replace null values with 'No' in a specific column (e.g., 'ColumnName')

users_df = users_df.na.fill('No', ['CurrentlyEmployed', 'ManagedOthers'])

In [None]:
unique_values = users_df.select('CurrentlyEmployed').distinct()
unique_values.show()

unique_values = users_df.select('ManagedOthers').distinct()
unique_values.show()


In [None]:
users_df = users_df.withColumn('Major', when(col('Major') == 'None', 'Not Applicable').otherwise(col('Major')))

In [None]:
users_df.count()

In [None]:
!pip3 install numpy

In [None]:
from pyspark.ml.feature import StringIndexer

string_indexer = StringIndexer(inputCol='DegreeType', outputCol='DegreeType_encoded', handleInvalid='skip')
users_df_encoded = string_indexer.fit(users_df).transform(users_df)

users_df_encoded.show()

In [None]:

# Numerically encode these columns ['City', 'State', 'Country', 'CurrentlyEmployed', 'ManagedOthers']
categorical_columns = ['City', 'State', 'Country', 'CurrentlyEmployed', 'ManagedOthers']

for cat_col in categorical_columns:
    encoded_column = cat_col + "_encoded"
    string_indexer = StringIndexer(inputCol=cat_col, outputCol=encoded_column, handleInvalid='skip')
    users_df_encoded = string_indexer.fit(users_df_encoded).transform(users_df_encoded)

# Show the resulting DataFrame with the encoded column
users_df_encoded.show()

In [None]:
columns_to_drop = ['City', 'State', 'Country', 'CurrentlyEmployed', 'ManagedOthers', 'DegreeType']
users_df_encoded = users_df_encoded.drop(*columns_to_drop)
users_df_encoded.show()

In [None]:
from pyspark.sql.functions import count, lit

# Calculate the frequency of each unique value in the 'Major' column
major_frequencies = users_df_encoded.groupBy('Major').agg(count(lit(1)).alias('major_frequency'))

# Join the frequency information back to the original DataFrame
users_df_encoded = users_df_encoded.join(major_frequencies, on='Major', how='left')

# Replace null values in the 'Frequency' column with 0
users_df_encoded = users_df_encoded.na.fill(0, ['major_frequency'])

users_df_encoded = users_df_encoded.drop('Major')
users_df_encoded.show()

In [None]:
users_df_encoded = users_df_encoded.withColumnRenamed("ZipCode","u_ZipCode").withColumnRenamed("City_encoded","u_City_encoded").withColumnRenamed("State_encoded", "u_State_encoded")
users_df_encoded = users_df_encoded.withColumnRenamed("Country_encoded", "u_Country_encoded")
users_df_encoded.printSchema()

### Load User History file

In [None]:
user_history_df = spark.read.format("csv").option("header", "true").option("delimiter", "\t").load("/Users/ganapathychidambaram/Desktop/predixions/user_history.tsv")

In [None]:
user_history_df.show()


In [None]:
unique_values = user_history_df.select('JobTitle').distinct()
unique_values.count()

In [None]:
user_history_df.count()

#### Do Category encoding on the Job Titles - Encode them by Frequency

In [None]:
job_title_frequencies = user_history_df.groupBy('JobTitle').agg(count(lit(1)).alias('job_title_freq'))
job_title_frequencies.show()

In [None]:
# Join the frequency information back to the original DataFrame
user_history_df = user_history_df.join(job_title_frequencies, on='JobTitle', how='left')

# Replace null values in the 'Frequency' column with 0
user_history_df = user_history_df.na.fill(0, ['job_title_freq'])

# Drop the original 'JobTitle' column
user_history_df = user_history_df.drop('JobTitle')

# Show the resulting DataFrame
user_history_df.show()

#### Flatten the data frame, such that there is one user per row with multiple job titles

In [None]:
from pyspark.sql.functions import collect_list, first

compressed_user_hist_df = user_history_df.groupBy('UserID').agg(
    collect_list('Sequence').alias('Sequence'),
    collect_list('job_title_freq').alias('job_title_freq'),
    first('WindowID').alias('WindowID'),
    first('Split').alias('Split')
)

In [None]:
compressed_user_hist_df.show()

### Load jobs file

In [None]:
jobs_df = spark.read.format("csv").option("header", "true").option("delimiter", "\t").load("/Users/ganapathychidambaram/Desktop/predixions/jobs.tsv")

In [None]:
jobs_df.show()

In [None]:
# Replace null values in 'Title' column with an empty string
jobs_df = jobs_df.withColumn("Title", when(jobs_df["Title"].isNull(), "").otherwise(jobs_df["Title"]))

# Replace null values in 'Description' and 'Requirements' columns
jobs_df = jobs_df.fillna({"Description": "<p>No description available</p>", "Requirements": "<p>No requirements available</p>"})

# Show the resulting DataFrame
jobs_df.show()

Since Description and Requirments are long texts, do TF-IDF encoding to represent them. TBD: Check if this good.

In [None]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.sql.functions import col, concat, lit

# Concatenate the text from different columns into a single column
jobs_df = jobs_df.withColumn("combined_text", concat(col("Description"), lit(" "), col("Requirements")))

# Tokenize the combined text
tokenizer = Tokenizer(inputCol="combined_text", outputCol="tokens")
jobs_df = tokenizer.transform(jobs_df)

# Remove stopwords
stopwords_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")
jobs_df = stopwords_remover.transform(jobs_df)

# Compute Term Frequencies
count_vectorizer = CountVectorizer(inputCol="filtered_tokens", outputCol="tf_features")
count_vectorizer_model = count_vectorizer.fit(jobs_df)
jobs_df = count_vectorizer_model.transform(jobs_df)

# Compute Inverse Document Frequencies
idf = IDF(inputCol="tf_features", outputCol="tfidf_features")
idf_model = idf.fit(jobs_df)
jobs_df = idf_model.transform(jobs_df)

# Drop intermediate columns
jobs_df = jobs_df.drop("combined_text", "tokens", "filtered_tokens", "tf_features")

# Drop the source columns
jobs_df = jobs_df.drop("Title", "Description", "Requirements")


# Show the resulting DataFrame
jobs_df.show()


In [None]:
for col_name in jobs_df.columns:
    null_count = jobs_df.filter(col(col_name).isNull()).count()
    if null_count > 0:
        print(f"Column '{col_name}' has {null_count} null value(s).")

In [None]:
# Replace null zip codes with a default zip code (e.g., '00000')
jobs_df = jobs_df.withColumn('Zip5', coalesce('Zip5', lit('00000')))
jobs_df = jobs_df.withColumn('EndDate', coalesce('EndDate', lit('1970-01-01')))

In [None]:
# Numerically encode these columns ['City', 'State', 'Country', 'CurrentlyEmployed', 'ManagedOthers']
categorical_columns = ['City', 'State', 'Country']

for cat_col in categorical_columns:
    encoded_column = cat_col + "_encoded"
    string_indexer = StringIndexer(inputCol=cat_col, outputCol=encoded_column, handleInvalid='skip')
    jobs_df = string_indexer.fit(jobs_df).transform(jobs_df)

columns_to_drop = ['City', 'State', 'Country']
jobs_df = jobs_df.drop(*columns_to_drop)
# Show the resulting DataFrame with the encoded column
jobs_df.show()

# Merge the Dataframes

In [None]:
# Join based on three columns
# Alias the columns in the first DataFrame
users_df_encoded = users_df_encoded.alias('users_df_encoded')
df1_column1 = col('users_df_encoded.UserID')
df1_column2 = col('users_df_encoded.WindowID')
df1_column3 = col('users_df_encoded.Split')

# Alias the columns in the second DataFrame
compressed_user_hist_df = compressed_user_hist_df.alias('compressed_user_hist_df')
df2_column1 = col('compressed_user_hist_df.UserID')
df2_column2 = col('compressed_user_hist_df.WindowID')
df2_column3 = col('compressed_user_hist_df.Split')



merged_users_df_encoded = users_df_encoded.join(compressed_user_hist_df, on=[df1_column1 == df2_column1,
                             df1_column2 == df2_column2,
                             df1_column3 == df2_column3],
                     how='left')

# Drop the duplicate columns from the second DataFrame
columns_to_drop = ['UserID', 'WindowID', 'Split'] 
for col_name in columns_to_drop:
    merged_users_df_encoded = merged_users_df_encoded.drop(compressed_user_hist_df[col_name])

# Show the resulting DataFrame
merged_users_df_encoded.show()

In [None]:
#merged_users_df_encoded = merged_users_df_encoded.withColumn('TotalYearsExperience', when(df['TotalYearsExperience'].isNull(), lit(0)).otherwise(df['TotalYearsExperience']))
from pyspark.sql.functions import coalesce, array

merged_users_df_encoded = merged_users_df_encoded.fillna({'TotalYearsExperience': 0})
# Impute nulls in the 'major_frequency' column with a default value (e.g., 0)
merged_users_df_encoded = merged_users_df_encoded.fillna({'major_frequency': 0})


# Replace None with an empty list for the list columns
merged_users_df_encoded = merged_users_df_encoded.withColumn('Sequence', coalesce('Sequence', array()))
merged_users_df_encoded = merged_users_df_encoded.withColumn('job_title_freq', coalesce('job_title_freq', array()))

merged_users_df_encoded.show()


In [None]:
for col_name in merged_users_df_encoded.columns:
    null_count = merged_users_df_encoded.filter(col(col_name).isNull()).count()
    if null_count > 0:
        print(f"Column '{col_name}' has {null_count} null value(s).")

In [None]:
# Replace null zip codes with a default zip code (e.g., '00000')
merged_users_df_encoded = merged_users_df_encoded.withColumn('u_ZipCode', coalesce('u_ZipCode', lit('00000')))
merged_users_df_encoded = merged_users_df_encoded.withColumn('GraduationDate', coalesce('GraduationDate', lit('1970-01-01')))

In [None]:
for col_name in applications_df.columns:
    null_count = applications_df.filter(col(col_name).isNull()).count()
    if null_count > 0:
        print(f"Column '{col_name}' has {null_count} null value(s).")

In [None]:
applications_df.printSchema()
merged_users_df_encoded.printSchema()

In [None]:
applications_df = applications_df.alias('applications_df')
df_left_column1 = col('applications_df.UserID')
df_left_column2 = col('applications_df.WindowID')
df_left_column3 = col('applications_df.Split')

# Alias the columns in the second DataFrame
merged_users_df_encoded = merged_users_df_encoded.alias('merged_users_df_encoded')
df_right_column1 = col('merged_users_df_encoded.UserID')
df_right_column2 = col('merged_users_df_encoded.WindowID')
df_right_column3 = col('merged_users_df_encoded.Split')

merged_app_df = applications_df.join(merged_users_df_encoded, on=[df_left_column1 == df_right_column1,
                             df_left_column2 == df_right_column2,
                             df_left_column3 == df_right_column3],
                     how='left')


# Drop the duplicate columns from the second DataFrame
columns_to_drop = ['UserID', 'WindowID', 'Split'] 
for col_name in columns_to_drop:
    merged_app_df = merged_app_df.drop(merged_users_df_encoded[col_name])
  

# Show the resulting DataFrame
merged_app_df.show()


In [None]:
merged_app_df = merged_app_df.alias('merged_app_df')
df_left_column1 = col('merged_app_df.JobID')
df_left_column2 = col('merged_app_df.WindowID')


# Alias the columns in the second DataFrame
jobs_df = jobs_df.alias('jobs_df')
df_right_column1 = col('jobs_df.JobID')
df_right_column2 = col('jobs_df.WindowID')


total_df = merged_app_df.join(jobs_df, on=[df_left_column1 == df_right_column1, df_left_column2 == df_right_column2,],
                              how='left')


# Drop the duplicate columns from the second DataFrame
columns_to_drop = ['JobID', 'WindowID'] 
for col_name in columns_to_drop:
    total_df = total_df.drop(jobs_df[col_name])
  

# Show the resulting DataFrame
total_df.show()


# Select the features and do Test vs Train Split

In [None]:
# TODO: You have omitted the application date as a feature. Check if it can be used a difference from the window start date
features_list = ['UserID', 'WindowID', 'Split', 'JobID', 'u_ZipCode','GraduationDate', 'WorkHistoryCount', 'TotalYearsExperience', 'ManagedHowMany', 'DegreeType_encoded', 'u_City_encoded', 'u_State_encoded', 'u_Country_encoded', 'CurrentlyEmployed_encoded', 'ManagedOthers_encoded', 'major_frequency', 'Sequence', 'job_title_freq', 'Zip5', 'StartDate', 'EndDate', 'tfidf_features', 'City_encoded', 'State_encoded', 'Country_encoded']

In [None]:
features_df = total_df[features_list]

#TODO: Casting to Integer seems too hacky - consider encoding this
features_df = features_df.withColumn("UserID", features_df["UserID"].cast("integer"))
features_df = features_df.withColumn("JobID", features_df["JobID"].cast("integer"))

#features_df = features_df.withColumn("UserWindowID", concat(col("UserID"), lit("_"), col("WindowID")))
train_features_df = features_df.filter(features_df['Split'] == 'Train')
test_features_df = features_df.filter(features_df['Split'] == 'Test')

train_features_df = train_features_df.drop('Split')
test_features_df = test_features_df.drop('Split')

# Run Collaborative filtering at the Window level

In [None]:
unique_windows = train_features_df.select('WindowID').distinct().rdd.flatMap(lambda x: x).collect()

for window_id in unique_windows:
    window_train_df = train_features_df.filter(train_features_df['WindowID'] == window_id)
    window_test_df = test_features_df.filter(test_features_df['WindowID'] == window_id)
    """
    Do Collaborative filtering here

    Create indexers for user and item IDs
    user_indexer = StringIndexer(inputCol=“userId”, outputCol=“userIndex”) 
    item_indexer = StringIndexer(inputCol=“jobId”, outputCol=“itemIndex”)

    Create one-hot encoders for user and item indices
    user_encoder = OneHotEncoder(inputCol=“userIndex”, outputCol=“userVec”) 
    item_encoder = OneHotEncoder(inputCol=“itemIndex”, outputCol=“itemVec”)

    Create a vector assembler for user and item features
    assembler = VectorAssembler(inputCols=[“userVec”, “user_feat1”, “user_feat2”, “user_feat3”, “itemVec”, “job_feat1”, “job_feat2”, “job_feat3”], outputCol=“features”)

    Create an FM regressor
    fm = FMRegressor(featuresCol=“features”, labelCol=“rating”)
    TODO: Do we need a default rating of 1 here?

    Create a pipeline to process the data and fit the model
    from pyspark.ml import Pipeline pipeline = Pipeline(stages=[user_indexer, item_indexer, user_encoder, item_encoder, assembler, fm]) model = pipeline.fit(train_df)

    Make predictions on the test data
    predictions = model.transform(test_df)

    Evaluate the model using RMSE metric
    evaluator = RegressionEvaluator(metricName=“rmse”, labelCol=“rating”, predictionCol=“prediction”) rmse = evaluator.evaluate(predictions) print(f"Root-mean-square error = {rmse}")

    # Generate recommendations for all users
    userRecs = model.recommendForAllUsers(5)

    # Generate top 5 recommendations for a specific user (replace 'userId' with the actual user ID)
    user_id = 1
    userRecs.filter(userRecs['UserID'] == user_id).select('recommendations').show()

    """