In [None]:
import os
import sys
from pyspark.sql import SparkSession
from pyspark.ml.feature import Word2Vec, Tokenizer, StandardScaler
from pyspark.sql.functions import regexp_replace, lower, col, udf, asc, desc, explode
from pyspark.sql.functions import split, size, array_join
from pyspark.sql.types import StringType, ArrayType
from pyspark.ml.classification import RandomForestClassifier, NaiveBayes, LogisticRegression
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import StringIndexer, Tokenizer, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

import re
import spacy
import nltk
from nltk.corpus import stopwords

from datetime import datetime
import matplotlib.pyplot as plt

from gensim import corpora, models
import numpy as np

from operator import contains
from wordcloud import WordCloud, STOPWORDS

# Needed on RaaS
os.environ['PYSPARK_PYTHON'] = sys.executable

# Not needed
# os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# cred ca vrea sa fie rulat totusi

In [None]:
def init_spark():
  spark = SparkSession.builder.config("spark.driver.memory", "5g").\
                              appName("BigData").getOrCreate()
  
  sc = spark.sparkContext
  return spark, sc

## Preprocess dataset for training and testing.

In [None]:
def get_sample_from_file(file_name, output_file, sample_size = 10_000):
    """
    Reads a file and extracts a sample of lines from it.
    
    Parameters:
    file_name (str): The name of the input file.
    output_file (str): The name of the output file where the sample will be written.
    sample_size (int): The number of lines to extract from the input file. Default is 10,000.
    """
    
    content = ""
    with open(file_name, "r") as f:
        for i in range(sample_size):
            content += f.readline()

    with open(output_file, "w") as f:
        f.write(content)

In [None]:
def load_data_from_json(spark, file_name):
    return spark.read.json(file_name)

### Initialize Context and Load Sample

In [None]:
# Generate a sample from the input file
get_sample_from_file("data.json", "arxiv-sample.json")

In [None]:
sc = init_spark()[0]
arxiv_dataset = load_data_from_json(sc, "arxiv-sample.json")

## Clean dataset

### Searcing for missing values

In [None]:
# search for missing values
print("Number of missing values in abstract column: ", arxiv_dataset.filter(arxiv_dataset.abstract.isNull()).count())
print("Number of missing values in title column: ", arxiv_dataset.filter(arxiv_dataset.title.isNull()).count())
print("Number of missing values in categories column: ", arxiv_dataset.filter(arxiv_dataset.categories.isNull()).count())
print("Number of missing values in id column: ", arxiv_dataset.filter(arxiv_dataset.id.isNull()).count())
print("Number of missing values in submitter column: ", arxiv_dataset.filter(arxiv_dataset.submitter.isNull()).count())
print("Number of missing values in authors column: ", arxiv_dataset.filter(arxiv_dataset.authors.isNull()).count())
print("Number of missing values in report-no column: ",arxiv_dataset.filter(arxiv_dataset["report-no"].isNull()).count() )
print("Number of missing values in comments column: ", arxiv_dataset.filter(arxiv_dataset.comments.isNull()).count())
print("Number of missing values in doi column: ", arxiv_dataset.filter(arxiv_dataset.doi.isNull()).count())
print("Number of missing values in journal-ref column: ", arxiv_dataset.filter(arxiv_dataset["journal-ref"].isNull()).count())
print("Number of missing values in versions column: ", arxiv_dataset.filter(arxiv_dataset.versions.isNull()).count())


In [None]:
# show rows with empty fields
arxiv_dataset.filter(arxiv_dataset["abstract"] == "").count()
arxiv_dataset.filter(arxiv_dataset["title"] == "").count()
arxiv_dataset.filter(arxiv_dataset["authors"] == "").count()
arxiv_dataset.filter(arxiv_dataset["categories"] == "").count()

### Basic preprocessing functions

In [None]:
# fill ALL NULL values with empty string : use after removing rows with empty fields
def fill_na_with_empty_string(df):
    return df.fillna("")

# remove rows with empty fields
def remove_empty_fields(df, field_name):
    return df.filter(df[field_name] != "")

# remove "\n" from text
def remove_empty_newlines(df, field_name):
    return df.withColumn(field_name, regexp_replace(col(field_name), "\n", " "))

# remove math formulas and latex
def remove_math_formula(df, field_name):
    return df.withColumn(field_name, regexp_replace(col(field_name), "\$.*?\$", ""))

# convert to lowercase
def convert_to_lowercase(df, field_name):
    return df.withColumn(field_name, lower(col(field_name)))

# remove extra spaces
def remove_extra_spaces(df, field_name):
    df = df.withColumn(field_name, regexp_replace(col(field_name), " +", " "))
    df = df.withColumn(field_name, regexp_replace(col(field_name), "^ +", ""))
    return df

def remove_punctuation_except_dot(df, field_name):
    pattern = "[^a-zA-Z0-9\s\.,]"
    cleaned_df = df.withColumn(field_name, regexp_replace(col(field_name), pattern, ""))
    return cleaned_df

# remove punctuation
def remove_punctuation(df, field_name):
    return df.withColumn(field_name, regexp_replace(col(field_name), "[^\w\s]", ""))

def remove_text_in_braces(df, field_name):
    pattern = r'[\(\{\[].*?[\)\}\]]'
    cleaned_df = df.withColumn(field_name, regexp_replace(col(field_name), pattern, ''))
    return cleaned_df

