In [33]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark_session = SparkSession.builder \
    .master("yarn") \
    .config('spark.yarn.appMasterEnv.ARROW_PRE_0_15_IPC_FORMAT', '1') \
    .config('spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT', '1') \
    .getOrCreate()

spark = spark_session


# Load the dataset
df = spark.read.json("./dblp-ref/*.json", multiLine=True)

# Show the schema to understand the structure
df.printSchema()

# Display a sample of the data
df.show(5)

# Count the number of records
print(f"Number of records: {df.count()}")

# Display summary statisticss
df.describe().show()

# Check for missing values (excluding `isnan`)
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

# Distribution of citations
df.select("n_citation").describe().show()

# Check for null abstracts and titles
df.filter(df.abstract.isNull() | df.title.isNull()).show()


root
 |-- abstract: string (nullable = true)
 |-- authors: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- id: string (nullable = true)
 |-- n_citation: long (nullable = true)
 |-- references: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- title: string (nullable = true)
 |-- venue: string (nullable = true)
 |-- year: long (nullable = true)

+--------------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+----+
|            abstract|             authors|                  id|n_citation|          references|               title|               venue|year|
+--------------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+----+
|Based on biologic...|[Guoping Pang, La...|4aa69add-3978-480...|         8|[04754a28-6bf4-4d...|Dynamic analysis ...|Mathematics and C...|2008|
|In this paper, a ...

In [34]:
!pip install langid



In [35]:
import langid
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Function to detect language using langid
def detect_language(text):
    if text is None:
        return None
    lang, _ = langid.classify(text)
    return lang

# Registering UDF
lang_detect_udf = udf(detect_language, StringType())

# Add a new column for language detection
df = df.withColumn("language", lang_detect_udf(df.abstract))

# Filter only English documents
df = df.filter(df.language == 'en')


In [36]:
from pyspark.sql.functions import col, lower, regexp_replace, split
from pyspark.ml.feature import StopWordsRemover

# Custom stop words
custom_stop_words = ['doi', 'preprint', 'copyright', 'peer', 'reviewed', 'org', 'https', 'et', 'al', 
                     'author', 'figure', 'rights', 'reserved', 'permission', 'used', 'using', 
                     'biorxiv', 'medrxiv', 'license', 'fig', 'fig.', 'al.', 'Elsevier', 'PMC', 
                     'CZI', 'www']

# Lowercase and remove punctuation
df_cleaned = df.withColumn("cleaned_abstract", lower(col("abstract")))
df_cleaned = df_cleaned.withColumn("cleaned_abstract", regexp_replace(col("cleaned_abstract"), r'[!()\-\[\]{};:\'",<>./?@#$%^&*_~]', ''))

# Tokenize the text
df_tokenized = df_cleaned.withColumn("tokenized_abstract", split(col("cleaned_abstract"), " "))

# Remove stop words
remover = StopWordsRemover(inputCol="tokenized_abstract", outputCol="filtered_abstract", 
                           stopWords=StopWordsRemover().getStopWords() + custom_stop_words)
df_filtered = remover.transform(df_tokenized)

# Show the cleaned dataframe
df_filtered.select("abstract", "cleaned_abstract", "tokenized_abstract", "filtered_abstract").show(truncate=False)


+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [37]:
from pyspark.ml.feature import HashingTF, IDF

# Apply TF
hashingTF = HashingTF(inputCol="filtered_abstract", outputCol="rawFeatures", numFeatures=20000)
df_featurized = hashingTF.transform(df_filtered)

# Apply IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")
idf_model = idf.fit(df_featurized)
df_vectorized = idf_model.transform(df_featurized)

# Select only the columns we need
df_vectorized = df_vectorized.select("id", "title", "features")
df_vectorized.show(5)


+--------------------+--------------------+--------------------+
|                  id|               title|            features|
+--------------------+--------------------+--------------------+
|4aa69add-3978-480...|Dynamic analysis ...|(20000,[28,42,274...|
|4ab3735c-80f1-472...|A new approach of...|(20000,[78,274,46...|
|00127ee2-cb05-48c...|Preliminary Desig...|(20000,[1072,1241...|
|001eef4f-1d00-4ae...|A Heterogeneous S...|(20000,[193,274,2...|
+--------------------+--------------------+--------------------+



In [38]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("PCA_KMeans") \
    .config("spark.driver.maxResultSize", "4g") \
    .getOrCreate()


In [39]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import PCA
import matplotlib.pyplot as plt

# Check if the sampled DataFrame is empty
if df_sampled.isEmpty():
    print("Sampled DataFrame is empty. Unable to perform PCA.")
else:
    # Apply PCA to reduce dimensions (for example, to 20 components)
    pca = PCA(k=20, inputCol="features", outputCol="pca_features")
    pca_model = pca.fit(df_sampled)
    df_pca = pca_model.transform(df_sampled)

    # Determine the optimal number of clusters using the elbow method
    costs = []
    for k in range(2, 21):
        kmeans = KMeans(k=k, seed=1, featuresCol="pca_features")
        model = kmeans.fit(df_pca)
        cost = model.summary.trainingCost
        costs.append(cost)

    # Plot the elbow curve
    plt.figure(figsize=(10, 6))
    plt.plot(range(2, 21), costs, marker='o')
    plt.xlabel('Number of Clusters')
    plt.ylabel('Cost')
    plt.title('Elbow Method For Optimal k')
    plt.show()


Sampled DataFrame is empty. Unable to perform PCA.


In [40]:
# Assume the optimal number of clusters is determined to be 10
optimal_k = 10

# Train the K-means model
kmeans = KMeans(k=optimal_k, seed=1, featuresCol="pca_features")
model = kmeans.fit(df_pca)

# Make predictions
df_clusters = model.transform(df_pca)
df_clusters.select("id", "title", "prediction").show(5)


NameError: name 'df_pca' is not defined

In [None]:
from pyspark.ml.linalg import Vectors
import numpy as np
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

# Function to calculate cosine similarity
def cosine_similarity(v1, v2):
    dot_product = np.dot(v1, v2)
    norm_v1 = np.linalg.norm(v1)
    norm_v2 = np.linalg.norm(v2)
    return float(dot_product / (norm_v1 * norm_v2))

cosine_similarity_udf = udf(cosine_similarity, DoubleType())

# Build search engine function
def recommend_papers(title, top_n=5):
    # Find the cluster of the given paper title
    paper_cluster = df_clusters.filter(df_clusters.title == title).select("prediction").collect()[0][0]
    
    # Get all papers in the same cluster
    cluster_papers = df_clusters.filter(df_clusters.prediction == paper_cluster)
    
    # Get the features of the given paper
    paper_features = df_clusters.filter(df_clusters.title == title).select("features").collect()[0][0]
    
    # Calculate cosine similarity and recommend top N papers
    cluster_papers = cluster_papers.withColumn("similarity", cosine_similarity_udf(cluster_papers.features, Vectors.dense(paper_features.toArray())))
    recommendations = cluster_papers.orderBy("similarity", ascending=False).limit(top_n)
    
    return recommendations.select("title", "similarity").collect()

# Example usage
recommended_papers = recommend_papers("A new approach of....", top_n=5)
for rec in recommended_papers:
    print(f"Title: {rec['title']}, Similarity: {rec['similarity']}")
