In [1]:
import os
import pickle

import pygtrie

# Get the path of the script
# script_path = os.path.abspath(__file__)

ROOT_DIR = '.'
# ROOT_DIR = os.path.dirname(script_path)

dict_path = ROOT_DIR + '/dict.pkl'

def saveDict():
	with open(dict_path, 'wb') as f:
		pickle.dump(trie, f)

if not os.path.exists(dict_path):
	trie = pygtrie.CharTrie()
	saveDict()

# Load the trie from the file using pickle
with open(dict_path, 'rb') as f:
	trie = pickle.load(f)

def add_word(word:str) -> None:
	trie[word] = True
	saveDict()
	
def check_word(word:str) -> bool:
	return word in trie

def del_word(word:str) -> None:
	try:
		del trie[word]
	except:
		pass

In [2]:
from itertools import chain
from math import log2
import regex as re
import sys
from statistics import mode 


# Get the path of the script
from dataclasses import dataclass, field
from typing import Callable, Generator, List, Optional, Tuple

# from .UDict import add_word, check_word 
import Levenshtein
from language_tool_python import LanguageTool
from icecream import ic

# Initialize the LanguageTool tool
lang_tool = LanguageTool('en-US', config={ 'cacheSize': 1000, 'pipelineCaching': True })

# Get the path of the script
script_path = '.'
# script_path = os.path.abspath(__file__)

# ROOT_DIR = os.path.dirname(script_path)
ROOT_DIR = '.'

def parseRules(name, ROOT_DIR=ROOT_DIR+'/rules') -> Generator[Tuple[str, str], None, None]:
   with open(ROOT_DIR + f'/{name}.tsv', 'r') as f:
     for line in f:
         line = line.strip()
         if len(line) == 0:
            continue
         line = line.split('\t')
         if len(line) > 1:
            yield (line[0], line[1])
def compile_first(x:Tuple[str,str])->Tuple[re.Pattern[str],str]:
   try:
     return (re.compile(x[0]),x[1])
   except:
     print(x)
     raise ValueError(f'compilable {x}')
WORD_CORRECTION_RULES = list(map(compile_first , chain(parseRules('anti.variant'), parseRules('anti.misspelling'))))
KEYBOARD_CORRECTION_RULES = list(map(compile_first , parseRules('anti.keyboard')))
FAT_CORRECTION_RULES = list(map(compile_first , parseRules('fat.keyboard')))
WORD_RULES = list(map(compile_first ,  chain(parseRules('variant'), parseRules('grammatical'), parseRules('misspelling'))))
KEYBOARD_RULES = list(map(compile_first,  parseRules('keyboard')))

In [3]:
class FileWriter:
	def __init__(self, file_path):
		self.file = open(file_path, 'w')

	def write(self, msg):
		self.file.write(msg)
		self.flush()

	def flush(self):
		self.file.flush()

In [4]:
import os, sys
parent_dir = os.path.abspath(os.path.join(os.getcwd(), '../'))
if parent_dir not in sys.path:
    sys.path.insert(0, parent_dir)
from util import StringSpans

In [17]:
def count_uppercase_letters(s: str) -> int:
   count = 0
   for char in s:
      if ord(char) < 97 or ord(char) > 122:
         count += 1
   return count
def normal_word(word:str)->bool:
   return lang_tool.correct(word) == word or check_word(word)
def string_mutation_distance(str1: str, str2: str) -> int:
   """Returns the number of mutations required to transform str1 into str2"""
   return Levenshtein.distance(str1, str2)
def show_diff(a: str, b: str):
   l_a = StringSpans(a).get_words()
   l_b = StringSpans(b).get_words()
   for i in range(min(len(l_a), len(l_b))):
      if l_a[i] != l_b[i]:
         print(f'i:{i} a:"{l_a[i]}" b:"{l_b[i]}"')
