In [1]:
# -*- coding: UTF-8 -*-

from __future__ import print_function

from pattern.en import conjugate
import traceback

In [2]:
class Verb(object):
    """
    This class if for those verbs which have the following dependency labels:

    VERB ROOT
    WHNP > WDT --relcl--> NN
    VERB --advcl--> NN
    VERB --advcl--> IN
    VERB --ccomp--> NN
    """
    AUXILIARY_MODALS = [
        'can', 'could', 'may', 'might', 'must', 'shall', 'should', 'will',
        'would'
    ]

    def __init__(self, tok=None, clause=None, number_person='pl'):
        """
        If the verb comes from a fragment without a subject,
        then default conjugation is past plural.
        """
        self.is_parsed = False
        self.break_recursion = False
        self.tok = tok
        self.clause = clause
        self.aux = []
        self.nsubj = None
        self.do = False
        self.been = False
        self.number_person = number_person
        self.is_past = False
        self.is_future = False
        self.have_pres = False
        self.have_past = False
        self.is_modal = False
        self.is_participle = False
        if self.tok and self.clause:
            self.parse()

    def __str__(self):
        s = """
        is parsed: %s
        verb: %s
        verb lemma: %s
        verb dependency label: %s
        verb constituent tag: %s""" % (
            self.is_parsed, self.tok.lower_,
            self.tok.lemma_, self.tok.dep_, self.tok.tag_)
        for i, a in enumerate(self.aux):
            s += """
        aux %d: %s
        aux %d lemma: %s
        aux %d dependency label: %s""" % (
                i, a.lower_, i, a.lemma_, i, a.dep_)
        s += """
        subject: %s
        number_person: %s
        has 'do': %s
        has 'been': %s
        is past tense: %s
        is future tense: %s
        is participle: %s
        has 'have' in present tense: %s
        has 'have' in past tense: %s
        has modal: %s
""" % (
            self.nsubj, self.number_person, self.do, self.been,
            self.is_past, self.is_future, self.is_participle,
            self.have_pres, self.have_past, self.is_modal)
        return s

    def parse(self):
        if not self.tok or not self.clause:
            raise Exception('Verb.parse_verb() requires the `tok` and `clause` '
                            'variables to be set.')
        if self.break_recursion:
            raise Exception(
                'There is a circular dependency among conjunctive verbs.')
        # Reset variables.
        self.aux = []
        self.do = False
        self.been = False
        self.have_pres = False
        self.have_past = False
        self.is_modal = False
        self.is_past = False
        self.is_future = False
        # Check for previous verb joined by conjunction.
        self.break_recursion = True
        prev_verb = self.clause.prev_verb(self)
        self.break_recursion = False
        if prev_verb:
            self.nsubj = prev_verb.nsubj
            self.number_person = prev_verb.number_person
            for child in self.tok.children:
                if child.lemma_ == 'not':
                    self.not_token = child
                elif child.dep_ == 'aux' or child.dep_ == 'auxpass':
                    self.aux.append(child)
            if not self.aux:
                if prev_verb.do:
                    # Special exception: He did not walk but talked.
                    if self.tok.tag_ == 'VBD':
                        self.do = False
                        self.aux = []
                    else:
                        self.do = True
                        self.aux = prev_verb.aux
                else:
                    self.do = False
                    self.aux = prev_verb.aux
                self.been = prev_verb.been
                self.have_pres = prev_verb.have_pres
                self.have_past = prev_verb.have_past
                self.is_modal = prev_verb.is_modal
                self.is_past = prev_verb.is_past
                self.is_future = prev_verb.is_future
            else:
                self.is_past = self._is_past()
            # Calculate negation status.
            # Algo: calculate negation status of previous verb
            #       then calculate neg of current verb by
            #       searching for not/n't/but
            #
            # * "not"/"n't" have dep_ == 'neg'
            # * "but" has dep_ == 'cc' and is child of
            #   the first verb of conjunction
            #
            # Ex:
            #   did not talk or walk
            #   did not talk but walked and balked
            #   did talk but didn't walk or balk
            #   didn't talk but did walk and balk
        else:
            # Iterates children in order of appearance.
            for child in self.tok.children:
                if child.dep_ == 'nsubj' or child.dep_ == 'nsubjpass':
                    self.nsubj = child
                elif child.dep_ == 'aux' or child.dep_ == 'auxpass':
                    self.aux.append(child)
            for a in self.aux:
                if a.lemma_ == 'do':
                    self.do = True
                elif a.lower_ == 'been':
                    self.been = True
                elif a.lower_ == 'will':
                    self.is_future = True
                if (a.lower_ == 'have' or a.lower_ == 'has'):
                    self.have_pres = True
                if a.lower_ == 'had':
                    self.have_past = True
            self.is_modal = self.aux and self._is_aux_modal(self.aux[0])
            self.number_person = self._number_person(self.nsubj)
            self.is_past = self._is_past()
        self.is_participle = self._is_participle(self.tok)
        self.is_parsed = True

    def transform_to_present_str(self, tok):
        """
        Takes in a token. If the token is this verb or one of its auxiliaries,
        then it converts it to present tense.

        Assumes that `self` is a past tense verb.

        Rules:
        * past > present
        * had + ppart > has/have + ppart
        * had + been + ppart > has/have + been + ppart
        * was + ppart > is + ppart
        * did > does, had > has/have, was > is
        * would have + ppart > would + inf
        * would have + been + ppart> would be + ppart
        """
        if self.aux and tok == self.aux[0]:
            if tok.lemma_ == 'do':
                return conjugate('do', self.number_person) + tok.whitespace_
            elif self.is_modal:
                return tok.text_with_ws
            elif self.is_participle and tok.lower_ == 'has':
                return conjugate('have', self.number_person) + tok.whitespace_
            else:
                return conjugate(tok.lemma_, self.number_person) \
                        + tok.whitespace_
        elif tok in self.aux:
            if self.is_modal and not self.is_future and tok.lower_ == 'have':
                return ''
            elif self.is_modal and not self.is_future and tok.lower_ == 'been':
                return 'be' + tok.whitespace_
            else:
                return tok.text_with_ws
        elif tok == self.tok:
            if not self.aux:
                return conjugate(tok.lemma_, self.number_person) \
                        + tok.whitespace_
            elif self.is_modal and not self.been and not self.is_future:
                return tok.lemma_ + tok.whitespace_
            else:
                return tok.text_with_ws
        return None

    @classmethod
    def _is_aux_modal(cls, tok):
        return tok.lower_ in cls.AUXILIARY_MODALS

    @classmethod
    def _contains_one_of(cls, haystack, needles):
        for item in haystack:
            for needle in needles:
                if item == needle:
                    return True
        return False

    @classmethod
    def _number_person(cls, token):
        """
        Expects a token with POS == 'NOUN'.
        """
        # Default is plural.
        if not token:
            return 'pl'
        for tok in token.children:
            if tok.lower_ == 'and':
                return 'pl'
        if (token.tag_ == 'NNPS' or token.tag_ == 'NNS' or
                token.lower_ == 'we' or token.lower_ == 'they'):
            return 'pl'
        elif token.lower_ == 'i':
            return '1sg'
        elif token.lower_ == 'you':
            return '2sg'
        else:
            return '3sg'

    def _is_past(self):
        if self.is_future:
            return False
        if self.do:
            return self._is_past_tok(self.aux[0], self.number_person)
        else:
            if self.aux and self._is_past_tok(self.aux[0], self.number_person):
                return True
            elif self._is_past_tok(self.tok, self.number_person):
                return True
        return False

    @classmethod
    def _is_past_tok(cls, tok, number_person=None):
        """
        Is `verb` past tense, where `verb` has lemma `lemma`
        and number/person `number_person`?
        """
        PAST_CONJ = {
            '1sg': '1sgp',
            '2sg': '2sgp',
            '3sg': '3sgp',
            'pl': 'ppl',
            'part': 'ppart',
        }
        if tok.tag_ == 'VBN' or tok.tag_ == 'VBD':
            return True
        if number_person:
            return conjugate(tok.lemma_, PAST_CONJ[number_person]) == tok.text
        return (conjugate(tok.lemma_, '1sgp') == tok.text or
                conjugate(tok.lemma_, '2sgp') == tok.text or
                conjugate(tok.lemma_, '3sgp') == tok.text)

    @classmethod
    def _is_participle(cls, tok):
        """
        Expects a token.
        """
        return tok.tag_ == 'VBG' or tok.tag_ == 'VBN'


