In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col, count, desc, trim, array_contains
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

In [2]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("PublicationTrendsAnalysis") \
    .getOrCreate()

In [3]:
# Load JSON data
df = spark.read.json("dblp_v14.json")

In [4]:
# Remove corrupt records
df_cleaned = df.filter(col("_corrupt_record").isNull())

In [5]:
# Drop rows where authors, year, n_citation, or title is null
df_cleaned = df_cleaned.dropna(subset=["authors", "year", "n_citation", "title"])

In [6]:
# Drop duplicate rows
df_cleaned = df_cleaned.dropDuplicates()

In [7]:
# Ensure 'year' is an integer
df_cleaned = df_cleaned.withColumn("year", col("year").cast("int"))

In [8]:
# Trim whitespace from 'title' and 'abstract'
df_cleaned = df_cleaned.withColumn("title", trim(col("title")))
df_cleaned = df_cleaned.withColumn("abstract", trim(col("abstract")))

In [12]:
# Check for duplicate records for 2009 "data mining"
df_2009_datamining = df_cleaned.filter((col("year") == 2009) & array_contains(col("keywords"), "data mining"))


In [13]:
# Count the total number of records
total_count = df_2009_datamining.count()

In [14]:
# Drop duplicates based on relevant columns (e.g., authors, title)
df_2009_datamining_deduped = df_2009_datamining.dropDuplicates(["authors", "title"])


In [15]:
# Count the number of records after dropping duplicates
deduped_count = df_2009_datamining_deduped.count()

In [16]:
# Calculate the number of duplicate records
duplicate_count = total_count - deduped_count

In [17]:
print(f"Total records for 2009 'data mining': {total_count}")
print(f"Records after removing duplicates: {deduped_count}")
print(f"Number of duplicate records: {duplicate_count}")

Total records for 2009 'data mining': 17271
Records after removing duplicates: 17271
Number of duplicate records: 0


In [None]:
# If duplicates are found, use the deduped dataframe for further analysis
if duplicate_count > 0:
    df_cleaned = df_cleaned.subtract(df_2009_datamining).union(df_2009_datamining_deduped)


In [18]:
# Explode keywords and select relevant columns
df_keywords = df_cleaned.select(
    col("year"),
    explode(col("keywords")).alias("keyword")
).filter(col("keyword").isNotNull())

In [19]:
# Group by year and keyword, then count the number of publications
df_trends = df_keywords.groupBy("year", "keyword").agg(count("*").alias("publication_count"))


In [20]:
# Define a window specification to rank keywords by publication count within each year
window_spec = Window.partitionBy("year").orderBy(desc("publication_count"))

In [21]:
# Add a row number to each keyword within each year based on the publication count
df_ranked = df_trends.withColumn("rank", row_number().over(window_spec))

In [22]:
# Filter to get the top keyword for each year
df_top_keywords = df_ranked.filter(col("rank") == 1).select("year", "keyword", "publication_count")


In [23]:
# Sort the result by year in descending order
df_sorted = df_top_keywords.orderBy(desc("year"))

In [24]:
df_sorted = df_sorted.filter(col("keyword") != "Conferences")

In [25]:
# Limit the result to 25 rows
df_limited = df_sorted.limit(25)

In [17]:
df_limited.show()

+----+------------------+-----------------+
|year|           keyword|publication_count|
+----+------------------+-----------------+
|2023|     deep learning|               28|
|2022|     Deep learning|             2867|
|2020|     Deep learning|             2144|
|2019|      Optimization|             2607|
|2018|      Optimization|             2671|
|2017|      Optimization|             2040|
|2016|      Optimization|             1535|
|2015|      optimization|             1334|
|2014|feature extraction|             4259|
|2013|feature extraction|             3556|
|2012|feature extraction|             3739|
|2011|feature extraction|             3683|
|2010|       data mining|             3635|
|2009|       data mining|            17361|
|2008|       data mining|             4681|
|2007|       data mining|             2752|
|2006|  computer science|             2758|
|2005|          internet|             2766|
|2004|          internet|             2138|
|2003|          internet|       

In [26]:
df_limited.write.json("publication_trends25.json")