def diff(a: str, b: str) -> List[Tuple[str,str]]:
   l_a = StringSpans(a).get_words()
   l_b = StringSpans(b).get_words()
   return [(l_a[i], l_b[i]) for i in range(min(len(l_a), len(l_b)))
      if l_a[i] != l_b[i]]
def apply_match(text:str, match_result: Tuple[Tuple[int,int],str,re.Pattern], verbose: bool = False) -> str:
   span, repl, regex = match_result
   if verbose:
      print(f"Before replace: {text}")
   replaced_text = regex.sub(repl, text[span[0]:span[1]])
   after_replace_text = text[:span[0]] + replaced_text + text[span[1]:]
   if verbose:
      print(f"After replace: {after_replace_text}")
   return after_replace_text
def keyboard_rules_scan(text: str)->List[Tuple[Tuple[int, int], str, re.Pattern]]:
   matches = []
   rules = KEYBOARD_RULES
   for regex, repl in rules:
     for x in regex.finditer(text,overlapped=True):
       matches.append((x.span(), repl, regex))
   return matches
def word_rules_scan(text: str)->List[Tuple[Tuple[int, int], str, re.Pattern]]:
   matches = []
   for regex, repl in WORD_RULES:
     x = regex.match(text)
     if x is not None:
       start, end = x.span()
       matches.append(((start, end), repl, regex))
   return matches
def rules_scan(text: str)-> List[Tuple[Tuple[int, int], str, re.Pattern]]:
   result = word_rules_scan(text) + keyboard_rules_scan(text)
   result.sort()
   return result
def expand_span_to_word(words:List[Tuple[int,int]],span:Tuple[int,int])->Tuple[Tuple[int,int],Tuple[int,int],int]:
   ss, se = span
   for i, (start,end) in enumerate(words):
      if start <= ss and se <= end:
         return (start,end),(ss-start,se-start),i
   for i, (start,end) in enumerate(words):
      if start > ss and se <= end:
         return (start,end),(start,se-start),i
      elif start <= ss and se > end:
         return (start,end),(ss-start,end-start),i
   
   raise ValueError(f'sth is wrong {words} {span}')
def valid_matches(text:str, slots:List[Tuple[Tuple[int, int], str, re.Pattern]], verbose=False):
   texas = StringSpans(text)
   mutations: List[str] = list(map(lambda x: '', slots))

   # Apply the match to the text and get the resulting strings
   for match_index, match_result in enumerate(slots):
      span, repl, regex = match_result
      ex_span,relative_span,ex_span_index = expand_span_to_word(texas.words,span)
      old_word = texas.get(ex_span)
      new_word = apply_match(old_word,(relative_span,repl,regex),verbose)
      new_word_corrections = corrections(new_word,verbose=verbose)
      if len(new_word_corrections) > 0:
         new_word_corrections_mode = mode(new_word_corrections)
      else:
         if verbose:
            print(f'new_word: {new_word} can\'t be fixed')
         new_word_corrections_mode = ''
      if normal_word(old_word) \
            and not normal_word(new_word) \
            and new_word[0].lower() == old_word[0].lower() \
            and new_word[-1].lower() == old_word[-1].lower() \
            and new_word_corrections_mode == old_word:
         mutations[match_index] = texas.replace_word(ex_span_index,new_word)
      else:
         if verbose:
            print(f'rule undetectable or modify looks! new word "{new_word}" != "{old_word}" original and will be corrected to {new_word_corrections_mode} from {new_word_corrections}')



   # Check for ambiguous and invalid matches
   ambiguous_invalid_matches = [i for i, new_string in enumerate(mutations)
         if not new_string or new_string in mutations[:i] or normalize(new_string,verbose=verbose) != text]

   # Create a list of valid matches
   valid_slots = [elem for i, elem in enumerate(slots) if i not in ambiguous_invalid_matches]
   
   # Print the list of matches and their mutations if verbose output is enabled
   if verbose:
      print('\n'+('%'*20)+'valid slots!'+('%'*20))
      for v in list(zip(slots, mutations)):
         print(v)

   return valid_slots
