In [83]:
from pyspark.sql.functions import lit, col, isnan, when, expr, year, count
from pyspark.sql import SparkSession

In [85]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Data Cleansing") \
    .getOrCreate()


ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it

In [70]:
# Load the CSV file into a Spark DataFrame
file_path = "2scopus_300_search.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Show initial data
df.show()


+-------+--------------------+--------------------+------------------+--------------------+--------------------+----------------+----------------+--------------------+--------------+--------------------+----------------+-------------------+
|Subject|                 URL|          Identifier|               EID|               Title|             Creator|Publication Name|Publication Date|                 DOI|Cited By Count|    Affiliation Name|Affiliation City|Affiliation Country|
+-------+--------------------+--------------------+------------------+--------------------+--------------------+----------------+----------------+--------------------+--------------+--------------------+----------------+-------------------+
|   MULT|https://api.elsev...|SCOPUS_ID:8521031...|2-s2.0-85210310976|Emx2 is an essent...|         Nguyen T.K.|        iScience|      2024-12-20|10.1016/j.isci.20...|             0|University of Not...|      Notre Dame|      United States|
|   MULT|https://api.elsev...|SCOPUS

In [71]:
# Drop rows where any of the specified columns have missing values
columns_to_check = [
    "Subject", "URL", "Identifier", "EID", "Title", 
    "Creator", "Publication Name", "DOI", "Affiliation Name", 
    "Affiliation City", "Affiliation Country"
]

df = df.na.drop(subset=columns_to_check)

# Show the resulting DataFrame
df.show()

# Count the number of rows after dropping
print(f"Number of rows after dropping missing values: {df.count()}")


+-------+--------------------+--------------------+------------------+--------------------+--------------------+----------------+----------------+--------------------+--------------+--------------------+----------------+-------------------+
|Subject|                 URL|          Identifier|               EID|               Title|             Creator|Publication Name|Publication Date|                 DOI|Cited By Count|    Affiliation Name|Affiliation City|Affiliation Country|
+-------+--------------------+--------------------+------------------+--------------------+--------------------+----------------+----------------+--------------------+--------------+--------------------+----------------+-------------------+
|   MULT|https://api.elsev...|SCOPUS_ID:8521031...|2-s2.0-85210310976|Emx2 is an essent...|         Nguyen T.K.|        iScience|      2024-12-20|10.1016/j.isci.20...|             0|University of Not...|      Notre Dame|      United States|
|   MULT|https://api.elsev...|SCOPUS

In [72]:
# Add a new column 'year' with the value 2024
df = df.withColumn("Year", lit(2024))

# Show the updated DataFrame
df.show()

+-------+--------------------+--------------------+------------------+--------------------+--------------------+----------------+----------------+--------------------+--------------+--------------------+----------------+-------------------+----+
|Subject|                 URL|          Identifier|               EID|               Title|             Creator|Publication Name|Publication Date|                 DOI|Cited By Count|    Affiliation Name|Affiliation City|Affiliation Country|Year|
+-------+--------------------+--------------------+------------------+--------------------+--------------------+----------------+----------------+--------------------+--------------+--------------------+----------------+-------------------+----+
|   MULT|https://api.elsev...|SCOPUS_ID:8521031...|2-s2.0-85210310976|Emx2 is an essent...|         Nguyen T.K.|        iScience|      2024-12-20|10.1016/j.isci.20...|             0|University of Not...|      Notre Dame|      United States|2024|
|   MULT|https:/

In [73]:
# Count all unique values in the 'Affiliation Country' column
unique_countries_count = df.select("Affiliation Country").distinct().count()

print(f"Number of unique countries: {unique_countries_count}")


Number of unique countries: 108


In [74]:
# Drop the 'URL' column from the DataFrame
df = df.drop("URL")

# Show the updated DataFrame
df.show()


