# EDS-NLP with Pandas or PySpark DataFrames

The way EDS-NLP is used may depend on how many documents you are working with.  Once working with tens of thousands of them,
parallelizing the processing can be really efficient (up to 8x faster), but will require a (tiny) bit more work.  
Here are shown 4 ways to analyse texts depending on your needs:

- [Testing / Using on a single string](#single_pipe)  

- [Using on DataFrames](#df_pipes)

   - [On a small Pandas DataFrame](#simple_pipe)
   - [On a larger Pandas DataFrame](#parallel_pipe)
   - [On a Spark DataFrame](#spark_pipe)

A [wrapper](#wrapper) is available to simply switch between those use cases.

Finally, you can check some [time benchmarks here](#timing)

In [1]:
%load_ext autoreload
%autoreload 2
%load_ext lab_black

In [2]:
import spacy
import pandas as pd
import sys

import pyspark.sql.types as T

import time
import datetime
from datetime import timedelta

from tqdm import tqdm

from loguru import logger

logger.remove()
logger.add(sys.stderr, level="CRITICAL")

# One-shot import of all declared Spacy components
import edsnlp.components

Here we import the necessary pipe functions for the sake of this tutorial:

In [3]:
from edsnlp.processing import (
    simple_pipe,
    parallel_pipe,
    spark_pipe,
    pipe,
    pyspark_type_finder,
)

For the sake of testing, let us create a simple NLP pipe:

In [None]:
nlp = spacy.blank("fr")
nlp.add_pipe("eds.sentences")
nlp.add_pipe("eds.normalizer")

regex = dict(
    patient=[
        "patient",
        "malade",
    ]
)

nlp.add_pipe("eds.matcher", config=dict(regex=regex, attr="NORM"))
nlp.add_pipe("eds.negation")
nlp.add_pipe("eds.hypothesis")
nlp.add_pipe("eds.family")
nlp.add_pipe("eds.dates")

## <a name="single_pipe"></a>Pipeline on a single string

In [5]:
text = """
    Patient admis le 25 Septembre 2021 pour suspicion de Covid.
    Pas de cas de coronavirus dans ce service.
    Le père du patient est atteind du covid.
"""

Simply apply `nlp()` to the piece of text:

In [6]:
doc = nlp(text)

We can have a quick look at what was extracted here:

In [7]:
def pretty_ents_printer(doc):

    headers = "{:<25} {:<20} {:<30} {:<6} {:<6} {:<6}"

    print(headers.format("Text", "Label", "Span", "Neg", "Par", "Hyp"))
    for entite in doc.ents:
        print(
            headers.format(
                entite.text,
                entite.label_,
                f"({entite.start_char},{entite.end_char})",
                entite._.negation,
                entite._.family,
                entite._.hypothesis,
            )
        )

    for date in doc.spans["dates"]:
        print(
            headers.format(
                date.text,
                date.label_,
                f"({date.start_char},{date.end_char})",
                date._.negation,
                date._.family,
                date._.hypothesis,
            )
        )

In [8]:
pretty_ents_printer(doc)

Text                      Label                Span                           Neg    Par    Hyp   
Patient                   patient              (5,12)                         0      0      0     
patient                   patient              (127,134)                      0      1      0     
25 Septembre 2021         absolute             (22,39)                        0      0      0     


## <a name="df_pipes"></a>Pipelines on DataFrames

We provide methods to use a SpaCy pipe directly on a Pandas or Spark DataFrame.
Each method have a few arguments in common, namely:
- `note`: a Pandas or Spark DataFrame containing at least a `note_text` and `note_id` columns
- `nlp`: the spaCy pipeline
- `additional_spans`: Each Spacy `Doc` has a `doc.spans` attribute, which is a dictionnary whose values are list of spans. For example, the `dates` pipe used above populates the `doc.spans['dates']` list. The `additional_spans` argument should be used to tell the function which lists you want to extract, in addition to the default `doc.ents` list.
- `extensions`: By default, each method will extract the following informations from each `span`:
   - "note_id": span.doc._.note_id
   - "lexical_variant": span.text
   - "label": span.label_
   - "start": span.start_char
   - "end": span.end_char  
   
Depending on your pipeline, you may want ot extract other extensions. To do so, simply provide those extension names (without the leading underscore) to the `extensions` argument.

### <a name="simple_pipe"></a>1. Pipeline on a small Pandas DataFrame

We will here get documents from the cluster.  
Depending on your acces, change the following parameters:

In [9]:
DB_NAME = "edsomop_prod_a"
TABLE_NAME = "orbis_note"
NOTE_ID_COL = "note_id"
NOTE_TEXT_COL = "note_text"

In [10]:
spark_notes = sql(
    f"""
    SELECT
        {NOTE_ID_COL} AS note_id,
        {NOTE_TEXT_COL} AS note_text
    FROM
        {DB_NAME}.{TABLE_NAME}
    WHERE
        {NOTE_TEXT_COL} IS NOT NULL
    LIMIT 100000
    """
)

In [11]:
notes = spark_notes.toPandas()

Let us keep 1000 documents to make a small set of notes, and 10.000 for a medium subset

In [15]:
small_notes_subset = notes[:1000]
medium_notes_subset = notes[:10000]

In [13]:
%%time
note_nlp = simple_pipe(
    small_notes_subset,
    nlp,
    additional_spans=["dates"],
    extensions=["parsed_date"],
)

100%|██████████| 1000/1000 [00:49<00:00, 20.36it/s]

CPU times: user 48.9 s, sys: 196 ms, total: 49.1 s
Wall time: 49.2 s





In [14]:
note_nlp.sample(5)

Unnamed: 0,note_id,lexical_variant,label,span_type,start,end,parsed_date
7701,699317043,1/02/2017,absolute,dates,220,229,2017-02-01
6229,701135505,Patient,patient,ents,812,819,NaT
3174,18060531806,03/06/2021,absolute,dates,1248,1258,2021-06-03
6062,19207919239,11/08/2021,absolute,dates,14257,14267,2021-08-11
7792,19747574107,23.02.21,absolute,dates,3623,3631,2021-02-23


### <a name="parallel_pipe"></a>2. Pipeline on a larger Pandas DataFrame

Here we will parallelise processes to speed up things 

In [18]:
%%time
note_nlp = parallel_pipe(
    medium_notes_subset,
    nlp,
    additional_spans=["dates"],
    extensions=["parsed_date"],
)

[Parallel(n_jobs=-2)]: Using backend MultiprocessingBackend with 63 concurrent workers.
[Parallel(n_jobs=-2)]: Done   8 out of 100 | elapsed:   13.2s remaining:  2.5min
[Parallel(n_jobs=-2)]: Done  19 out of 100 | elapsed:   14.7s remaining:  1.0min
[Parallel(n_jobs=-2)]: Done  30 out of 100 | elapsed:   15.7s remaining:   36.7s
[Parallel(n_jobs=-2)]: Done  41 out of 100 | elapsed:   16.8s remaining:   24.2s
[Parallel(n_jobs=-2)]: Done  52 out of 100 | elapsed:   18.3s remaining:   16.9s
[Parallel(n_jobs=-2)]: Done  63 out of 100 | elapsed:   21.2s remaining:   12.5s
[Parallel(n_jobs=-2)]: Done  74 out of 100 | elapsed:   24.2s remaining:    8.5s
[Parallel(n_jobs=-2)]: Done  85 out of 100 | elapsed:   24.8s remaining:    4.4s
[Parallel(n_jobs=-2)]: Done  96 out of 100 | elapsed:   26.0s remaining:    1.1s
[Parallel(n_jobs=-2)]: Done 100 out of 100 | elapsed:   26.6s finished


CPU times: user 645 ms, sys: 1.9 s, total: 2.54 s
Wall time: 28.7 s


In [19]:
note_nlp.sample(10)

Unnamed: 0,note_id,lexical_variant,label,span_type,start,end,parsed_date
84970,18060754715,patiente,patient,ents,1955,1963,NaT
84855,19916176337,16/09/2021,absolute,dates,1559,1569,2021-09-16
61236,8421417398,le mois,relative,dates,351,358,NaT
9662,7884672495,18/02/2019,absolute,dates,4131,4141,2019-02-18
4749,697712250,26/12/2008,absolute,dates,688,698,2008-12-26
86068,16923880617,2018,year_only,dates,1687,1691,2018-01-01
30925,9275339027,05/01/1960,absolute,dates,1994,2004,1960-01-05
49620,7067037758,01/2016,no_day,dates,615,622,2016-01-01
86535,18633073703,07/07/2021,absolute,dates,2053,2063,2021-07-07
60954,18029401622,patient,patient,ents,3804,3811,NaT


### <a name="spark_pipe"></a>3. Pipeline distributed on a Spark DataFrame

Spark needs to know in advance the type of each extension you want to save.  
Thus, if you need additional extensions to be saved, you will need to provide a dictionnary via the `extensions` argument, with the name of the extension as keys and its type as value.  
Accepted types are the ones present in `pyspark.sql.types`.

A helper, `pyspark_type_finder`, is available to get the correct type for most Python objects:

In [20]:
pyspark_datetime_type = pyspark_type_finder(datetime.datetime(2020, 1, 1))

Infered type is TimestampType


In [21]:
%%time
note_nlp = spark_pipe(
    spark_notes,
    nlp,
    additional_spans=["dates"],
    extensions={"parsed_date": pyspark_datetime_type},
).show(5)

+-----------+---------------+--------+---------+-----+----+-------------------+
|    note_id|lexical_variant|   label|span_type|start| end|        parsed_date|
+-----------+---------------+--------+---------+-----+----+-------------------+
|16063715029|     21/12/1988|absolute|    dates| 1324|1334|1988-12-21 00:00:00|
|16063715029|     21/01/2021|absolute|    dates| 1383|1393|2021-01-21 00:00:00|
|16063715029|     21/01/2021|absolute|    dates| 1853|1863|2021-01-21 00:00:00|
|16063715029|     21/12/1988|absolute|    dates| 1925|1935|1988-12-21 00:00:00|
|16453909745|     16/10/1989|absolute|    dates| 1517|1527|1989-10-16 00:00:00|
+-----------+---------------+--------+---------+-----+----+-------------------+
only showing top 5 rows

CPU times: user 188 ms, sys: 23.8 ms, total: 212 ms
Wall time: 6min 10s


### <a name="wrapper"></a>4. A wrapper for simpler usage

The `edsnlp.processing.pipe` wraps those 3 functions presented above in a single on.  
It adds a `how` argument, which can be either `'simple'`, '`parallel'` or '`spark'`

In [None]:
### Small Pandas DataFrame

note_nlp = pipe(
    note=spark_notes.limit(1000).toPandas(),
    nlp=nlp,
    how="simple",
    additional_spans=["dates"],
    extensions=["parsed_date"],
)

### Larger Pandas DataFrame

note_nlp = pipe(
    note=spark_notes.limit(10000).toPandas(),
    nlp=nlp,
    how="parallel",
    additional_spans=["dates"],
    extensions=["parsed_date"],
)

### Small Pandas DataFrame

note_nlp = pipe(
    note=spark_notes,
    nlp=nlp,
    how="spark",
    additional_spans=["dates"],
    extensions={"parsed_date": pyspark_datetime_type},
)

## <a name="timing"></a>Computationnal time comparison

Let us compare the `simple` and `parallel` pipe on Pandas DataFrames

In [22]:
def process(note):
    """
    Compare runtime between the two methods
    """
    n = len(note)
    t0 = time.time()

    note_nlp = pipe(
        note=note,
        nlp=nlp,
        how="simple",
        additional_spans=["dates"],
        extensions=["parsed_date"],
        progress_bar=False,
    )

    t_simple = round(time.time() - t0)
    note_nlp = pipe(
        note=note,
        nlp=nlp,
        how="parallel",
        additional_spans=["dates"],
        extensions=["parsed_date"],
        progress_bar=False,
    )

    t_parallel = round(time.time() - t0 - t_simple)
    ratio = t_simple / t_parallel

    speed_simple = round(60 * n / t_simple)
    speed_parallel = round(60 * n / t_parallel)

    t_simple = str(timedelta(seconds=t_simple))
    t_parallel = str(timedelta(seconds=t_parallel))

    print(
        f"""
For {n} documents:
    Simple pipe took {t_simple} --> Mean of {speed_simple} docs/minute.
    Parallel pipe took {t_parallel} --> Mean of {speed_parallel} docs/minute.
    Parallel pipe is {round(ratio,2)} times faster than simple pipe"""
    )

In [23]:
list_notes = [
    notes[:100],
    notes[:1000],
    notes[:10000],
]

In [24]:
for notes_subset in list_notes:
    process(notes_subset)




For 100 documents:
    Simple pipe took 0:00:05 --> Mean of 1200 docs/minute.
    Parallel pipe took 0:00:06 --> Mean of 1000 docs/minute.
    Parallel pipe is 0.83 times faster than simple pipe





For 1000 documents:
    Simple pipe took 0:00:47 --> Mean of 1277 docs/minute.
    Parallel pipe took 0:00:08 --> Mean of 7500 docs/minute.
    Parallel pipe is 5.88 times faster than simple pipe





For 10000 documents:
    Simple pipe took 0:07:16 --> Mean of 1376 docs/minute.
    Parallel pipe took 0:00:21 --> Mean of 28571 docs/minute.
    Parallel pipe is 20.76 times faster than simple pipe


We can see that while the parallel method has some overhead with a few hundreds of documents, it gets way quicker with the number of inputs increasing.  
It can run on the full 100.000 documents fairly quickly:

In [None]:
%%timeit
_ = pipe(
    note=notes,
    nlp=nlp,
    how="parallel",
    additional_spans=["dates"],
    extensions=["parsed_date"],
    progress_bar=False,
)