class Clause(object):
    VERB_DEPS = ['ROOT', 'conj', 'relcl', 'advcl', 'ccomp']

    def __init__(self, span):
        self.verbs = []
        self.span = span

    def prev_verb(self, verb):
        """
        If a verb is connected by a conjunction to a previous verb,
        then get that verb.
        """
        prev_verb = None
        if verb.tok.dep_ == 'conj':
            for tok in verb.tok.ancestors:
                if tok.dep_ in self.VERB_DEPS:
                    for v in self.verbs:
                        if v.tok == tok:
                            prev_verb = v
                            if not prev_verb.is_parsed:
                                prev_verb.parse()
                    break
        return prev_verb

    def parse_verbs(self):
        self.verbs = []
        first = True
        # Depth-first search to find verbs.
        s = []
        s.append(self.span)
        while s:
            span = s.pop()
            if 'S' in span._.labels or 'SBAR' in span._.labels:
                if first:
                    first = False
                else:
                    continue
            elif len(span) == 1 and span[0].dep_ in self.VERB_DEPS:
                self.verbs.append(Verb(span[0], self))
            s.extend(reversed(list(span._.children)))

    def transform_present(self):
        self.parse_verbs()
        past_verbs = [v for v in self.verbs if v.is_past]
        first = True
        new_text = ''
        # Depth-first search to transform past to present.
        s = []
        s.append(self.span)
        while s:
            span = s.pop()
            if len(span) == 1:
                span_is_past_verb = False
                if span[0].pos_ == 'VERB':
                    for v in past_verbs:
                        present = v.transform_to_present_str(span[0])
                        if present is not None:
                            new_text += present
                            span_is_past_verb = True
                            break
                if not span_is_past_verb:
                    new_text += span.text_with_ws
            elif 'S' in span._.labels or 'SBAR' in span._.labels:
                if first:
                    first = False
                    s.extend(reversed(list(span._.children)))
                else:
                    ws = ' ' if span.text_with_ws[-1] == ' ' else ''
                    new_text += Clause(span).transform_present() + ws
            else:
                s.extend(reversed(list(span._.children)))
        new_text = new_text.replace(' .', '.').replace(' ,', ',')
        if new_text and new_text[-1] == ' ':
            new_text = new_text[:-1]
        return new_text


