In [1]:
import re
import os
import time
import json
import PyPDF2
import pypandoc
import pandas as pd
from tqdm import tqdm
import lxml.etree as ET


from langchain_openai import AzureChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain.text_splitter import RecursiveCharacterTextSplitter

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options

## Utility functions

In [10]:
isLinux = True
default_linux_path = os.getcwd().replace("/Data", "/Documents/Downloaded") if "/Data" in os.getcwd() else os.getcwd() + "/Documents/Downloaded"
default_windows_path = os.getcwd().replace("\\Data", "\\Documents\\Downloaded") if "\\Data" in os.getcwd() else os.getcwd() + "\\Documents\\Downloaded"
default_path = default_linux_path if isLinux else default_windows_path

DEFAULT_SAVE_DIR = default_path.replace("/Downloaded", "/Generated") if isLinux else default_path.replace("\\Downloaded", "\\Generated")
CHROME_DRIVERS_PATH = "/home/antonelli/chromedriver-linux64/chromedriver" if isLinux else "C:\\Users\\giaco\\Downloads\\chromedriver-win64\\chromedriver.exe"

CODICE_PENALE_PDF = default_path + ("/Codice penale well formatted edited.pdf" if isLinux else "\\Codice penale well formatted edited.pdf")
CODICE_PENALE_CSV = DEFAULT_SAVE_DIR + ("/Codice penale well formatted edited.csv" if isLinux else "\\Codice penale well formatted edited.csv")

CPP_CSV = DEFAULT_SAVE_DIR + ("/Codice procedura penale.csv" if isLinux else "\\Codice procedura penale.csv")

REF_MERG = DEFAULT_SAVE_DIR + ('/references_merged.csv' if isLinux else '\\references_merged.csv')
INV_LAWS_JSON = DEFAULT_SAVE_DIR + ('/invalid_laws.json' if isLinux else '\\invalid_laws.json')
LAWS_CSV = DEFAULT_SAVE_DIR + ('/laws.csv' if isLinux else '\\laws.csv')


# Utility functions and constants
def write_to_file(filename, content):
    with open(filename, 'w') as f:
        f.write(content)

def read_from_file(filename):
    with open(filename, 'r') as f:
        return f.read()

# Different kind of text extraction from each type of file
def extract_text_from_pdf(pdf_path):
    text = ""
    with open(pdf_path, 'rb') as file:
        reader = PyPDF2.PdfReader(file)
        for page_num in range(len(reader.pages)):
            page = reader.pages[page_num]
            text += page.extract_text()
    return text    

def extract_text_from_xml(xml_path):
    tree = ET.parse(xml_path)
    root = tree.getroot()
    return ET.tostring(root, encoding='unicode', method='text')

def extract_text_from_rtf(rtf_path):
    return pypandoc.convert_file(rtf_path, 'plain', format='rtf')


def split_text(text, pattern):
    parts = re.split(pattern, text, flags=re.MULTILINE)
    
    parts = [part for part in parts if part]
    
    if not re.match(pattern, parts[0]):
        parts = parts[1:]
    
    return parts

def split_text(text, max_chunk_size=7000, chunk_overlap=100):
    text_splitter = RecursiveCharacterTextSplitter(separators=[
        "\n\n",
        "\n",
        ".",
    ],
    chunk_size=max_chunk_size,
    chunk_overlap=chunk_overlap)
    
    return text_splitter.split_text(text)

