# arXiv Paper Embedding


## Multi GPU w/ Dask + CUDF
Using Dask and CuDF to orchestrate sentence embedding over multiple GPU workers.

![Rapids and Dask Logos](https://saturn-public-assets.s3.us-east-2.amazonaws.com/example-resources/rapids_dask.png "doc-image")

### Important Imports

* [`dask_saturn`](https://github.com/saturncloud/dask-saturn) and [`dask_distributed`](http://distributed.dask.org/en/stable/): Set up and run the Dask cluster in Saturn Cloud.
* [`dask-cudf`](https://docs.rapids.ai/api/cudf/stable/basics/dask-cudf.html): Create distributed `cudf` dataframes using Dask.

In [2]:
import dask_cudf
import cudf
import json
import os
import re
import string
from categories import cat_mapper
import pandas as pd

import spacy
from spacy.lang.en.stop_words import STOP_WORDS
import en_core_sci_lg


from dask_saturn import SaturnCluster
from dask.distributed import Client, wait


pd.set_option('display.max_colwidth', None)

DATA_PATH = "/home/jovyan/arxiv/arxiv-metadata-oai-snapshot.json"

# DATA_PATH = "arxiv-metadata-oai-snapshot.json"
YEAR_CUTOFF = 2000
YEAR_PATTERN = r"(19|20[0-9]{2})"
ML_CATEGORY = "cs.LG"

In [3]:
client.close()

NameError: name 'client' is not defined

### Start the Dask Cluster

The template resource you are running has a Dask cluster already attached to it with three workers. The `dask-saturn` code below creates two important objects: a cluster and a client.

* `cluster`: knows about and manages the scheduler and workers
    - can be used to create, resize, reconfigure, or destroy those resources
    - knows how to communicate with the scheduler, and where to find logs and diagnostic dashboards
* `client`: tells the cluster to do things
    - can send work to the cluster
    - can restart all the worker processes
    - can send data to the cluster or pull data back from the cluster

In [4]:
n_workers = 4
cluster = SaturnCluster(n_workers=n_workers)
client = Client(cluster, timeout='60s')

INFO:dask-saturn:Cluster is ready
INFO:dask-saturn:Registering default plugins
INFO:dask-saturn:Success!


If you already started the Dask cluster on the resource page, then the code above will run much more quickly since it will not have to wait for the cluster to turn on.

>**Pro tip**: Create and start the cluster in the Saturn Cloud UI before opening JupyterLab if you want to get a head start!

The last command ensures the kernel waits until all the desired workers are online before continuing.

In [5]:
client.wait_for_workers(n_workers=n_workers)

In [6]:
# def clean_description(description: str):
#     if not description:
#         return ""
#     # remove unicode characters
#     description = description.encode('ascii', 'ignore').decode()

#     # remove punctuation
#     description = re.sub('[%s]' % re.escape(string.punctuation), ' ', description)

#     # clean up the spacing
#     description = re.sub('\s{2,}', " ", description)

#     # remove urls
#     #description = re.sub("https*\S+", " ", description)

#     # remove newlines
#     description = description.replace("\n", " ")

#     # remove all numbers
#     #description = re.sub('\w*\d+\w*', '', description)

#     # split on capitalized words
#     description = " ".join(re.split('(?=[A-Z])', description))

#     # clean up the spacing again
#     description = re.sub('\s{2,}', " ", description)

#     # make all words lowercase
#     description = description.lower()

#     return description

In [7]:
# def process(paper: dict):
#     paper = json.loads(paper)
#     if paper['journal-ref']:
#         years = [int(year) for year in re.findall(YEAR_PATTERN, paper['journal-ref'])]
#         years = [year for year in years if (year <= 2022 and year >= 1991)]
#         year = min(years) if years else None
#     else:
#         year = None
#     return {
#         'id': paper['id'],
#         'title': paper['title'],
#         'year': year,
#         'authors': paper['authors'],
#         'categories': ','.join(paper['categories'].split(' ')),
#         'abstract': paper['abstract'],
#         'input': clean_description(paper['title'] + ' ' + paper['abstract'])
#     }

# def papers():
#     with open(DATA_PATH, 'r') as f:
#         for paper in f:
#             paper = process(paper)
#             if paper['year']:
#                 yield paper


def process(paper: dict):
    paper = json.loads(paper)
    if paper['journal-ref']:
        years = [int(year) for year in re.findall(YEAR_PATTERN, paper['journal-ref'])]
        years = [year for year in years if (year <= 2022 and year >= 1991)]
        year = min(years) if years else None
    else:
        year = None
    return {
        'id': paper['id'],
        'title': paper['title'],
        'year': year,
        'authors': paper['authors'],
        'categories': ','.join(paper['categories'].split(' ')),
        'abstract': paper['abstract']
    }


def papers():
    with open(DATA_PATH, 'r') as f:
        for paper in f:
            paper = process(paper)
            if paper['year']:
                # if paper['year'] >= YEAR_CUTOFF and ML_CATEGORY in paper['categories']:
                if paper['year'] >= YEAR_CUTOFF:
                    yield paper

def readpaper():
    with open(DATA_PATH, 'r') as f:
        for paper in f:
            yield process(paper)


In [8]:
cdf = cudf.DataFrame(list(papers()))

In [11]:
## Create the categories raw strings (Combine all encoded categories to strings)

def split_cats(cats):
    cats = cats.split(',')
    return [cat_mapper[cat] for cat in cats if cat in cat_mapper.keys()]

def get_all_categories_as_string(cats):
    return " ".join(cats)

In [13]:
df = cdf.to_pandas()

df["split_categories"] = df["categories"].map(split_cats)
df["categories_combined"] = df["split_categories"].map(get_all_categories_as_string)

cdf = cudf.from_pandas(df)

In [14]:
import pickle
# Pro Tip: Pickle the dataframe
# This might save you time in the future so you don't have to do all of that again
with open('cdf_1.pkl', 'wb') as f:
    pickle.dump(cdf, f)

In [12]:
# cdf.head()

In [None]:
# cdf["count_cats"] = cdf.categories.str.count(',') + 1

In [10]:

    
# Load pickle
# with open('cdf.pkl', 'rb') as f:
#     cdf = pickle.load(f)

In [15]:
# Still going to downsample here
len(cdf)

717846

In [13]:
# cdf['categories'].value_counts()


In [23]:
# cdf['categories_combined'].head()

0                High Energy Physics - Phenomenology
1    Classical Analysis and ODEs Functional Analysis
2           General Relativity and Quantum Cosmology
3                                  Materials Science
4                                       Astrophysics
Name: categories_combined, dtype: object

## Using Dask to parallelize things

In [16]:
# Convert our CuDF to a Dask-CuDF
ddf = dask_cudf.from_cudf(cdf, npartitions=n_workers).persist()

In [17]:
import copy

In [18]:
import string

punctuations = string.punctuation #list of punctuation to remove from text
stopwords = list(STOP_WORDS)

# Parser
parser = en_core_sci_lg.load()
parser.max_length = 700000


def spacy_tokenizer(sentence):
    ''' Function to preprocess text of scientific papers 
        (e.g Removing Stopword and puntuations)'''
    mytokens = parser(sentence)
    mytokens = [ word.lemma_.lower().strip() if word.lemma_ != "-PRON-" else word.lower_ for word in mytokens ] # transform to lowercase and then split the scentence
    mytokens = [ word for word in mytokens if word not in stopwords and word not in punctuations ] #remove stopsword an punctuation
    mytokens = " ".join([i for i in mytokens]) 
    return mytokens

In [19]:

word_model = spacy.load('en_core_sci_lg')
# word_model_2 = copy.deepcopy(word_model)

In [20]:
from dask.distributed import get_worker
import numpy as np



def get_embeddings(df):
    # model = word_model
    # cat = cats_str.categories_combined
    worker = get_worker()
    if hasattr(worker, "model"):
        model = worker.model
    else:
        # from 
        # model = spacy.load('en_core_sci_lg')
        # model = SentenceTransformer('sentence-transformers/all-mpnet-base-v2')
        worker.model = word_model.model
        
    sentences = [spacy_tokenizer(x) for x in df['categories_combined'].tolist()]
    # sentences = spacy_tokenizer(df)
    try:
        sentence_vector = [model(sent) for sent in sentences]
        # sentence_vector = model(sentences)
        df['category_embeddings'] = cudf.Series(sentence_vector, index=df.index)
    #     return [sentence_vector.vector]
    except:
        print("Error: ", flush=True)
        print(sentence, flush=True)
    #     # sentence_vector     
        # return [np.array([])]
    return df[['id', 'category_embeddings']]
    

def embed_partition(df: dask_cudf.DataFrame):
    """
    Create embeddings on single partition of DF (one dask worker)
    """
    worker = get_worker()
    if hasattr(worker, "model"):
        model = worker.model
    else:
        from sentence_transformers import SentenceTransformer

        # model = SentenceTransformer('sentence-transformers/all-mpnet-base-v2')
        model = spacy.load('en_core_sci_lg')
        worker.model = model

    print("embedding input", flush=True)
        
    # embed the input      
    vectors = model.encode(
        sentences = df.input.values_host,
        normalize_embeddings = True,
        show_progress_bar = True
    )
    
    # Convert to cudf series and return
    df['vector'] = cudf.Series(vectors.tolist(), index=df.index)
    return df[['id', 'vector']]

def clear_workers():
    """
    Deletes model attribute, freeing up memory on the Dask workers
    """
    import torch
    import gc

    worker = get_worker()
    if hasattr(worker, "model"):
        del worker.model
    torch.cuda.empty_cache()
    gc.collect()
    return

In [21]:
output_df = ddf[["id", "categories_combined"]].map_partitions(
    func = get_embeddings,
    meta = {
      "id": object,
      "vector": cudf.ListDtype('float32')
    }
)

# Gather results
output_df = output_df.persist()
%time _ = wait(output_df)

CancelledError: [('get_embeddings-557b83a316b87de336289891cc6315cb', 1), ('get_embeddings-557b83a316b87de336289891cc6315cb', 2), ('get_embeddings-557b83a316b87de336289891cc6315cb', 0), ('get_embeddings-557b83a316b87de336289891cc6315cb', 3)]

In [22]:
output_df.head()

CancelledError: ('get_embeddings-557b83a316b87de336289891cc6315cb', 0)

2022-10-28 13:57:19,539 - distributed.client - ERROR - Failed to reconnect to scheduler after 60.00 seconds, closing client


In [50]:
type(output_df)

dask_cudf.core.DataFrame

In [29]:
# Check output
len(output_df)

ModuleNotFoundError: No module named 'spacy'

In [31]:
# Check output
output_df.category_embeddings.isna().sum().compute()

AttributeError: 'DataFrame' object has no attribute 'category_embeddings'

In [33]:
# Merge and then take a sample of all ML papers AND papers older than 2015
full_ddf = ddf.merge(output_df)
full_ddf = full_ddf[(full_ddf.categories.str.contains(ML_CATEGORY)) | (full_ddf.year >= 2015)]

In [34]:
len(full_ddf)

ModuleNotFoundError: No module named 'spacy'

In [33]:
# Store as pickled pandas df

with open('arxiv_embeddings_300000.pkl', 'wb') as f:
    pickle.dump(full_ddf.compute().to_pandas(), f)

In [None]:
# Cleanup dask worker RAM
#client.run(clear_workers)