def remove_text_between_parentheses(strings_list):
    result_list = []
    for string in strings_list:
        modified_string = re.sub(r'\([^()]*\)', '', string)
        result_list.append(modified_string)
    return result_list

def remove_substring_from_list(input_list, substring_to_remove):
    return [element.replace(substring_to_remove, "") for element in input_list]

def remove_stop_words(df, field_name, output_field, stop_words):
  df = df.withColumn("tokens", split(field_name, "\\s+"))
  remover = StopWordsRemover(stopWords=stop_words, inputCol="tokens", outputCol=output_field)
  return remover.transform(df).select(field_name, "categories", array_join(output_field, " ").alias(output_field))

def split_function(sep, authors_list):
  for i in range(len(authors_list)):
      element = authors_list[i]
      author = element.split(sep)
      authors_list[i] = author
  authors_list = set([item for sublist in authors_list for item in sublist])
  return list(authors_list)

def delete_new_line_and_spaces(author_list):
  for i in range(len(author_list)):
    if author_list[i].startswith("\n "):
      author_list[i] = author_list[i].replace("\n ", '', 1)
  for i in range(len(author_list)):
    author_list[i] = author_list[i].strip()
  for author in author_list:
    if author == '':
      author_list.remove(author)
  return author_list

def delete_duplicate(author_list):
  distinct_list = []
  for author in author_list:
    if author not in distinct_list:
      distinct_list.append(author)
  return distinct_list

def delete_letter(author_list):
  for author in author_list:
    if len(author) == 1:
      author_list.remove(author)
    elif len(author) == 2 and author[-1] == ".":
      author_list.remove(author)
  return author_list

def remove_characters_before_substring(original_list, target_substring):
    return [remove_characters(element, target_substring) for element in original_list]

def remove_characters(original_string, target_substring):
    index = original_string.lower().find(target_substring.lower())
    if index != -1:
        return original_string[index + len(target_substring):]
    else:
        return original_string

def delete_element(input_list, substr):
  for element in input_list:
    if substr.lower() in element.lower():
      input_list.remove(element)
  return input_list


### Clear 'abstract' column

In [None]:
arxiv_dataset = remove_empty_fields(arxiv_dataset, "abstract")
arxiv_dataset = remove_empty_newlines(arxiv_dataset, "abstract")
arxiv_dataset = remove_math_formula(arxiv_dataset, "abstract")
arxiv_dataset = convert_to_lowercase(arxiv_dataset, "abstract")
arxiv_dataset = remove_extra_spaces(arxiv_dataset, "abstract")

In [None]:
# Show some changed rows
arxiv_dataset.select("abstract").show(5, truncate=False)

## Convert Abstract to Vector Representation

In [None]:
# create a Word2Vec model
input_col = "abstract"
output_col = "abstract_vector"

words2vec_model = Word2Vec(
    inputCol="words",
    outputCol=output_col,
    vectorSize=100,
    minCount=5
)

tokenized = Tokenizer(inputCol=input_col, outputCol="words")
tokenized_dataset = tokenized.transform(arxiv_dataset)
model = words2vec_model.fit(tokenized_dataset)

In [None]:
model.getVectors().show(n=2, truncate=False)

### Vizualize most popular categories based on how many authors contributed to that category

In [None]:
# loading back the data
arxiv_dataset = load_data_from_json(sc, "arxiv-sample.json")

In [None]:
@udf(returnType=ArrayType(StringType()))
def get_authors_list(parsed_authors):
    authors = [
        f"{author[0]} {author[1]}" for author in parsed_authors
    ]
    return authors

In [None]:
arxiv_dataset = arxiv_dataset.withColumn("authors_list", get_authors_list("authors_parsed"))

In [None]:
arxiv_dataset.select("authors_list").show(5, truncate=False)

In [None]:
# preprocess authors column : remove the "and" word and split the authors by comma
def preprocess_authors(df):
    return df.withColumn("authors", regexp_replace(col("authors"), " and ", ", "))

# preprocess authors column : remove extra commas
def remove_authors_extra_commas(df):
    return df.withColumn("authors", regexp_replace(col("authors"), ",+", ","))
    
# preprocess authors column : remove empty spaces (more than one space)
def remove_authors_extra_spaces(df):
    return df.withColumn("authors", regexp_replace(col("authors"), " +", " "))

# preprocess authors column : remove parentheses and their content
def remove_authors_parentheses(df):
    return df.withColumn("authors", regexp_replace(col("authors"), "\(.+?\)", ""))
    

In [None]:
arxiv_dataset = preprocess_authors(arxiv_dataset)
arxiv_dataset = remove_authors_extra_commas(arxiv_dataset)
arxiv_dataset = remove_authors_extra_spaces(arxiv_dataset)

In [None]:
# see the first 5 rows
arxiv_dataset.select("authors_list").show(5, truncate=False)

In [None]:
# Get all categories
categories = arxiv_dataset.select("categories")

categories = (
    categories.rdd.map(lambda x: x.categories)
    .map(lambda x: x.split(" "))
    .map(lambda x: [item for item in x])
    .flatMap(lambda x: x)
    .distinct()
)

print("Number of categories: ", categories.count())

# Get all authors
authors = arxiv_dataset.select("authors_list")
authors = (
    authors.rdd.map(lambda x: x.authors_list)
    .map(lambda x: [item for item in x])
    .flatMap(lambda x: x)
    .distinct()
)

print("Number of authors: ", authors.count())

# get all authors for each category