def clearCommaContent(content):
    if "<" not in content:
        return content
    
    # Check for comma class tags
    match =  re.search(r'<span class="art_text_in_comma">(.*?)</span>', comma_content, re.DOTALL)
    comma_content = match.group(1) if match else comma_content
    comma_content = re.sub(r"<div class=\"ins-akn\" eid=\"ins_\d+\">\(\(", "", comma_content, flags=re.DOTALL)
    comma_content = re.sub(r"\)\)</div>", "", comma_content, flags=re.DOTALL)
    comma_content = re.sub(r"\n", "", comma_content, flags=re.DOTALL)
    comma_content = re.sub(r"<br>", "", comma_content, flags=re.DOTALL)
    
    # Check for a tags
    aPattern = re.compile(r'<a.*?>(.*?)</a>', re.DOTALL)
    matches = aPattern.findall(content)
    if matches:
        for match in matches:
            content = re.sub(r'<a.*?>.*?</a>', match, content, count=1)

    # Check for span tags
    sPattern = re.compile(r'<span.*?>(.*?)</span>', re.DOTALL)
    matches = sPattern.findall(content)
    if matches:
        for match in matches:
            content = re.sub(r'<span.*?>.*?</span>', match, content, count=1)

    # Check for list div tags
    dlPattern = re.compile(r'<div class="pointedList-rest-akn">(.*?)</div>', re.DOTALL)
    matches = dlPattern.findall(content)
    if matches:
        for match in matches:
            content = re.sub(r'<div class="pointedList-rest-akn">.*?</div>', re.escape(match), content, count=1)
    
    # Delete remaining tags
    content = re.sub(r'<.+?>', "", content)
    
    return content

/home/antonelli/Thesis/Documents/Downloaded


## Extraction of Codice Penale from website of Procura Generale Trento (**OUTDATED**)

In [13]:
def find_articles(text):
    # Remove all sections starting with "LIBRO"
    text = re.sub(r'LIBRO.*?(?=Articolo n\.|$)', '', text, flags=re.DOTALL)

    pattern = r'(Articolo n\..*?)(\n.*?)(?=\nArticolo n\.|$)'
    matches = re.findall(pattern, text, re.DOTALL)
    return matches

text = extract_text_from_pdf(CODICE_PENALE_PDF)

matches = find_articles(text)

data = []
for article in matches:
    law_number = article[0].split("Articolo n.")[1] # Get everything after "Articolo n."
    law_text = article[1].strip()
    if '.' in law_text:
        law_title, law_text = map(str.strip, law_text.split('.', 1))
    else:
        law_title = ''
    data.append({'Source': "c.p.p.",'Law number': law_number, 'Law title': law_title, 'Law text': law_text})

df_cp = pd.DataFrame(data)
df_cp.to_csv(CODICE_PENALE_CSV, index=False)

df_cp.head()

Unnamed: 0,Source,Law number,Law title,Law text
0,c.p.p.,1,Reati e pene: disposizione espressa di legge,1. Nessuno può essere punito per un fatto che ...
1,c.p.p.,2,Successione di leggi penali,"1. Nessuno può essere punito per un fatto che,..."
2,c.p.p.,3,Obbligatorietà della legge penale,1. La legge penale italiana obbliga tutti colo...
3,c.p.p.,4,Cittadino italiano,Territorio dello Stato.\n1. Agli effetti della...
4,c.p.p.,5,Ignoranza della legge penale,1. Nessuno può invocare a propria scusa l'igno...


## Extraction of Codice Penale from Normattiva

