# Compute Metrics
Precomputes every (model viz, benchmark viz) pair and performs an analysis.
For visualization similarity, uses the structural similarity index measure (SSIM). 

In [1]:
from server.model_setup import get_ncNetInstance, get_nl4dv_instance
from server.scripts import config

# Parallel processing
from dask.distributed import Client, LocalCluster
import multiprocessing
import dask

# Logging
import logging
import warnings
warnings.filterwarnings("ignore")

# Image processing
import skimage.metrics as skm
from PIL import Image
import numpy as np
import subprocess
import time

import json
import os
import uuid


c:\Users\casil\Documents\Spring_2022\6.S079\final-project\server\model_setup.py


Define the process to get the visualization comparison metrics. It is kind of a
pain to convert a VegaLite spec to an image in Python, so we first have to save
the spec as a JSON object, then use the `vega-lite` CLI (Node) to convert the spec into
a png.

In [2]:
def get_viz_metrics(spec1, spec2, cleanup=True):
    if spec1 is None:
        return {"metrics": {"ssim": 0}, "errors": []}

    spec1["autosize"] = "fit"
    spec1["width"] = 500
    spec1["height"] = 500
    spec1["background"] = "#fafafa"

    spec2["autosize"] = "fit"
    spec2["width"] = 500
    spec2["height"] = 500
    spec2["background"] = "#fafafa"

    pair_id = str(uuid.uuid4())

    try:
        os.listdir("tmp")
    except FileNotFoundError:
        os.mkdir("tmp")

    tmp_dir = lambda x: os.path.join("tmp", x)

    # Model result spec
    spec1_json_path = tmp_dir(f"spec1-{pair_id}.json")
    with open(spec1_json_path, "w") as f:
        json.dump(spec1, f)

    # Benchmark spec
    spec2_json_path = tmp_dir(f"spec2-{pair_id}.json")
    with open(spec2_json_path, "w") as f:
        json.dump(spec2, f)

    # Convert the specs to pngs using the vega-lite CLI
    spec1_png_path = tmp_dir(f"spec1-{pair_id}.png")
    subprocess.run(
        f"npx -p vega -p vega-lite vl2png {spec1_json_path} {spec1_png_path}".split(),
        shell=True,
    )
    spec2_png_path = tmp_dir(f"spec2-{pair_id}.png")
    subprocess.run(
        f"npx -p vega -p vega-lite vl2png {spec2_json_path} {spec2_png_path}".split(),
        shell=True,
    )

    # Wait a little bit to let the png files get written
    time.sleep(2)

    # Load the pngs into PIL and compute the metrics
    try:
        viz1 = np.array(Image.open(spec1_png_path).convert("RGB"))
        viz2 = np.array(Image.open(spec2_png_path).convert("RGB"))
    except FileNotFoundError:
        time.sleep(10)
        try:
            viz1 = np.array(Image.open(spec1_png_path).convert("RGB"))
            viz2 = np.array(Image.open(spec2_png_path).convert("RGB"))
        except FileNotFoundError:
            return {"metrics": {"ssim": -1}, "errors": ["FileNotFoundError"]}

    try:
        score_ssim = skm.structural_similarity(viz1, viz2, multichannel=True)
        errors = []
    except Exception as e:
        score_ssim = -1
        errors = [f"Exception when finding similarity - {type(e)}: {e}"]

    result = {
        "metrics": {
            "ssim": score_ssim,
        },
        "errors": errors,
    }

    if not cleanup:
        return result

    os.remove(spec1_json_path)
    os.remove(spec2_json_path)
    os.remove(spec1_png_path)
    os.remove(spec2_png_path)

    return result


Here we create a JSON lookup mapping datasets to the NL queries from the benchmark.

In [3]:
with open(config.BENCHMARK_META_PATH, "r") as file:
    benchmark_metadata: dict = json.load(file)

with open(config.TABLE_TO_BENCHMARK_LOOKUP_PATH, "r") as file:
    lookup = json.load(file)

dataset_to_queries_lookup = {}
for dataset_name, benchmark_ids in lookup.items():
    # Get all of the NL queries for the dataset
    nl_queries = [
        nl_query
        for benchmark_id in benchmark_ids
        for nl_query in benchmark_metadata[benchmark_id]["nl_queries"]
    ]
    
    dataset_to_queries_lookup[dataset_name] = nl_queries
    
