Skip to content

Commit

Permalink
(feature/capitalization) Add supports for capitalization transformation
Browse files Browse the repository at this point in the history
  • Loading branch information
PonteIneptique committed Sep 17, 2020
1 parent c347c0d commit b400307
Show file tree
Hide file tree
Showing 11 changed files with 1,757 additions and 39 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -6,6 +6,7 @@ memory.csv
memory*.csv
new.yaml
tests/tests_output
tests/test_config/generated.xml

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
37 changes: 37 additions & 0 deletions DOCUMENTATION.md
Expand Up @@ -361,3 +361,40 @@ It will produces the following output

The glue token is not applied on token, the lemma value is transfered to the previous row and the POS is lost.
`@glue_char` is used to concatenate columns such as `lemma` here,

### Capitalization

This post-processing function capitalizes (*ie.* makes the first letter of words upper-case) randomly or always first
words of chunks (*ie.* sentences) and random letters inside. It also provides an uppercase mask creation, where it
replaces uppercased letters with lowercase letters the [Neutral Chess Queen UTF-8 character](https://www.compart.com/fr/unicode/U+1FA01).

The model is the following:

```xml
<config>
<!--...-->
<postprocessing>
<capitalize column-token="token" caps-to-utf8-marker="true">
<first-word when="never">
<sentence-marker name="empty_line"/>
</first-word>
<first-letters when="ratio" ratio="0.5"/>
</capitalize>
</postprocessing>
<!--...-->
</config>
```

1. <kbd>column-token</kbd> specifies the name of the column containing the raw form of the tokens
2. (Optional) <kbd>column-lemma</kbd> does the same thing for lemma
3. <kbd>caps-to-utf8-marker</kbd> activates masking uppercased letters.
4. <kbd>first-word</kbd> is activated when <kbd>when</kbd> is set to a value between `always`, `random` and `ratio`.
1. <kbd>when="ratio"</kbd> requires a second <kbd>ratio</kbd> value which needs to be a float between .0 and 1.0 (a percentage basically)
2. <kbd>when=random</kbd> is basically a shortcut for the latter where ratio=0.5
3. To identify sentences, you need to set up <kbd>sentence-marker</kbd>
1. It can be <kbd>name="empty_line"</kbd>, in which case chunks are separated by empty line (default output)
2. It can be <kbd>name="regexp"</kbd>, in which case it takes a `@matchPattern` attribute (for regular expression)
and a column that needs to be matched in `@source`, *.ie* `<sentence-matcher name="regexp" matchPattern="[\.!?]" source="lemma" />`
5. <kbd>first-letters</kbd> works with the same when/ratio attribute than <kbd>first-word</kbd>. It applies said capitalization
to random words inside chunks.

4 changes: 2 additions & 2 deletions protogenie/configs.py
Expand Up @@ -5,13 +5,13 @@

from .splitters import RegExpSplitter, LineSplitter, TokenWindowSplitter, FileSplitter, _SplitterPrototype
from .reader import Reader
from .postprocessing import Disambiguation, ReplacementSet, Skip, PostProcessing, Clitic
from .postprocessing import Disambiguation, ReplacementSet, Skip, PostProcessing, Clitic, Capitalize
from .toolbox import RomanNumeral
import datetime
from dataclasses import dataclass
Splitter = Type[_SplitterPrototype]

PostProcessingClasses = [Disambiguation, ReplacementSet, Skip, RomanNumeral, Clitic]
PostProcessingClasses = [Disambiguation, ReplacementSet, Skip, RomanNumeral, Clitic, Capitalize]


@dataclass
Expand Down
234 changes: 217 additions & 17 deletions protogenie/postprocessing.py
@@ -1,12 +1,18 @@
if False:
from .configs import CorpusConfiguration
import tempfile
import regex as re
from xml.etree.ElementTree import Element
import csv
from typing import List, ClassVar, Tuple, Dict
import math
import random
from abc import ABC, abstractmethod
from collections import namedtuple
from xml.etree.ElementTree import Element
import csv
from typing import List, ClassVar, Tuple, Dict, Optional, TYPE_CHECKING, Union

import regex as re

if TYPE_CHECKING:
from .configs import CorpusConfiguration
from .sentence_matchers import SentenceMatcherProto, SentenceRegexpMatcher
Numeric = Union[int, float]


class PostProcessing(ABC):
Expand All @@ -26,6 +32,74 @@ def match_config_node(cls, node: Element) -> bool:
"""
return node.tag == cls.NodeName

def _modify_line(self, header: List[str], values: Optional[List[str]],
file_path: str, config: "CorpusConfiguration"):
raise NotImplementedError

def _stop_chunk(self, line: Optional[Dict[str, str]]) -> bool:
raise NotImplementedError

def _chunk_modify_routine(self, file_path: str, config: "CorpusConfiguration"):
raise NotImplementedError

def _scan_chunks(self,
file_path: str, config: "CorpusConfiguration",
sentence_matcher: Optional[SentenceMatcherProto]) -> Tuple[int, int]:
""" Analyzes the FILE for the number of chunks
"""
chunks = 0
tokens = 0
with open(file_path) as file:
for nb_line, line in enumerate(file):
vals = line.strip().split(config.column_marker)

if nb_line == 0:
header = vals
continue

if len(header) == len(vals):
tokens += 1
if sentence_matcher and sentence_matcher.match(header, vals):
chunks += 1
elif sentence_matcher:
chunks += sentence_matcher.match(header, None)

return chunks, tokens

def _single_line_modify_routine(self, file_path: str, config: "CorpusConfiguration"):
header: List[str] = []
temp = tempfile.TemporaryFile(mode="w+") # 2

try:
with open(file_path) as file:
for nb_line, line in enumerate(file):

if not line.strip():
temp.write(line)
self._modify_line(header, None, file_path, config)
continue

vals = line.strip().split(config.column_marker)

if nb_line == 0:
header = vals
temp.write(line)
continue

modified = self._modify_line(header, vals, file_path=file_path, config=config)
temp.write(
config.column_marker.join(
[modified[head] for head in header]
) + "\n"
)

with open(file_path, "w") as f:
temp.seek(0)
f.write(temp.read())
finally:
temp.close() # 5


class ApplyTo:
def __init__(self, source: str, target: List[str]):
Expand Down Expand Up @@ -62,7 +136,7 @@ def apply(self, file_path: str, config: "CorpusConfiguration"):
header: List[str] = []
for nb_line, line in enumerate(csv_reader): # The file should already have been open
if nb_line == 0:
temp.write(config.column_marker.join(line+[self.disambiguation_key])+"\n")
temp.write(config.column_marker.join(line + [self.disambiguation_key]) + "\n")
header = line
continue
elif not line:
Expand All @@ -79,7 +153,7 @@ def apply(self, file_path: str, config: "CorpusConfiguration"):
lines[self.lemma_key] = self.match_pattern.sub("", lines[self.lemma_key])
else:
lines[self.disambiguation_key] = self.default_value
temp.write(config.column_marker.join(list(lines.values()))+"\n")
temp.write(config.column_marker.join(list(lines.values())) + "\n")
with open(file_path, "w") as f:
temp.seek(0)
f.write(temp.read())
Expand Down Expand Up @@ -120,7 +194,7 @@ def apply(self, file_path: str, config: "CorpusConfiguration"):
header: List[str] = []
for nb_line, line in enumerate(csv_reader): # The file should already have been open
if nb_line == 0:
temp.write(config.column_marker.join(line)+"\n")
temp.write(config.column_marker.join(line) + "\n")
header = line
continue
elif not line:
Expand All @@ -140,7 +214,7 @@ def apply(self, file_path: str, config: "CorpusConfiguration"):
else: # Otherwise, we just set the target value using this value
lines[target] = self.replacement_pattern

temp.write(config.column_marker.join(list(lines.values()))+"\n")
temp.write(config.column_marker.join(list(lines.values())) + "\n")
with open(file_path, "w") as f:
temp.seek(0)
f.write(temp.read())
Expand All @@ -162,7 +236,7 @@ class Skip(PostProcessing):
NodeName = "skip"

def __init__(
self, match_pattern: str, source: str
self, match_pattern: str, source: str
):
super(Skip, self).__init__()
self.match_pattern: re.Regex = re.compile(match_pattern)
Expand All @@ -177,7 +251,7 @@ def apply(self, file_path: str, config: "CorpusConfiguration"):
header: List[str] = []
for nb_line, line in enumerate(csv_reader): # The file should already have been open
if nb_line == 0:
temp.write(config.column_marker.join(line)+"\n")
temp.write(config.column_marker.join(line) + "\n")
header = line
continue
elif not line:
Expand All @@ -190,7 +264,7 @@ def apply(self, file_path: str, config: "CorpusConfiguration"):
if self.match_pattern.search(lines[self.source]):
continue

temp.write(config.column_marker.join(list(lines.values()))+"\n")
temp.write(config.column_marker.join(list(lines.values())) + "\n")

with open(file_path, "w") as f:
temp.seek(0)
Expand All @@ -214,7 +288,7 @@ class Clitic(PostProcessing):
Transfer = namedtuple("Transfer", ["col", "glue"])

def __init__(
self, match_pattern: str, source: str, glue: str, transfers: List[Tuple[str, bool]]
self, match_pattern: str, source: str, glue: str, transfers: List[Tuple[str, bool]]
):
super(Clitic, self).__init__()
self.match_pattern: re.Regex = re.compile(match_pattern)
Expand All @@ -239,7 +313,7 @@ def apply(self, file_path: str, config: "CorpusConfiguration"):
modifications: List[Tuple[int, Dict[str, Tuple[str, str]]]] = []
for nb_line, line in enumerate(csv_reader): # The file should already have been open
if nb_line == 0:
temp.write(config.column_marker.join(line)+"\n")
temp.write(config.column_marker.join(line) + "\n")
header = line
continue
elif not line:
Expand All @@ -253,7 +327,7 @@ def apply(self, file_path: str, config: "CorpusConfiguration"):
temp.write("\n".join([
config.column_marker.join(list(l.values()))
for l in sequence
])+"\n")
]) + "\n")
sequence = []
modifications = []
continue
Expand All @@ -264,7 +338,7 @@ def apply(self, file_path: str, config: "CorpusConfiguration"):
if self.match_pattern.match(lines[self.source]):
modifications.append(
(
len(sequence) - 1 -len(modifications),
len(sequence) - 1 - len(modifications),
{key: (keep, lines[key]) for (key, keep) in self.transfers}
)
)
Expand Down Expand Up @@ -293,3 +367,129 @@ def from_xml(cls, node: Element) -> "Clitic":
for tr in node.findall("transfer")
]
)


class Capitalize(PostProcessing):
""" Applies capitalization strategies to content
"""
NodeName = "capitalize"
Marker: str = "🨁" # NEUTRAL CHESS QUEEN
RE_Upper: re.Regex = re.compile("(\p{Lu})")

def __init__(self, first_word: Numeric, first_letters: Numeric,
column_token: str,
column_lemma: Optional[str] = None,
apply_unicode_marker: bool = False,
sentence_matcher: Optional[SentenceMatcherProto] = None):

self.first_word: Numeric = first_word
self.first_letters: Numeric = first_letters

self.column_token: str = column_token
self.column_lemma: Optional[str] = column_lemma
self.apply_unicode_marker: bool = apply_unicode_marker
self.sentence_matcher: Optional[SentenceMatcherProto] = sentence_matcher
self.first_word_state: bool = True # Variable representing the current status
# (True = next word is a first word)

self._files_chunks: Dict[str, List[bool]] = {}
self._files_tokens: Dict[str, List[bool]] = {}

@staticmethod
def parse_when(value: str, ratio: Optional[str]) -> Numeric:
if value == "always":
return 1
elif value == "never":
return 0
elif value == "random":
return 0.5
elif ratio:
try:
if 1.0 > float(ratio) > .0:
return float(ratio)
except:
raise ValueError("Your ration value is probably wrong. They must be < 1.0 (Found: {})".format(ratio))
raise ValueError("Invalid parameters for a ratio or an application")

@classmethod
def parse_node_including_when(cls, node: Element, name: str) -> Tuple[Numeric, Optional[Element]]:
target = node.findall("./{name}".format(name=name))
if target:
return cls.parse_when(target[0].attrib["when"], target[0].attrib.get("ratio")), target[0]
return 0, None

def _modify_line(self, header: List[str], values: Optional[List[str]],
file_path: str, config: "CorpusConfiguration") -> Dict[str, str]:
if self.first_word and self.sentence_matcher.match(header, values):
self.first_word_state = True
return dict(zip(header, values))

if not values or len(header) != len(values):
return {}

line = dict(zip(header, values))

# Sentence starts
if self.first_word > .0 and self.first_word_state and self._files_chunks[file_path].pop():
line[self.column_token] = line[self.column_token].capitalize()
# Need to pop tokens as well
if self.first_letters:
self._files_tokens[file_path].pop()
elif self.first_letters > .0 and self._files_tokens[file_path].pop():
line[self.column_token] = line[self.column_token].capitalize()

line[self.column_token] = self.RE_Upper.sub(self._replace_caps, line[self.column_token])

self.first_word_state = False
return line

def _replace_caps(self, value):
return value.group().lower()+self.Marker

@staticmethod
def _transform_to_bool_list(count: int, ratio: Numeric) -> List[bool]:
if ratio == 1.0:
return [True] * count
elif ratio == .0:
return [False] * count
else:
positives = min(round(count * ratio), count)
negatives = count - positives
out = [True] * positives + [False] * negatives
random.shuffle(out)
return out

def apply(self, file_path: str, config: "CorpusConfiguration"):
# We scan the files
chunks, tokens = self._scan_chunks(file_path, config, sentence_matcher=self.sentence_matcher)

# We store the dispatch of booleans
if self.first_word > .0:
self._files_chunks[file_path] = self._transform_to_bool_list(chunks, self.first_word)
if self.first_letters > .0:
self._files_tokens[file_path] = self._transform_to_bool_list(tokens, self.first_letters)

self._single_line_modify_routine(file_path=file_path, config=config)

@classmethod
def from_xml(cls, node: Element) -> "Capitalize":
first_word, first_word_elem = cls.parse_node_including_when(node, "first-word")
first_letters, _ = cls.parse_node_including_when(node, "first-letters")
sentence_marker = None
if first_word != .0:
try:
sentence_marker = SentenceMatcherProto.from_xml(
first_word_elem.findall("./sentence-marker")[0]
)
except IndexError:
print("You forgot to use a sentence marker.")
raise Exception

return cls(
first_word=first_word,
first_letters=first_letters,
sentence_matcher=sentence_marker,
apply_unicode_marker=node.attrib.get("utf8-marker-for-caps", "true").lower() == "true",
column_token=node.attrib["column-token"],
column_lemma=node.attrib.get("column-lemma")
)

0 comments on commit b400307

Please sign in to comment.