# Assignment 3 - All Pair Documents Similarity Search

## Setup
Follow instructions in README.md to setup local standalone cluster, which is required to run this notebook.

In [1]:
import json
import math
import sys
import os
from typing import Tuple, Type, List, Dict, Any

import pandas as pd
import seaborn as sns
from loguru import logger
from matplotlib import pyplot as plt

from pyspark import SparkFiles

# Needed to correctly set the python executable of the current conda env
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['SPARK_LOCAL_IP'] = "127.0.0.1"

# UserWarning: 'PYARROW_IGNORE_TIMEZONE' environment variable was not set.
# It is required to set this environment variable to '1' in both driver and executor
#   sides if you use pyarrow>=2.0.0.
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

MASTER_HOST = "localhost"  # master host in local standalone cluster

import pyspark.sql as psql

# IMPORTANT: create session prior to importing pyspark.pandas, else
#   spark won't use all specified cores
from src.utilities.utils import AVAILABLE_CORES, AVAILABLE_RAM_GB

import src.tokenization as tok
import src.apdss.map_reduce as mr
import src.apdss.sequential as seq
import src.utilities.io as io
from src.apdss.core import Results

  from tqdm.autonotebook import tqdm
[nltk_data] Downloading package punkt to /Users/a/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to /Users/a/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package stopwords to /Users/a/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


## APDSS - Computing Results

Notes:
- Driver is allocated 2 cores and 2 gb of RAM
- Executors are left with 64gb of RAM and 30 cores to share
- Sim Threshold is fixed at 0.95 because it didn't seem to impact computation times
- trec-covid-small corpus is a custom-sample of the first 13.5k docs from the trec-covid corpus (170k docs)

In [None]:
SeqSetting: Type = Tuple[str, float]
"""Dataset name"""

MRSetting: Type = Tuple[str, int, float]
"""(dataset_name, n_executors, threshold)"""

# Full settings
datasets: List[str] = ["SciFact"]
n_executors: List[int] = [1, 2, 5, 10, 20]
thresholds: List[float] = [0.5, 0.6, 0.7, 0.8, 0.9]

# Since the datasets used are relatively small, one partition per core is sufficient
# The rule of thumb would be "numPartitions = numWorkers * cpuCoresPerWorker"
# In my case, local standalone cluster, there is just 1 worker with AVAILABLE_CORES cores
# See this answer for a useful discussion about how to determine numPartitions
#   https://stackoverflow.com/a/39398750/19582401
N_PARTITIONS = AVAILABLE_CORES

In [None]:
corpus_paths: Dict[str, str] = {
    d: io.get_corpus_path(d)
    for d in datasets
}

# , "", "", ""
j
# Custom samples from trec-covid dataset
datasets.append("trec-covid-long")
datasets.append("trec-covid-short")
datasets.append("trec-covid-random")
corpus_paths["trec-covid-long"] = os.path.join(io.DATA_DIR, "trec-covid", "samples", "longest_docs.pkl")
corpus_paths["trec-covid-long"] = os.path.join(io.DATA_DIR, "trec-covid", "samples", "shortest_docs.pkl")
corpus_paths["trec-covid-long"] = os.path.join(io.DATA_DIR, "trec-covid", "samples", "random_docs.pkl")

In [None]:
def create_spark_session(n_executors: int, app_name: str) -> psql.SparkSession:
    driver_ram_gb = 2
    driver_cores = 2
    mem_per_executor = (AVAILABLE_RAM_GB - driver_ram_gb) // n_executors
    cores_per_executor = (AVAILABLE_CORES - driver_cores) // n_executors

    logger.debug(f"Executor memory: {mem_per_executor}")
    logger.debug(f"AVAILABLE_RAM_GB: {AVAILABLE_RAM_GB}")
    logger.debug(f"Total executor memory: {(AVAILABLE_RAM_GB - driver_ram_gb)}")
    logger.debug(f"Executor cores: {cores_per_executor}")


    spark: psql.SparkSession = (
        psql.SparkSession.builder
        .master(f"spark://{MASTER_HOST}:7077")  # connect to previously started master host

        .appName(f"{app_name}")
        #.config("spark.driver.host", f"{MASTER_HOST}:7077")
        .config("spark.driver.cores", driver_cores)
        .config("spark.driver.memory", f"{driver_ram_gb}g")
        .config("spark.executor.instances", n_executors)
        .config("spark.executor.cores", cores_per_executor)
        .config("spark.executor.memory", f"{mem_per_executor}g")
        .config("spark.default.parallelism", AVAILABLE_CORES)
        .config("spark.cores.max", AVAILABLE_CORES - driver_cores)
        .getOrCreate()
    )

    # Add local dependencies (local python source files) to SparkContext and sys.path
    src_zip_path = os.path.abspath("../../src.zip")
    logger.debug(f"Adding {src_zip_path} to SparkContext")

    spark.sparkContext.addPyFile(src_zip_path)
    sys.path.insert(0, SparkFiles.getRootDirectory())

    return spark