# Save the dataset_to_queries_lookup to a file
with open(os.path.join(config.BENCHMARK_DIR_PATH, "dataset_to_queries_lookup.json"), "w") as file:
    json.dump(dataset_to_queries_lookup, file, indent=4)
    

Create the Dask client for multiprocessing

In [4]:
# Get the number of cores
n_cores = multiprocessing.cpu_count()
print("Number of cores we have: ", n_cores)

# Create a cluster and client
print("> Creating a cluster and client...")
cluster = LocalCluster(
    ip=None,
    n_workers=n_cores,
    processes=True,
    silence_logs=logging.ERROR,
    # interface="lo",
)
client = Client(cluster)
client

Number of cores we have:  12
> Creating a cluster and client...


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 12
Total threads: 12,Total memory: 15.79 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:59569,Workers: 12
Dashboard: http://127.0.0.1:8787/status,Total threads: 12
Started: Just now,Total memory: 15.79 GiB

0,1
Comm: tcp://127.0.0.1:59675,Total threads: 1
Dashboard: http://127.0.0.1:59676/status,Memory: 1.32 GiB
Nanny: tcp://127.0.0.1:59573,
Local directory: c:\Users\casil\Documents\Spring_2022\6.S079\final-project\dask-worker-space\worker-qoxch_h3,Local directory: c:\Users\casil\Documents\Spring_2022\6.S079\final-project\dask-worker-space\worker-qoxch_h3

0,1
Comm: tcp://127.0.0.1:59690,Total threads: 1
Dashboard: http://127.0.0.1:59691/status,Memory: 1.32 GiB
Nanny: tcp://127.0.0.1:59576,
Local directory: c:\Users\casil\Documents\Spring_2022\6.S079\final-project\dask-worker-space\worker-kqm339s1,Local directory: c:\Users\casil\Documents\Spring_2022\6.S079\final-project\dask-worker-space\worker-kqm339s1

0,1
Comm: tcp://127.0.0.1:59678,Total threads: 1
Dashboard: http://127.0.0.1:59679/status,Memory: 1.32 GiB
Nanny: tcp://127.0.0.1:59574,
Local directory: c:\Users\casil\Documents\Spring_2022\6.S079\final-project\dask-worker-space\worker-yz3rh7ry,Local directory: c:\Users\casil\Documents\Spring_2022\6.S079\final-project\dask-worker-space\worker-yz3rh7ry

0,1
Comm: tcp://127.0.0.1:59660,Total threads: 1
Dashboard: http://127.0.0.1:59661/status,Memory: 1.32 GiB
Nanny: tcp://127.0.0.1:59579,
Local directory: c:\Users\casil\Documents\Spring_2022\6.S079\final-project\dask-worker-space\worker-eprartjv,Local directory: c:\Users\casil\Documents\Spring_2022\6.S079\final-project\dask-worker-space\worker-eprartjv

0,1
Comm: tcp://127.0.0.1:59669,Total threads: 1
Dashboard: http://127.0.0.1:59670/status,Memory: 1.32 GiB
Nanny: tcp://127.0.0.1:59581,
Local directory: c:\Users\casil\Documents\Spring_2022\6.S079\final-project\dask-worker-space\worker-ee0rjugz,Local directory: c:\Users\casil\Documents\Spring_2022\6.S079\final-project\dask-worker-space\worker-ee0rjugz

0,1
Comm: tcp://127.0.0.1:59684,Total threads: 1
Dashboard: http://127.0.0.1:59685/status,Memory: 1.32 GiB
Nanny: tcp://127.0.0.1:59577,
Local directory: c:\Users\casil\Documents\Spring_2022\6.S079\final-project\dask-worker-space\worker-a8zcs956,Local directory: c:\Users\casil\Documents\Spring_2022\6.S079\final-project\dask-worker-space\worker-a8zcs956

0,1
Comm: tcp://127.0.0.1:59681,Total threads: 1
Dashboard: http://127.0.0.1:59682/status,Memory: 1.32 GiB
Nanny: tcp://127.0.0.1:59583,
Local directory: c:\Users\casil\Documents\Spring_2022\6.S079\final-project\dask-worker-space\worker-mi0zfbsb,Local directory: c:\Users\casil\Documents\Spring_2022\6.S079\final-project\dask-worker-space\worker-mi0zfbsb