In [15]:
class NormattivaCpScraper:
    def __init__(self, driver_path, headless=True):
        chrome_options = Options()
        if headless:
            chrome_options.add_argument("--headless")
        self.service = Service(driver_path)
        self.driver = webdriver.Chrome(service=self.service, options=chrome_options)
    
    # Get the originario version of the law
    def fill_field(self, field_id, value):
        input_field = self.driver.find_element(By.ID, field_id)
        input_field.clear()
        input_field.send_keys(value)
    
    # Get the text of a specific article
    def get_cpp_articles(self):
        articles_list = []
        
        # Ensure is multivigente version
        time.sleep(1)
        
        # Get articles
        albero = self.driver.find_element(By.ID, "albero")
        articles = albero.find_elements(By.CLASS_NAME, "numero_articolo")
        print("Article len: ", len(articles))
                
        for i, article in enumerate(articles[1:]):            
            print(i)
            if article.text.strip() != "" and article.text[0].isdigit():                
                try:
                    article.click()                    
                    time.sleep(1)
                    
                    article_title = self.driver.find_element(By.CLASS_NAME, "article-num-akn").text.strip()
                    commas = self.driver.find_elements(By.CLASS_NAME, "art-comma-div-akn")                    
                except:
                    continue
                
                print("Comma len: ", len(commas))
                if len(commas) == 0:
                    continue
                
                firstTime = True                
                for comma in commas:
                    if firstTime:
                        time.sleep(1)
                        firstTime = False
                    
                    # Check if the content contains any multivigenza change
                    try:
                        comma_content = comma.get_attribute('outerHTML')
                    except:
                        continue
                    #print(comma_content)
                    
                    # Skip if the content contains "<span class="art_text_in_comma">((</span>" or "<span class="art_text_in_comma">))</span>"
                    if "<span class=\"art_text_in_comma\">((</span>" in comma_content or "<span class=\"art_text_in_comma\">))</span>" in comma_content:
                        continue
                    
                    try:
                        comma_number = comma.find_element(By.CLASS_NAME, "comma-num-akn").text.strip()
                    except:
                        continue
                    comma_content_element = comma.text#find_element(By.CLASS_NAME, "art_text_in_comma")

                    # Clear the output
                    comma_content = clearCommaContent(comma_content_element)
                    
                    print(article_title, comma_number, comma_content)
                    
                    articles_list.append({ "Source": "c.p.",
                                            "Article": article_title,
                                            "Comma number": comma_number,
                                            "Comma content": comma_content.strip()}) # Numeration not working in case of -bis... extract
            else:
                print("Out ", article.text)

        return articles_list

    # Navigate to a specific page
    def navigate_to_page(self, url):
        self.driver.get(url)
        
    # Close the driver
    def close(self):
        self.driver.quit()

scraper = NormattivaCpScraper(CHROME_DRIVERS_PATH, headless=True)
scraper.navigate_to_page("https://www.normattiva.it/uri-res/N2Ls?urn:nir:stato:regio.decreto:1930-10-19;1398")
articles = scraper.get_cpp_articles()

df_cpp = pd.DataFrame(articles)
df_cpp.to_csv(CODICE_PENALE_CSV, index=False)

Article len:  2297
0
Comma len:  0
1
2
Out  art. 1
3
Out  art. 2
4
Out  
5
Out  
6
Out  art. 3
7
Out  art. 3 bis
8
Out  art. 4
9
Out  art. 5
10
Out  
11
Out  art. 6
12
Out  art. 7
13
Out  
14
Out  art. 8
15
Out  
16
Out  
17
Out  
18
Out  
19
Out  art. 9
20
Out  
21
Out  
22
Out  
23
Out  
24
Out  art. 10
25
Out  
26
Out  
27
Out  
28
Out  
29
Out  art. 11
30
Out  art. 12
31
Out  art. 13
32
Out  art. 14
33
Out  art. 15
34
Out  art. 16
35
Out  art. 17
36
Out  
37
Out  
38
Out  art. 18
39
Out  art. 19
40
Out  
41
Out  
42
Out  
43
Out  art. 20
44
Out  art. 20 bis
45
Out  art. 21
46
Out  
47
Out  art. 22
48
Out  
49
Out  
50
Out  art. 23
51
Out  art. 24
52
Out  
53
Out  
54
Out  
55
Out  
56
Out  
57
Out  art. 25
58
Out  art. 26
59
Out  
60
Out  
61
Out  
62
Out  
63
Out  
64
Out  art. 27
65
Out  art. 28
66
Out  
67
Out  
68
Out  art. 29
69
Out  art. 30
70
Out  art. 31
71
Out  art. 32
72
Out  
73
Out  
74
Out  art. 32 bis
75
Out  
76
Out  art. 32 ter
77
Out  
78
Out  art. 32 quater
79
Out