In [None]:
mr_results: Dict[MRSetting, Results] = {}
seq_results: Dict[SeqSetting, Results] = {}

OUT_DIR = "../../out"
if not os.path.exists(OUT_DIR):
    os.mkdir(OUT_DIR)

DATASET_NAME_KEY = "d_name"
N_EXECS_KEY = "n_execs"
THRESHOLD_KEY = "threshold"
TIME_KEY = "time"
SIM_DOCS_KEY = "time"

def save_mr_results():
    with open(f"{OUT_DIR}/mr_results.json", "w") as f:
        data = {}
        for i, settings in enumerate(mr_results.keys()):
            d_name, n_execs, t = settings
            res = mr_results[settings]
            data[i] = {
                DATASET_NAME_KEY: d_name,
                N_EXECS_KEY: n_execs,
                THRESHOLD_KEY: t,
                SIM_DOCS_KEY: len(res.similar_docs),
                TIME_KEY: res.time
            }

        json.dump(data, f)

def save_seq_results():
    with open(f"{OUT_DIR}/seq_results.json", "w") as f:
        data = {}
        for i, settings in enumerate(seq_results.keys()):
            d_name, t = settings
            res = seq_results[settings]
            data[i] = {
                DATASET_NAME_KEY: d_name,
                THRESHOLD_KEY: t,
                SIM_DOCS_KEY: len(res.similar_docs),
                TIME_KEY: res.time
            }

        json.dump(data, f)

In [None]:
# spark.stop()

In [None]:
for d_name in datasets:
    # Create "single-use" spark session to parse the text
    spark = create_spark_session(n_executors=4, app_name="Doc Features")
    docs_scores_df = tok.get_document_features(
        spark=spark,
        corpus_json_path=corpus_paths[d_name],
        n_partitions=N_PARTITIONS
    )
    docs_scores_pandas: pd.DataFrame = docs_scores_df.toPandas()
    spark.stop()

    for t in thresholds:
        for n_execs in n_executors:
            # ----- MR -----------------------------------
            spark = create_spark_session(n_executors=n_execs, app_name="MR-APDSS")

            # Create docs df from pandas instance with new context
            docs_scores_df = spark.createDataFrame(docs_scores_pandas)
            docs_scores_df.cache()

            mr_res = mr.MapReduceAPDSS().apdss(
                spark=spark,
                docs_scores_df=docs_scores_df,
                threshold=t,
                num_partitions=N_PARTITIONS
            )

            mr_setting = (d_name, n_execs, t)
            logger.info(f"MR::({mr_setting}):time -> {mr_res.time}")
            mr_results[mr_setting] = mr_res
            save_mr_results()  # Overwrite each time to make sure data isn't lost

            # Stop context so that next iteration creates a new one with
            #   a different number of executors
            spark.stop()

        # ----- SEQ -----------------------------------
        # Create docs df from pandas instance with new context
        spark = create_spark_session(n_executors=4, app_name="SEQ-APDSS")
        docs_scores_df = spark.createDataFrame(docs_scores_pandas)

        seq_res = seq.SequentialAPDSS().apdss(
            spark=spark,
            docs_scores_df=docs_scores_df,
            threshold=t
        )

        seq_setting = (d_name, t)
        logger.info(f"SEQ::({seq_setting}):time -> {seq_res.time}")
        seq_results[seq_setting] = seq_res
        save_seq_results()  # Overwrite each time to make sure data isn't lost

## Saving Results

In [None]:
mr_results

In [None]:
save_mr_results()

In [None]:
seq_results

In [None]:
save_seq_results()