# Basic Blocking Program Example

In this example we will show how to create and run a simple blocking program locally. We assume you have already installed Delex on a single machine, using [this guide](https://github.com/anhaidgroup/delex/blob/docs/doc/installation-guides/install-single-machine.md). This example proceeds in the following steps,

1. Download Datasets
2. Setup
3. Initialize Spark
4. Read the Data
5. Create a Blocking Program
6. Execute a Blocking Program
7. Compute Recall


## 1. Download Datasets + Python Notebook

To begin, we need to download three datasets from GitHub. Navigate to the dblp_acm folder [here](https://github.com/anhaidgroup/delex/tree/main/examples/data/dblp_acm). Click on 'gold.parquet' and click the download icon at the top. Repeat this for 'table_a.parquet' and 'table_b.parquet'. Now move all these into one directory on your local machine called 'dblp_acm'. Then, download this Python notebook [here](https://github.com/anhaidgroup/delex/tree/main/examples/data/basic_example.ipynb), and move it into the 'dblp_acm' folder you previously created.


## 2. Setup

In [None]:
from pathlib import Path

import sys
sys.path.append(str(Path().resolve().parent))
import os
os.environ['PYTHONPATH'] = str(Path().resolve().parent)

from pyspark import SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as F


from delex.lang.predicate import (
        BM25TopkPredicate,
        JaccardPredicate,
        EditDistancePredicate,
        SmithWatermanPredicate,
        JaroPredicate, 
        JaroWinklerPredicate, 
        CosinePredicate, 
        ExactMatchPredicate
)

from delex.lang import BlockingProgram, DropRule, KeepRule
from delex.tokenizer import StrippedWhiteSpaceTokenizer, QGramTokenizer
from delex.execution.plan_executor import PlanExecutor
import operator
import psutil

## 3. Initialize Spark

Next we need to initialize Spark. For this example we are doing everything in a local setup, in particular, all files are stored on the local file system and we are running Spark in local mode. In order to run on a cluster the SparkSession should be initialized with the correct master and the files must be stored where all nodes have access to them (e.g. HDFS, PostgresSQL, etc.).

In [None]:
# enable pyarrow execution, recommended for better performance
conf = SparkConf()\
        .set('spark.sql.execution.arrow.pyspark.enabled',  'true')

# initialize a local spark context
spark = SparkSession.builder\
                    .master('local[*]')\
                    .config(conf=conf)\
                    .appName('Basic Example')\
                    .getOrCreate()

## Data 

The data downloaded in step 1 is a small dataset of paper citations with about 1000 rows per table. This section loads in the paths to the data.

In [None]:
# path to the test data directory
data_path = (Path().resolve()).absolute()

# table to be indexed, generally this should be the table with fewer rows
index_table_path = data_path / 'table_a.parquet'
# table for searching
search_table_path = data_path / 'table_b.parquet'
# the ground truth, i.e. the correct matching pairs
gold_path = data_path / 'gold.parquet'

## 4. Read the Data

Once Spark is initialized, we can then read all of our data into Spark dataframes.

In [None]:
# read all the data as spark dataframes
index_table = spark.read.parquet(f'file://{str(index_table_path)}')
search_table = spark.read.parquet(f'file://{str(search_table_path)}')
gold = spark.read.parquet(f'file://{str(gold_path)}')

index_table.printSchema()

## 5. Create a Blocking Program

Next we need to define our blocking program. For this basic example, we will define a very simple blocking program that returns all pairs where the Jaccard scores using a 3gram tokenizer are greater than or equal to .6. To do this we define a `BlockingProgram` with a single `KeepRule` which has a single `JaccardPredicate`.

In [None]:
prog = BlockingProgram(
        keep_rules = [
                KeepRule([ JaccardPredicate('title', 'title', QGramTokenizer(3), operator.ge, .6)])
            ],
        drop_rules = [],
    )

In terms of SQL this program is equivalent to

```SQL
SELECT A.id, B.id
FROM index_table as A, search_table as B
WHERE jaccard_3gram(A.title, B.title) >= .6
```

## 6. Execute a Blocking Program

Next, we create a `PlanExecutor` and execute the `BlockingProgram` by calling `.execute()`. Notice, that we passed `optimize=False` and `estimate_cost=False` as arguments, these parameters control the plan that is generated, which will be explained in a separate example.

In [None]:
executor = PlanExecutor(
        index_table=index_table, 
        search_table=search_table,
        optimize=False,
        estimate_cost=False,
)

candidates, stats = executor.execute(prog, ['_id'])
candidates = candidates.persist()
candidates.show()

## 7. Compute Recall

Finally, we can compute recall. As you can see, the output of the `PlanExecutor` is actually grouped by the id of `search_table`. This is done for space and computation effeicency reasons. To compute recall we first need to 'unroll' the output and then do a set intersection with the gold pairs to get the number of true positives.

In [None]:
# unroll the output
pairs = candidates.select(
                    F.explode('ids').alias('a_id'),
                    F.col('_id').alias('b_id')
                )
# total number 
gold = gold.drop('__index_level_0__')
n_pairs = pairs.count()
true_positives = gold.intersect(pairs).count()
recall = true_positives / gold.count()
print(f'n_pairs : {n_pairs}')
print(f'true_positives : {true_positives}')
print(f'recall : {recall}')

In [None]:
# remove the dataframe from the cache
candidates.unpersist()