In [29]:
%pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [30]:
from pyspark.sql import SparkSession

# Initialize Spark session with increased memory
spark = SparkSession.builder \
    .appName("WARK Data Pipeline - Predict Subject Area") \
    .master("local[*]") \
    .config("spark.executor.memory", "16g") \
    .config("spark.driver.memory", "16g") \
    .getOrCreate()


# Load CSV data into a Spark DataFrame
data_path = "C:/Users/Achita/Documents/DSDE_Final_Project_WARK/Final/joined_2017-2023.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)

# Show the data schema
df.printSchema()
df.show(5)

root
 |-- citation_title: string (nullable = true)
 |-- abstracts: string (nullable = true)
 |-- authors: string (nullable = true)
 |-- authors_with_location_department: string (nullable = true)
 |-- affiliations: string (nullable = true)
 |-- classifications: string (nullable = true)
 |-- subject_area_name: string (nullable = true)
 |-- subject_area_code: string (nullable = true)
 |-- date: string (nullable = true)
 |-- citedby_count: string (nullable = true)
 |-- category: string (nullable = true)

+--------------------+--------------------+--------------------+--------------------------------+--------------------+---------------+--------------------+-----------------+----------+-------------+--------------------+
|      citation_title|           abstracts|             authors|authors_with_location_department|        affiliations|classifications|   subject_area_name|subject_area_code|      date|citedby_count|            category|
+--------------------+--------------------+-----------

In [31]:
# Drop the redundant column
# df = df.drop('authors')
df = df.drop('authors_with_location_department')
df = df.drop('affiliations')
df = df.drop('classifications')
df = df.drop('subject_area_name')
# df = df.drop('subject_area_code')
df = df.drop('date')
df = df.drop('citedby_count')

# Show the data schema
df.printSchema()
df.show(5)


root
 |-- citation_title: string (nullable = true)
 |-- abstracts: string (nullable = true)
 |-- authors: string (nullable = true)
 |-- subject_area_code: string (nullable = true)
 |-- category: string (nullable = true)

+--------------------+--------------------+--------------------+-----------------+--------------------+
|      citation_title|           abstracts|             authors|subject_area_code|            category|
+--------------------+--------------------+--------------------+-----------------+--------------------+
|Mödruvallabók, AM...|The ultimate goal...|Arkel-de Leeuw va...|                -|               Other|
|  Energy and society|                   -|                   -|                -|               Other|
|PAHs in polystyre...|Eight low-ring PA...|Li Si-Qi; Ni Hong...|                -|Medicine and Heal...|
|Techniques in int...|This article trac...| Sivakumaran Sandesh|                -|               Other|
|The responsibilit...|While histories o...|      Gl

In [33]:
from pyspark.sql.functions import col, concat_ws
%pip install setuptools

# Replace null or missing values
df_cleaned = df.fillna({
    'citation_title': '-',
    'abstracts': '-',
    'authors': '-',
    'subject_area_code' : '-',
    'category': '-',
})

# Combine relevant fields into a single feature
df_cleaned = df_cleaned.withColumn(
    "features_combined", 
    concat_ws("||", col("citation_title"), col("abstracts"), col("authors"), col("subject_area_code"))
)

# Show the combined features
df_cleaned.select("features_combined", "category").show(5, truncate=False)


# Ensure no null values in required columns
df_new = df_cleaned.fillna({"features_combined": "-", "category": "-"})

df_new.toPandas().to_csv("C:/Users/Achita/Documents/DSDE_Final_Project_WARK/DE-ML/dataTrain2.csv", index=False)

Note: you may need to restart the kernel to use updated packages.
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [39]:
from pyspark.ml.feature import Tokenizer, CountVectorizer, StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col



# Tokenize the combined features
tokenizer = Tokenizer(inputCol="features_combined", outputCol="tokens")

# Convert tokens into numerical features
vectorizer = CountVectorizer(inputCol="tokens", outputCol="features", vocabSize=30000)

