In [25]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import HashingTF, IDF
from pyspark.sql.types import *
import pyspark.sql.functions as psf
from pyspark.sql.window import Window
from pyspark.sql.types import StringType, StructType
from sklearn.metrics.pairwise import cosine_similarity
from scipy.sparse import csr_matrix
from operator import attrgetter
from scipy.sparse import vstack
import numpy as np
from pathlib import Path
import shutil

In [28]:
def as_matrix(vec):
    data, indices = vec.values, vec.indices
    shape = 1, vec.size
    return csr_matrix((data, indices, np.array([0, vec.values.size])), shape)

def broadcast_matrix(mat):
    bcast = sc.broadcast((mat.data, mat.indices, mat.indptr))
    (data, indices, indptr) = bcast.value
    bcast_mat = csr_matrix((data, indices, indptr), shape=mat.shape)
    return bcast_mat 

def parallelize_matrix(scipy_mat, rows_per_chunk=100):
    [rows, cols] = scipy_mat.shape
    i = 0
    submatrices = []
    while i < rows:
        current_chunk_size = min(rows_per_chunk, rows - i)
        submat = scipy_mat[i:i + current_chunk_size]
        submatrices.append((i, (submat.data, submat.indices, submat.indptr), (current_chunk_size, cols)))
        i += current_chunk_size
    return sc.parallelize(submatrices)

def calculated_cosine_similarity(sources, targets, inputs_start_index, threshold=.2):
    cosimilarities = cosine_similarity(sources.toarray(), targets.toarray())
    for i, cosimilarity in enumerate(cosimilarities):
        res_list = []
        cosimilarity = cosimilarity.flatten()
        rounded = [np.round(x, 4) for x in cosimilarity]
        source_index = inputs_start_index + i + 1
        for _, score in enumerate(rounded):
            if score > threshold:
                res_list.append(score)
        
        yield (source_index, len(res_list)-1)

[Stage 117:>                                                        (0 + 1) / 1]

In [26]:
spark = SparkSession.builder \
        .appName('TF-IDF') \
        .config("spark.sql.analyzer.maxIterations", "500") \
        .config('spark.executor.memory', '8g') \
        .config('spark.driver.memory', '8g') \
        .getOrCreate()
sc = spark.sparkContext

# Read the articles into a RDD
rdd = (sc.textFile('hdfs://namenode:9000/arxiv_dataset/cleaned_train.txt')
        .map(lambda line: line.split('\t'))
        .map(lambda r: (r[0].replace('"', ''), r[1].split(" "))))

# Schema for DataFrame
schema = StructType([
        StructField('id', StringType()),
        StructField('words', ArrayType(elementType=StringType()))
])

# Convert RDD to DataFrame
# data = spark.createDataFrame(rdd, schema)
# data.show(10)
# rdd.take(1)

In [None]:
data = spark.createDataFrame(rdd, schema)
data.show(5)

In [7]:
# Read categories into dataframe
categories = spark.read.csv('arxiv-dataset/categories.csv', header=True, inferSchema=True)
unique_cats = categories.select('categories').distinct()
cats_split = unique_cats.select(psf.split(psf.col('categories'), ' ').alias('categories'))
cats_list = cats_split.select(psf.explode(psf.col('categories')).alias('category')).distinct()

for row in cats_list.collect():
    category = row.category
    categories = categories.withColumn(category, psf.when(categories['categories'].contains(category), 1).otherwise(0))

joined_data = data.join(categories, ["id"])

                                                                                

In [None]:
for i,row in enumerate(cats_list.collect()):
    print(row.category)
    cat_df = joined_data.filter(joined_data[f'`{row.category}`'] == 1)
    print('filtered')
    cat_df.write.json(f'hdfs://namenode:9000/arxiv_dataset/categories/{row.category}')
    if i == 10: break

In [22]:
# loop over 10 first folders 
# get folders in hdfs://namenode:9000/arxiv_dataset/categories
cats = ['arxiv_dataset/categories/astro-ph.IM']
for i, cat in enumerate(cats):
    df = spark.read.json(f'hdfs://namenode:9000/{cat}')
    print(df.count())
    hashingTF = HashingTF(inputCol="words", outputCol='features', numFeatures=2**17)
    tf = hashingTF.transform(df)
    idf = IDF(inputCol='features', outputCol='idf')
    model = idf.fit(tf)
    tf_idf = model.transform(tf)
    tf_idf = tf_idf.drop('words', 'features', 'categories')
    vectors = tf_idf.rdd.map(attrgetter('idf'))
    matrix = vectors.map(as_matrix)
    matrix_reduced = matrix.reduce(lambda x, y: vstack([x, y]))
    matrix_parallelized = parallelize_matrix(matrix_reduced, rows_per_chunk=1000)
    matrix_broadcast = broadcast_matrix(matrix_reduced)
    print('Calculating result')
    res = matrix_parallelized.flatMap(lambda submatrix: \
        calculated_cosine_similarity(csr_matrix(submatrix[1], \
            shape=submatrix[2]), matrix_broadcast, submatrix[0]))
    final = res.sum()
    print(f'Category: {cat}; No. articles checked: {cat_df.count()}; Similar articles detected: {final}')
    if i == 10: break

output/nlin.CD/part-00000-450b8f31-803a-4c3c-b355-42f59fbcda21-c000.csv
output/cs.MS/part-00000-579407b2-635a-4773-9c84-d08fd64ea073-c000.csv
output/q-fin.CP/part-00000-78fd9394-527d-4acd-b44a-fc2b5aac964c-c000.csv
output/cs.LG/part-00000-27723eed-4325-4d32-b62f-601821b48d5c-c000.csv
+--------+--------------------+
|      id|               words|
+--------+--------------------+
|806.3537|"suppose,we,are,t...|
+--------+--------------------+



In [None]:
cats = [file for file in Path('hdfs://namenode:9000/arxiv_dataset/categories/').iterdir()]