class Sentence(object):

    def __init__(self, span):
        self.span = span

    def transform_present(self):
        if not self.span:
            raise Exception(
                'Sentence.transform_present() requires `span` to be set.')
        try:
            text = Clause(self.span).transform_present()
            if text:
                text = text[0].upper() + text[1:]
            return text
        except Exception as e:
            print('There was an error parsing the following sentence')
            print()
            print(self.span.text)
            print()
            print('with parse:')
            print()
            if hasattr(self.span._, 'parse_string'):
                print(self.span._.parse_string)
            print()
            print(str(e) + '\n' + traceback.format_exc())
            return self.span.text


def transform_present(nlp, text, echo=False):
    """
    It is not allowed to have sentences that conjunct verbs of different tenses
    without auxiliary verbs:

    * "We will go tomorrow and go today."

    This is because they are ambiguous without knowledge of adverbs like
    "tomorrow" and "today":

    "We will go and eat."

    Should eat be present or future tense? We assume future to avoid having to
    analyze the semantics of adverbial phrases.
    """
    doc = nlp(text)
    trans_text = []
    for sent in doc.sents:
        trans_text.append(Sentence(sent).transform_present())
        if echo:
            print(trans_text[-1])
    return ' '.join(trans_text)


def transform_present_span(sent):
    return Sentence(sent).transform_present()

In [3]:
# -*- coding: UTF-8 -*-

from __future__ import absolute_import

import benepar
from benepar.spacy_plugin import BeneparComponent
from collections import namedtuple
from past2present import Sentence
import re
import spacy
import string

In [11]:
class BeneparExtension(object):
    """
    Represents a custom extension of a spacy Span with data from the
    `benepar` plugin. Allows access of properties using dot syntax.
    """
    def __init__(self, children=[], labels=[], parse_string=''):
        self.children = children
        self.labels = labels
        self.parse_string = parse_string

# BeneparExts = namedtuple('Extensions', ['children', 'labels', 'parse_string'])