# Encode target labels into numerical format
label_indexer = StringIndexer(inputCol="category", outputCol="label")

# Create a preprocessing pipeline
preprocessing_pipeline = Pipeline(stages=[tokenizer, vectorizer, label_indexer])

# Fit and transform the data
preprocessed_data = preprocessing_pipeline.fit(df_cleaned).transform(df_cleaned)

# Verify the transformed data
preprocessed_data.select("features", "label").show(5)



+--------------------+-----+
|            features|label|
+--------------------+-----+
|(30000,[0,1,2,5,7...|  1.0|
|(30000,[2,166],[1...|  1.0|
|(30000,[0,1,2,5,7...|  5.0|
|(30000,[0,1,2,5,7...|  1.0|
|(30000,[0,1,2,5,7...|  1.0|
+--------------------+-----+
only showing top 5 rows



In [40]:
# Check if features and label columns are present
preprocessed_data.select("features", "label").show(5)

# Split the dataset into training and testing sets
train_data, test_data = preprocessed_data.randomSplit([0.7, 0.3], seed=42)

# # Sample a fraction of the data
# train_data = train_data.sample(fraction=1, seed=42)
# test_data = test_data.sample(fraction=1, seed=42)


print(f"Training Data Count: {train_data.count()}, Test Data Count: {test_data.count()}")


+--------------------+-----+
|            features|label|
+--------------------+-----+
|(30000,[0,1,2,5,7...|  1.0|
|(30000,[2,166],[1...|  1.0|
|(30000,[0,1,2,5,7...|  5.0|
|(30000,[0,1,2,5,7...|  1.0|
|(30000,[0,1,2,5,7...|  1.0|
+--------------------+-----+
only showing top 5 rows

Training Data Count: 15041, Test Data Count: 6287


In [41]:
from pyspark.ml.classification import LogisticRegression

# Initialize the logistic regression model
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=40)

# Train the model
lr_model = lr.fit(train_data)

# Evaluate the model on the test data
predictions = lr_model.transform(test_data)

# Display predictions
predictions.select("features", "label", "prediction").show(5)


+--------------------+-----+----------+
|            features|label|prediction|
+--------------------+-----+----------+
|(30000,[0,1,9,15,...|  1.0|       1.0|
|(30000,[17880],[1...|  1.0|       1.0|
|(30000,[17880],[1...|  1.0|       1.0|
|(30000,[17880],[1...|  1.0|       1.0|
|(30000,[17880],[1...|  1.0|       1.0|
+--------------------+-----+----------+
only showing top 5 rows



In [42]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Evaluate accuracy
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy"
)

accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy:.2f}")


Accuracy: 0.52


In [None]:
# from pyspark.ml.classification import LogisticRegressionModel

# # Ensure lr_model is an instance of LogisticRegressionModel
# assert isinstance(lr_model, LogisticRegressionModel), "lr_model is not a LogisticRegressionModel!"

# # Save the trained model with overwrite
# lr_model.write().overwrite().save("logistic_regression_model")
# print("Logistic Regression model successfully saved.")

# # Save the preprocessing pipeline with overwrite
# preprocessing_pipeline.write().overwrite().save("preprocessing_pipeline")
# print("Preprocessing pipeline successfully saved.")



Py4JJavaError: An error occurred while calling o1313.save.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:735)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:270)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:286)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:788)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:356)
	at org.apache.hadoop.mapred.FileOutputCommitter.setupJob(FileOutputCommitter.java:131)
	at org.apache.hadoop.mapred.OutputCommitter.setupJob(OutputCommitter.java:265)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:188)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:79)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1091)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1089)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1062)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1027)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$3(PairRDDFunctions.scala:1009)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1008)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$2(PairRDDFunctions.scala:965)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:963)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$2(RDD.scala:1623)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1623)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1609)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1609)
	at org.apache.spark.ml.util.DefaultParamsWriter$.saveMetadata(ReadWrite.scala:413)
	at org.apache.spark.ml.classification.LogisticRegressionModel$LogisticRegressionModelWriter.saveImpl(LogisticRegression.scala:1311)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:168)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:547)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:568)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:591)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:688)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1907)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1867)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1840)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:181)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:153)
	at org.apache.spark.util.ShutdownHookManager$.<init>(ShutdownHookManager.scala:58)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:242)
	at org.apache.spark.util.SparkFileUtils.createTempDir(SparkFileUtils.scala:103)
	at org.apache.spark.util.SparkFileUtils.createTempDir$(SparkFileUtils.scala:102)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:94)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:372)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:964)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1120)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1129)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:467)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:438)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:515)
	... 25 more