0,1
Comm: tcp://127.0.0.1:59647,Total threads: 1
Dashboard: http://127.0.0.1:59650/status,Memory: 1.32 GiB
Nanny: tcp://127.0.0.1:59575,
Local directory: c:\Users\casil\Documents\Spring_2022\6.S079\final-project\dask-worker-space\worker-htukm44m,Local directory: c:\Users\casil\Documents\Spring_2022\6.S079\final-project\dask-worker-space\worker-htukm44m

0,1
Comm: tcp://127.0.0.1:59687,Total threads: 1
Dashboard: http://127.0.0.1:59688/status,Memory: 1.32 GiB
Nanny: tcp://127.0.0.1:59582,
Local directory: c:\Users\casil\Documents\Spring_2022\6.S079\final-project\dask-worker-space\worker-hmkfox4z,Local directory: c:\Users\casil\Documents\Spring_2022\6.S079\final-project\dask-worker-space\worker-hmkfox4z

0,1
Comm: tcp://127.0.0.1:59663,Total threads: 1
Dashboard: http://127.0.0.1:59664/status,Memory: 1.32 GiB
Nanny: tcp://127.0.0.1:59578,
Local directory: c:\Users\casil\Documents\Spring_2022\6.S079\final-project\dask-worker-space\worker-hmqjqwfk,Local directory: c:\Users\casil\Documents\Spring_2022\6.S079\final-project\dask-worker-space\worker-hmqjqwfk

0,1
Comm: tcp://127.0.0.1:59672,Total threads: 1
Dashboard: http://127.0.0.1:59673/status,Memory: 1.32 GiB
Nanny: tcp://127.0.0.1:59580,
Local directory: c:\Users\casil\Documents\Spring_2022\6.S079\final-project\dask-worker-space\worker-x162ehta,Local directory: c:\Users\casil\Documents\Spring_2022\6.S079\final-project\dask-worker-space\worker-x162ehta

0,1
Comm: tcp://127.0.0.1:59666,Total threads: 1
Dashboard: http://127.0.0.1:59667/status,Memory: 1.32 GiB
Nanny: tcp://127.0.0.1:59572,
Local directory: c:\Users\casil\Documents\Spring_2022\6.S079\final-project\dask-worker-space\worker-lb2nrtls,Local directory: c:\Users\casil\Documents\Spring_2022\6.S079\final-project\dask-worker-space\worker-lb2nrtls


