# Getting Started With Sparkly

This tutorial describes how to perform blocking for entity matching using Sparkly. Specifically, we will see how to build in index and then use the index to perform blocking effeciently. We do this with the following steps,

0. Setup
1. Reading in Data
2. Creating an Index Config
3. Building an Index
4. Creating a Query Spec
5. Performing Blocking
6. Save Blocking Output

# Step 0 : Setup

Before getting started we first need to install the requirements for Sparkly. 

## Install PyLucene

To install PyLucene see [PyLucene docs](https://lucene.apache.org/pylucene/install.html) 

## Install this Library

To install Sparkly, simply run the following pip command in the root directory of this repo.    
    
```bash
$ python3 -m pip install .
``` 

Now that we have the dependencies installed, we can import the libraries.

In [None]:
# spark imports for reading data
from pyspark.sql import SparkSession    
import pyspark.sql.functions as F
# sparkly imports 
from sparkly.index import LuceneIndex    
from sparkly.index_config import IndexConfig  
from sparkly.query_generator import QuerySpec    
from sparkly.search import Searcher 
# other python utilities
from pathlib import Path  

# Step 1 : Reading in Data

Now that we have all of the necessary packages imported, we can read in the data using PySpark. Note that for this example, we are running the SparkContext locally and reading the data from local files included in the repo. 
For production applications spark will likely be run in distributed mode and the data will be read from HDFS or a database.

In [None]:
# path to the test data    
data_path = Path('./data/abt_buy/').absolute()    
# table to be indexed    
table_a_path = data_path / 'table_a.parquet'    
# table for searching    
table_b_path = data_path / 'table_b.parquet'    
# the ground truth    
gold_path = data_path / 'gold.parquet'  

# initialize a local spark context    
spark = SparkSession.builder\
                    .master('local[*]')\
                    .appName('Sparkly Example')\
                    .getOrCreate()    
# read all the data as spark dataframes    
table_a = spark.read.parquet(f'file://{str(table_a_path)}')    
table_b = spark.read.parquet(f'file://{str(table_b_path)}')    
gold = spark.read.parquet(f'file://{str(gold_path)}') 

For this example we have our data stored in parquet files, however Sparkly operates on PySpark and Pandas dataframes, so any file format that be read by PySpark or Pandas can be used. For example, if your data is in csv format it can be read as follows,
```python
table_a = spark.read.csv('file:///ABSOLUTE/PATH/TO/CSV/FILE.csv')
```

or by reading it into a Pandas dataframe and then converting to a PySpark dataframe.

```python
pdf = pd.read_csv('/ABSOLUTE/PATH/TO/CSV/FILE.csv')
table_a = spark.createDataFrame(pdf)
```

Note that PySpark will try to infer the schema of `pdf` when calling `createDataFrame`. In some cases this may fail and require manually providing the schema as an argument, see the [SparkSession.createDataFrame docs](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.createDataFrame.html) for more details. See [DataFrameReader docs](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.html#pyspark.sql.DataFrameReader) and [DataFrameWriter docs](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.html) for more information on reading and writing data with PySpark and [Pandas input/output docs](https://pandas.pydata.org/docs/reference/io.html) for reading and writing data with Pandas.

In general if your data is stored in files (as opposed to in a database) we strongly recommend that you use parquet as it is significantly faster to read and write, has a more expressive data model, and is strongly typed.

# Step 2 : Creating an Index Config

Next we need to define how we want to index the data. To do this we need to define sub indexes, which are specified by a field in the indexed dataframe with an analyzer. Here we are going to index two fields and each with two analyzers to create a total of four searchable sub indexes. First we define the index config as follows,

In [None]:
# the analyzers used to convert the text into tokens for indexing    
# see LuceneIndex.ANALYZERS.keys() for currently implemented analyzers    
analyzers = ['3gram', 'standard']
# the index config, '_id' column will be used as the unique     
# id column in the index. Note id_col must be an integer (32 or 64 bit)    
config = IndexConfig(id_col='_id')    
# add the 'name' column to be indexed with analyzers above                                                                                                  
# note that this will create two sub indexes name.3gram and name.standard                                                                                   
# which can be searched independently                                                                                                                       
config.add_field('name', analyzers)                                                                                                                         
# do the same for the description                                                                                                                           
config.add_field('description', analyzers) 

# Step 3 : Building an Index

Now that we have defined the index config, we can build the index by specifying the location (on the local filesystem) where we want to build the index along with how we want to index the data using the config we created above.

In [None]:
# create a new index stored at /tmp/example_index/                                                                                                          
index = LuceneIndex('/tmp/example_index/', config)                                                                                                          
# index the records from table A according to the config we created above                                                                                   
index.upsert_docs(table_a)                                                                                                                                  
# this index now has 4 searchable subindexes each named '<FIELD_NAME>.<ANALYZER>', specifically                                                             
# 'name.3gram', 'name.standard', 'description.3gram', and 'description.standard'                                                                            
                                                                                  

# Step 4 : Creating a Query Spec

Next we need to define how we are going to block using the index that we just built. To do this we create a `QuerySpec`. Notice that we don't need to use all of the subindexes in the index that we created above.

In [None]:
# Pass a mapping of {<SEARCH FIELD> -> {<SUBINDEX NAME>, ...}}                                                                                              
# to create a QuerySpec which will specify how queries should be created for documents                                                                      
query_spec = QuerySpec({                                                                                                                                    
                # use name from table b to search name.3gram and description.standard in the index                                                          
                # notice that you can use any field to search any subindex, although                                                                        
                # typically you will just want to search the subindexes created by with                                                                     
                # the same column                                                                                                                           
                'name' : {'name.3gram', 'description.standard'},                                                                                            
                # use description from table_b to search description.standard in the index                                                                  
                'description' : {'description.standard'}                                                                                                    
            })                                                                                                                                              
                                                                                                                                                            
# kwargs can also be used like a python dict                                                                                                                
# this is equivalent to the spec above                                                                                                                      
query_spec = QuerySpec(                                                                                                                                     
                name = {'name.3gram', 'description.standard'},                                                                                              
                description = {'description.standard'}                                                                                                      
            )   

If we simply we want to use all the subindexes we created, we can use the following method,

In [None]:
# use all subindexes
query_spec = index.get_full_query_spec()
# Equivalent to (for this index)
query_spec = QuerySpec({                                                                                                                                                                                                                                                        
                'name' : {'name.3gram', 'description.standard'},                                                                                            
                'description' : {'description.3gram', 'description.standard'}                                                                                                    
            })  

# Step 5 : Performing Blocking

Now that we have read in the data, built an index on the data, and defined how we want to block using the index, we can now perform blocking. We do this by using the `Searcher` class which will handle most of the boilerplate code for doing blocking. The last thing that we need to specify is the id column for the search dataframe and the maximum number of candidates to return per search record.

In [None]:
# the number of candidates returned per record    
limit = 50    
# create a searcher for doing bulk search using our index
searcher = Searcher(index)
# search the index with table b
candidates = searcher.search(table_b, query_spec, id_col='_id', limit=limit).cache()

candidates.show()

Finally, we can compute the recall of the candidate set.

In [None]:
# output is rolled up as 
# search record id -> (indexed ids + scores + search time)
#
# explode the results to compute recall
pairs = candidates.select(
                    F.explode('ids').alias('a_id'),
                    F.col('_id').alias('b_id')
                )
# number of matches found
true_positives = gold.intersect(pairs).count()
# precentage of matches found
recall = true_positives / gold.count()

print(f'true_positives : {true_positives}')
print(f'recall : {recall}')

# Step 6 : Saving Blocking Output

Finally, we can save the output of blocking to the local filesystem. To do this we simply convert the Spark DataFrame into a Pandas DataFrame and write it using a Pandas DataFrame method.

In [None]:
df = candidates.toPandas()
df.to_parquet('./out.parquet')
# remove candidates from Spark cache
candidates.unpersist()