## Extraction of C.P.P. from Normattiva

In [12]:
class NormattivaCppScraper:
    def __init__(self, driver_path, headless=True):
        chrome_options = Options()
        if headless:
            chrome_options.add_argument("--headless")
        self.service = Service(driver_path)
        self.driver = webdriver.Chrome(service=self.service, options=chrome_options)
    
    # Get the originario version of the law
    def fill_field(self, field_id, value):
        input_field = self.driver.find_element(By.ID, field_id)
        input_field.clear()
        input_field.send_keys(value)
    
    # Get the text of a specific article
    def get_cpp_articles(self):
        articles_list = []
        
        # Ensure is multivigente version
        time.sleep(1)
        
        # Get articles
        albero = self.driver.find_element(By.ID, "albero")
        articles = albero.find_elements(By.CLASS_NAME, "numero_articolo")
        print("Article len: ", len(articles))
                
        for i, article in enumerate(articles[1:]):            
            print(i)
            if article.text.strip() != "" and article.text[0].isdigit():                
                try:
                    article.click()                    
                    time.sleep(1)
                    
                    article_title = self.driver.find_element(By.CLASS_NAME, "article-num-akn").text.strip()
                    commas = self.driver.find_elements(By.CLASS_NAME, "art-comma-div-akn")                    
                except:
                    continue
                
                print("Comma len: ", len(commas))
                if len(commas) == 0:
                    continue
                
                firstTime = True                
                for comma in commas:
                    if firstTime:
                        time.sleep(1)
                        firstTime = False
                    
                    # Check if the content contains any multivigenza change
                    try:
                        comma_content = comma.get_attribute('outerHTML')
                    except:
                        continue
                    #print(comma_content)
                    
                    # Skip if the content contains "<span class="art_text_in_comma">((</span>" or "<span class="art_text_in_comma">))</span>"
                    if "<span class=\"art_text_in_comma\">((</span>" in comma_content or "<span class=\"art_text_in_comma\">))</span>" in comma_content:
                        continue
                    
                    try:
                        comma_number = comma.find_element(By.CLASS_NAME, "comma-num-akn").text.strip()
                    except:
                        continue
                    comma_content_element = comma.text#find_element(By.CLASS_NAME, "art_text_in_comma")

                    # Clear the output
                    comma_content = clearCommaContent(comma_content_element)
                    
                    print(article_title, comma_number, comma_content)
                    
                    articles_list.append({ "Source": "c.p.p.",
                                            "Article": article_title,
                                            "Comma number": comma_number,
                                            "Comma content": comma_content.strip()}) # Numeration not working in case of -bis... extract
            else:
                print("Out ", article.text)

        return articles_list

    # Navigate to a specific page
    def navigate_to_page(self, url):
        self.driver.get(url)
        
    # Close the driver
    def close(self):
        self.driver.quit()

scraper = NormattivaCppScraper(CHROME_DRIVERS_PATH, headless=True)
scraper.navigate_to_page("https://www.normattiva.it/uri-res/N2Ls?urn:nir:stato:decreto.del.presidente.della.repubblica:1988-09-22;447")
articles = scraper.get_cpp_articles()

df_cpp = pd.DataFrame(articles)
df_cpp.to_csv(CPP_CSV, index=False)

Article len:  2289
0
Comma len:  1
Art. 1 1. 1. La giurisdizione penale è esercitata dai giudici previsti dalle leggi di ordinamento giudiziario secondo le norme di questo codice.
1
Comma len:  2
Art. 2 1. 1. Il giudice penale risolve ogni questione da cui dipende la decisione, salvo che sia diversamente stabilito.
Art. 2 2. 2. La decisione del giudice penale che risolve incidentalmente una questione civile, amministrativa o penale non ha efficacia vincolante in nessun altro processo.
2
Comma len:  4
Art. 3 1. 1. Quando la decisione dipende dalla risoluzione di una controversia sullo stato di famiglia o di cittadinanza, il giudice, se la questione è seria e se l'azione a norma delle leggi civili è già in corso, può sospendere il processo fino al passaggio in giudicato della sentenza che definisce la questione.
Art. 3 2. 2. La sospensione è disposta con ordinanza soggetta a ricorso per cassazione. La corte decide in camera di consiglio.
Art. 3 3. 3. La sospensione del processo non imped