class SpacyToken(object):
    """
    A pickleable version of a `spacy.tokens.token.Token` object. It holds a
    pointer to the `SpacySentenceSpan` object that represents the sentence
    in which it appears.
    """
    def __init__(self, sent, tok):
        self.sent = sent
        self.dep_ = tok.dep_
        self.i = tok.i
        self.lemma_ = tok.lemma_
        self.lower_ = tok.lower_
        self.pos_ = tok.pos_
        self.tag_ = tok.tag_
        self.text = tok.text
        self.text_with_ws = tok.text_with_ws
        self.whitespace_ = tok.whitespace_
        self.__children = [child.i for child in tok.children]
        self.__ancestors = [anc.i for anc in tok.ancestors]

    @property
    def children(self):
        for i in self.__children:
            yield self.sent[i - self.sent.start]

    @property
    def ancestors(self):
        for i in self.__ancestors:
            yield self.sent[i - self.sent.start]


class SpacySpan(object):
    """
    A pickleable version of a `spacy.tokens.span.Span` object. It requires
    the Berkeley Neural Parser `benepar` plugin, because it follows the
    constituency parse tree to transform each `spacy.tokens.span.Span` into
    a `SpacySpan`.
    """
    def __init__(self, sent, span=None):
        if span is None:
            self.sent = sent
            self.__len = 0
            self.text = ''
            self.text_with_ws = ''
            self.start = 0
            self.end = 0
            self._ = BeneparExtension(children=[], labels=[], parse_string='')
        else:
            chs = [SpacySpan(sent, child) for child in span._.children]
            ls = list(span._.labels)
            ps = span._.parse_string
            self.__len = len(span)
            self.sent = sent
            self.text = span.text
            self.text_with_ws = span.text_with_ws
            self.start = span.start
            self.end = span.end
            self._ = BeneparExtension(children=chs, labels=ls, parse_string=ps)

    def _relative_index(self, idx):
        if idx < 0:
            idx += self.__len
        idx += self.start - self.sent.start
        return idx

    def __getitem__(self, key):
        if isinstance(key, slice):
            start = self.relative_index(slice.start)
            stop = self.relative_index(slice.stop)
            return self.sent[start:stop:slice.step]
        elif isinstance(key, int):
            return self.sent[self._relative_index(key)]
        else:
            raise ValueError('Index must be an integer or slice.')

    def __len__(self):
        return self.end - self.start


class SpacySentenceSpan(SpacySpan):
    """
    A pickleable version of a `spacy.tokens.span.Span` object that represents
    an entire sentence.

    A special type of span is required for this, because it needs to
    hold the tokens that other spans refer to.
    """
    def __init__(self, span):
        self.__tokens = [SpacyToken(self, tok) for tok in span]
        super(SpacySentenceSpan, self).__init__(self, span)

    def __getitem__(self, key):
        if isinstance(key, slice):
            if slice.step != 1:
                raise ValueError(
                        'Stepped slices are not supported in Span objects.')
            span = SpacySpan(self)

            start = self.relative_index(slice.start)
            stop = self.relative_index(slice.stop)

            if len(self.__tokens[start:stop]) == 0:
                return span
            print('start:', start, ' stop:', stop)
            all_but_last = map(lambda tok: tok.text_with_ws,
                               self.__tokens[start:stop - 1])
            last = self.__tokens[stop - 1]
            span.text = ''.join(all_but_last) + last.text
            span.text_with_ws = span.text + last.whitespace_
            return span
        elif isinstance(key, int):
            return self.sent.__tokens[self._relative_index(key)]
        else:
            raise ValueError('Index must be an integer or slice.')

In [5]:
def test_parse(nlp):
    doc = nlp(u'The cat sat on the mat by a hat.')
    sent = next(doc.sents)
    psent = SpacySentenceSpan(sent)
    for child in psent._.children:
        assert isinstance(child, SpacySpan)
    s = [(sent, psent)]
    while s:
        span, pspan = s.pop()
        print('<', span.text)
        print('>', pspan.text)
        assert isinstance(pspan, SpacySpan)
        assert len(span) == len(pspan)
        assert span.text_with_ws == pspan.text_with_ws
        assert span.text == pspan.text
        if len(span) == 1:
            assert isinstance(pspan[0], SpacyToken)
            assert span[0].lemma_ == pspan[0].lemma_
            assert span[0].dep_ == pspan[0].dep_
            assert span[0].whitespace_ == pspan[0].whitespace_
        s.extend(reversed(list(zip(span._.children, pspan._.children))))

