# Grammar blah blah

In [32]:
import numpy as np
import random
import re
import os
import warnings
from typing import List, Tuple, Dict
from gensim.models import Word2Vec

In [33]:
# Disable annoying warnings from gensim
warnings.filterwarnings("ignore")

## Define nontrivial polish language grammar

In [4]:
NONTERMINALS = {
    'S': [
        ('VERB_PHRASE',),
    ],
    'VERB_PHRASE': [
        ('VERB_IMPS',),
        ('ADV_PHRASE', 'VERB_IMPS'),
        ('VERB_IMPS', 'ACC_PHRASE_SG_M1'),
        ('VERB_IMPS', 'ACC_PHRASE_PL_F'),
        ('VERB_IMPS', 'ACC_PHRASE_PL_N1'),
        ('ADV_PHRASE', 'VERB_IMPS', 'ACC_PHRASE_SG_M1'),
        ('ADV_PHRASE', 'VERB_IMPS', 'ACC_PHRASE_PL_F'),
        ('ADV_PHRASE', 'VERB_IMPS', 'ACC_PHRASE_PL_N2'),
        
    ],
    'ADV_PHRASE': [
        ('ADV',),
        ('ADV', 'ADV',),
    ],
    'ACC_PHRASE_SG_M1': [
        ('SUBST_SG_ACC_M1',),
        ('ADJ_PHRASE_SG_ACC_M1', 'SUBST_SG_ACC_M1',),
    ],
    'ACC_PHRASE_PL_F': [
        ('SUBST_PL_ACC_F',),
        ('ADJ_PHRASE_PL_ACC_F_N2', 'SUBST_PL_ACC_F'),
    ],
    'ACC_PHRASE_PL_N2': [
        ('SUBST_PL_ACC_N2',),
        ('ADJ_PHRASE_PL_ACC_F_N2', 'SUBST_PL_ACC_N2'),
    ],
    'ADJ_PHRASE_SG_ACC_M1': [
        ('ADJ_SG_ACC_M1',),
        ('ADJ_SG_ACC_M1', 'ADJ_SG_ACC_M1'),
    ],
    'ADJ_PHRASE_PL_ACC_F_N2': [
        ('ADJ_PL_ACC_F_N2',),
        ('ADJ_PL_ACC_F_N2', 'ADJ_PL_ACC_F_N2'),
    ],
    
    # Productions with terminals
    'VERB_IMPS': [
        ('verb:imps',),
    ],
    'SUBST_SG_ACC_M1': [
        ('subst:sg:acc:m1',),
    ],
    'SUBST_PL_ACC_F': [
        ('subst:pl:acc:f',),
    ],
    'SUBST_PL_ACC_N2': [
        ('subst:pl:acc:n2',),
    ],
    'ADJ_SG_ACC_M1': [
        ('adj:sg:acc:m1',),
    ],
    'ADJ_PL_ACC_F_N2': [
        ('adj:pl:acc:m2.m3.f.n1.n2.p2.p3',),
    ],
    'ADV': [
        ('adv:',),
    ]
}  

In [5]:
TERMINALS = (
    'verb:imps',
    'subst:sg:acc:m1',
    'subst:pl:acc:f',
    'subst:pl:acc:n2',
    'adj:sg:acc:m1',
    'adj:pl:acc:m2.m3.f.n1.n2.p2.p3',
    'adv:',
)

In [6]:
class NoNonterminal(Exception):
    pass

In [65]:
class TooLongSent(Exception):
    pass

In [7]:
class Symbol:
    def __init__(self, symbol: str):
        self.symbol = symbol

In [8]:
class Terminal(Symbol):
    pass

In [9]:
class Nonterminal(Symbol):
    def __init__(self, symbol: str, productions: List[Symbol]):
        super().__init__(symbol)
        self.productions = productions
        
    def production(self) -> Tuple[Symbol]:

        def create_new_symbol(symbol) -> Symbol:
           if symbol in NONTERMINALS:
               return Nonterminal(symbol, NONTERMINALS[symbol])
           else:
               return Terminal(symbol)
        
        # Draw the production
        rand_prod_ind = np.random.choice(len(self.productions))
        rand_prod = self.productions[rand_prod_ind]
            
        return list(map(create_new_symbol, rand_prod))

