In [None]:
import pandas as pd
import copy
import numpy as np

In [None]:
# Import the dataframe
df_lombardia = pd.read_csv('/Users/dilettaferri/Desktop/UNIPI/SNA - Project/Project/df_lombardia.csv')

In [None]:
df_lombardia

In [None]:
# Counts of the different types of entities
df_lombardia['Sezione'].value_counts()

In [None]:
# Save the list of names in lista_denominazioni
lista_denominazioni = df_lombardia['Denominazione'].tolist()
lista_denominazioni

<h3>Scraping - get info from the RUNTS page</h3>

In [None]:
# Import libraries
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.action_chains import ActionChains
import time

<h5>Connection to the driver and get to the page</h5>

In [None]:
# Chromedriver path
service = Service('/Users/dilettaferri/Desktop/UNIPI/SNA - Project/Project/chromedriver-mac-arm64/chromedriver')  

# Initiailzation of the Chrome driver using the class Service 
driver = webdriver.Chrome(service=service)

In [None]:
# Connect to the RUNTS research page
driver.get("https://servizi.lavoro.gov.it/runts/it-it/Ricerca-enti")

<h5>Definition of the functions</h5>

In [None]:
# Decline cookies
def rifiuta_cookies():
    decline_cookies_click = driver.find_element(By.LINK_TEXT,'RIFIUTA')
    decline_cookies_click.click()

In [None]:
# Input and search the name
def input_name(nome):
    search_bar_denominazione = driver.find_element(By.ID,"dnn_ctr446_View_txtDenominazione") #trova la barra del nome
    search_bar_denominazione.clear() # Make sure the search bar is empty
    search_bar_denominazione.send_keys(nome) # Insert the name
    search_bar_denominazione.send_keys(Keys.RETURN) # Press Return

In [None]:
# Enters the "Dettagli" section with the button in the first line of the table
def press_dettagli():
    try: 
        button_dettagli = driver.find_element(By.ID,"dnn_ctr446_View_gvEnti_btnDettaglio_0")
        driver.execute_script("arguments[0].scrollIntoView();", button_dettagli) # Make sure the "Dettagli" button is visible
        button_dettagli.click() # Click the button
    except TimeoutException:
        # If the button isn't found
        raise TimeoutException("Il pulsante 'dettagli' non è stato trovato o non è cliccabile.")

In [None]:
# Get and save the geographic info of the legal headquarters

def get_geo_info(stato_list, provincia_list, comune_list, indirizzo_list):
    element = driver.find_element(By.ID,"dnn_ctr448_View_spnStatoSL") # Find the State
    driver.execute_script("arguments[0].scrollIntoView();", element) # Make sure it's visible
    stato = element.text # Get the text in the element
    stato_list.append(stato) # Append to the list

    # Repeat the same with the province, municipality (comune) and address

    element = driver.find_element(By.ID,"dnn_ctr448_View_spnProvinciaSL") 
    driver.execute_script("arguments[0].scrollIntoView();", element) 
    provincia = element.text 
    provincia_list.append(provincia) 

    element = driver.find_element(By.ID,"dnn_ctr448_View_spnComuneSL") 
    driver.execute_script("arguments[0].scrollIntoView();", element) 
    comune = element.text
    comune_list.append(comune) 

    # Try/ except to deal with the case of no address
    try:
        element = driver.find_element(By.ID,"dnn_ctr448_View_spnIndirizzoSL") 
        driver.execute_script("arguments[0].scrollIntoView();", element) 
        indirizzo = element.text 
    except:
        indirizzo = None
    indirizzo_list.append(indirizzo) 
    
    return stato_list, provincia_list, comune_list, indirizzo_list

<h5>Definition of the denominations and the dataset in 1000 record chuncks and execution of the functions </h5>

This choice allows not to process the whole dataset at once, in order to easily deal with errors

In [None]:
# Split the denomination list in 1000 records chunks -> 18 lists with at max 1000 elements
split_lists = [lista_denominazioni[i:i + 1000] for i in range(0, len(lista_denominazioni), 1000)]


In [None]:
len(split_lists)

In [None]:
# Create empty lists, which will be re-initializated every time a batch of 1000 records is completed
stato_list = []
provincia_list = []
comune_list = []
indirizzo_list = []