In [6]:
import benepar
benepar.download('benepar_en')

[nltk_data] Downloading package benepar_en to
[nltk_data]     /home/jovyan/nltk_data...
[nltk_data]   Package benepar_en is already up-to-date!


True

In [8]:
nlp = spacy.load('en_core_web_sm')
nlp.add_pipe(BeneparComponent('benepar_en'))

In [12]:
doc = nlp(u'The cat sat on the mat by a hat.')
sent = next(doc.sents)
psent = SpacySentenceSpan(sent)
print(transform_present_span(psent))

The cat sits on the mat by a hat.


In [18]:
test_parse(nlp)

< The cat sat on the mat by a hat.
> The cat sat on the mat by a hat.
< The cat
> The cat
< The
> The
< cat
> cat
< sat on the mat by a hat
> sat on the mat by a hat
< sat
> sat
< on the mat
> on the mat
< on
> on
< the mat
> the mat
< the
> the
< mat
> mat
< by a hat
> by a hat
< by
> by
< a hat
> a hat
< a
> a
< hat
> hat
< .
> .


In [14]:
import argparse
import logging

from past.builtins import unicode

import apache_beam as beam
from apache_beam import coders
from apache_beam.io import filebasedsource
from apache_beam.io import WriteToText
from apache_beam.io.iobase import Read
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.transforms import PTransform

In [15]:
class ParseWithSpacy(beam.DoFn):
    DEFAULT_BATCH_SIZE = 16
    DEFAULT_N_THREADS = 2

    def process(self, doc_string, *args, **kwargs):
        batch_size = (kwargs['batch_size'] if 'batch_size' in kwargs
                      else self.DEFAULT_BATCH_SIZE)
        n_threads = (kwargs['n_threads'] if 'n_threads' in kwargs
                     else self.DEFAULT_N_THREADS)
        doc_strings = [doc_string]
        docs = self.nlp.pipe(doc_strings,
                             batch_size=batch_size,
                             n_threads=n_threads)
        benepar = self.nlp.get_pipe('benepar')
        docs = map(benepar, docs)
        for doc in docs:
            for sent in doc.sents:
                yield SpacySentenceSpan(sent)

    def start_bundle(self):
        if not getattr(self, 'nlp', None):
            self.nlp = ParseWithSpacy.init_spacy_english_model()
        else:
            logging.debug('Spacy model already initialized.')

    @staticmethod
    def init_spacy_english_model():
        nlp = spacy.load('en_core_web_lg')
        nlp.add_pipe(BeneparComponent('benepar_en'))
        return nlp


class _GutenbergSource(filebasedsource.FileBasedSource):
    def __init__(self,
                 file_pattern,
                 chunk_char_length=4000,
                 min_sentence_char_length=5):
        super(_GutenbergSource, self).__init__(file_pattern, splittable=False)
        self.chunk_char_length = chunk_char_length
        self.min_sentence_char_length = min_sentence_char_length
        self.coder = coders.StrUtf8Coder()

    def read_records(self, file_name, range_tracker):
        start_offset = range_tracker.start_position()
        with self.open_file(file_name) as file_to_read:
            file_to_read.seek(start_offset)
            read_buffer = file_to_read.read()
            data = ''.join(
                c for c in read_buffer if c in string.printable or c == '\n')
            data = self.coder.decode(data)
            data = _GutenbergSource.clean_text(data)
            for text in self.chunks(data):
                yield text

    @staticmethod
    def clean_text(text):
        # Replace carraige return and tab characters.
        text = text.replace(u'\r\n', u'\n').replace(u'\r', u'\n')
        text = text.replace(u'\t', u' ')
        # Standardize quotes/apostrophes.
        text = re.sub(ur"[’‘]",  # noqa: E999
                      u"'",
                      re.sub(ur"[ ]*`", u"'", text))
        # Remove play character prompts like 'WALTER: ...'
        text = re.sub(ur"^[A-Z0-9‘’'., ]+(?:\([^)]*\))?[:.][ ]*",  # noqa
                      u'',
                      text,
                      flags=re.M)
        # Remove titles which are lines with all-caps.
        text = re.sub(ur'^[A-Z0-9.,-?$#!;:\'’"_ ]+$', u'', text, flags=re.M)
        # Remove divider lines like '* * * * *' and antiquated punctuation.
        text = re.sub(
            ur'^[ ]*[-*=_].*[-*=_][ ]*$',
            u'',
            re.sub(u':--', u':', re.sub('&c', 'etc', text)),
            flags=re.M)
        # Remove leading spaces.
        text = re.sub(u'^[ ]+', u'', text)
        # Remove all line breaks except empty lines.
        text = re.sub(u'\n(?=[^\n])', u' ', text, flags=re.M)
        # Remove footnotes and references.
        # (they can be nested one deep: [Note [Note in note]])
        text = re.sub(  # noqa: W605
            ur'\[[^][]+\]', u'',
            re.sub(ur'\[[^][]+\]', '', re.sub(ur'^\[[^ ]+\] .*$', u'', text,
                                              flags=re.M)))
        # Remove underscores
        text = text.replace(u'_', u'')
        return text

    def chunks(self, text):
        remainder = text
        N = len(remainder)
        start = 0
        end = 0
        while start < N:
            end += self.chunk_char_length
            while end < N and remainder[end] != u'\n':
                end += 1
            # Remove all newlines, double spaces and underscores.
            yield re.sub(u' [ ]+',
                         u' ',
                         re.sub(u'\n',
                                u' ',
                                remainder[start:end]))
            start = end