In [10]:
class Generator:
    
    def expand_terminal(self, symbols: List[Symbol]) -> List:        

        # Extract nonterminals
        nonterminals = [symbol for symbol in symbols
                        if isinstance(symbol, Nonterminal)]
            
        if not nonterminals:
            raise NoNonterminal
            
        # Expand random nonterminal
        expand_ind = np.random.choice(len(nonterminals))
        nonterminal = nonterminals[expand_ind]
        new_symbols = nonterminal.production()
            
        # Swap nonterminal with new symbols
        nonterminals_processed = 0
        for ind in range(len(symbols)):
            if isinstance(symbols[ind], Nonterminal):
                nonterminals_processed += 1
                    
                if nonterminals_processed-1 == expand_ind:
                        
                    # Delete the old nonterminal
                    symbols.pop(ind)
                                
                    # Insert new ones
                    symbols = symbols[:ind] + new_symbols + symbols[ind:]

        return symbols
    
    def symbols_to_strings(self, symbols: List[Symbol]) -> List:
        return [symbol.symbol for symbol in symbols]
        
    def gen_terminals(self, start_symbol: Symbol) -> List:
        symbols = [start_symbol]
        
        # Expand until there is any nonterminal in the symbols
        while True:
            try:
                symbols = self.expand_terminal(symbols)
            except NoNonterminal:
                return self.symbols_to_strings(symbols)
            

## Generate some sentence schemas and group them by the number of tokens

In [11]:
gen = Generator()

In [12]:
def create_schemas(n_iter: int = 1000, schemas: Dict = {}) -> Dict:
    for i in range(n_iter):
        
        start_symbol = Nonterminal('S', NONTERMINALS['S'])
        
        schema = tuple(gen.gen_terminals(start_symbol))
        schema_len = len(schema)
        
        # Update schemas
        if schema_len in schemas:
            schemas[schema_len].add(schema)
        else:
            schemas[schema_len] = {schema}
            
    # Map sets to tuples to enable drawing
    schemas = {key: tuple(val) for key, val in schemas.items()}
            
    return schemas

In [13]:
schemas = create_schemas()

# Show some schemas
dict(list(schemas.items())[:2])

{3: (('adv:', 'verb:imps', 'subst:pl:acc:f'),
  ('verb:imps', 'adj:sg:acc:m1', 'subst:sg:acc:m1'),
  ('adv:', 'verb:imps', 'subst:sg:acc:m1'),
  ('verb:imps', 'adj:pl:acc:m2.m3.f.n1.n2.p2.p3', 'subst:pl:acc:f'),
  ('adv:', 'adv:', 'verb:imps'),
  ('adv:', 'verb:imps', 'subst:pl:acc:n2')),
 4: (('adv:', 'adv:', 'verb:imps', 'subst:pl:acc:f'),
  ('verb:imps',
   'adj:pl:acc:m2.m3.f.n1.n2.p2.p3',
   'adj:pl:acc:m2.m3.f.n1.n2.p2.p3',
   'subst:pl:acc:f'),
  ('adv:', 'verb:imps', 'adj:pl:acc:m2.m3.f.n1.n2.p2.p3', 'subst:pl:acc:f'),
  ('adv:', 'verb:imps', 'adj:sg:acc:m1', 'subst:sg:acc:m1'),
  ('adv:', 'adv:', 'verb:imps', 'subst:sg:acc:m1'),
  ('verb:imps', 'adj:sg:acc:m1', 'adj:sg:acc:m1', 'subst:sg:acc:m1'),
  ('adv:', 'verb:imps', 'adj:pl:acc:m2.m3.f.n1.n2.p2.p3', 'subst:pl:acc:n2'),
  ('adv:', 'adv:', 'verb:imps', 'subst:pl:acc:n2'))}

## Extract the grammar categories found in the grammar schemas

In [14]:
class PolimorfGen:
    POLIMORF_PATH = './data/polimorfologik-2.1.txt'
    
    def __init__(self):
        self.grammar_cats = dict((terminal, [])
                                  for terminal in TERMINALS)
    
    def __iter__(self):
        with open(self.POLIMORF_PATH) as f:
            yield from f
            
    def find_terminal_occ(self, line: str):
        """
        Search for each pattern (terminal)
        in the line of the polimorfologik file
        """
        
        base, token, grammar_cats = line.split(';')
        
        for terminal in self.grammar_cats:
            pattern = re.compile(terminal)
        
            if pattern.search(grammar_cats):
                self.grammar_cats[terminal].append((base, token))


