# CORD-19 Project: Analysis of COVID-19 Papers
The following notebook aims to perform a parallelized analysis on a dataset containing several COVID-19 related papers. This dataset is part of a real-world research on COVID-19 known as COVID-19 Open Research Dataset Challenge (CORD-19). Datasets and relative information can be retrieved here: https://www.kaggle.com/allen-institute-for-ai/CORD-19-research-challenge

For this project we are going to use two version of the dataset: version 30 (13.63 GB) and version 50 (20.03 GB). The dataset is updated periodically and at present is composed by a staggering 87 GB of papers. We will make use of available resources on CloudVeneto to create a cluster, and Dask to perform the analysis in a parallelized setting

# 0: Setup of the cluster
We created a cluster on CloudVeneto using 4 machines: one of them is the ```dask-scheduler```, the other 3 are ```dask-workers```. In addition to that, to guarantee consistent access to all machines to the dataset, we set up a NFS-server. We access the machines via ```ssh``` protocol.

```bash
ssh -L port:127.0.0.1:port -J username@gate.cloudveneto.it -i ~/.ssh/key.pem ubuntu@ip
```

We changed ```port```, ```username``` and ```ip``` based on our needs. Ip of our machines are:
- ```10.67.22.173``` for the scheduler
- ```10.67.22.150``` for worker 1
- ```10.67.22.153``` for worker 2
- ```10.67.22.183``` for worker 3
- ```10.67.22.227``` for NFS server

## Cluster creation

We shared keys between the workers and the scheduler so they could communicate on their own.

We used `miniconda` to exactly replicate the python environment on each VM.

After some trials and errors we decided to store both dataset versions on a 60 GB volume. To grant access to every worker,
 we mounted the volume on a VM and configured it as an NFS server, following the CloudVeneto user's guide.

To connect our machines to the NFS server we run on all of them:

```bash
sudo mount -t nfs 10.67.22.227:/home/ubuntu/nfs /home/ubuntu/data
```

Now we are ready to setup Dask with Python

# 1: Libraries import and Cluster turn-on
In this section we import all the libraries we will use in the notebook. Then we turn on the cluster via Dask

## 1.1: Libraries import

In [None]:
from dask.distributed import Client, SSHCluster, default_client, PipInstall, performance_report, wait
import ClusterManager as cm  ## Custom python script.
import dask.bag as db
import dask.dataframe as dd
import dask
import numpy as np
import json
from IPython.display import clear_output
import glob as gl
import os
import glob
from nltk.corpus import stopwords
from nltk import word_tokenize
import re
import nltk
from dask.distributed import Client
import socket
import pandas as pd
import seaborn as sns
import time
from collections import Counter
import matplotlib.pyplot as plt

## 1.2: Cluster setup
The following command starts off the as-before setupped cluster. To access the dashboard, we create a ```SSH``` tunnel via the command line:

```bash
ssh -L 8797:localhost:8797 -J ncognome@gate.cloudveneto.it -i ~/.ssh/chiave.pem ubuntu@10.67.22.173
```

So that we can then access the Dask Dashboard on http://localhost:8797

In [None]:
client = cm.ClusterStarter()

We can check the status of the cluster by giving a simple task to perform. We should see the activity log activating in the Dashboard

In [None]:
def who_am_i():
    import socket
    return socket.gethostname()

futures = [client.submit(who_am_i) for _ in range(10)]
results = client.gather(futures)



# 2: Dataset setup
In this part of the Notebook we focus on the available datasets and try to retrieve the correct information from each record. First off, we decide which version of the dataset to analyze and inspect a single JSON's structure:

In [None]:
dataset_version = 30  #pr 50/ whichever we prefer
directory_path = f"data/{dataset_version}/document_parses/pdf_json/"
filenames = gl.glob(directory_path + '*.json')

print(f"Found {len(filenames)} JSON files. \n")

example = filenames[1]
with open(example, 'r') as f:
    data = json.load(f)

print('Each JSON is a dictionary with some keys:')
print(data.keys())

