In [1]:
import logging
# Set the logging level to ERROR
logging.getLogger("dask").setLevel(logging.ERROR)

import shutil
from functools import wraps
import numpy as np
from dask import delayed
import dask.dataframe as dd
import dask.array as da
from dask.distributed import Client
from multiprocessing import Manager
from cluster_config import create_local_cluster

### Clean up previous run ###
# Specify the path to the dask-worker-space folder
dask_worker_space_path = "./dask-worker-space"
# Remove the folder if it exists
shutil.rmtree(dask_worker_space_path, ignore_errors=True)

# Create a local cluster with multiple processes
cluster = create_local_cluster()

# Connect a Dask client to the cluster
client = Client(cluster)

# Define data types for selected columns in a dictionary (preprocessed)
dtypes = {
    'title':'object',
    'categories': 'object',
    'abstract':'object',
    'update_date':'datetime',
}

df = dd.read_parquet(
    './processing/stage_1_text_preprocessed', 
    blocksize='8MB', 
    dtype=dtypes,
    )

# Reset the index to have a unique numerical index
df = df.reset_index(drop=True)

# Repartition the DataFrame into smaller partitions (e.g., 16 partitions for 16 cores)
df = df.repartition(npartitions=16)

# Show preprocessed arXiv Dask DataFrame which is stored as parquet file
print(df.dtypes)
df.head()

title                        object
abstract                     object
categories                   object
update_date          datetime64[ns]
categories_encode             int64
dtype: object


Unnamed: 0,title,abstract,categories,update_date,categories_encode
0,calculation prompt diphoton production cross s...,fully differential calculation perturbative qu...,hep-ph,2008-11-26,0
1,sparsitycertifying graph decomposition,describe new algorithm kellpebble game color u...,math.CO cs.CG,2008-12-13,1
2,evolution earthmoon system based dark matter f...,evolution earthmoon system described dark matt...,physics.gen-ph,2008-01-13,2
3,determinant stirling cycle number count unlabe...,show determinant stirling cycle number count u...,math.CO,2007-05-23,3
4,dyadic lambdaalpha lambdaalpha,paper show compute lambdaalpha norm alphage 0 ...,math.CA math.FA,2013-10-15,4


In [2]:
# Extract both year and month and create new columns 'year' and 'month'
df['year'] = df['update_date'].dt.year
df['month'] = df['update_date'].dt.month
year_min = df['year'].min().compute()
year_max = df['year'].max().compute()

In [3]:
# Create a datetime column for month groupings
df['dt'] = df[['year', 'month']].apply(lambda row: f"{row['year']}-{row['month']}-01", axis=1, meta=('dt','str'))
df['dt'] = dd.to_datetime(df['dt'], format='%Y-%m-%d')
df = df.sort_values(by='dt')
df.head()

Unnamed: 0,title,abstract,categories,update_date,categories_encode,year,month,dt
3,determinant stirling cycle number count unlabe...,show determinant stirling cycle number count u...,math.CO,2007-05-23,3,2007,5,2007-05-01
2223743,difference differential equation colored jones...,colored jones function knot sequence laurent p...,math.GT math.QA,2007-05-23,111,2007,5,2007-05-01
2223742,characterization freeness hyperplane arrangement,consider characterization freeness hyperplane ...,math.CO math.AG,2007-05-23,334,2007,5,2007-05-01
2223740,limiting distribution additive functionals cat...,additive tree functionals represent cost many ...,math.PR math.CO,2007-05-23,926,2007,5,2007-05-01
2223739,singularity analysis hadamard product tree rec...,present toolbox extracting asymptotic informat...,math.CO math.PR,2007-05-23,405,2007,5,2007-05-01


In [4]:
from sklearn.feature_extraction.text import TfidfVectorizer

# Calculate the TF-IDF matrix
tfidf_vectorizer = TfidfVectorizer(max_features=500, stop_words='english')
tfidf_matrix = da.from_array(tfidf_vectorizer.fit_transform(df['title'].compute()))

# Define the vocabulary (term-to-feature index mapping)
vocabulary = tfidf_vectorizer.get_feature_names_out()

print('Vocabulary length: ',len(vocabulary))
print('TFIDF Matrix Row length: ',len(tfidf_matrix[0]))

Vocabulary length:  500
TFIDF Matrix Row length:  500


In [None]:
from datetime import datetime
# Define a function to calculate most common topics for a given group of documents
def scores(g):
    # Calculate the sum of TF-IDF scores for each term in the group
    indexes= g.index.compute().tolist()
    # print('Indexes: ',indexes)
    # print('Indexes length: ',len(indexes))
    term_scores = tfidf_matrix[indexes, :].compute()
    # print('Term scores sparse matrix: ',term_scores)
    term_scores = np.array(term_scores.sum(axis=0))[0]
    # print('Term scores length: ',len(term_scores))
    # print('Term scores: ',term_scores)
    return term_scores

# Init file with headers
with open(f'./processing/stage_2_text_vectorized/tfidf_scores-{datetime.now().strftime("%Y-%m-%d")}.csv','w') as f:
    columns = vocabulary.tolist()
    columns = np.concatenate((['dt'], columns))
    # print(columns)
    f.write(','.join(map(str, columns)) + '\n')

# Iterate over the groups and print the first few rows of the specified year
group_dt = df.groupby('dt')
unique_dates = df['dt'].unique().compute()

for key in unique_dates:
    gdt = group_dt.get_group(key)
    with open(f'./processing/stage_2_text_vectorized/tfidf_scores-{datetime.now().strftime("%Y-%m-%d")}.csv','a') as f:
        topic_scores = scores(gdt)
        joined_string = ','.join([str(x) for x in topic_scores])
        line = str(key) + ',' + joined_string
        print(f"\nTopics scores in {key}:{line}")
        f.write(line + '\n')