categories_authors = {}
auth_categories = arxiv_dataset.select("categories", "authors_list")
categories_authors = (
    auth_categories.rdd.map(lambda x: (x.categories, x.authors_list))
    .map(lambda x: (x[0].split(" "), x[1]))
    .flatMap(lambda x: [(item, x[1]) for item in x[0]])
    .reduceByKey(lambda x, y: x + y)
    .map(lambda x: (x[0], list(set(x[1]))))
)

# categories_authors to dict
categories_authors = dict(categories_authors.collect())

# get number of authors for each category
count_authors = {}
for category in categories_authors:
    count_authors[category] = len(categories_authors[category])

# sort categories based on number of authors
sorted_categories = sorted(count_authors.items(), key=lambda x: x[1], reverse=True)

# get top 10 categories
top_categories = sorted_categories[:10]

In [None]:
top_categories

### Topic extraction and stats (most popular topic per category)

BERTopic? Gensim? LDA?

In [None]:
# loading back the data
arxiv_dataset = load_data_from_json(sc, "arxiv-sample.json")

In [None]:
# https://arxiv.org/help/api/user-manual
category_map = {'astro-ph': 'Astrophysics',
'astro-ph.CO': 'Cosmology and Nongalactic Astrophysics',
'astro-ph.EP': 'Earth and Planetary Astrophysics',
'astro-ph.GA': 'Astrophysics of Galaxies',
'astro-ph.HE': 'High Energy Astrophysical Phenomena',
'astro-ph.IM': 'Instrumentation and Methods for Astrophysics',
'astro-ph.SR': 'Solar and Stellar Astrophysics',
'cond-mat.dis-nn': 'Disordered Systems and Neural Networks',
'cond-mat.mes-hall': 'Mesoscale and Nanoscale Physics',
'cond-mat.mtrl-sci': 'Materials Science',
'cond-mat.other': 'Other Condensed Matter',
'cond-mat.quant-gas': 'Quantum Gases',
'cond-mat.soft': 'Soft Condensed Matter',
'cond-mat.stat-mech': 'Statistical Mechanics',
'cond-mat.str-el': 'Strongly Correlated Electrons',
'cond-mat.supr-con': 'Superconductivity',
'cs.AI': 'Artificial Intelligence',
'cs.AR': 'Hardware Architecture',
'cs.CC': 'Computational Complexity',
'cs.CE': 'Computational Engineering, Finance, and Science',
'cs.CG': 'Computational Geometry',
'cs.CL': 'Computation and Language',
'cs.CR': 'Cryptography and Security',
'cs.CV': 'Computer Vision and Pattern Recognition',
'cs.CY': 'Computers and Society',
'cs.DB': 'Databases',
'cs.DC': 'Distributed, Parallel, and Cluster Computing',
'cs.DL': 'Digital Libraries',
'cs.DM': 'Discrete Mathematics',
'cs.DS': 'Data Structures and Algorithms',
'cs.ET': 'Emerging Technologies',
'cs.FL': 'Formal Languages and Automata Theory',
'cs.GL': 'General Literature',
'cs.GR': 'Graphics',
'cs.GT': 'Computer Science and Game Theory',
'cs.HC': 'Human-Computer Interaction',
'cs.IR': 'Information Retrieval',
'cs.IT': 'Information Theory',
'cs.LG': 'Machine Learning',
'cs.LO': 'Logic in Computer Science',
'cs.MA': 'Multiagent Systems',
'cs.MM': 'Multimedia',
'cs.MS': 'Mathematical Software',
'cs.NA': 'Numerical Analysis',
'cs.NE': 'Neural and Evolutionary Computing',
'cs.NI': 'Networking and Internet Architecture',
'cs.OH': 'Other Computer Science',
'cs.OS': 'Operating Systems',
'cs.PF': 'Performance',
'cs.PL': 'Programming Languages',
'cs.RO': 'Robotics',
'cs.SC': 'Symbolic Computation',
'cs.SD': 'Sound',
'cs.SE': 'Software Engineering',
'cs.SI': 'Social and Information Networks',
'cs.SY': 'Systems and Control',
'econ.EM': 'Econometrics',
'eess.AS': 'Audio and Speech Processing',
'eess.IV': 'Image and Video Processing',
'eess.SP': 'Signal Processing',
'gr-qc': 'General Relativity and Quantum Cosmology',
'hep-ex': 'High Energy Physics - Experiment',
'hep-lat': 'High Energy Physics - Lattice',
'hep-ph': 'High Energy Physics - Phenomenology',
'hep-th': 'High Energy Physics - Theory',
'math.AC': 'Commutative Algebra',
'math.AG': 'Algebraic Geometry',
'math.AP': 'Analysis of PDEs',
'math.AT': 'Algebraic Topology',
'math.CA': 'Classical Analysis and ODEs',
'math.CO': 'Combinatorics',
'math.CT': 'Category Theory',
'math.CV': 'Complex Variables',
'math.DG': 'Differential Geometry',
'math.DS': 'Dynamical Systems',
'math.FA': 'Functional Analysis',
'math.GM': 'General Mathematics',
'math.GN': 'General Topology',
'math.GR': 'Group Theory',
'math.GT': 'Geometric Topology',
'math.HO': 'History and Overview',
'math.IT': 'Information Theory',
'math.KT': 'K-Theory and Homology',
'math.LO': 'Logic',
'math.MG': 'Metric Geometry',
'math.MP': 'Mathematical Physics',
'math.NA': 'Numerical Analysis',
'math.NT': 'Number Theory',
'math.OA': 'Operator Algebras',
'math.OC': 'Optimization and Control',
'math.PR': 'Probability',
'math.QA': 'Quantum Algebra',
'math.RA': 'Rings and Algebras',
'math.RT': 'Representation Theory',
'math.SG': 'Symplectic Geometry',
'math.SP': 'Spectral Theory',
'math.ST': 'Statistics Theory',
'math-ph': 'Mathematical Physics',
'nlin.AO': 'Adaptation and Self-Organizing Systems',
'nlin.CD': 'Chaotic Dynamics',
'nlin.CG': 'Cellular Automata and Lattice Gases',
'nlin.PS': 'Pattern Formation and Solitons',
'nlin.SI': 'Exactly Solvable and Integrable Systems',
'nucl-ex': 'Nuclear Experiment',
'nucl-th': 'Nuclear Theory',
'physics.acc-ph': 'Accelerator Physics',
'physics.ao-ph': 'Atmospheric and Oceanic Physics',
'physics.app-ph': 'Applied Physics',
'physics.atm-clus': 'Atomic and Molecular Clusters',
'physics.atom-ph': 'Atomic Physics',
'physics.bio-ph': 'Biological Physics',
'physics.chem-ph': 'Chemical Physics',
'physics.class-ph': 'Classical Physics',
'physics.comp-ph': 'Computational Physics',
'physics.data-an': 'Data Analysis, Statistics and Probability',
'physics.ed-ph': 'Physics Education',
'physics.flu-dyn': 'Fluid Dynamics',
'physics.gen-ph': 'General Physics',
'physics.geo-ph': 'Geophysics',
'physics.hist-ph': 'History and Philosophy of Physics',
'physics.ins-det': 'Instrumentation and Detectors',
'physics.med-ph': 'Medical Physics',
'physics.optics': 'Optics',
'physics.plasm-ph': 'Plasma Physics',
'physics.pop-ph': 'Popular Physics',
'physics.soc-ph': 'Physics and Society',
'physics.space-ph': 'Space Physics',
'q-bio.BM': 'Biomolecules',
'q-bio.CB': 'Cell Behavior',
'q-bio.GN': 'Genomics',
'q-bio.MN': 'Molecular Networks',
'q-bio.NC': 'Neurons and Cognition',
'q-bio.OT': 'Other Quantitative Biology',
'q-bio.PE': 'Populations and Evolution',
'q-bio.QM': 'Quantitative Methods',
'q-bio.SC': 'Subcellular Processes',
'q-bio.TO': 'Tissues and Organs',
'q-fin.CP': 'Computational Finance',
'q-fin.EC': 'Economics',
'q-fin.GN': 'General Finance',
'q-fin.MF': 'Mathematical Finance',
'q-fin.PM': 'Portfolio Management',
'q-fin.PR': 'Pricing of Securities',
'q-fin.RM': 'Risk Management',
'q-fin.ST': 'Statistical Finance',
'q-fin.TR': 'Trading and Market Microstructure',
'quant-ph': 'Quantum Physics',
'stat.AP': 'Applications',
'stat.CO': 'Computation',
'stat.ME': 'Methodology',
'stat.ML': 'Machine Learning',
'stat.OT': 'Other Statistics',
'stat.TH': 'Statistics Theory'}