## Extraction of Dlgs from Normattiva

In [6]:
class NormattivaDlgsScraper:
    def __init__(self, driver_path, headless=True):
        chrome_options = Options()
        if headless:
            chrome_options.add_argument("--headless")
        self.service = Service(driver_path)
        self.driver = webdriver.Chrome(service=self.service, options=chrome_options)
    
    # Get the originario version of the law
    def fill_field(self, field_id, value):
        input_field = self.driver.find_element(By.ID, field_id)
        input_field.clear()
        input_field.send_keys(value)
    
    # Get the text of a specific article
    def get_article_text(self, numeroProvvedimento, anno, article_num=[]):
        articles_list = []
            
        self.fill_field("numeroProvvedimento", numeroProvvedimento)
        self.fill_field("annoProvvedimento", anno)

        self.driver.find_element(By.CSS_SELECTOR, "[type*='submit']").click()
        self.driver.find_elements(By.CSS_SELECTOR, "[title*='Dettaglio atto']")[0].click()
        
        time.sleep(2)
        # Ensure is multivigente version
        multivigente_button = self.driver.find_element(By.XPATH, '//a[contains(@href, "multivigenza")]')
        multivigente_button.click()
        
        # Get articles
        albero = self.driver.find_element(By.ID, "albero")
        articles = albero.find_elements(By.CLASS_NAME, "numero_articolo")
        print("Article len: ", len(articles))
                
        for article in articles:            
            if article.text.strip() != "" and article.text[0].isdigit() and "orig" not in article.text and "Allegato" not in article.text and "agg" not in article.text:
                print("In ", article.text)
                try:
                    article.click()
                except:
                    continue
                
                time.sleep(2)
                
                article_title = self.driver.find_element(By.CLASS_NAME, "article-num-akn").text.strip()
                commas = self.driver.find_elements(By.CLASS_NAME, "art-comma-div-akn")
                print("Comma len: ", len(commas))
                if len(commas) == 0:
                    continue
                
                firstTime = True                
                for i, comma in enumerate(commas):
                    if firstTime:
                        time.sleep(1)
                        firstTime = False
                    print(numeroProvvedimento, anno, article_title, i)
                    
                    # Check if the content contains any multivigenza change
                    comma_content = comma.get_attribute('outerHTML')
                    #print(comma_content)
                    
                    # Skip if the content contains "<span class="art_text_in_comma">((</span>" or "<span class="art_text_in_comma">))</span>"
                    if "<span class=\"art_text_in_comma\">((</span>" in comma_content or "<span class=\"art_text_in_comma\">))</span>" in comma_content:
                        continue
                    
                    try:
                        comma_number = comma.find_element(By.CLASS_NAME, "comma-num-akn").text.strip()
                    except:
                        continue
                    comma_content_element = comma.text#find_element(By.CLASS_NAME, "art_text_in_comma")

                    #print(comma_number, comma_content)
                    
                    # Clear the output
                    match =  re.search(r'<span class="art_text_in_comma">(.*?)</span>', comma_content, re.DOTALL)
                    comma_content = match.group(1) if match else comma_content
                    comma_content = re.sub(r"<div class=\"ins-akn\" eid=\"ins_\d+\">\(\(", "", comma_content, flags=re.DOTALL)
                    comma_content = re.sub(r"\)\)</div>", "", comma_content, flags=re.DOTALL)
                    comma_content = re.sub(r"\n", "", comma_content, flags=re.DOTALL)
                    comma_content = re.sub(r"<br>", "", comma_content, flags=re.DOTALL)
                    
                    articles_list.append({ "Dlgs":f"{numeroProvvedimento}/{anno}".strip(),
                                            "Article": article_title,
                                            "Comma number": comma_number,
                                            "Comma content": comma_content.strip()}) # Numeration not working in case of -bis... extract
            else:
                print("Out ", article.text)

        return articles_list

    # Navigate to a specific page
    def navigate_to_page(self, url):
        self.driver.get(url)
        
    # Close the driver
    def close(self):
        self.driver.quit()