def valid_rules_scan(text:str,verbose=False):
   proposed_slots = rules_scan(text)
   if verbose:
      print('proposed_slots: ',proposed_slots)
   valid_slots = valid_matches(text,proposed_slots,verbose=verbose)
   if verbose:
      print('valid_slots: ')
      for s in valid_slots:
         print(s)
   return valid_slots
def chunker(text:str,span_size = 6) -> List[Tuple[int,int]]:
   words = StringSpans(text).words
   if len(words) < span_size:
      return [(0,len(text))]
   chunks = []
   last_start = 0
   for i in range(span_size-1,len(words),span_size):
      chunks.append((last_start,words[i][1]))
      if i+1<len(words):
         last_start = words[i+1][0] 

   # last word ends with last word
   chunks[-1] = (chunks[-1][0], words[-1][1])
   return chunks
def word_we_misspelled(word:str,spelling:str,verbose=False):
   uls = count_uppercase_letters(word)
   if string_mutation_distance(spelling,word) == 1 \
     and spelling[0].lower() == word[0].lower() \
     and spelling[-1].lower() == word[-1].lower() \
     and uls == 2 \
     and uls < len(word):

     for regex,repl in FAT_CORRECTION_RULES:
       if regex.sub(repl,word) != spelling:
         if verbose:
            print(f"FAT_CORRECTION_RULES ({regex}) ({repl}): {regex.sub(repl,word)} == {spelling}")
         return True
     return False
   else:
     return False # speller is wrong since input is ai generated and the only source for bad spelling is us and it's probably a name of sth
def spell_word(word:str,verbose=False) -> str:
   if normal_word(word):
      return word
   spellingOpt = lang_tool.check(word)[0].replacements[0]
   spelling = spellingOpt if spellingOpt is not None else word
   return spelling if word_we_misspelled(word,spelling,verbose) else word 
def correction_rules_subset(text:str,verbose=False):
   return [rule for rule in lang_tool.check(text) if rule.category in ['TYPOS','SPELLING','GRAMMAR','TYPOGRAPHY']]
def normalize(text:str,verbose=False,learn=False):
   chunks: List[Tuple[int,int]] = chunker(text) # size = chunks
   to_be_original = text
   offsets: List[int]= [x.offset for x in correction_rules_subset(text,verbose=verbose)] # size = offsets
   empty_chunks = [False for _ in chunks] # size = chunks
   text_sss = StringSpans(text)
   affected_words = []
   for o in offsets:
      closest_word = None
      closest_distance = float('inf')
      for s, e in text_sss.words:
         if o < s:  # o is to the left of the current word
               distance = s - o
         elif o > e:  # o is to the right of the current word
               distance = o - e
         else:  # o is inside the current word
               distance = 0
         if distance < closest_distance:
               closest_distance = distance
               closest_word = text[s:e]
      affected_words.append(closest_word)

   if verbose:
      print(f'text={text}')
      print(f'chunks={chunks}')
      print(f'offsets={offsets}')
      print(f'text_sss.words={text_sss.words}')
      print(f'text_sss.get_words()={text_sss.get_words()}')
      print(f'affected_words={affected_words}')
   offsets_chunks = []
   for chunk_start, chunk_end in chunks:
      chunk_offsets = []
      for i, o in enumerate(offsets):
         if verbose:
            print(f'iter={i}, o={o}')
         if chunk_start <= o < chunk_end:
            affected_word = affected_words[i]
            affected_word_corrections = corrections(affected_word)
            chunk_offsets.append((o, affected_word, affected_word_corrections))
            if verbose:
               print(f"Added ({o}, {affected_word}, {affected_word_corrections}) to chunk_offsets")
      offsets_chunks.append(chunk_offsets)
      if verbose:
         print(f"Added {chunk_offsets} to offsets_chunks")
   if verbose:
      print(f'chunks_offsets={offsets_chunks}')
   for i, offsets_chunk in enumerate(offsets_chunks):
      if len(offsets_chunk) > 1:
         if verbose:
            print(f'len({offsets_chunk})={len(offsets_chunk)} > 1')
         empty_chunks[i] = True
         if learn:
            for o,w,cs in offsets_chunk:
               add_word(w)
      elif len(offsets_chunk) == 1 and len(offsets_chunk[0][2]) == 0:
         if verbose:
            print(f'no suggestions for {offsets_chunk[0][1]} added to dict')
         empty_chunks[i] = True
         if learn:
            add_word(offsets_chunk[0][1])
      elif len(offsets_chunk) == 1:
         cs = offsets_chunk[0][2]
         if verbose:
            print(f'typo={offsets_chunk[0][1]}\nsuggestion={mode(cs)}')
            print(f'votes={cs}')
         to_be_original = to_be_original.replace(offsets_chunk[0][1],mode(cs))
      else:
         empty_chunks[i] = True
   
   return to_be_original
