In [1]:
import os
import re
import csv
import pandas as pd
import numpy as np

import time
import requests
from bs4 import BeautifulSoup

#from langdetect import detect
#from deep_translator import GoogleTranslator

In [2]:
BASE_URL = "https://www.consiglio.vda.it/app/oggettidelconsiglio/dettaglio?pk_documento={}&versione=R"

OUTPUT_FOLDER = "./downloads"
if not os.path.exists(OUTPUT_FOLDER):
    os.makedirs(OUTPUT_FOLDER)

#funzione per scraperare e salvare i resoconti dal sito del consiglio Valle
def scrape_and_save(doc_id):
    url = BASE_URL.format(doc_id)

    response = requests.get(url)
    if response.status_code != 200:
        print(f"Documento {doc_id} non trovato (HTTP {response.status_code}).")
        return None

    soup = BeautifulSoup(response.text, "html.parser")
    page_text = soup.get_text(separator="\n", strip=True)

    output_file = os.path.join(OUTPUT_FOLDER, f"{doc_id}.txt")
    with open(output_file, "w", encoding="utf-8") as f:
        f.write(page_text)
    
    print(f"Documento {doc_id} salvato in: {output_file}")
    return output_file


def main(start_id, end_id):
    csv_data = []
    print(f"Inizio lo scraping per i documenti dal {start_id} al {end_id}...")

    for doc_id in range(start_id, end_id + 1):
        print(f"Processo il documento {doc_id}...")
        file_path = scrape_and_save(doc_id)

        if file_path:
            csv_data.append({"ID_file": doc_id, "path_src": file_path})

        # Pausa per evitare di sovraccaricare il server (ad esempio 2-5 secondi)
        time.sleep(2 + (3 * doc_id % 5))  # Variamo la pausa per evitare un pattern troppo prevedibile

    # Salvataggio dei risultati nel file CSV
    csv_file = "csv_paths.csv"
    print(f"Salvataggio dei risultati nel file CSV: {csv_file}")
    with open(csv_file, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=["ID_file", "path_src"])
        writer.writeheader()
        writer.writerows(csv_data)

    print("Completato! Tutti i documenti salvati nella cartella:", OUTPUT_FOLDER)

if __name__ == "__main__":
    main(start_id=41367, end_id=42421)

Inizio lo scraping per i documenti dal 41367 al 42421...
Processo il documento 41367...
Documento 41367 salvato in: ./downloads/41367.txt
Processo il documento 41368...
Documento 41368 salvato in: ./downloads/41368.txt
Processo il documento 41369...
Documento 41369 salvato in: ./downloads/41369.txt
Processo il documento 41370...
Documento 41370 salvato in: ./downloads/41370.txt
Processo il documento 41371...
Documento 41371 salvato in: ./downloads/41371.txt
Processo il documento 41372...
Documento 41372 salvato in: ./downloads/41372.txt
Processo il documento 41373...
Documento 41373 salvato in: ./downloads/41373.txt
Processo il documento 41374...
Documento 41374 salvato in: ./downloads/41374.txt
Processo il documento 41375...
Documento 41375 salvato in: ./downloads/41375.txt
Processo il documento 41376...
Documento 41376 salvato in: ./downloads/41376.txt
Processo il documento 41377...
Documento 41377 salvato in: ./downloads/41377.txt
Processo il documento 41378...
Documento 41378 salva

In [3]:
def generate_csv_from_folder(folder_path, csv_file="csv_paths.csv"):
    csv_data = []
    
    # Verifica che la cartella esista
    if not os.path.exists(folder_path):
        print(f"La cartella {folder_path} non esiste.")
        return
    
    # Scansiona la cartella per trovare i file
    for file_name in os.listdir(folder_path):
        file_path = os.path.join(folder_path, file_name)
        
        if os.path.isfile(file_path):  # Assicuriamoci che sia un file e non una cartella
            file_id, _ = os.path.splitext(file_name)  # Rimuove l'estensione
            csv_data.append({"ID_file": file_id, "path_src": file_path})
    
    # Salvataggio dei dati in un file CSV
    with open(csv_file, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=["ID_file", "path_src"])
        writer.writeheader()
        writer.writerows(csv_data)
    
    print(f"File CSV creato con successo: {csv_file}")