In [None]:
# This list allows to process more than one batch of 1000 records at once
lista_di_indici = [14,15,16,17] #These indices are of the last 4 batches

for j in lista_di_indici:    
    for index, nome in enumerate(split_lists[j]):  # Use enumerate to keep track of the indices
        try:
            input_name(nome)  # Insert name in the search bar
            rifiuta_cookies()  # Decline cookies if necessary
            
            try:
                # Try to press the "Dettagli" button
                press_dettagli()
                time.sleep(2)
                rifiuta_cookies()  # Decline cookies again, if they appear
                
                # Get geographic info
                stato_list, provincia_list, comune_list, indirizzo_list = get_geo_info(
                    stato_list, provincia_list, comune_list, indirizzo_list
                )
            except TimeoutException:
                # If "Dettagli" button isn't found before the time runs out
                print(f"Il pulsante 'dettagli' non è stato trovato per il nome: {nome}")
                stato_list.append(None)
                provincia_list.append(None)
                comune_list.append(None)
                indirizzo_list.append(None)
            
            # Go back in order to deal with the next name
            driver.back()
            time.sleep(2)

            # Print a message every 100 records (to keep track of process)
            if (index + 1) % 100 == 0:  
                print(f"Processate {index + 1} righe finora.")

        except Exception as e:
            error_message = str(e)
            if 'Unable to locate element: {"method":"css selector","selector":"[id="dnn_ctr446_View_txtDenominazione"]"}' in error_message:
                print(f"Elemento non trovato per il nome {nome}. Ricarico la pagina iniziale.")
                driver.get("https://servizi.lavoro.gov.it/runts/it-it/Ricerca-enti")
                WebDriverWait(driver, 10).until(
                    EC.presence_of_element_located((By.ID, "dnn_ctr446_View_txtDenominazione"))
                )
                print("Pagina iniziale ricaricata.")
            else:
                print(f"Errore con il nome {nome}: {e}")
            
            stato_list.append(None)
            provincia_list.append(None)
            comune_list.append(None)
            indirizzo_list.append(None)
            continue

In [None]:
# Divide the dataframe in other 1000 records dataframes, to correspond with the lists

chunk_size = 1000
for i, start in enumerate(range(0, len(df_lombardia), chunk_size)):
    globals()[f"df_{i+1}"] = df_lombardia[start:start+chunk_size]

# They are called from df_1 to df_18
# split_lists[0] corresponds with df_1 and so on

Each time the indices have to be replaced with the ones that are being processed

In [None]:
# Save in the different lists of length 1000 the geographical info

lista15_stato = stato_list[:1000] 
lista16_stato = stato_list[1000:2000] 
lista17_stato = stato_list[2000:3000]
lista18_stato = stato_list[3000:]

lista15_provincia = provincia_list[:1000] 
lista16_provincia = provincia_list[1000:2000] 
lista17_provincia = provincia_list[2000:3000]
lista18_provincia = provincia_list[3000:]


lista15_comune = comune_list[:1000] 
lista16_comune = comune_list[1000:2000] 
lista17_comune = comune_list[2000:3000] 
lista18_comune = comune_list[3000:]

lista15_indirizzo = indirizzo_list[:1000] 
lista16_indirizzo = indirizzo_list[1000:2000] 
lista17_indirizzo = indirizzo_list[2000:3000] 
lista18_indirizzo = indirizzo_list[3000:]

In [None]:
df_18 

In [None]:
# Create the new columns of the dataframe adding the geographical info 
# Repeated with each 1000 records list
df_18['stato'] = lista18_stato
df_18['provincia'] = lista18_provincia
df_18['comune'] = lista18_comune
df_18['indirizzo'] = lista18_indirizzo

In [None]:
# Visualize again to check
df_18

In [None]:
# Check how many "none" records are present 
conteggio_none = df_18.isna().sum()
conteggio_none


In [None]:
# Save locally the df in order to save the precise address

# Specify the path and the name - change each time
file_path='/Users/dilettaferri/Desktop/UNIPI/SNA - Project/Project/dataframe con indirizzi/df_18.csv'
df_18.to_csv(file_path, index=False)