#### Display the distribution of categories
- the categories are mostly 1 to 2 per paper

In [None]:
categories_length = arxiv_dataset.select(size(split(arxiv_dataset.categories, " ")).alias("categories_length"))

In [None]:
# print max, min, and average number of categories
maxim = categories_length.agg({"categories_length": "max"}).collect()[0][0]
minim = categories_length.agg({"categories_length": "min"}).collect()[0][0]
avg = categories_length.agg({"categories_length": "mean"}).collect()[0][0]
std = categories_length.agg({"categories_length": "std"}).collect()[0][0]
median = categories_length.agg({"categories_length": "median"}).collect()[0][0]

print("Max number of categories: ", maxim)
print("Min number of categories: ", minim)
print("Average number of categories: ", avg)
print("Standard deviation of categories: ", std)
print("Median number of categories: ", median)

In [None]:
categories_length_list  = map(lambda row: row.categories_length, categories_length.collect())
plt.hist(list(categories_length_list), bins=100)

#### Most/Least popular category 

In [None]:
# most frequently occuring category
categories = arxiv_dataset.select("categories").collect()
categories = [row.categories for row in categories]
categories = [cat.split(" ") for cat in categories]
categories = [item for sublist in categories for item in sublist]

# most frequently occuring category
print( 
    "Most frequently occuring category: ", 
    max(set(categories), key = categories.count), 
    " (", categories.count(max(set(categories), key = categories.count)), " times)"
)

# least frequently occuring category
print( 
    "Least frequently occuring category: ", 
    min(set(categories), key = categories.count), 
    " (", categories.count(min(set(categories), key = categories.count)), " times)"
)

#### Analyzing the abstracts

In [None]:
# abstract length (histogram)
abstract_lengths = arxiv_dataset.select(size(split("abstract", " ")).alias('abstract_lengths'))

In [None]:
max_length = abstract_lengths.agg({"abstract_lengths": "max"}).collect()[0][0]
min_length = abstract_lengths.agg({"abstract_lengths": "min"}).collect()[0][0]
avg_length = abstract_lengths.agg({"abstract_lengths": "mean"}).collect()[0][0]
std_length = abstract_lengths.agg({"abstract_lengths": "std"}).collect()[0][0]
median_length = abstract_lengths.agg({"abstract_lengths": "median"}).collect()[0][0]