# Esempio di utilizzo
folder_path = "./downloads"  # Modifica con il percorso della tua cartella
generate_csv_from_folder(folder_path)

File CSV creato con successo: csv_paths.csv


In [4]:
#funzione per pulire i file txt dalle parti inutili presenti nella pagina web del consiglio Valle
def first_clean(target_folder, keyword, mid_strings, end_keyword, csv_paths):
    df = pd.read_csv(csv_paths)

    if not os.path.exists(target_folder):
        os.makedirs(target_folder)

    df["path_clean"] = ""

    for idx, row in df.iterrows():
        source_path = row["path_src"]
        if source_path.endswith(".txt"):
            filename = os.path.basename(source_path)
            target_path = os.path.join(target_folder, filename)

            df.at[idx, "path_clean"] = target_path

            with open(source_path, "r", encoding="utf-8") as file:
                content = file.read()

            if keyword in content:
                content = keyword + content.split(keyword, 1)[1] 

            for mid_string in mid_strings:
                content = re.sub(re.escape(mid_string), "", content) 

            if end_keyword in content:
                content = content.split(end_keyword, 1)[0]  

            with open(target_path, "w", encoding="utf-8") as file:
                file.write(content)

    df.to_csv(csv_paths, index=False)

#funzione per estrarre informazioni dai file txt: oggetto, legislatura, data e classificazione
def extract_and_format_date(text):
    
    date_pattern = re.search(r"(\d{1,2})\s+([a-zA-Z]+)\s+(\d{4})", text)
    
    if date_pattern:
        day, month_name, year = date_pattern.groups()
        
        months = {
            "gennaio": "01", "febbraio": "02", "marzo": "03", "aprile": "04", "maggio": "05", "giugno": "06",
            "luglio": "07", "agosto": "08", "settembre": "09", "ottobre": "10", "novembre": "11", "dicembre": "12"
        }
        
        month = months.get(month_name.lower())
        
        if month:
            return f"{int(day):02d}/{month}/{year}"
    
    return None

def first_info(pattern1, pattern2, csv_paths):
    df_paths = pd.read_csv(csv_paths)

    #df_paths['language'] = ""
    df_paths['object'] = ""
    df_paths['legislature'] = ""
    df_paths['date'] = ""
    df_paths['month'] = ""
    df_paths['class'] = ""

    for idx, row in df_paths.iterrows():
        source_path = row["path_clean"]
        if source_path.endswith(".txt"):
            filename = os.path.basename(source_path)
            filenum = int(os.path.splitext(filename)[0])

            with open(source_path, "r", encoding="utf-8") as file:
                content = file.read()

            match1 = re.search(pattern1, content)

            if match1:
                stringa = match1.group(1).strip()
                ogg, leg = stringa.split("/", 1)

                df_paths.loc[df_paths['ID_file'] == filenum, 'object'] = ogg
                df_paths.loc[df_paths['ID_file'] == filenum, 'legislature'] = leg

            date = extract_and_format_date(content)
            df_paths.loc[df_paths['ID_file'] == filenum, 'date'] = date
            if date:
                df_paths.loc[df_paths['ID_file'] == filenum, 'month'] = date.split("/")[1]
            
            match2 = re.search(pattern2, content, re.DOTALL)
            
            if match2:
                classe = match2.group(1).strip()
                classe = classe.replace("\n", ", ").strip()
                df_paths.loc[df_paths['ID_file'] == filenum, 'class'] = classe

            #try:
            #    lang = detect(content)
            #    if lang == 'fr':
            #        df_paths.loc[df_paths['ID_file'] == filenum, 'language'] = 'fr'
            #    elif lang == 'it':
            #        df_paths.loc[df_paths['ID_file'] == filenum, 'language'] = 'it'
            #    else:
            #        df_paths.loc[df_paths['ID_file'] == filenum, 'language'] = 'other'
            #except:
            #    df_paths.loc[df_paths['ID_file'] == filenum, 'language'] = 'error'

    df_paths.to_csv(csv_paths, index=False)