class ReadGutenbergText(PTransform):
    def __init__(self,
                 file_pattern=None,
                 **kwargs):
        super(ReadGutenbergText, self).__init__(**kwargs)
        self._source = _GutenbergSource(file_pattern)

    def expand(self, pvalue):
        return pvalue.pipeline | Read(self._source)

In [23]:
with open('text-small.txt') as f:
    s = f.read()
    #t = ''.join(c for c in s if c in string.printable or s == '\n')
    #u = coders.StrUtf8Coder().decode(t)
    u = s.decode('utf-8-sig')
    print(_GutenbergSource.clean_text(u))

A certain king had a beautiful garden, and in the garden stood a tree which bore golden apples. These apples were always counted, and about the time when they began to grow ripe it was found that every night one of them was gone. The king became very angry at this, and ordered the gardener to keep watch all night under the tree. The gardener set his eldest son to watch; but about twelve o'clock he fell asleep, and in the morning another of the apples was missing. Then the second son was ordered to watch; and at midnight he too fell asleep, and in the morning another apple was gone. Then the third son offered to keep watch; but the gardener at first would not let him, for fear some harm should come to him: however, at last he consented, and the young man laid himself under the tree to watch. As the clock struck twelve he heard a rustling noise in the air, and a bird came flying that was of pure gold; and as it was snapping at one of the apples with its beak, the gardener's son jumped up

In [None]:
PROJECT = 'my-project'
BUCKET = 'gs://my-bucket'

def run(argv=None):
    """Main entry point."""

    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input',
        dest='input',
        default=BUCKET + '/gutenberg/small/*.txt',
        help='Input file to process.')
    parser.add_argument(
        '--output',
        dest='output',
        default=BUCKET + '/past2present/sentences',
        help='Output file to write results to.')
    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_args.extend([
        '--runner=DirectRunner',
        '--project=' + PROJECT,
        '--staging_location=' + BUCKET + '/past2present/staging',
        '--temp_location=' + BUCKET + '/tmp',
        '--job_name=past2present',
    ])
    pipeline_options = PipelineOptions(pipeline_args)
    # From https://github.com/apache/beam/blob/master/sdks/python/apache_beam
    #             /examples/complete/top_wikipedia_sessions.py :
    # We use the save_main_session option because one or more DoFn's in this
    # workflow rely on global context (e.g., a module imported at module level).
    pipeline_options.view_as(SetupOptions).save_main_session = True

    def transform(sent):
        return (sent.text, Sentence(sent).transform_present())

    def format_result(before_after):
        (before, after) = before_after
        return u'< %s\n> %s' % (before, after)

    with beam.Pipeline(options=pipeline_options) as p:
        # Read the text file[pattern] into a PCollection.
        chunks = p | ReadGutenbergText(known_args.input)

        output = (
                chunks
                | 'Parse' >> beam.ParDo(ParseWithSpacy())
                | 'Split' >> beam.FlatMap(lambda sents: sents)
                | 'Transform' >> beam.Map(transform)
                | 'Format' >> beam.Map(format_result))

        output | WriteToText(known_args.output)

In [None]:
logging.getLogger().setLevel(logging.INFO)
run()