In [None]:
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.classification import LogisticRegressionModel

In [None]:
# from pyspark.ml import Pipeline
# from pyspark.ml.classification import LogisticRegression
# from pyspark.ml.feature import Tokenizer, CountVectorizer, StringIndexer
# from pyspark.sql import SparkSession

# # Initialize Spark session
# spark = SparkSession.builder.getOrCreate()

# # Example: Define your data (this should be your actual data)
# training_data = spark.createDataFrame([{
#     "citation_title": "Research in AI",
#     "abstracts": "Deep learning advances in AI...",
#     "authors": "Author A; Author B",
#     "affiliations": "University of XYZ",
#     "classifications": "ASJC: 1700; SUBJABBR: COMP",
#     "subject_area_name": "AI"
# }])

# # Define your feature transformations and pipeline
# tokenizer = Tokenizer(inputCol="abstracts", outputCol="words")
# vectorizer = CountVectorizer(inputCol="words", outputCol="features")
# indexer = StringIndexer(inputCol="subject_area_name", outputCol="label")

# # Create the pipeline
# preprocessing_pipeline = Pipeline(stages=[tokenizer, vectorizer, indexer])

# # Fit the pipeline to your data (this is the model that should be saved)
# trained_pipeline_model = preprocessing_pipeline.fit(training_data)

# # Save the trained pipeline model with overwrite
# trained_pipeline_model.write().overwrite().save("preprocessing_pipeline")

# # Use the pipeline to transform the data into the format expected by logistic regression
# training_data_transformed = trained_pipeline_model.transform(training_data)

# # Now, train your logistic regression model on the transformed data
# lr_model = LogisticRegression(featuresCol="features", labelCol="label")
# lr_model_trained = lr_model.fit(training_data_transformed)

# # Save the trained logistic regression model with overwrite
# lr_model_trained.write().overwrite().save("logistic_regression_model")

# # --- Later, when you want to load and use the saved models for new data ---

# # Load the trained preprocessing pipeline model
# loaded_pipeline = PipelineModel.load("preprocessing_pipeline")

# # Load the trained logistic regression model
# loaded_model = LogisticRegressionModel.load("logistic_regression_model")

# # Process new data
# new_data = spark.createDataFrame([{
#     "citation_title": "New Research in AI",
#     "abstracts": "Deep learning advances...",
#     "authors": "Author A; Author B",
#     "affiliations": "University of XYZ",
#     "classifications": "ASJC: 1700; SUBJABBR: COMP",
#     "subject_area_name": "Unknown"
# }])

# # Transform new data using the loaded preprocessing pipeline
# new_data_cleaned = loaded_pipeline.transform(new_data)

# # Get predictions using the loaded model
# new_predictions = loaded_model.transform(new_data_cleaned)

# # Show the predictions
# new_predictions.select("features", "prediction").show()


24/12/08 02:54:39 WARN Instrumentation: [be91165c] All labels are the same value and fitIntercept=true, so the coefficients will be zeros. Training is not needed.


+-------------------+----------+
|           features|prediction|
+-------------------+----------+
|(5,[2,3],[1.0,1.0])|       0.0|
+-------------------+----------+



24/12/08 04:36:24 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 898946 ms exceeds timeout 120000 ms
24/12/08 04:36:24 WARN SparkContext: Killing executors is not supported by current scheduler.
24/12/08 04:36:33 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at o