All of our files should have the following structure:
```json
{
    "paper_id": <str>,                      # 40-character sha1 of the PDF
    "metadata": {
        "title": <str>,
        "authors": [                        # list of author dicts, in order
            {
                "first": <str>,
                "middle": <list of str>,
                "last": <str>,
                "suffix": <str>,
                "affiliation": <dict>,
                "email": <str>
            },
            ...
        ],
        "abstract": [                       # list of paragraphs in the abstract
            {
                "text": <str>,
                "cite_spans": [             # list of character indices of inline citations
                                            # e.g. citation "[7]" occurs at positions 151-154 in "text"
                                            #      linked to bibliography entry BIBREF3
                    {
                        "start": 151,
                        "end": 154,
                        "text": "[7]",
                        "ref_id": "BIBREF3"
                    },
                    ...
                ],
                "ref_spans": <list of dicts similar to cite_spans>,     # e.g. inline reference to "Table 1"
                "section": "Abstract"
            },
            ...
        ],
        "body_text": [                      # list of paragraphs in full body
                                            # paragraph dicts look the same as above
            {
                "text": <str>,
                "cite_spans": [],
                "ref_spans": [],
                "eq_spans": [],
                "section": "Introduction"
            },
            ...
            {
                ...,
                "section": "Conclusion"
            }
        ],
        "bib_entries": {
            "BIBREF0": {
                "ref_id": <str>,
                "title": <str>,
                "authors": <list of dict>       # same structure as earlier,
                                                # but without `affiliation` or `email`
                "year": <int>,
                "venue": <str>,
                "volume": <str>,
                "issn": <str>,
                "pages": <str>,
                "other_ids": {
                    "DOI": [
                        <str>
                    ]
                }
            },
            "BIBREF1": {},
            ...
            "BIBREF25": {}
        },
        "ref_entries":
            "FIGREF0": {
                "text": <str>,                  # figure caption text
                "type": "figure"
            },
            ...
            "TABREF13": {
                "text": <str>,                  # table caption text
                "type": "table"
            }
        },
        "back_matter": <list of dict>           # same structure as body_text
    }
}
```
We have to handle them carefully, since there can be missing values

## 2.1: Dask bag
Dask bags are one of the available data structures on Dask. They provide very generalized computations, and interpret their content as lists (the i-th element can be accessed via command, and also there can be duplicated arguments). For the first algorithm, this is our data structure of choice

In [None]:
n_partitions = 200
files_to_take = len(filenames)
filenames = filenames[:files_to_take]

# function to load files
def load_json_file(path):
    with open(path) as f:
        return json.load(f)

json_bag = db.from_sequence(filenames, npartitions=n_partitions).map(load_json_file)
count = json_bag.count().compute()
print('JSON bag contains',count,'files')

# 3: Word counter distributed algorithm
Our first real task is about a Word counter. Our goal is to determine how many times a single word appears in every document (and then in the full dataset). To achieve this we setup an algorithm divided in two phases:

**Map phase**: For each document $D_i$ we will produce a set of intermediate pairs $(w,cp(w))$, one per each word $w$ in a given document $D_i$, where $cp(w)$ refers to the number of occurrences of $w$ in $D_i$.

**Reduce phase**: For each word $w$, gather all pairs previously computed and return a final pair $(w,c(w))$ where $c(w)$ refers to the **total** number of occurrences of $w$ across all documents.  

The algorithm will be executed on the full text of all the papers. To get the full text, we first need to access the ```body-text``` attribute of each JSON and concatenate the strings in them.

In [None]:
# english common stopwords
en_stopwords = stopwords.words("english")

# function to compute the words in a text
def words_in_body(body_text):

    # clean text
    string = " ".join([txt['text'] for txt in body_text]).lower() # join all paragraphs in lowercase

    string = re.sub(r"[^a-z\s]", " ", string) # remove numbers and punctuations

    tokens = string.split() # tokenize on whitespaces (avoids words splitting)

    # remove stopwords, common words and single letters
    stop_words = set(en_stopwords) | {"fig","figure","et","al","results","also",
                                      "used","using","may","one","two","de","however"}
    
    stop_words |= set("p o i u y t r e w q l k j h g f d s a z x c v b n m".split())
    filtered_words = [w for w in tokens if w not in stop_words]

    words, words_counts = np.unique(filtered_words, return_counts=True)

    return [{"word": str(i) , "n_counts": int(j)} for i,j in zip(words,words_counts)]
    

We separately perform map and reduce phases in the following cell

In [None]:
# extract each document's body text
body_texts = json_bag.pluck("body_text")

# count words (map phase)
words_counts = body_texts.map(words_in_body)

# increment counters through all documents
def increment(tot, x):
    return tot + x['n_counts']

