# Dask Experiments

This notebook showcases the Dask implementation and experiments.


In [1]:

from erpub.pipeline.preprocessing_dask import all_lowercase_and_stripped_dask
from experiment_utils import (
    plot_blocking_comparison,
    evaluate_blocking_method,
    plot_matching_accs,
    get_accuracy_of_matches,
)
import numpy as np
from erpub.pipeline.pipeline_dask import DaskPipeline
from erpub.pipeline.matching_dask import jaccard_similarity, specific_name_matcher_dask
from dask.distributed import Client


In [3]:
def run_matching_experiment(pipeline, name, thresholds, plot=True):
    accs = []
    for threshold in thresholds:
        pipeline.run(f"experiments/{name}_threshold_{threshold}", threshold)
        accs.append(get_accuracy_of_matches(f"experiments/{name}_threshold_{threshold}/matched_entities.csv"))
    if plot:
        plot_matching_accs(f"Matching accuracy of experiment: {name}", thresholds, accs)
    print(f"Matching accuracys: {accs}")

We're starting with a simple setup with jaccard_similarity and data preprocessing.

Disclaimer: Unfortunately there are still pickle serialization issues, impacting part of the functionality

In [4]:
global client, pipe, ddf
client = Client()
pipe = DaskPipeline("data/prepared/", client=client, matching_fns=jaccard_similarity,
                    similarity_threshold=0.8, preprocess_data_fn=all_lowercase_and_stripped_dask)
ddf = pipe.run("experiments", np.linspace(0.3, 0.9, num=5))

2024-02-02 21:58:26,175 - The pipeline will be built with these files: ['data/prepared/ACM_1995_2004_rep_9x.csv', 'data/prepared/DBLP_1995_2004_rep_4x.csv', 'data/prepared/ACM_1995_2004_rep_7x.csv', 'data/prepared/DBLP_1995_2004_rep_6x.csv', 'data/prepared/DBLP_1995_2004_rep_8x.csv', 'data/prepared/DBLP_1995_2004_rep_3x.csv', 'data/prepared/ACM_1995_2004_rep_4x.csv', 'data/prepared/DBLP_1995_2004_rep_9x.csv', 'data/prepared/DBLP_1995_2004_rep_10x.csv', 'data/prepared/DBLP_1995_2004_rep_7x.csv', 'data/prepared/ACM_1995_2004_rep_8x.csv', 'data/prepared/DBLP_1995_2004.csv', 'data/prepared/ACM_1995_2004.csv', 'data/prepared/ACM_1995_2004_rep_6x.csv', 'data/prepared/DBLP_1995_2004_rep_5x.csv', 'data/prepared/DBLP_1995_2004_rep_2x.csv', 'data/prepared/ACM_1995_2004_rep_3x.csv', 'data/prepared/ACM_1995_2004_rep_5x.csv', 'data/prepared/ACM_1995_2004_rep_10x.csv', 'data/prepared/ACM_1995_2004_rep_2x.csv']
2024-02-02 21:58:27,569 - Loaded csv successfully into Dask dataframe
2024-02-02 21:58:27,

TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 28 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x7ff6ac286910>\n 0. to_pyarrow_string-ca5ad36d3ac56196ffac587b4dbb6551\n 1. to_pyarrow_string-9a92e43894348896cef5608546fd2a72\n 2. to_pyarrow_string-a7472f2d73f9551a46e10a70c3bfe777\n 3. to_pyarrow_string-53f887dd3bafa991bd923bd1156f8e7b\n 4. to_pyarrow_string-f5bcdd2af0c4269025256ac2efa242d3\n 5. to_pyarrow_string-2f205110f49eb1bb31d93dc4556c034b\n 6. to_pyarrow_string-435a3d7101b7a019301a1eb579413a25\n 7. to_pyarrow_string-c417ee8820c70dbdfb729b68c0af8274\n 8. to_pyarrow_string-0277849ef86ac7128c2de66b781c5786\n 9. to_pyarrow_string-6dd202d086595101e1e3ecd982473e1c\n 10. to_pyarrow_string-0f3e36ec8b0bc7101679c0b1e4045403\n 11. to_pyarrow_string-6f60988dd003135f67bd26c2af27dead\n 12. to_pyarrow_string-b805aa60394bc3ed7eec68dec1146232\n 13. to_pyarrow_string-86a2e9055c0155746ffa580783f7348a\n 14. to_pyarrow_string-452ab10006bc6f68cf3203e3cb26faad\n 15. to_pyarrow_string-0b1c7b0d0998437db9168b984b1ba6db\n 16. to_pyarrow_string-d41fa5bb107a9395759bb7c0c6b82387\n 17. to_pyarrow_string-95403128544765933f1f8a9e33994d31\n 18. to_pyarrow_string-6a9a391ca17a710058931acc2b44e165\n 19. to_pyarrow_string-6252b374a4cfa6bde6f7929fa43c008b\n 20. concat-e2985aa29649aa2672866e5dddd7c00c\n 21. assign-4ee02c8e323dea2e8ae489904518c524\n 22. apply-76f10778cce5791e68b959d0bb99a6a1\n 23. apply-1776ec33bfabd3eb2a1e50e021eaa97f\n 24. apply-a2ab79be091920fb0bd49c8c20041eba\n 25. apply-d7eed26ce4f6cfbe66e87719b729d131\n 26. concat-f5b37b578f6b23483874c3d51dd83087\n 27. to_pyarrow_string-fee3fbd6ada3f0a3b087d856eaebb2ce\n>')

To view the Dask dashboard and get an overview over running workers and jobs, click the link below:

In [5]:
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 8,Total memory: 7.67 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:32831,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 8
Started: Just now,Total memory: 7.67 GiB

0,1
Comm: tcp://127.0.0.1:35301,Total threads: 2
Dashboard: http://127.0.0.1:36395/status,Memory: 1.92 GiB
Nanny: tcp://127.0.0.1:37631,
Local directory: /tmp/dask-scratch-space/worker-0v8wmpdj,Local directory: /tmp/dask-scratch-space/worker-0v8wmpdj

0,1
Comm: tcp://127.0.0.1:46585,Total threads: 2
Dashboard: http://127.0.0.1:41777/status,Memory: 1.92 GiB
Nanny: tcp://127.0.0.1:32827,
Local directory: /tmp/dask-scratch-space/worker-5bdlx2mh,Local directory: /tmp/dask-scratch-space/worker-5bdlx2mh

0,1
Comm: tcp://127.0.0.1:39555,Total threads: 2
Dashboard: http://127.0.0.1:36863/status,Memory: 1.92 GiB
Nanny: tcp://127.0.0.1:38395,
Local directory: /tmp/dask-scratch-space/worker-cuwsbajj,Local directory: /tmp/dask-scratch-space/worker-cuwsbajj

0,1
Comm: tcp://127.0.0.1:36645,Total threads: 2
Dashboard: http://127.0.0.1:41033/status,Memory: 1.92 GiB
Nanny: tcp://127.0.0.1:42477,
Local directory: /tmp/dask-scratch-space/worker-9p9uvsuv,Local directory: /tmp/dask-scratch-space/worker-9p9uvsuv


## Advanced pipeline
Our best performing configuration, now with Dask

In [6]:
from erpub.pipeline.matching_dask import jaccard_similarity
from erpub.pipeline.preprocessing_dask import all_lowercase_and_stripped_dask

In [None]:
compare_to_baseline(
    blocking_fn=author_names_initials,
    matching_fns={"paper_title": jaccard_similarity,
                  "year_of_publication": equality_matcher},
    thresholds=thresholds,
)

In [None]:
pipeline = DaskPipeline(
    file_dir="data/prepared/",
    preprocess_data_fn=all_lowercase_and_stripped_dask,
    blocking_fn=author_names_initials,
    matching_fns={
        "paper_title": jaccard_similarity,
        "year_of_publication": equality_matcher,
    },
    verbose=False,
)
run_matching_experiment(pipeline, "author_names_initials_blocking", thresholds)