# In number of words
print("Max length of abstract: ", max_length)
print("Min length of abstract: ", min_length)
print("Average length of abstract: ", avg_length)
print("Standard deviation of abstract length: ", std_length)
print("Median length of abstract: ", median_length)

In [None]:
abstract_lengths_list = map(lambda row: row.abstract_lengths, abstract_lengths.collect())

plt.hist(list(abstract_lengths_list), bins=100)

### LDA preparations

In [None]:
nltk.download('stopwords')
nltk.download('wordnet')

In [None]:
# after some research : spacy is a bit faster than nltk on large datasets
def lemmatization(text):
    lemmatizer = nltk.stem.WordNetLemmatizer()
    return " ".join([lemmatizer.lemmatize(word) for word in text.split(" ")])

In [None]:
nlp = spacy.load('en_core_web_sm')

In [None]:
@udf(returnType=StringType())
def lemmatization_spacy(text):
    doc = nlp(text)
    return " ".join([token.lemma_ for token in doc])

In [None]:
@udf(returnType=StringType())
def remove_stopwords(text):
    doc = nlp(text)
    return " ".join([token.text for token in doc if not token.is_stop])

In [None]:
@udf(returnType=StringType())
def remove_punctuation(text):
    doc = nlp(text)
    return " ".join([token.text for token in doc if not token.is_punct])

In [None]:
@udf(returnType=StringType())
def convert_to_lowercase(text):
    return text.lower()

In [None]:
# clear abstract column
arxiv_dataset = remove_empty_fields(arxiv_dataset, "abstract")
arxiv_dataset = remove_empty_newlines(arxiv_dataset, "abstract")
arxiv_dataset = remove_math_formula(arxiv_dataset, "abstract")
arxiv_dataset = remove_extra_spaces(arxiv_dataset, "abstract")
# convert to lowercase
arxiv_dataset = arxiv_dataset.withColumn("abstract", convert_to_lowercase("abstract"))
# remove punctuation
arxiv_dataset = arxiv_dataset.withColumn("abstract", remove_punctuation("abstract"))
# remove stopwords
arxiv_dataset = arxiv_dataset.withColumn("abstract", remove_stopwords("abstract"))

In [None]:
arxiv_dataset = arxiv_dataset.withColumn("abstract", lemmatization_spacy("abstract"))

### Gensim

In [None]:
abstracts = arxiv_dataset.select("abstract").rdd.map(lambda x: x.abstract.split()).collect()

In [None]:
dictionary = corpora.Dictionary(abstracts)
dictionary.filter_extremes(no_below=20, no_above=0.05)

In [None]:
dictionary.most_common(5)

In [None]:
corpus = [dictionary.doc2bow(text) for text in abstracts]

In [None]:
lda_model = models.LdaMulticore(corpus=corpus, id2word=dictionary, num_topics=10, random_state=42, passes=100)

In [None]:
lda_model.print_topics()

In [None]:
topics = [
    lda_model.get_document_topics(c)
    for c in corpus
]

In [None]:
topics[1]

In [None]:
def most_probable_topic(t):
    t = np.array(t)
    idx = t[:, 1].argmax()
    return int(t[idx, 0])

final_topics = [most_probable_topic(t) for t in topics]

### Query for similar articles (based on the same topic/category/common authors/other criteria)

In [None]:
# loading back the data
arxiv_dataset = load_data_from_json(sc, "arxiv-sample.json")

In [None]:
tokenizer = Tokenizer(inputCol="abstract", outputCol="words")
words_data = tokenizer.transform(arxiv_dataset)

In [None]:
hashing_tf = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=1000)
featurized_data = hashing_tf.transform(words_data)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idf_model = idf.fit(featurized_data)
tfidf_data = idf_model.transform(featurized_data)

In [None]:
# Calculate TF-IDF similarity between articles
article_id = "0704.0001"  # Example article ID
article_tfidf = tfidf_data.filter(col("id") == article_id).select("features").first()["features"]
article_tfidf

In [None]:
article_id2 = "0704.0002"  # Example article ID
article_tfidf2 = tfidf_data.filter(col("id") == article_id2).select("features").first()["features"]
article_tfidf2

In [None]:
def cosine_similarity(a, b):
    return float(a.dot(b) / (a.norm(2) * b.norm(2)))

In [None]:
# calculate cosine similarity between article_id and other articles and sort them
similarity = tfidf_data.rdd.map(lambda x: (x.id, cosine_similarity(article_tfidf, x.features))).sortBy(lambda x: x[1], ascending=False)

In [None]:
similarity.take(10)

In [None]:
# take the second article from similarity list
article_id2 = similarity.take(2)[1][0]
article_id2

In [None]:
# get the information of the second article
arxiv_dataset.filter(arxiv_dataset.id == article_id2).select("title", "abstract", "authors", "categories").show(truncate=False)

In [None]:
# get the information of the first article
arxiv_dataset.filter(arxiv_dataset.id == article_id).select("title", "abstract", "authors", "categories").show(truncate=False)

### query for similar articles based on category