def corrections (typo,verbose=False):
   suggestion = spell_word(typo)
   votes = [suggestion] if string_mutation_distance(suggestion,typo) == 1 and normal_word(suggestion) else []
   for regex,repl in FAT_CORRECTION_RULES:
      matches = ((x.span(), repl, regex) for x in regex.finditer(typo,overlapped=True))
      for match in matches:
         votes.append(apply_match(typo,match))
         
   for regex,repl in WORD_CORRECTION_RULES:
      if regex.match(typo) is not None:
         votes.append(regex.sub(repl,typo))

   for regex,repl in KEYBOARD_CORRECTION_RULES:
      matches = ((x.span(), repl, regex) for x in regex.finditer(typo,overlapped=True))
      for match in matches:
         votes.append(apply_match(typo,match))
         
   if verbose:
      print(f'unfiltered votes {votes}')
   votes = [v for v in votes if  normal_word(v)]
   if verbose:
      print(f'filtered votes {votes}')
   return votes

In [6]:
@dataclass
class Typo:
   """Class for Typo Engine."""

   text: str = field(repr=False)
   _length: int = field(init=False, repr=False)
   _slots: Optional[List[Tuple[Tuple[int, int], str, re.Pattern]]] = field(init=True, repr=False,default=None)
   _spaces: Optional[List[int]] = field(init=True, repr=False,default=None)
   verbose: bool = field(init=True,repr=False,default=False)

   def __post_init__(self):
      if self.text != normalize(self.text,self.verbose):
         raise ValueError("Text isn't spelled correctly")
   @staticmethod
   def isAcceptable(text:str,verbose:bool=False):
      return text == normalize(text,verbose)
   @staticmethod
   def FixText(text:str,verbose=False):
      return normalize(text,verbose)
   def apply(self, space: int, offset: int, text: str) -> str:
      if self.verbose:
         print(f"apply: space={space}, offset={offset}, text={text}")
      if offset == 0:
         return text
      match_tuple = self.slots[sum(self.spaces[0:space]) + offset - 1]
      applied = apply_match(text, match_tuple,self.verbose)
      if self.verbose:
         print(f"applied: {applied}")
      return applied
   @property
   def slots(self):
      if self._slots is None:
         self._slots = valid_rules_scan(self.text,self.verbose)
      return self._slots
   @property
   def length(self) -> int:
      return len(self.slots)
   @length.setter
   def length(self, length: int):
      pass
   @property
   def spaces(self) -> List[int]:
      if self._spaces is not None:
         return self._spaces
      
      sentence_ranges = chunker(self.text)
      
      # Initialize an empty list of buckets
      num_buckets = len(sentence_ranges)
      buckets: List[int] = [0 for _ in range(num_buckets)]
      
      # Iterate through each element range and put it in the corresponding bucket
      for i, (start, end) in enumerate(span for span,_,_ in self.slots):
         for j, (sent_start, sent_end) in enumerate(sentence_ranges):
            if sent_start <= start < sent_end and sent_start < end <= sent_end:
               buckets[j] += 1
               break
      return buckets
   @spaces.setter
   def spaces(self, value):
      pass
   @property
   def bits(self):
      return list(map(int, map(lambda x : log2(x + 1), self.spaces)))
   @bits.setter
   def bits(self, bits: int):
      pass
   def encode(self, values:List[int]):
      spaces = self.spaces
      if len(values) > len(spaces):
         raise ValueError("Can't encode")
      for i in range(len(values)):
         # spaces[i] = 0 means that the chunk has a birth defect
         # a typo not by us making the chunk unusable and in that case 
         # values[i] = 0 and the fact that it's an un-fixable typo will
         # tell the decoder to learn it
         if values[i] >= spaces[i] and spaces[i] != 0:
            raise ValueError("Won't fit")
      result = self.text
      for i in range(len(values) - 1, -1, -1):
         result = self.apply(i, values[i], result)
      return result
   @staticmethod
   def decode(text:str,verbose=False,test_self=None) -> Tuple[str,List[int]]:
      original = normalize(text,verbose)
      if test_self is not None:
         if original != test_self.text:
            print(f'original=\n{original}')
            print(f'test_self.text=\n{test_self.text}')
         assert original == test_self.text
      t = Typo(original)

      return original, t._decode(text,test_self)
   def _decode(self, text:str,test=None) -> List[int]:
      a_self = test if test is not None else self
      spaces = a_self.spaces
      cnt = len(diff(text,a_self.text))
      if a_self.verbose:
         print(f'cnt={cnt}')
         print(f'diff(text,a_self.text)={diff(text,a_self.text)}')
      values = [0 for s in spaces]
      for index, space in enumerate(spaces):
         isZero = True
         for i in range(space):
            values[index] = i
            dif = diff(text, a_self.encode(values))
            if len(dif) == cnt - 1:
               if a_self.verbose:
                  print(f'values={values}')
                  print(f'dif={dif}')
               cnt -= 1
               isZero = False
               break 
         if isZero:
            values[index] = 0     
            if a_self.verbose:
               print(f'chunk is empty values={values}')
      return  values
   def encode_encoder(self, bytes_str: str) -> Tuple[List[int], str]:
      if not set(bytes_str) <= set('01'):
         raise ValueError(f"bytes_str isn't a bytes string : '{bytes_str}'")
      values = self.bits
      bit_values = []
      remaining_bits = bytes_str
      for i, val in enumerate(values):
         if len(remaining_bits) >= val + 1 and int(remaining_bits[:val+1]) < self.spaces[i]:
            bit_value = int(remaining_bits[:val+1], 2)
            bit_values.append(bit_value)
            remaining_bits = remaining_bits[val+1:]
         elif len(remaining_bits) >= val and val > 0:
            bit_value = int(remaining_bits[:val], 2)
            bit_values.append(bit_value)
            remaining_bits = remaining_bits[val:]
         else:
            bit_values.append(0)
      return bit_values, remaining_bits
   def learn(self,text:str)->None:
      normalize(text,learn=True,verbose=self.verbose)