# using foldby to get all occurrences of each 'word' (reduce phase)
words_counted_reduce = words_counts.flatten().foldby('word', 
                                                     binop=increment, 
                                                     initial=0, 
                                                     combine = lambda x,y: x+y, 
                                                     combine_initial=0
                                                    )


Dask is **lazy**: it does not perform any computation if it is not explicily said to do so. We compute the task and show the results in a histogram:

In [None]:
# convert to a list of (word, count) tuples and compute
word_counts = words_counted_reduce.compute()

# sort and get highest 20
sorted_words = sorted(word_counts, key=lambda x: x[1], reverse=True)[:20]
words, counts = zip(*sorted_words)

# plot logic
plt.figure(figsize=(10, 6))
plt.bar(words, counts)
plt.xticks(rotation=45, ha='right')
plt.ylabel("Count")
plt.title("Top 20 Most Frequent Words")
plt.tight_layout()
plt.show()

## 3.1: Grid search on hyperparameters
We want to be able to perform tasks with the highest speed possible. Hyperparameter configuration is strictly dependent on the dataset and its structure, and it's hard to infer a-priori what values they should take. For this reason, we repeat the previous analysis changing the number of available workers and the number of partitions, to see which combination performs best. We expect to have better results with 3 workers, and a number of partitions that is manageble by our workers, but at the same time that include a high number of files

In [None]:
def configuration_time(n_partitions):
    # take the starting time
    start_time = time.time()
    # configure the bag
    json_bag = db.from_sequence(filenames, npartitions=n_partitions).map(load_json_file)
    # perform the count(map phase)
    words_counts = json_bag.pluck("body_text").map(words_in_body)
    # merge the counts (reduce phase)
    words_counts_reduce = words_counts.flatten().foldby('word', 
                                                     binop=increment, 
                                                     initial=0, 
                                                     combine = lambda x,y: x+y, 
                                                     combine_initial=0
                                                    ).compute() # here we compute to really perform the operation
    end_time = time.time()
    diff_time = end_time - start_time
    print(f'With {n_partitions} partitions, computation time: {round(diff_time,2)}s')

    # delete the objects (save space)
    del json_bag
    del words_counts
    del words_counts_reduce
    
    return diff_time
    

In [None]:
# ensure that the cluster is closed
client.close()
cluster.close()
# setup DataFrame for saving results
df_times = pd.DataFrame(np.nan, index=partitions_list, columns=possible_workers)

We ran the following cell a number of times to get all the data needed for completing the grid

In [None]:
# parameters
partitions_list = [1,10, 20, 50, 100, 500]
possible_workers = [1,2,3]  
chosen_n_workers = possible_workers[0]

client = cm.ClusterManager(nworkers=chosen_n_workers)

# get the time for each partition count
times_list = []
print(f'\nTrying {num_workers} workers...')
for partitions in partitions_list:
    t = configuration_time(partitions)
    times_list.append(t)

df_times[chosen_n_workers] = times_list

# close client and cluster 
client.close()
cluster.close()

When the grid is complete, we plot it:

In [None]:
plt.figure(figsize=(6, 6))
sns.heatmap(final_df, annot=True, fmt=".2f", cmap="jet", alpha = 0.8)
plt.title("Execution Time Heatmap")
plt.xlabel("Number of Workers")
plt.ylabel("Number of Partitions")
plt.tight_layout()
#plt.savefig("Grid_search_execution_times.png")
plt.show()

# 4: Identifying the worst and best represented countries/institutes in the research
For this second task we will convert our dataset into ```DataFrame``` structure. Our aim is to identify the most and least active countries in this research field. The analysis is based on affiliation institute of authors.

In [None]:
# taking data
client = cm.ClusterManager(nworkers=chosen_n_workers)

directory_path = f"data/{dataset_version}/document_parses/pdf_json/"
filenames = gl.glob(directory_path + '*.json')

print(f"Found {len(filenames)} JSON files. \n")

with open(example, 'r') as f:
    data = json.load(f)


In [None]:
%%time

def extract_key_fields(filepath):
    with open(filepath) as f:
        data = json.load(f)

    authors = data.get('metadata', {}).get('authors', [])
    # extract information in dict
    return {
        'paper_id': data.get('paper_id'),
        'title': data.get('metadata', {}).get('title'),
        'author_count': len(data.get('metadata', {}).get('authors', [])),
        'affiliations': [author.get('affiliation') for author in authors],
        'first_author': data.get('metadata', {}).get('authors', [{}])[0].get('last') if data.get('metadata', {}).get('authors') else None,
            }