In [None]:
### Similar article based on category
def get_similar_articles_by_category(article_id):
    # iterate over all articles 
    categories_search = (
        arxiv_dataset.filter(arxiv_dataset.id == article_id)
        .select("categories")
        .rdd.map(lambda x: x.categories)
        .map(lambda x: x.split(" "))
        .map(lambda x: [item for item in x])
        .flatMap(lambda x: x)
        .collect()
    )
    include_all = len(categories_search)
    if include_all == 0: # invalid article id
        return []
    similar_categories = (
        arxiv_dataset.rdd.map(lambda x: (x.id, x.categories))
        .map(lambda x: (x[0], x[1].split(" ")))
        .filter(lambda x: len(list(value for value in x[1] if value in categories_search)) == include_all)
        .map(lambda x: x[0])
        .filter(lambda x: x != article_id)
        .collect()
    )
    return similar_categories

# get a random article from the list
article_ = arxiv_dataset.rdd.takeSample(False, 1)[0]
# show the article category
print("categories to search after:",  article_.categories)
similar_articles = get_similar_articles_by_category(article_.id)
if len(similar_articles) > 0:
    test_id = similar_articles[0]
    arxiv_dataset.filter(arxiv_dataset.id == test_id).select("title", "abstract", "authors", "categories").show(truncate=False)


### query for similar articles based on common authors

In [None]:
def get_similar_articles_by_common_authors(article_id):
    # get list of authors for the article
    authors_search = (
        arxiv_dataset.filter(arxiv_dataset.id == article_id)
        .select("authors_list")
        .rdd.map(lambda x: x.authors_list)
        .map(lambda x: [item for item in x])
        .flatMap(lambda x: x)
        .collect()
    )
    similar_authors = (
        arxiv_dataset.rdd.map(lambda x: (x.id, x.authors_list))
        .map(lambda x: (x[0], x[1]))
        .filter(lambda x: len(list(value for value in x[1] if value in authors_search)) > 0)
        .map(lambda x: x[0])
        .filter(lambda x: x != article_id)
        .collect()
    )
    return similar_authors

In [None]:
# preprocess dataset authors column
arxiv_dataset = arxiv_dataset.withColumn("authors_list", get_authors_list("authors_parsed"))

In [None]:
# get a random article from the list
article_ = arxiv_dataset.rdd.takeSample(False, 1)[0]

authors_to_search = article_.authors_list

print("authors to search after:", authors_to_search)
similar_articles = get_similar_articles_by_common_authors(article_.id)

if len(similar_articles) > 0:
    test_id = similar_articles[0]
    arxiv_dataset.filter(arxiv_dataset.id == test_id).select("title", "abstract", "authors", "categories").show(truncate=False)


### Classification (for a new article to determine its category)

### First approach:
- use TF-IDF to vectorize the abstracts and Tokeniezer to tokenize the abstracts
- use CountVectors ("document-term vectors") and regex tokenizer to tokenize the abstracts ( + remove stop words)

Models to use:
- Naive Bayes
- Logistic Regression
- Random Forest

Finally: use Cross-Validation -> try to tune the hyperparameters

In [None]:
# loading back the data
arxiv_dataset = load_data_from_json(sc, "arxiv-sample.json")

In [None]:
# drop columns
drop_colums = ["authors_parsed", "authors_list", "comments", "doi", "journal-ref", "license", "report-no", "submitter", "title", "versions", "authors", "update_date"]
arxiv_dataset = arxiv_dataset.drop(*drop_colums)

In [None]:
arxiv_dataset.show(5)

In [None]:
# from the categories column, get the first category
arxiv_dataset = arxiv_dataset.withColumn("category", split("categories", " ")[0])
# drop the categories column
arxiv_dataset = arxiv_dataset.drop("categories")

In [None]:
arxiv_dataset.show(5)

In [None]:
string_indexer = StringIndexer(inputCol="category", outputCol="label", handleInvalid="skip")
indexer_fitted = string_indexer.fit(arxiv_dataset)
labels = indexer_fitted.labels # retrieve labels in order to use them later
arxiv_dataset = indexer_fitted.transform(arxiv_dataset)

In [None]:
arxiv_dataset.show(5)

In [None]:
# remove stopwords
arxiv_dataset = arxiv_dataset.withColumn("abstract", remove_stopwords("abstract"))
# convert to lowercase
arxiv_dataset = arxiv_dataset.withColumn("abstract", convert_to_lowercase("abstract"))
# remove punctuation
arxiv_dataset = arxiv_dataset.withColumn("abstract", remove_punctuation("abstract"))
# apply lemmatization
arxiv_dataset = arxiv_dataset.withColumn("abstract", lemmatization_spacy("abstract"))

### Prepare the models: Logistic Regression, Random Forest, Naive Bayes

In [None]:
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=30, regParam=0.3, elasticNetParam=0)
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=10)
nb = NaiveBayes(labelCol="label", featuresCol="features", smoothing=1.0, modelType="multinomial")

desired approach: prepare the training data and then use what model you want

In [None]:
# Define a pipeline with stages for tokenization, TF-IDF conversion, and Logistic Regression
tokenizer = Tokenizer(inputCol="abstract", outputCol="words")
hashing_tf = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=10000)

# minDocFreq: remove sparse terms
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)

pipeline = Pipeline(stages=[tokenizer, hashing_tf, idf])

# fit the pipeline
pipeline_model = pipeline.fit(arxiv_dataset)
dataset = pipeline_model.transform(arxiv_dataset)

# Split the data into training and testing sets
(training_data, testing_data) = dataset.randomSplit([0.8, 0.2], seed=123)

In [None]:
testing_data.show(5)

In [None]:
model_lr = lr.fit(training_data)
model_rf = rf.fit(training_data)
model_nb = nb.fit(training_data)