def second_info(csv_paths):
    df = pd.read_csv(csv_paths)
    
    if "obj_pos" not in df.columns:
        df["obj_pos"] = 0
    
    df["object"] = pd.to_numeric(df["object"], errors="coerce")  # Converti in numero per ordinamento
    
    df.sort_values(by=["date", "object"], ascending=[True, True], inplace=True)
    
    df["obj_pos"] = df.groupby("date").cumcount() + 1  # Assegna numeri progressivi
    
    df.to_csv(csv_paths, index=False)

In [5]:
target_folder = "./clean"

csv_paths = "./csv_paths.csv"
csv_details = "./csv_details.csv"
csv_cons = "./csv_cons.csv"
csv_chunks = "./csv_chunks.csv"

In [6]:
keyword = "Classificazione"
mid_strings = ["Precedente", "Successivo", "Resoconto integrale del dibattito dell'aula. I documenti allegati sono reperibili nel link \"iter atto\"."]
end_keyword = "Informativa cookies"

first_clean(target_folder, keyword, mid_strings, end_keyword, csv_paths)

pattern1 = r'(?:OGGETTO N\.|OBJET N°)(.*?)\s*-'
pattern2 = r'Classificazione\s*(.*?)\s*Oggetto'

first_info(pattern1, pattern2, csv_paths)
second_info(csv_paths)

In [7]:
import pandas as pd
import re

def define_names(csv_cons, leg):
    df_cons = pd.read_csv(csv_cons)
    df_cons = df_cons.reset_index()  # Aggiunge l'indice come colonna ID_cons
    df_cons.rename(columns={'index': 'ID_cons'}, inplace=True)
    names = []
    
    print(f"Filtrando per legislatura: {leg}")
    filtered_cons = df_cons[df_cons['legislature'].str.contains(leg, case=False, na=False)]
    
    surname_counts = filtered_cons['surname'].value_counts()

    for idx, row in filtered_cons.iterrows():
        surname = row["surname"]
        name = row["name"]
        
        if surname_counts[surname] > 1:
            names.append(f"{surname} {name[0]}.")
        else:
            names.append(surname)
    
    print(f"Numero di nomi trovati per la legislatura {leg}: {len(names)}")
    return filtered_cons, names