In [7]:
t= Typo('Hi How 🤷 are you 👧')
t.spaces

[8]

In [8]:
def testTypoInstance(t,verbose=False,testName='test'):
    # patch
   ORIGINAL_STDOUT = sys.stdout
   sys.stdout = FileWriter(testName+'.txt')

   if isinstance (t,str):
     t = Typo(t,verbose=verbose)
   spaces = t.spaces

   print(f"t.spaces = {spaces}")
   print(f"t.bits = {t.bits}")
   print(f"max={max(spaces)}")
   print(f"len={len(spaces)}")
   
   g = (list(map(lambda x: i % x ,spaces)) for i in range(max(spaces)))
   for v in g:
     print(f'{v}')
     encoded = t.encode(v)
     print(f"after encoding {v} {encoded}")
     org, x = Typo.decode(encoded,test_self=t)
     print(f'original text candidate "{org}"')
     if not x == v:
       print(f'\nt.decode(t.encode(v)):{x}')
       print(f't.text:{t.text}')
     if org == t.text and x == v:
       print(('>'*100)+" passed!")
     else:
       print(f'org == t.text and x == v\n{org == t.text} and {x == v}\n{org} == {t.text} and {x} == {v}')
       assert org == t.text and x == v
   # restore
   sys.stdout = ORIGINAL_STDOUT
   return t

