In [1]:
import regex as re
from functional import seq
from functional.pipeline import Sequence
from fn import _
from collections import namedtuple
from regex.regex import Match, Pattern
from typing import List, Dict, Tuple, Optional
from termcolor import colored
import os

In [2]:
act_dir = os.environ["HOME"]+"/tmp/nlp/ustawy"
act_names = seq(os.listdir(act_dir))

acts = act_names.map(lambda fn : open("{}/{}".format(act_dir,fn)).read())

In [3]:
def match_with_index(pattern ,text:str)-> List[Tuple[int,int,str]]:
    return [(m.start(), m.end(),m.captures()[0]) for m in re.finditer(pattern, text)]


def print_highlighted(
    text:str,
    matches_groups:List[List[Tuple[int,int]]],
    colors:List[Tuple[str,Optional[str]]]
):
    if len(matches_groups) != len(colors):
        raise "There should be the same number of matches groups and colors"
    
    MatchedEntry=namedtuple('MatchedEntry','beg end color on_color')
    
    # Seq[MatchedEntry]
    mg = seq(matches_groups)\
        .zip(colors)\
        .flat_map(lambda gc: seq(gc[0])\
            .map(lambda x : MatchedEntry(beg=x[0],end=x[1],color=gc[1][0], on_color = gc[1][1]))
                 )\
        .order_by(_.beg)
        
    # Will duplicate some matches but that's not the point
    beg = 0
    for matched in mg:
        print(text[beg:matched.beg],end="") 
        print(colored(text[matched.beg:matched.end],color= matched.color, on_color= matched.on_color),end="")
        beg = matched.end
    print(text[beg:])

In [4]:
dzu_pat = re.compile("Dz\.?\s?U.?")
year_pat = re.compile("(?P<year>((19\d)|(200)|(201))\d{1})")
nr_pat = re.compile("Nr\s+(?P<nr>\d+)")
pos_pat = re.compile("poz\.\s+(?P<poz>\d+((-|,)\d+)*)")

#Entry extractors
footer_ext_pat = re.compile(r"\[\d+\]\)?\s+Zmiany(\w|\s|\n)*?w\s+Dz\.*\s*U\.?(?P<foot_entry>(.|\s|\n)*?)(?=(\[|\Z))")
art_ext_pat=re.compile("r(U|u)staw\w*\s+(z\s+dnia\s+)+\d+\s+\w+\s+(?P<year>\d{4})(\s|\n)*r\.?\s*(–|\-)*\s*(?P<title>(\w|\s|\")*)\((?P<art_entry>(.|\n)*?)\)")

# 1. External references

In [5]:
#Show marking of interesting things
print_highlighted(
    acts.head(),
    [
        match_with_index(dzu_pat,acts.head()),
        match_with_index(year_pat,acts.head()),
        match_with_index(nr_pat,acts.head()),
        match_with_index(pos_pat,acts.head()),
    ],
    [
        ("blue","on_grey"),
        ("green","on_grey"),
        ("red","on_grey"),
        ("yellow","on_grey"),
    
    ]
)