def isolate_chunk(csv_paths, csv_cons):
    df_paths = pd.read_csv(csv_paths)
    chunk_list = []

    for idx, row in df_paths.iterrows():
        leg = row["legislature"]
        date = row["date"]
        month = row["month"]
        #language = row["language"]
        obj = row["obj_pos"]
        
        print(f"Processando file {row['path_clean']} per la legislatura {leg}...")
        
        try:
            df_cons, list_cons = define_names(csv_cons, leg)

            with open(row["path_clean"], 'r') as f:
                text = f.read()

                all_matches = []
                for name in list_cons:
                    pattern = r"(?i)" + re.escape(name) + r"\s?\([^)]*\)\s?-"
                    matches = list(re.finditer(pattern, text))
                    all_matches.extend(matches)
                
                president_pattern = r"(?i)Presidente\s?-"
                president_matches = list(re.finditer(president_pattern, text))
                all_matches.extend(president_matches)
                
                all_matches.sort(key=lambda match: match.start())
                
                president_data = None
                if all_matches:
                    first_match = all_matches[0]
                    if not re.match(president_pattern, first_match.group(0), re.IGNORECASE):
                        surname = first_match.group(0)
                        words = surname.split()
                        surname = " ".join(words[:2])
                        if words[1].endswith('.') or words[1].startswith('('):
                            surname = words[0]
                        person_info = df_cons[df_cons["surname"] == surname].iloc[0]
                        president_data = {
                            "ID_cons": person_info["ID_cons"],
                            "year_birth": person_info["year_birth"],
                            "gender": person_info["gender"],
                            "group": 'Presidente',
                            "seniority": len(person_info["legislature"].split(','))
                        }

                chunk_idx = 1
                for i, match in enumerate(all_matches):
                    start_pos = match.end() 
                    end_pos = all_matches[i + 1].start() if i + 1 < len(all_matches) else len(text)

                    chunk = text[start_pos:end_pos].replace("\n", " ").strip()

                    if chunk:
                        if re.match(president_pattern, match.group(0), re.IGNORECASE):
                            if president_data:
                                ID_cons = president_data["ID_cons"]
                                year_birth = president_data["year_birth"]
                                gender = president_data["gender"]
                                group = president_data["group"]
                                seniority = president_data["seniority"]
                            else:
                                ID_cons = "N/A"
                                year_birth = "N/A"
                                gender = "N/A"
                                group = "N/A"
                                seniority = "N/A"
                        else:
                            surname = match.group(0)
                            party = re.search(r'\((.*?)\)', surname)
                            group = party.group(1) if party else "N/A"
                            words = surname.split()
                            surname = " ".join(words[:2])
                            if words[1].endswith('.') or words[1].startswith('('):
                                surname = words[0]
                            try:
                                person_info = df_cons[df_cons["surname"] == surname].iloc[0]
                                ID_cons = person_info["ID_cons"]
                                year_birth = person_info["year_birth"]
                                gender = person_info["gender"]
                                seniority = len(person_info["legislature"].split(','))
                            except IndexError:
                                ID_cons = "N/A"
                                year_birth = "N/A"
                                gender = "N/A"
                                group = "N/A"
                                seniority = "N/A"

                        chunk_list.append({
                            "ID_file": row["ID_file"],
                            "leg": leg,
                            "date": date,
                            "month": month,
                            "class": row["class"],
                            "obj_pos": obj,
                            #"language": language,
                            "ID_cons": ID_cons,
                            "year_birth": year_birth,
                            "gender": gender,
                            "group": group,
                            "seniority": seniority,
                            "position": chunk_idx,
                            "length": len(chunk),
                            "chunk": chunk
                        })
                        
                        chunk_idx += 1
            print('File ok')

        except Exception as e:
            print(f"Errore nel file {row['path_clean']}: {e}. Saltando questa entry.")
            continue

    df_chunks = pd.DataFrame(chunk_list)
    print(f"Numero totale di chunk processati: {len(chunk_list)}")
    return df_chunks

df = isolate_chunk(csv_paths, csv_cons)

print("Salvataggio del file csv_chunks.csv...")
df.to_csv("csv_chunks.csv", index=False)
print("File salvato con successo!")

Processando file ./clean/45461.txt per la legislatura XVI...
Filtrando per legislatura: XVI
Numero di nomi trovati per la legislatura XVI: 38
File ok
Processando file ./clean/45462.txt per la legislatura XVI...
Filtrando per legislatura: XVI
Numero di nomi trovati per la legislatura XVI: 38
File ok
Processando file ./clean/45463.txt per la legislatura XVI...
Filtrando per legislatura: XVI
Numero di nomi trovati per la legislatura XVI: 38
File ok
Processando file ./clean/45464.txt per la legislatura XVI...
Filtrando per legislatura: XVI
Numero di nomi trovati per la legislatura XVI: 38
File ok
Processando file ./clean/45465.txt per la legislatura XVI...
Filtrando per legislatura: XVI
Numero di nomi trovati per la legislatura XVI: 38
File ok
Processando file ./clean/43971.txt per la legislatura XVI...
Filtrando per legislatura: XVI
Numero di nomi trovati per la legislatura XVI: 38
File ok
Processando file ./clean/41709.txt per la legislatura XV...
Filtrando per legislatura: XV
Numero di 