In [5]:
def compute_metrics(model_name, dataset_name):
    """The primary computation function to be parallelized."""

    save_path = os.path.join(
        config.BENCHMARK_EVAL_DIR_PATH,
        f"eval_{model_name}_{dataset_name}.json",
    )
    if os.path.exists(save_path):
        # We already computed this, so skip it
        return

    dataset = dataset_name.replace(".csv", "") + ".csv"
    data_path = os.path.join(config.BENCHMARK_DATA_DIR_PATH, dataset)

    # Get the queries for the dataset
    nl_queries = dataset_to_queries_lookup[dataset_name]

    # Get the benchmarks for this dataset
    with open(config.BENCHMARK_META_PATH, "r") as file:
        benchmark_metadata: dict = json.load(file)
    with open(config.TABLE_TO_BENCHMARK_LOOKUP_PATH, "r") as file:
        lookup = json.load(file)
    b_ids = lookup[dataset_name]
    benchmarks_with_dataset = [
        benchmark for b_id, benchmark in benchmark_metadata.items() if b_id in b_ids
    ]

    result = {
        "model_name": model_name,
        "dataset_name": dataset_name,
        "results": [],
        "errors": [],
    }

    # Create the model isntances
    try:
        if model_name == "nl4dv":
            model = get_nl4dv_instance(data_path=data_path)
        elif model_name == "ncNet":
            try:
                model = get_ncNetInstance(data_path=data_path, table_name=dataset_name)
            except Exception as e:
                result['errors'].append(f"Exception when creating model - {type(e)}: {e}")
                with open(save_path, "w", encoding="utf-8") as file:
                    json.dump(result, file, indent=4)    
                return result    
                
    except FileNotFoundError:
        result["errors"].append("Data path not found when trying to set up model")
        with open(save_path, "w", encoding="utf-8") as file:
            json.dump(result, file, indent=4)
        return result

    # For each query, execute it and get the similarity metrics between the
    # model output and the benchmark
    for i, nl_query in enumerate(nl_queries):
        # Get the benchmark for this query, and get its spec
        benchmarks_with_query = [
            benchmark
            for benchmark in benchmarks_with_dataset
            if nl_query in benchmark["nl_queries"]
        ]
        num_benchmarks = len(benchmarks_with_query)
        if num_benchmarks != 1:
            result["errors"].append(
                f'"{nl_query}" has {num_benchmarks} benchmarks associated with it'
            )
        if num_benchmarks == 0:
            # No benchmark, there is no point in executing the query
            continue
        benchmark_spec = benchmarks_with_query[0]["vega_spec"]

        # Execute the query
        produced_spec = False
        model_vl_spec = None
        if model_name == "nl4dv":
            print(f'EXECUTING QUERY: "{nl_query}" on nl4dv')
            model_result = model.analyze_query(nl_query)
            # Get the first VegaLite spec
            vis_list = model_result["visList"]
            if len(vis_list) > 0:
                produced_spec = True
                model_vl_spec = vis_list[0]["vlSpec"]

        elif model_name == "ncNet":
            print(f'EXECUTING QUERY: "{nl_query}" on ncNet')
            try:
                viz = model.nl2vis(nl_query)[
                    0
                ]  # nl2vis will return a list a [Vis, VegaLiteSpec]
            except Exception as e:
                result["errors"].append(f'Error when executing "{nl_query}" - {type(e)}: {e}')
                produced_spec = False
            else:
                model_vl_spec = viz.spec
                produced_spec = True

        metrics = get_viz_metrics(model_vl_spec, benchmark_spec)
        result['errors'].extend(metrics['errors'])

        result["results"].append(
            {
                "query": nl_query,
                "produced_spec": produced_spec,
                "metrics": metrics['metrics'],
            }
        )

    # Write the results to a file
    with open(save_path, "w", encoding="utf-8") as file:
        json.dump(result, file, indent=4)

    return result


In [6]:
parameters = [
    (model_name, dataset_name)
    for model_name in ["ncNet", "nl4dv"]
    for dataset_name in dataset_to_queries_lookup.keys()
]

parameters = [
    ("ncNet", "airlines"),
    ("ncNet", "company"),
    ("ncNet", "customer"),
    ("nl4dv", "game"),
    ("ncNet", "list"),
    ("ncNet", "mill"),
    ("ncNet", "movie"),
    ("ncNet", "train"),
    ("nl4dv", "university"),
]
lazy_results = []
for i, (model_name, dataset_name) in enumerate(parameters):
    # Temporary stopping measure
    lazy_result = dask.delayed(compute_metrics)(model_name, dataset_name)
    lazy_results.append(lazy_result)
    print(lazy_result)

# for result in dask.compute(*lazy_results):
#     print(result)
result = dask.compute(*lazy_results)
# with open(os.path.join(config.BENCHMARK_DIR_PATH, "evaluation.json"), "w", encoding='utf-8') as f:
#     json.dump(result, f, indent=4)


Delayed('compute_metrics-be53284c-95c1-4674-81b9-b01fc5756f3d')
Delayed('compute_metrics-1db7e52e-3a43-4136-b05c-bd428863c978')
Delayed('compute_metrics-52c44577-309e-441d-b363-47811832c957')
Delayed('compute_metrics-f903d969-5a23-4b22-a3a9-da6c3d3281c2')
Delayed('compute_metrics-6be0d347-07de-426b-910e-73099f87e0a5')
Delayed('compute_metrics-ed94148e-226f-44ab-b516-1f7f3cd726c0')
Delayed('compute_metrics-df1885a5-6f28-427b-827c-ff05104acb2e')
Delayed('compute_metrics-3338a1e6-bf8b-4d0f-9c6d-31cd7d290e45')
Delayed('compute_metrics-2368b927-1c0b-492e-b855-12e3a711ebd4')


In [None]:
client.close()