In [None]:
# Make predictions on the testing data
predictions_lr = model_lr.transform(testing_data)
predictions_rf = model_rf.transform(testing_data)
predictions_nb = model_nb.transform(testing_data)

In [None]:
# show predictions
predictions_lr.select("abstract", "label", "prediction").show(5)
predictions_rf.select("abstract", "label", "prediction").show(5)
predictions_nb.select("abstract", "label", "prediction").show(5)

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

accuracy_lr = evaluator.evaluate(predictions_lr)
accuracy_rf = evaluator.evaluate(predictions_rf)
accuracy_nb = evaluator.evaluate(predictions_nb)

print("Accuracy of Logistic Regression: ", accuracy_lr)
print("Accuracy of Random Forest: ", accuracy_rf)
print("Accuracy of Naive Bayes: ", accuracy_nb)

## Use CountVectorizer instead of HashingTF and regexTokenizer instead of Tokenizer

In [None]:
# loading back the data
arxiv_dataset = load_data_from_json(sc, "arxiv-sample.json")

In [None]:
# drop columns
drop_colums = ["authors_parsed", "authors_list", "comments", "doi", "journal-ref", "license", "report-no", "submitter", "title", "versions", "authors", "update_date"]
arxiv_dataset = arxiv_dataset.drop(*drop_colums)
# from the categories column, get the first category
arxiv_dataset = arxiv_dataset.withColumn("category", split("categories", " ")[0])
# drop the categories column
arxiv_dataset = arxiv_dataset.drop("categories")
string_indexer = StringIndexer(inputCol="category", outputCol="label", handleInvalid="skip")
indexer_fitted = string_indexer.fit(arxiv_dataset)
labels = indexer_fitted.labels # retrieve labels in order to use them later
arxiv_dataset = indexer_fitted.transform(arxiv_dataset)


In [None]:
regexTokenizer = RegexTokenizer(inputCol="abstract", outputCol="words", pattern="\\W")
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered")
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors])

# fit the pipeline
pipeline_fit = pipeline.fit(arxiv_dataset)
dataset = pipeline_fit.transform(arxiv_dataset)

In [None]:
# Split the data into training and testing sets
(training_data, testing_data) = dataset.randomSplit([0.8, 0.2], seed=123)

In [None]:
# Train the model
model_lr = lr.fit(training_data)
model_rf = rf.fit(training_data)
model_nb = nb.fit(training_data)

In [None]:
# Make predictions on the testing data
predictions_lr = model_lr.transform(testing_data)
predictions_rf = model_rf.transform(testing_data)
predictions_nb = model_nb.transform(testing_data)

In [None]:
# evaluate accuracy
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy_lr = evaluator.evaluate(predictions_lr)
accuracy_rf = evaluator.evaluate(predictions_rf)
accuracy_nb = evaluator.evaluate(predictions_nb)

print("Accuracy of Logistic Regression: ", accuracy_lr)
print("Accuracy of Random Forest: ", accuracy_rf)
print("Accuracy of Naive Bayes: ", accuracy_nb)

# Predict on a single abstract (new article)


In [None]:
# predict on a single abstract from the testing data
prediction = model.transform(testing_data.filter(testing_data.id == "0704.1072"))

# take the probability array of the prediction
prediction.select("probability").show(truncate=False)
# convert the probability array to a list
list_prediction = prediction.select("probability").collect()[0][0].toArray().tolist()
print(list_prediction)
threshold = 0.2
for i in range(len(list_prediction)):
    if list_prediction[i] > threshold:
        print(labels[i])

### Articles Distribution

In [None]:
def filter_by_category(dataset, category):
  return dataset.filter(dataset.categories.contains(category))

In [None]:
def convert_data(rfc_1123_date):
  return datetime.strftime(datetime.strptime(rfc_1123_date, "%a, %d %b %Y %H:%M:%S %Z"), "%Y-%m")

In [None]:
def create_graph(dataset, category):
  dataset_panda = dataset.select("createDate", "count").toPandas()

  plt.plot(dataset_panda["createDate"], dataset_panda["count"])
  plt.xlabel('Date')
  plt.ylabel('Count')
  plt.xticks(rotation=60, ha='right')
  plt.title('Paper over time for {} category'.format(category))
  plt.show()

In [None]:
def show_papers_over_time(category, df):
  # extract creation date
  convert_data_f = udf(convert_data, StringType())
  modified_df = df.withColumn('createDate', convert_data_f(df.versions[0]['created']))

  modified_df = filter_by_category(modified_df, category)
  modified_df = modified_df.groupBy('createDate').count().orderBy(asc('createDate'))
  create_graph(modified_df, category)

show_papers_over_time('hep-th')

In [None]:
def show_papers_over_time(category, df):
  # extract creation date
  convert_data_f = udf(convert_data, StringType())
  df = df.withColumn('createDate', convert_data_f(df.versions[0]['created']))

  df = filter_by_category(df, category)
  df = df.groupBy('createDate').count().orderBy(asc('createDate'))
  create_graph(df, category)

show_papers_over_time('hep-th')

### Cluster and top words

In [None]:
def tokenize_dataset(dataset, input_col, output_col):
  tokenizer = Tokenizer(inputCol=input_col, outputCol=output_col)
  return tokenizer.transform(dataset)

def vectorize_dataset(dataset, input_col, output_col):
  dataset = remove_extra_spaces(dataset, "categories")
  words2vec_model = Word2Vec(inputCol="words",
                             outputCol=output_col,
                             vectorSize=100,
                             minCount=1)
  return words2vec_model.fit(tokenize_dataset(dataset, input_col, "words"))