+-------+--------------------+------------------+--------------------+--------------------+----------------+----------------+--------------------+--------------+--------------------+----------------+-------------------+----+
|Subject|          Identifier|               EID|               Title|             Creator|Publication Name|Publication Date|                 DOI|Cited By Count|    Affiliation Name|Affiliation City|Affiliation Country|Year|
+-------+--------------------+------------------+--------------------+--------------------+----------------+----------------+--------------------+--------------+--------------------+----------------+-------------------+----+
|   MULT|SCOPUS_ID:8521031...|2-s2.0-85210310976|Emx2 is an essent...|         Nguyen T.K.|        iScience|      2024-12-20|10.1016/j.isci.20...|             0|University of Not...|      Notre Dame|      United States|2024|
|   MULT|SCOPUS_ID:8521030...|2-s2.0-85210305527|BMAL1 upregulates...|             Zhou X.|        i

In [75]:
# Ensure 'Publication Date' is in a proper date format
df = df.withColumn("Publication Date", col("Publication Date").cast("date"))
print(f"Number of rows before filtering: {df.count()}")

# Drop rows where 'Publication Date' is not in 2024
df = df.filter(year(col("Publication Date")) == 2024)

# Show the filtered DataFrame
df.show()

# Count the number of remaining rows
print(f"Number of rows remaining after filtering: {df.count()}")

Number of rows before filtering: 7544
+-------+--------------------+------------------+--------------------+--------------------+----------------+----------------+--------------------+--------------+--------------------+----------------+-------------------+----+
|Subject|          Identifier|               EID|               Title|             Creator|Publication Name|Publication Date|                 DOI|Cited By Count|    Affiliation Name|Affiliation City|Affiliation Country|Year|
+-------+--------------------+------------------+--------------------+--------------------+----------------+----------------+--------------------+--------------+--------------------+----------------+-------------------+----+
|   MULT|SCOPUS_ID:8521031...|2-s2.0-85210310976|Emx2 is an essent...|         Nguyen T.K.|        iScience|      2024-12-20|10.1016/j.isci.20...|             0|University of Not...|      Notre Dame|      United States|2024|
|   MULT|SCOPUS_ID:8521030...|2-s2.0-85210305527|BMAL1 upregul

In [76]:
# Count NULLs in 'Publication Date'
null_pub_date = df.filter(col("Publication Date").isNull()).count()

# Count NULLs in 'Cited By Count'
null_cited_by_count = df.filter(col("Cited By Count").isNull()).count()

print(f"Number of NULLs in 'Publication Date': {null_pub_date}")
print(f"Number of NULLs in 'Cited By Count': {null_cited_by_count}")

Number of NULLs in 'Publication Date': 0
Number of NULLs in 'Cited By Count': 0


In [77]:
df.show()

+-------+--------------------+------------------+--------------------+--------------------+----------------+----------------+--------------------+--------------+--------------------+----------------+-------------------+----+
|Subject|          Identifier|               EID|               Title|             Creator|Publication Name|Publication Date|                 DOI|Cited By Count|    Affiliation Name|Affiliation City|Affiliation Country|Year|
+-------+--------------------+------------------+--------------------+--------------------+----------------+----------------+--------------------+--------------+--------------------+----------------+-------------------+----+
|   MULT|SCOPUS_ID:8521031...|2-s2.0-85210310976|Emx2 is an essent...|         Nguyen T.K.|        iScience|      2024-12-20|10.1016/j.isci.20...|             0|University of Not...|      Notre Dame|      United States|2024|
|   MULT|SCOPUS_ID:8521030...|2-s2.0-85210305527|BMAL1 upregulates...|             Zhou X.|        i

In [79]:
# Save DataFrame to CSV
import os

output_path = os.path.abspath("cleaned_308_researches.csv")
df.write.csv(output_path, header=True, mode="overwrite")

print(f"DataFrame saved to {output_path}")


Py4JJavaError: An error occurred while calling o338.csv.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:735)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:270)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:286)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:788)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:356)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:188)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:269)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:391)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:364)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:860)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:578)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1623)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:547)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:568)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:591)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:688)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1907)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1867)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1840)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:181)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:153)
	at org.apache.spark.util.ShutdownHookManager$.<init>(ShutdownHookManager.scala:58)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:242)
	at org.apache.spark.util.SparkFileUtils.createTempDir(SparkFileUtils.scala:103)
	at org.apache.spark.util.SparkFileUtils.createTempDir$(SparkFileUtils.scala:102)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:94)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:372)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:964)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1120)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1129)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:467)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:438)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:515)
	... 25 more