def save_articles(articles, filename):
    df = pd.DataFrame(articles)
    df.to_csv(filename, index=False)

scraper = NormattivaDlgsScraper(CHROME_DRIVERS_PATH, headless=True)

# Check if the laws have already been scraped
scraped_laws_set = set()
if os.path.exists(LAWS_CSV):
    df = pd.read_csv(LAWS_CSV)
    first_column = df.iloc[:, 0]
    scraped_laws = df.to_dict('records')
    
    # put the already scraped laws in a set to not scrape them again
    scraped_laws_set = set(first_column)
else:
    scraped_laws = []

invalid_laws_set = set()
if os.path.exists(INV_LAWS_JSON):
    invalid_laws = json.loads(INV_LAWS_JSON)
    invalid_laws_set = set(invalid_laws)
else:
    invalid_laws = []

scraped_laws_set.update(invalid_laws_set)

df = pd.read_csv(REF_MERG)
articles = []

for index, row in tqdm(df.iterrows(), total=df.shape[0]): # 445
    # Check if it's a D. lgs.
    if '/' not in row['Source'] or row['Source'].strip() in scraped_laws_set:
        continue
    scraped_laws_set.add(row['Source'])
    print(f"Scraping |{row['Source']}|")
    num, year = row['Source'].split("/")
            
    scraper.navigate_to_page("https://www.normattiva.it/ricerca/avanzata")
    res = scraper.get_article_text(num, year)
    if res == []:
        invalid_laws.append(f"{num}/{year}")
    else:
        articles.append(res)
    
    if articles and len(articles) % 3 == 0:
        save_articles([item for sublist in articles for item in sublist] + scraped_laws, LAWS_CSV)
        with open(INV_LAWS_JSON, 'w') as f:
            json.dumps(scraped_laws)

save_articles([item for sublist in articles for item in sublist] + scraped_laws, LAWS_CSV)
with open(INV_LAWS_JSON, 'w') as f:
    json.dumps(scraped_laws)

SessionNotCreatedException: Message: session not created: This version of ChromeDriver only supports Chrome version 127
Current browser version is 122.0.6261.128 with binary path /opt/google/chrome/chrome
Stacktrace:
#0 0x56299187b6aa <unknown>
#1 0x56299154c600 <unknown>
#2 0x56299158b13c <unknown>
#3 0x56299158a0b2 <unknown>
#4 0x562991584f24 <unknown>
#5 0x5629915802f8 <unknown>
#6 0x5629915cace8 <unknown>
#7 0x5629915be643 <unknown>
#8 0x56299158ed31 <unknown>
#9 0x56299158f79e <unknown>
#10 0x56299184323b <unknown>
#11 0x5629918471d2 <unknown>
#12 0x5629918305f5 <unknown>
#13 0x562991847d62 <unknown>
#14 0x56299181523f <unknown>
#15 0x56299186ae48 <unknown>
#16 0x56299186b020 <unknown>
#17 0x56299187a47c <unknown>
#18 0x7f9513ac9609 start_thread


## Post all Dlgs extraction -> Merge previous df with the laws.csv file

In [None]:
df_laws = pd.read_csv(LAWS_CSV)
final_df = pd.concat([df_laws, df_cp, df_cpp], ignore_index=True)