In [8]:
df_chunks = pd.read_csv(csv_chunks)

df_outcomes = df_chunks.copy()
df_outcomes = df_outcomes.drop(columns=['ID_cons','year_birth','gender','group','position','length','chunk'])
df_outcomes = df_outcomes.drop_duplicates(subset=['ID_file'])

intervention_counts = df_chunks.groupby('ID_file').size().reset_index(name='# interventions')
df_outcomes = df_outcomes.merge(intervention_counts, on='ID_file', how='left')

df_chunks_filtered = df_chunks[df_chunks['group'] != 'Presidente']
intervention_counts_filtered = df_chunks_filtered.groupby('ID_file').size().reset_index(name='# interventions (w/o president)')
df_outcomes = df_outcomes.merge(intervention_counts_filtered, on='ID_file', how='left')

length_sum = df_chunks.groupby('ID_file')['length'].sum().reset_index(name='total_length')
df_outcomes = df_outcomes.merge(length_sum, on='ID_file', how='left')

length_sum_filtered = df_chunks_filtered.groupby('ID_file')['length'].sum().reset_index(name='total_length (w/o president)')
df_outcomes = df_outcomes.merge(length_sum_filtered, on='ID_file', how='left')

def extract_number(text, pattern):
    match = re.search(pattern, text)
    if match:
        return int(match.group(1))  
    return None

def get_count(id_file, pattern):
    chunks = df_chunks[(df_chunks['ID_file'] == id_file) & (df_chunks['group'] == 'Presidente')]
    
    for chunk in chunks['chunk']:
        number = extract_number(chunk, pattern)
        if number is not None:
            return number
    
    return None

presenti = r"Presenti: (\d+)"
votanti = r"Votanti: (\d+)"
favorevoli = r"Favorevoli: (\d+)"
contrari = r"Contrari: (\d+)"
astenuti = r"Astenuti: (\d+)"

df_outcomes['presenti'] = df_outcomes['ID_file'].apply(lambda id_file: get_count(id_file, presenti))
df_outcomes['votanti'] = df_outcomes['ID_file'].apply(lambda id_file: get_count(id_file, votanti))
df_outcomes['favorevoli'] = df_outcomes['ID_file'].apply(lambda id_file: get_count(id_file, favorevoli))
df_outcomes['contrari'] = df_outcomes['ID_file'].apply(lambda id_file: get_count(id_file, contrari))
df_outcomes['astenuti'] = df_outcomes['ID_file'].apply(lambda id_file: get_count(id_file, astenuti))

def determine_outcome(row):
    favorevoli = row['favorevoli'] if pd.notna(row['favorevoli']) else 0
    contrari = row['contrari'] if pd.notna(row['contrari']) else 0
    astenuti = row['astenuti'] if pd.notna(row['astenuti']) else 0

    if pd.isna(row['favorevoli']) and pd.isna(row['contrari']) and pd.isna(row['astenuti']):
        return np.nan

    if favorevoli > contrari and favorevoli > astenuti:
        return 'approvato'
    else:
        return 'non approvato'

df_outcomes['outcome'] = df_outcomes.apply(determine_outcome, axis=1)

df_outcomes.to_csv("csv_outcomes.csv", index=False)

In [9]:
df = pd.read_csv("csv_chunks.csv")

df_filtered = df[df['group'] != 'Presidente']

df_filtered.to_csv("csv_chunks_filtered.csv", index=False)

In [10]:
valid_outcomes = df_outcomes[df_outcomes['outcome'].notna()]['ID_file']

df = pd.read_csv("csv_chunks.csv")
df_filtered = df[df['ID_file'].isin(valid_outcomes)]

df_filtered.to_csv("csv_chunks_outcomes.csv", index=False)

df = pd.read_csv("csv_chunks_outcomes.csv")

df_filtered = df[df['group'] != 'Presidente']

df_filtered.to_csv("csv_chunks_outcomes_filtered.csv", index=False)