In [1]:
import apache_beam as beam
from apache_beam.runners.interactive import interactive_runner
import apache_beam.runners.interactive.interactive_beam as ib
from collections import defaultdict
import json

## Data Processing Pipeline

### 1. Initial Processing of the Data

Using a `DoFN` to structure the data. `form`, `properties.lemma`, `properties.xpos` for each `node` is extracted and accumulated in lists.

Steps:
1. If there is no negation in the sentence, add a list with `"T"` tags for each token in the sentence.

2. If there is one negation cue in the sentence, accumulate the unprocessed `negation` values in a list to process later.

3. If there are two or more negation cues in the sentence, accumulate the unprocessed `negation` values in separate lists according to the `negation.id` value. Return one copy of the sentence per `negation.id` value, with the corresponding negation values.

In [2]:
class ExtractJSON(beam.DoFn):
    """Extract JSON for each line."""

    def process(self, e):
        """Process a sentence."""
        e = json.loads(e)

        if e["negations"] == 0:

            # Process fields and add "T" tags

            fields = dict(
                zip(
                    ("form", "lemma", "xpos", "negation"),
                    [list(t) for t in zip(*[
                        [n["form"],
                         n["properties"]["lemma"],
                         n["properties"]["xpos"],
                         "T"] for n in e.get("nodes")])]
                )
            )

            # Remove the nodes fields
            e.pop("nodes")

            # Insert new fields
            e.update(fields)

            yield e
        else:

            # Process Fields
            fields = defaultdict(list)
            negs = defaultdict(list)
            for i in e.get("nodes"):
                fields["form"].append(i["form"])
                fields["lemma"].append(i["properties"]["lemma"])
                fields["xpos"].append(i["properties"]["xpos"])

                # Process the negation tags
                for j, k in enumerate(i.get("negation")):
                    negs[k["id"]].append(k)

            e.pop("nodes")
            e.update(dict(fields))

            # Output one copy of sentence for each set of cue + tags
            for v in negs.values():
                e["negation"] = v
                yield e

### 2. Processing the Negation Tokens

1. If there is no negation in the sentence, from the previous step we know the negation field is ready

2. If there is a negation, label the negation cues for each token as follows:
    - `cue` is in the dict?
        - Yes --> label `"C"`
        - No:
            - `scope` is in the dict?
                - Yes --> label `"F"`
                - No --> label `"T"`

In [4]:
class ProcessNegations(beam.DoFn):
    """Process negations to 4 labels.

    Negations are labeled as one of ["C", "A", "F", "T"].
    """

    def process(self, e):
        """Process labels.

        If 0 negations, the labels are ready
        from the previous step.
        """
        if e["negations"] == 0:
            n_t = len(e["negation"])
            e["cue"] = [""] * n_t
            e["scope"] = [""] * n_t
            yield e
        else:
            neg_old = e.pop("negation")
            neg_new = defaultdict(list)
            
            for f, n in zip(e["form"], neg_old):
                neg_new["cue"].append(n.get("cue", ""))
                neg_new["scope"].append(n.get("scope", ""))
                if "cue" in n:
                    if n["cue"] != f and "scope" in n:
                        neg_new["negation"].append("A")
                    else:
                        neg_new["negation"].append("C")
                else:
                    if "scope" in n:
                        neg_new["negation"].append("F")
                    else:
                        neg_new["negation"].append("T")

            e.update(dict(neg_new))
            yield e