# create the dataframe
npartitions = max(1, len(filenames) // 50)  
json_bag2 = db.from_sequence(filenames, npartitions=npartitions).map(extract_key_fields)
json_df = json_bag2.to_dataframe()

count = json_df.count().compute()
print('JSON Data Frame contains', count, 'papers')



In [None]:
json_df.head()

In [None]:
%%time

def process_partition(partition):
    all_institutions = []
    all_countries = []
    
    for affiliations in partition['affiliations']:
        if isinstance(affiliations, list):
            for aff in affiliations:
                if isinstance(aff, dict):
                    if 'institution' in aff:
                        all_institutions.append(aff['institution'])
                    if 'location' in aff and isinstance(aff['location'], dict):
                        if 'country' in aff['location']:
                            all_countries.append(aff['location']['country'])
    
    return pd.DataFrame({
        'institutions': [all_institutions],
        'countries': [all_countries]
    })

# Apply to each partition
results = json_df.map_partitions(process_partition, meta={'institutions': object, 'countries': object})
all_institutions = [item for sublist in results['institutions'] for item in results]
all_countries = [item for sublist in results['countries'] for item in results]

# Compute and aggregate
computed = results.compute()

all_institutions = [item for sublist in computed['institutions'] for item in sublist]
all_countries = [item for sublist in computed['countries'] for item in sublist]

institution_counts = Counter(all_institutions)
country_counts = Counter(all_countries)


In [None]:
%%time
institution_counts = Counter(all_institutions)
country_counts = Counter(all_countries)

In [None]:
institution_counts.visualize()

In [None]:
institution_counts = Counter(all_institutions)
country_counts = Counter(all_countries)

# Removing null values

institution_counts = Counter({k: v for k, v in institution_counts.items() 
                           if k and str(k).strip() and k != 'null'})
country_counts = Counter({k: v for k, v in country_counts.items() 
                           if k and str(k).strip() and k != 'null'})

We noticed that a fair number of countries occurred with different names (for example United States of America and USA). We want to merge the counts relative to the same real countries, but cited with different names

In [None]:
df2 = pd.DataFrame.from_dict(country_counts, orient='index', columns=['count'])
df2['country'] = df2.index
df2 = df2.reset_index(drop=True)

# mapping duplicate countries
country_mapping = {
    'USA': 'United States',
    'United States of America': 'United States',
    'US': 'United States',
    'USA, USA': 'United States',
    'Usa': 'United States',
    'Alabama': 'United States',
    'New Jersey': 'United States',
    'UK': 'United Kingdom',
    'UK A R': 'United Kingdom',
    'Great Britain': 'United Kingdom',
    'PR China': 'China', 
    'People\'s Republic of China': 'China',
    'P.R. China': 'China',
    'PR China': 'China',
    'China, China': 'China',
    'P. R. China': 'China',
    'China A R': 'China',
    'Republic of Korea': 'South Korea',
    'Korea': 'South Korea',
    'España': 'Spain',
    'the Netherlands': 'The Netherlands',
    'Taiwan, ROC': 'Taiwan',
    'Taiwan ROC': 'Taiwan',
    'Italia': 'Italy',
    'Russian Federation': 'Russia', 
    'CANADA': 'Canada',
    'australia': 'Australia',
    'UAE': 'United Arab Emirates',
    'France Key Words': 'France',
    'Democratic Republic of the Congo': 'Democratic Republic of Congo', 
    'Congo': 'Democratic Republic of Congo', 
    'The Gambia': 'Republic of The Gambia'
}

df2['country_clean'] = df2['country'].replace(country_mapping)
df2 = df2[df2['country_clean'].str.match(r'^[a-zA-Z\s]+$')]

df2 = df2.groupby('country_clean')['count'].sum().reset_index()
df2 = df2.sort_values('count', ascending=False).reset_index(drop=True)



#df2.head(30)
df2 = df2[df2['count'] >= 12]
df2.head(-30)

#Top 30
plt.figure(figsize=(15, 8))
df2.head(30).plot(x='country_clean', y='count', kind='bar', figsize=(15, 8))
plt.title('Top 30 Countries by Count')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show()

# Bottom 30 countries
plt.figure(figsize=(15, 8))
df2.tail(30).iloc[::-1].plot(x='country_clean', y='count', kind='bar', figsize=(15, 8))
plt.title('Bottom 30 Countries by Count')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show()


In the end we plot in histograms the results

In [None]:
bottom_n = 20
top_n = 20
top_institutions = institution_counts.most_common(top_n)
bottom_institutions = institution_counts.most_common()[:-(bottom_n+1):-1]

names_t, counts_t = zip(*top_institutions)
names_b, counts_b = zip(*bottom_institutions)

# Create bar plot
plt.figure(figsize=(12, 8))
plt.bar(range(len(names_t)), counts_t)
plt.xticks(range(len(names_t)), names_t, rotation=45, ha='right')
plt.xlabel('Institution')
plt.ylabel('Count')
plt.title(f'Top {top_n} Institutions')
plt.tight_layout()
plt.show()


# 5: Obtaining embeddings for Paper Titles
In Natural Language Processing, a common tecnique for text analysis is to transform text into numerical vectors, where each vector represents a word in a document. At the end of this operation, the document is transformed in a list of vectors, or a matrix ```n x m``` where ```n``` is the number of words in the document and ```m``` is the size of the vector representing each word ```n```.

For this part, we took a pre-trained English model. The model is a large dictionary whose entries are in the form ```key:vector```, where each ```key``` corresponds to a word. This way we can easily link each word to its own vector. We will map the dataset in a DataFrame composed of ```paper-id``` (```pid```) and ```title-embedding``` (```vec```), so we link each document's id to the representation of its title in the vector space. 

In [None]:
#  helper functions:
def split_like_oleg(inputi:str)->list:
    """
    splits and lowers string
    """
    if type(inputi) != type('hey'): return []
    return inputi.lower().split()

def master_splinter(inputi:str) -> dict:
    """
    Splits strings and turns them in dicts made like {'word': [vector]}
    """
    line = inputi.rstrip().split(' ')
    if len(line) < 301: return None # just to be sure
    elif len(line[0]) <= 2: return None # remove header
    else:
        return dict({'word' :str(line[0]), 'vec': list(map(float,line[1:])) })

def usgarrista(inputi):
    """
    Returns True if  NoneType
    """
    return inputi is None 

def list_or_nothing(objecto): # some words do not appear in model: annihilated
    if objecto is None: return []
    if isinstance(objecto, list): return objecto
    return []

# functions for this part
def nlp_embedding(num_workers:int,
                            title_pid_partitions:float,
                            wiki_blksize:float,
                            filt_partitions:float,
                            v_dataset:int,
                            benchmark:bool):
    """
    This function takes the metadata.csv file, selects title and paper ID, extract a list of unique words from all the titles.
    Then it filters the model wiki.en.vec (the biggest one from FastText) to have a model with just the words we need.
    The it maps each word to the model to retrieve the embedding. A group by the index gives us a dataframe w/ ID + list of vectors.
    """
    client = cm.ClusterStarter(nworkers=num_workers)
    dataset_version= v_dataset
    modelpath = '/home/ubuntu/data/NLP_model/wiki.en.vec' ## Model location
    metadata_path = f'/home/ubuntu/data/{dataset_version}/metadata.csv' # metadata location

    metadatacsv = dd.read_csv(metadata_path,   # Reading metadata.csv
                            blocksize=32e6, ## so it is divided in 3,more partitions make no sense
                            dtype={'title': str, 'sha': str, 'pmcid': str, 'arxiv_id': str, 'pubmed_id': str, 'who_covidence_id': str}
                            ).loc[:,['sha', 'title','pmcid']]

    metadatacsv['pid'] = metadatacsv['sha'].fillna(value=metadatacsv['pmcid']).astype(str) # sometimes pdf_jsonID is missing, we will use from PMC_json
    title_pid = metadatacsv.drop(columns=['sha','pmcid']) # don't need those
    title_pid['title'] = title_pid['title'].apply(split_like_oleg, meta=('title', 'object')) # turning every title-string into title-list
    title_pid = title_pid.explode('title').repartition(npartitions=title_pid_partitions) # exploding every list + repartitioning


    uni_words = dd.from_pandas(pd.DataFrame({'word':title_pid['title'].unique()})).astype({'word': str})#unique words in all titles

    del metadatacsv 
    client.run(gc.collect) # naive memory management

    wiki_vec = db.read_text(modelpath,# loading model
                            blocksize=wiki_blksize,
                            encoding='utf-8',
                            errors='strict',
                            linedelimiter='\n',
                            collection=True
                            ).map(
                                master_splinter
                                ).remove(
                                    usgarrista
                                    ).to_dataframe(
                                        meta= {'word': str, 'vec': 'object'}
                                    )
    wiki_vec['word'] = wiki_vec['word'].astype(str)

    filtered_wiki = wiki_vec.merge(uni_words, # intersecting model with unique words, so we will use just what we need
                                    on='word',
                                    how='inner'
                                    ).repartition(
                                        npartitions=filt_partitions)
    del wiki_vec, master_splinter, usgarrista, uni_words, modelpath, metadata_path # memory management
    client.run(gc.collect)

    the_word_and_friends = title_pid.merge(filtered_wiki, how='left', left_on='title',right_on='word') # merging exploded titles w/ filtered wiki
    the_word_and_the_vec = the_word_and_friends[['pid','vec']].copy() # some cleaning/refining
    the_word_and_the_vec['vec'].apply(list_or_nothing, meta=('vec', 'object'))
    del the_word_and_friends # memory management
    client.run(gc.collect)

    nlp_bible = the_word_and_the_vec.groupby('pid')['vec'].apply(list, meta=('vec','object')).reset_index().persist()
    
    if benchmark:
        wait(nlp_bible) # wait for persist to compute the df
        del nlp_bible, the_word_and_the_vec # memory management of the cluster if benchmarking
        client.run(gc.collect)
        client.shutdown()
        return 'worked'
    else:
        wait(nlp_bible) ## this is not really necessary but maybe it's better to have the dataset ready before starting the next task. We will decide after doing the last task.
        return client, nlp_bible


def benchmark():
    worker_list = [3, 2, 1] # reversed because is faster to shut down a worker than waking it up
    title_pid_parts = 3 # it does not makes sense to split this in more partitions than 3
    wiki_blksize = [32e6, 64e6, 128e6] #,512e6,1e9] bigger partitions were not manageable by workers
    filt_wiki = [3, 6, 9] # filt_wiki is like a third of wiki_blk size so ~ 2 GB, so these number of partitions should be ok. According to our resources and Dask docs.
    times = [] 
    check = [] # check it not really needed but it makes me feel calmer during benchmarking.
    param_list = []
    # things to add? CPU Load? Data transfer? # not enough time left to implement it im afraid
    for worker in worker_list:        
        for blksz in wiki_blksize:
            for filt_wikiii in filt_wiki:
                start = time.time()
                a = nlp_embedding(num_workers=worker,
                    title_pid_partitions=title,
                    wiki_blksize= blksz,
                    filt_partitions=filt_wikiii, 
                    v_dataset=dataset_version,
                        benchmark=True )
                end = time.time()
                diff_time = end - start
                #times.append(diff_time)
                #check.append(a)

                params = {
                    'workers': worker,
                    'wiki_blksize': blksz,
                    'title_pid_parts': title,
                    'filt_wiki': filt_wikiii,
                    'time': diff_time,
                    'check': a
                }
                param_list.append(params)
    
    return param_list #times, check, params


## 5.1: Grid search on hyperparameters
As previously done for the word counter algorithm, we want to look for the best setting that lets us perform the task as quickly as possible. The way we have set up the functions lets us set a flag that lets us perform a benchmark. Our parameter space is the following:

```python
worker_list = [3,2,1]
title_pid_parts = [3, 6, 9]
wiki_blksize = [32e6, 64e6, 128e6] #,512e6,1e9] 
filt_wiki = [3, 6, 9]

```

In [None]:
params = benchmark()

Then we retrieve the best performance's time and parameters

In [None]:
times = [x['time'] for x in params]
times = np.array(times)
best_time_index = np.argmin(times)
best_time = times[best_time_index]
best_params = params[best_time_index]

print(f'We got best time: {best_time}s with params: \n{best_params}')

## 5.2: Setup the best parameters and see the results
We want to setup the cluster correctly for the last part and check the representation of the titles as vectors

In [None]:
n_workers = best_params['workers']
title_pid_partitions = best_params['title_pid_parts']
wiki_blksize = best_params['wiki_blksize']
filt_partitions = best_params['filt_wiki']
benchmark = False

client, df = nlp_embedding(num_workers=n_workers,
                 title_pid_partitions=title_pid_partitions,
                  wiki_blksize=wiki,_blksize,
                   filt_partitions=filt_partitions,
                    v_dataset=dataset_version,
                     benchmark=benchmark)


In [None]:
df.head(1).iloc[1]