In [18]:
bugtext = 'Hi ✌️, How 🚴😡👆🏽🚍 are yiou 🔥?'
bugtext[26:]

'🔥?'

In [19]:
normalize(bugtext,verbose=True)

text=Hi ✌️, How 🚴😡👆🏽🚍 are yiou 🔥?
chunks=[(0, 28)]
offsets=[26]
text_sss.words=[(0, 2), (7, 10), (17, 20), (21, 25)]
text_sss.get_words()=['Hi', 'How', 'are', 'yiou']
affected_words=['yiou']
iter=0, o=26
Added (26, yiou, ['you']) to chunk_offsets
Added [(26, 'yiou', ['you'])] to offsets_chunks
chunks_offsets=[[(26, 'yiou', ['you'])]]
typo=yiou
suggestion=you
votes=['you']


'Hi ✌️, How 🚴😡👆🏽🚍 are you 🔥?'

In [9]:
testTexts = [
   '''Hey, How are you? Did you see the last John Cena movie?'''
,'''Hi, How are you?'''
,'However, you may as well just use a function statement instead; the only advantage that a lambda offers is that you can put a function definition for a simple expression inside a larger expression.'
, '''However, you may as well just use a function statement instead; the only advantage that a lambda offers is that you can put a function definition for a simple expression inside a larger expression. But the above lambda is not part of a larger expression, it is only ever part of an assignment statement, binding it to a name. That's exactly what a statement would achieve.'''
, '''I’ve toyed with the idea of using GPT-3’s API to add much more intelligent capabilities to RC, but I can’t deny that I’m drawn to the idea of running this kind of language model locally and in an environment I control. I’d like to someday increase the speed of RC’s speech synthesis and add a speech-to-text translation model in order to achieve real-time communication between humans and the chatbot. I anticipate that with this handful of improvements, RC will be considered a fully-fledged member of our server. Often, we feel that it already is.'''
]
# tests take ~ 2 hours all passed!
# TestTypos = [testTypoInstance(text,verbose=True,testName=f'test{i}') for i, text in enumerate(testTexts)]
# LAST_TESTED_TYPO = testTypoInstance('Hi How 🤷 are you 👧',verbose=True,testName=f'after emojier') 

In [10]:
# x = '''I’ve toyed with the idea of using GPT-3’s API to add much more intelligent capabilities to RC, but I can’t deny that I’m drawn to the idea of running this kind of language model locally and in an environment I control. I’d like to someday increase the speed of RC’s speech synthesis and add a speech-to-text translation model in order to achieve real-time communication between humans and the chatbot. I anticipate that with this handful of improvements, RC will be considered a fully-fledged member of our server. Often, we feel that it already is.'''
# testTexts[0] == normalize('''I’ve toyed with the idea of using GPT-3’s API to add much more intelligent capabilities to RC, but I casn’t deny that I’m drawn to the idea of running this kind of language model locally and in an environment I control. I’d like to someday increase the speed of RC’s speech synthesis and add a speech-to-text translation model in order to achieve real-time communication between humans and the chatbot. I anticipate that with this handful of improvements, RC will be considered a fully-fledged member of our server. Often, we feel that it already is.''',verbose=True)