def prepare_data(dataset, output_col='scaled_categories', input_col="categories"):
  scaler = StandardScaler(inputCol="vector",
                          outputCol=output_col,
                          withStd=True,
                          withMean=False)
  model = vectorize_dataset(dataset, input_col, "vector")
  scalerModel = scaler.fit(model.getVectors())
  # data.select('scaled_categories').show(5)
  return scalerModel.transform(model.getVectors())

In [None]:
def show_silhouette_score_graph(dataset):
  silhouette_score = []
  evaluator = ClusteringEvaluator(predictionCol="prediction",
                                  featuresCol='scaled_categories',
                                  metricName='silhouette',
                                  distanceMeasure='squaredEuclidean')
  dataset = prepare_data(dataset)
  for i in range(2,10):
    kmeans=KMeans(featuresCol='scaled_categories', k=i)
    model=kmeans.fit(dataset)
    predictions=model.transform(dataset)
    score=evaluator.evaluate(predictions)
    silhouette_score.append(score)
    print('Silhouette Score for k =',i,'is',score)

  # Choose k = 4
  plt.plot(range(2,10),silhouette_score)
  plt.xlabel('k')
  plt.ylabel('silhouette score')
  plt.title('Silhouette Score')
  plt.show()

In [None]:
def create_clusters(dataset, k_value):
  kmeans = KMeans(featuresCol='scaled_categories', k=k_value)
  return kmeans.fit(prepare_data(dataset))

def create_predictions(model, dataset):
  return model.transform(prepare_data(dataset))

In [None]:
def run():
  sc = init_spark()[0]
  arxiv_dataset = load_data_from_json(sc, "part1.json")

  show_silhouette_score_graph(arxiv_dataset)

  model = create_clusters(arxiv_dataset, 4)
  print("Cluster Centers: ")
  for center in model.clusterCenters():
    print(center)

  create_predictions(model, arxiv_dataset).select('prediction').show(10)

run()

In [None]:
def prepare_dataset(dataset):
  dataset = remove_empty_fields(dataset, "abstract")
  dataset = remove_empty_newlines(dataset, "abstract")
  dataset = remove_math_formula(dataset, "abstract")
  dataset = convert_to_lowercase(dataset, "abstract")
  dataset = remove_extra_spaces(dataset, "abstract")
  return dataset

In [None]:
def show_word_cloud(dataset, category):
  text =  " ".join(dataset.filter(dataset.categories.contains(category)).select("abstract").rdd.flatMap(lambda x: x).collect())
  wordcloud = WordCloud().generate(text)

  plt.imshow(wordcloud, interpolation='bilinear')
  plt.axis("off")
  plt.show()

In [None]:
def show_word_top(dataset, category, count):
  dataset = remove_stop_words(dataset, "abstract", "filtered_abstract", list(STOPWORDS))
  dataset = dataset.filter(dataset.categories.contains(category))
  dataset = dataset.withColumn('filtered_abstract', explode(split('filtered_abstract', ' '))).groupBy('filtered_abstract').count().orderBy(desc('count'))
  top_val_dict = {r['filtered_abstract']:r['count'] for r in dataset.head(count)}
  top_val_dict.pop("")
  print(top_val_dict)


In [None]:
def run():
  arxiv_dataset = load_data_from_json(sc, "arxiv-sample.json")
  show_word_cloud(arxiv_dataset, "stat.TH")
  show_word_top(arxiv_dataset, "stat.TH", 10)

run()

## **Prolific authors**

### **Article categories**

In [None]:
categories_df = arxiv_dataset.select("categories")
distinct_categories_df = categories_df.distinct()
categories_list = distinct_categories_df.rdd.flatMap(lambda x: x).collect()

for i in range(len(categories_list)):
    element = categories_list[i]
    categories = element.split()
    categories_list[i] = categories

categories_list = set([item for sublist in categories_list for item in sublist])

print(categories_list)

### **Articles authors**

In [None]:
authors_df = arxiv_dataset.select("authors")
distinct_authors_df = authors_df.distinct()
authors_list = distinct_authors_df.rdd.flatMap(lambda x: x).collect()

authors_list = remove_text_between_parentheses(authors_list)
authors_list = remove_text_between_parentheses(authors_list)

authors_list = remove_substring_from_list(authors_list, "et al")
authors_list = remove_characters_before_substring(authors_list, "Collaboration:")

authors_list = split_function(",", authors_list)
authors_list = split_function(" and ", authors_list)

authors_list = delete_new_line_and_spaces(authors_list)
authors_list = delete_duplicate(authors_list)
authors_list = delete_letter(authors_list)
authors_list = delete_element(authors_list, "Collaboration")

print(authors_list)
print(len(authors_list))

### **Find prolific authors for a specific category**

In [None]:
for category in categories_list:
  filtered_df = arxiv_dataset.filter(col("categories").like(f"%{category}%"))
  authors_per_cathegory = filtered_df.select("authors").distinct().rdd.flatMap(lambda x: x).collect()

  profilic_authors = {}
  for author in authors_per_cathegory:
      for author_name in authors_list:
          if author_name.lower() in author.lower():
              if author_name in profilic_authors:
                  profilic_authors[author_name] += 1
              else:
                  profilic_authors[author_name] = 1
  max_author = max(profilic_authors, key=profilic_authors.get)
  print("Category: " + category + " Author: " + str(max_author))