In [15]:
polimorf = PolimorfGen()

# Extract the categories
for line in polimorf:
    polimorf.find_terminal_occ(line)

## Generate some sentences of length n without using the embeddings 

In [72]:
class SentGen:
    def rand_schema(self, n: int) -> Tuple:

        # Draw the sentence schema
        try:
            schemas_n_len = schemas[n]
            return random.choice(schemas_n_len)
        except KeyError:
            raise TooLongSent

    def core_sent_gen(self, n: int) -> Tuple:
        try:
            schema = self.rand_schema(n)
            
            # Draw the tokens
            tokens_with_bases = [random.choice(polimorf.grammar_cats[category])
                                 for category in schema]

            bases, tokens = list(zip(*tokens_with_bases))

            return ' '.join(bases), ' '.join(tokens)
        except TooLongSent:
            print('Sentence too long')
    

In [73]:
sent_gen = SentGen()

In [74]:
sent_gen.core_sent_gen(3)

('wydłużyć niesłupkowy Arecki', 'wydłużono niesłupkowego Areckiego')

In [75]:
sent_gen.core_sent_gen(5)

('muszyńsko zużytkować pampasowy nieidealistyczny Tyrawa',
 'muszyńsko zużytkowano pampasowego nieidealistycznego Tyrawę')

In [76]:
sent_gen.core_sent_gen(6)

('niekonstancińsko maszopsko grudzić niejasnopłowy niekabulski mantyctwo',
 'niekonstancińsko maszopsko grudzono niejasnopłowe niekabulskie mantyctwa')

In [77]:
sent_gen.core_sent_gen(7)

Sentence too long


## Prepare the Word2Vec struct

In [21]:
class CorpusGen:
    CORPUS_PATH = './data/task3_train_segmented.txt'
    
    def __init__(self, n_sent):
        self.n_sent = n_sent
    
    def __iter__(self):
        with open(self.CORPUS_PATH) as f:
            for line, _ in zip(f, range(self.n_sent)):
                yield line.split()

In [25]:
if not os.path.isfile('./data/word2vec.model'):
    # Perform the embeddings only during the first session 
    
    sentences = CorpusGen(10_000_000)
    model = Word2Vec(sentences, min_count=1)
    model.save('./data/word2vec.model')
else:
    # The model exists
    
    # Gensim fails in case of loading the model for the second time
    try:
        model
    except NameError:
        model = Word2Vec.load('./data/word2vec.model')

In [28]:
len(model.wv.vocab)

2640650

## Generate thematical sentences from the grammar

In [29]:
TOPICS = (
    ('malina', 'koszyk', 'zazdrość', 'morderstwo'),
    ('programowanie', 'błąd', 'zmienna', 'deklaracja'),
    ('lotniskowiec', 'łódź', 'podwodny',
     'tonąć', 'atak', 'torpeda', 'ocean'),
)

In [158]:
class TopicSentGen(SentGen):
    def __init__(self, model):
        self.model = model
        
    def choose_best_token(self, tokens: List, topic: Tuple) -> str:
        
        # Draw the topic token
        topic_token = random.choice(topic)
        
        # Filter the tokens being in both model and polimorf
        common_tokens = [token for (base, token) in tokens
                         if token in model.wv.vocab]
        
        similarity, best_token = max([(model.wv.similarity(topic_token, token),
                                       token) for token in common_tokens])
        return best_token
        
    def topic_sent_gen(self, n: int, topic: Tuple, n_to_choose: int = None) -> str:
        try:
            schema = self.rand_schema(n)
            
            categories = [polimorf.grammar_cats[category]
                          for category in schema]
            
            topic_sent = [self.choose_best_token(category, topic)
                          for category in categories]
            
            return topic_sent
            
        except TooLongSent:
            print('Sentence too long')

In [159]:
topic_sent_gen = TopicSentGen(model)

In [165]:
topic_sent_gen.topic_sent_gen(5, TOPICS[2])

['odprzodowo', 'oblatano', 'alianckie', 'żaglowe', 'chiny']