[40m[34mDz.U.[0m z [40m[32m2000[0m r. [40m[31mNr 50[0m, [40m[33mpoz. 581[0m
                                                                              
                                                                              
                                                                              
                                                                              
                                    USTAWA
                            z dnia 26 maja [40m[32m2000[0m r.
                                       
  o zmianie ustawy o niektórych formach popierania budownictwa mieszkaniowego
          oraz o zmianie ustawy o pracowniczych ogrodach działkowych
                                       
                                       
                                    Art. 1.
W ustawie z dnia 26 października [40m[32m1995[0m r. o niektórych formach popierania
budownictwa mieszkaniowego ([40m[34mDz.U.[0m [40m[31mNr 133[0m, [40m[33mpoz. 6

In [6]:
ExtRef= namedtuple("ExtRef", "year nr poz title")

def extract_group_name(match:Match) -> str:
    return list(match.groupdict().keys())[0]

#Assumption: entry without year  takes year and title from outside
def art_entry_extractor(title:str,default_year:str,entry:str)-> List[ExtRef]:
    matches = seq([year_pat,nr_pat,pos_pat])\
    .flat_map(lambda pat: list(pat.finditer(entry)))\
    .order_by(lambda x: x.start())
    
    year_found = False
    year = default_year
    nr = None
    res = []    
    for match in matches:
        name = extract_group_name(match)
        if name == "year":
            year_found = True
            year = match.groupdict()[name]
            if type(year) != str:
                raise "Oh no! Year is supposed to be str."
        elif name =="nr":
            nr = match.groupdict()[name]
            if type(nr) != str:
                raise "Oh no! Nr is supposed to be str."
        elif name =="poz":
            poz = match.groupdict()[name]
            if type(poz) != str:
                raise "Oh no! Poz is supposed to be str."
            res.append(ExtRef(year=year,nr=nr,poz=poz, title=None if year_found else title))
        else:
            raise "Disaster here; Match name not found ion art_entry_extractor"
    return res


def art_match_extractor(match:Match)-> List[ExtRef]:
    d = match.groupdict()
    title = re.sub("[\s\n]+"," ",d["title"]).strip()
    return art_entry_extractor(title,d["year"],d["art_entry"])

    
def foot_match_extractor(match:Match)-> List[ExtRef]:
    return art_entry_extractor(None,None, match.groupdict()["foot_entry"])

In [7]:
def ext_ref_extractor(text:str) -> Sequence: #Sequence[ExtRef]:
    art_matches = seq(list(art_ext_pat.finditer(text,timeout=5))).flat_map(art_match_extractor)
    footer_matches = seq(list(footer_ext_pat.finditer(text,timeout=5))).flat_map(foot_match_extractor)
    return  art_matches + footer_matches



CountedExtRef= namedtuple("CountedExtRef", "count year nr poz title")
def ext_ref_counter(refs:Sequence)-> CountedExtRef:#Sequence[ExtRef] -> Sequence[CountedExternalRef]
    def counted_from_ext_ref(count:int,ref:ExtRef) -> CountedExtRef:
        return CountedExtRef(count=count,year=ref.year,nr=ref.nr,poz=ref.poz,title=ref.title)
    
    def count_and_select_best(t:Tuple[str,Sequence])-> CountedExtRef:
        if len(t[1]) == 0:
            raise "Aggregated tuple shouldn't be empty"
        with_title = seq(t[1]).find(lambda ref:ref.title != None)
        return counted_from_ext_ref(len(t[1]), t[1][0] if with_title is None else with_title)
        
    def not_empty(ref):
        return ref.year != None and ref.poz != None
    return refs\
        .filter(lambda ref : ref.year != None and ref.poz != None)\
        .group_by(lambda ref : ref.year+ "-"+ref.poz).map(count_and_select_best)

def ext_ref_counter_aggregate(refs:Sequence)-> CountedExtRef:#Sequence[CountedExtRef] -> Sequence[CountedExternalRef]
    
    def count_and_select_best(t:Tuple[str,Sequence])-> CountedExtRef:
        if len(t[1]) == 0:
            raise "Aggregated tuple shouldn't be empty"
            
        t1seq = seq(t[1])
        count_sum = t1seq.map(lambda ref:ref.count).sum()
        with_title = t1seq.find(lambda ref:ref.title != None)
        res = t1seq.head() if with_title is None else with_title
        return CountedExtRef(count = count_sum,year = res.year, nr = res.nr, poz= res.poz,title= res.title)
        
    return refs\
        .group_by(lambda ref : ref.year+ "-"+ref.poz).map(count_and_select_best)

def display_seq(sequence:Sequence,rows:int)-> None:
    sequence._repr_html_= lambda :sequence.tabulate(rows,tablefmt='html')
    display(sequence)
    sequence._repr_html_= lambda :sequence.tabulate(10,tablefmt='html')

def ext_refs(act:str) -> Sequence: #Sequence[CountedExtRef]
    return ext_ref_counter(ext_ref_extractor(act))\
    .order_by(lambda x : x.poz)\
    .order_by(lambda x : int(x.year))\
    .order_by(lambda x: 1/x.count)
    
#Possible improvement - extract title from existing articles, not just from text before referencing
def global_refs(acts:Sequence)-> Sequence : #Sequence[str] -> Sequence[CountedExtRef]
    return ext_ref_counter_aggregate(acts.map(ext_ref_extractor).flat_map(ext_ref_counter))\
        .order_by(lambda x : x.poz)\
        .order_by(lambda x : int(x.year))\
        .order_by(lambda x: 1/x.count)
        
        
display_seq(
    global_refs(acts),
    30
)



count,year,nr,poz,title
157,2006,104.0,708,
141,2002,74.0,676,
90,1998,106.0,668,
90,2002,153.0,1271,
83,2002,25.0,253,
78,2004,96.0,959,
76,2002,200.0,1679,
63,2005,183.0,1538,
63,2006,157.0,1119,
62,2002,113.0,984,


# 2. Internal references

In [8]:
single_regex = r"(?P<single_ust>\d+\p{L}{0,2}\b)"
i_regex = r"(?P<i_ust_1>\d+\p{L}{0,3})(\s+(i|oraz)\s+)(?P<i_ust_2>\d+\p{L}{0,3})"
dash_regex= r"(?P<dash_ust_1>\d+\p{L}{0,3})(\s*(\-|do|–)\s*(?P<dash_ust_2>\d+\p{L}{0,3}))"
#TODO - match sklejone : art1 5)
ustaw_regex = "ust\.*[\s\n]("+ i_regex +"|"+ dash_regex+ "|"+ single_regex+")"
art_ust_regex = r"art\.*[\s\n]*(?P<art>\d+\p{L}{0,3})[\s\n]+(w[\s\n]+)?"+ustaw_regex
art_ust_ust_regex = r"({}|{})".format(art_ust_regex,ustaw_regex)
art_ust_pat= re.compile(art_ust_ust_regex,re.I)

In [9]:
#Show marking of interesting things\
act = open("{}/{}".format(act_dir,"2004_1001.txt")).read()
art_ust_matches = match_with_index(art_ust_pat,act)

print_highlighted(
    act,
    [
        art_ust_matches,
    ],
    [
        ("blue","on_grey"),
    ]
)




Tekst ustawy przyjęty przez Senat bez poprawek

 
 
 
USTAWA
z dnia 26 września 2014 r.
 
o zmianie ustawy o podatku dochodowym od osób
fizycznych 
oraz niektórych innych ustaw[1])
 
 
Art. 1. 
W ustawie z dnia 26 lipca 1991 r. o podatku
dochodowym od osób fizycznych (Dz. U. z 2012 r. poz. 361, z późn. zm.[2])) wprowadza się następujące zmiany:
1)   w [40m[34mart. 35 ust. 10[0m otrzymuje brzmienie:
„10. Płatnicy
stypendiów, o których mowa w [40m[34mart. 21 ust. 1[0m pkt 40b, są obowiązani w terminie
do końca lutego roku następującego po roku podatkowym, z zastrzeżeniem [40m[34mart.
45ba ust. 4[0m, sporządzić informację o wysokości wypłaconego stypendium, według
ustalonego wzoru, i przesłać ją podatnikowi oraz urzędowi skarbowemu, którym
kieruje naczelnik urzędu skarbowego właściwy według miejsca zamieszkania
podatnika, z zastrzeżeniem art. 37.”;
2)   w [40m[34mart. 37 ust. 3[0m otrzymuje brzmienie:
„3. Roczne obliczenie
podatku, o którym mowa w [40m[34must. 1[0m, płat

In [10]:
InterRef = namedtuple("InterRef", "art ust")
def art_ust_extractor(match:Match) -> List[InterRef]:
    gd = match.groupdict()
    #small sanity checks
    def extract_usts():
        not_nones = seq(gd.items()).filter(lambda x: x[1] is not None)
        keys = not_nones.map(lambda x: x[0])

        if keys.count(lambda x:x.startswith("dash")) != 2\
        and keys.count(lambda x:x.startswith("i_")) != 2\
        and keys.count(lambda x:x == "single_ust") != 1:
            raise Exception("Hey, match dict not ok :{} {}".format(gd,keys))
        
        if gd["dash_ust_1"] is not None:
            du1 = gd["dash_ust_1"]
            du2 = gd["dash_ust_2"]
            if du1.isdigit() and du2.isdigit():
                du1= int(du1); du2= int(du2)
                #If we have ust 1-3 we just include 1,2,3, but not possibly 1, 2, 2a, 2b, 3
                return seq(range(min(du1,du2),max(du1,du2)+1)).map(str).to_list()
            else:
                return [du1,du2]
        elif gd["i_ust_1"] is not None:
            return [gd["i_ust_1"],gd["i_ust_2"]]
        elif gd["single_ust"] is not None:
            return [gd["single_ust"]]
        else:
            raise "No ust was matched"
    art = gd["art"]
    return seq(extract_usts()).map(lambda ust : InterRef(art = art, ust = ust)).to_list()
  
CountedInterRef = namedtuple("CountedInterRef", "count art ust")
def inter_ref_counter(ref:Sequence)-> Sequence: # Seq[InterRef, CountedInterRef]
    return ref.group_by(lambda x: x.ust if x.art is None else x.art + x.ust)\
    .map(lambda refs:CountedInterRef(count = len(refs[1]),art = refs[1][0].art, ust = refs[1][0].ust))
    
def internal_refs(act:str)-> Sequence: #Sequence [CountedInterRef] 
    inter_ref_counter(
        seq(list(art_ust_pat.finditer(act))).flat_map(art_ust_extractor)
    ).order_by(lambda x: 1/x.count)

In [11]:
def internal_refs(as.drop(30).head())

SyntaxError: invalid syntax (<ipython-input-11-a70a980cf6c0>, line 1)

# 3 Ustawa count

In [None]:
ustawa_pat = re.compile(r"\b(ustawa|ustawy|ustawie|ustawę|ustawą|ustawo|ustaw|ustawom|ustawami|ustawach)\b",flags= re.I)

In [None]:
acts.map(lambda act: len(list(ustawa_pat.finditer(act)))).sum() # add \b on beg and end?  25092 - pohl

In [None]:
list(re.finditer(re.compile("\p{L}\w"), "12 12 12"))