In [28]:
class ProcessToTFRecord(beam.DoFn):

    def tf_ex(self, e):
        cont = tf.train.Features(
            feature={
            "id": tf.train.Feature(bytes_list=tf.train.BytesList(value=[e["id"].encode("utf-8")])),
            "source": tf.train.Feature(bytes_list=tf.train.BytesList(value=[e["source"].encode("utf-8")])),
            "negations": tf.train.Feature(int64_list=tf.train.Int64List(value=[e["negations"]]))
        })
        

        form_fs = []
        lemma_fs = []
        xpos_fs = []
        cue_fs = []
        scope_fs = []
        neg_fs = []
        
        form = e["form"]
        lemma = e["lemma"]
        xpos = e["xpos"]
        cue = e["cue"]
        scope = e["scope"]
        neg = e["negation"]
        
        
        for f, l, x, c, s, n in zip(form, lemma, xpos, cue, scope, neg):
            # create each of the features, then add them to the corresponding feature list
            f_f = tf.train.Feature(bytes_list=tf.train.BytesList(value=[f.encode("utf-8")]))
            form_fs.append(f_f)
            
            l_f = tf.train.Feature(bytes_list=tf.train.BytesList(value=[l.encode("utf-8")]))
            lemma_fs.append(l_f)
            
            x_f = tf.train.Feature(bytes_list=tf.train.BytesList(value=[x.encode("utf-8")]))
            xpos_fs.append(x_f)
            
            c_f = tf.train.Feature(bytes_list=tf.train.BytesList(value=[c.encode("utf-8")]))
            cue_fs.append(c_f)
            
            s_f = tf.train.Feature(bytes_list=tf.train.BytesList(value=[s.encode("utf-8")]))
            scope_fs.append(s_f)
            
            n_f = tf.train.Feature(bytes_list=tf.train.BytesList(value=[n.encode("utf-8")]))
            neg_fs.append(n_f)
        
        form = tf.train.FeatureList(feature=form_fs)
        lemma = tf.train.FeatureList(feature=lemma_fs)
        xpos = tf.train.FeatureList(feature=xpos_fs)
        cue = tf.train.FeatureList(feature=cue_fs)
        scope = tf.train.FeatureList(feature=scope_fs)
        neg = tf.train.FeatureList(feature=neg_fs)

        feats = tf.train.FeatureLists(feature_list={
            "form": form,
            "lemma": lemma, 
            "xpos": xpos,
            "cue": cue,
            "scope": scope,
            "negation": neg
        })

        ex = tf.train.SequenceExample(context=cont,
                                      feature_lists=feats)

        return ex

    def process(self, e):
        ex = self.tf_ex(e)
        yield ex.SerializeToString()

True

### 3. "Putting It All Together":

In [5]:
def process_data(src, dst):
    """Process data and save to file.
    
    Parameters
    ----------
    src : str
        The source file to process
    dst : str
        The destination file to output
        processed data
    """
    pref, suff = dst.rsplit(".")
    p = beam.Pipeline(interactive_runner.InteractiveRunner())
    sentences = (
        p
        | "Read Lines" >> beam.io.ReadFromText(src)
        | "Extract Data" >> beam.ParDo(ExtractJSON())
        | "Process Labels" >> beam.ParDo(ProcessNegations())
        | "Format JSON" >> beam.Map(json.dumps)
        | "Write File" >> beam.io.WriteToText(pref, "." + suff)
    )
    
    p.run()


In [11]:
def process_data_to_list(src):
    """Process data and return list.
    
    Parameters
    ----------
    src : str
        The source file to process
    """
    p = beam.Pipeline(interactive_runner.InteractiveRunner())
    sentences = (
        p
        | "Read Lines" >> beam.io.ReadFromText(src)
        | "Extract Data" >> beam.ParDo(ExtractJSON())
        | "Process Labels" >> beam.ParDo(ProcessNegations())
    )
    
    res = p.run()
    sents = res.get(sentences)
    return sents


In [19]:
def process_data_to_TF(src, dst):
    """Process data and save to file.
    
    Parameters
    ----------
    src : str
        The source file to process
    dst : str
        The destination file to output
        processed data
    """
    pref, suff = dst.rsplit(".")
    p = beam.Pipeline(interactive_runner.InteractiveRunner())
    sentences = (
        p
        | "Read Lines" >> beam.io.ReadFromText(src)
        | "Extract Data" >> beam.ParDo(ExtractJSON())
        | "Process Labels" >> beam.ParDo(ProcessNegations())
        | "Write File" >> beam.io.WriteToTFRecord(pref)
    )

    p.run()



## Processing the Files

1. The dev data file:

In [6]:
process_data(src="negation/cdd.epe", 
             dst="Data/dev.epe")

2. The training data file:

In [7]:
process_data(src="negation/cdt.epe",
             dst="Data/train.epe")

3. The evaluation data file:

In [8]:
process_data(src="negation/cde.epe",
             dst="Data/eval.epe")

## Output Format:

The example below shows the new format of the sentences:

```javascript
{
    "id": "0", 
    "source": "wisteria01",
    "negations": 0,
    "form": ["1.", "The", "Singular", "Experience", "of", "Mr.", "John", "Scott", "Eccles"],
    "lemma": ["1.", "The", "Singular", "Experience", "of", "Mr.", "John", "Scott", "Eccles"],
    "xpos": ["CD", "NNP", "NNP", "NN", "IN", "NNP", "NNP", "NNP", "NNP"],
    "negation": ["T", "T", "T", "T", "T", "T", "T", "T", "T"],
    "cue": ["", "", "", "", "", "", "", "", ""],
    "scope": ["", "", "", "", "", "", "", "", ""]
}
```
