In [1]:
from datetime import datetime,date
from functools import wraps
from selenium.webdriver import Chrome
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import NoSuchElementException, NoSuchWindowException, ElementNotInteractableException, StaleElementReferenceException
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.support.ui import Select, WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from urllib.parse import urlparse, unquote

import csv
import hashlib
import logging
import os
import requests
import re
import time

from pathlib import Path
from typing import Tuple

In [2]:
logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)

In [3]:
comune = 'brindisi'
download_path = './ap'
url_comune = 'https://servizi.comune.brindisi.it/openweb/albo/albo_pretorio.php'

In [4]:
def retry(max_attempts=3, delay=10, exceptions=(Exception,)):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            attempts = 0
            while attempts < max_attempts:
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    attempts += 1
                    logging.info(f"Attempt {attempts} failed:", e)
                    time.sleep(delay)
            raise RuntimeError(f"Function {func.__name__} failed after {max_attempts} attempts")
        return wrapper
    return decorator

# Driver manager

In [5]:
class WebDriverManager:
    def __init__(self, DIR_DOWNLOAD):
        self.driver = None

    def start_driver(self, url):
        if not self.driver or (self.is_driver_open() == False):
            opts = Options()
            # start driver
            s = Service(r".\chromedriver.exe")
            prefs = {"download.default_directory": f'{download_path}',
                    "directory_upgrade": True,
                    "profile.default_content_settings.popups": 0}
            opts.add_experimental_option("prefs", prefs)
            self.driver = Chrome(service=s, options=opts)

            # opts = Options()
            # service = Service()

            # prefs = {"download.default_directory": f'{download_path}',
            #         "directory_upgrade": True,
            #         "profile.default_content_settings.popups": 0,
            #         "plugins.always_open_pdf_externally": True  # It will not show PDF directly in chrome
            #         }

            # opts.add_experimental_option('prefs', prefs)
            # opts.add_argument('--no-sandbox')
            # opts.add_argument('--headless')
            # opts.add_argument('--disable-gpu')
            # opts.add_argument('--disable-dev-shm-usage')

            # self.driver = Chrome(service=service,
            #                 options=opts)

            self.driver.get(url)
            # self.check_cookies()
            time.sleep(10) # Waiting for main page to load
        return self.driver  # Return the driver instance
        
    # No cookies button (?)
    # def check_cookies(self):
    #     try: 
    #         time.sleep(2)
    #         cookie_button = self.driver.find_element(By.CLASS_NAME, "btn.btn-primary.mr-2")

    #         cookie_button.click()
    #     except NoSuchElementException:
    #         logging.info(f'Cookies button was not found')
    #         pass
    #     except ElementNotInteractableException:
    #         pass

    def is_driver_open(self):
        if self.driver:
            try:
                # Access a property or method of the driver
                self.driver.current_url
                return True
            except NoSuchWindowException:
                return False
            except Exception as e:
                logging.info(f'Driver seems to be closed: {e}')
                return False
        return False
    
    def get_driver(self, link):
        if not self.is_driver_open():
            logging.warning(f'Driver is not responding, reopening')
            self.driver = self.start_driver(link)
        # else: 
        #     self.driver.get(link)
            # self.check_cookies()

        return self.driver

    def close_driver(self):
        if self.driver:
            self.driver.quit()
            self.driver = None

In [6]:
driver_manager = WebDriverManager(download_path)

In [7]:
driver = driver_manager.get_driver(url_comune)



# Class oggetti Ricerca

In [8]:
class SectionsAlbo:
    def __init__(self, url_comune: str, driver_manager: WebDriverManager, dir_download: str, last_date_update: datetime):
        self.url_comune = url_comune
        self.driver = driver_manager.get_driver(url_comune)
        self.dir_download = self.check_dir_download(dir_download)
        self.last_date_update = last_date_update
        self.dict_links_oggetti = self.loop_all_pages()


    @staticmethod
    def check_dir_download(dir_download):
        # Creating directory if does not exist:
        if not os.path.exists(dir_download):
            os.makedirs(dir_download)
        return dir_download
    

    @staticmethod
    def get_n_atto(ogg_e: WebElement)->str:
        try:
            pubblica_n = ogg_e.find_element(By.XPATH, ".//div[label/text()='Numero progressivo:']").text

            pubblica_pattern = r"2024/(\d+)"
            
            # Find matches in the string
            match = re.search(pubblica_pattern, pubblica_n)
            return match.group(1)
        except Exception as e:
            logging.warning(f'SectionsAlbo: Could not get n_atto: {e}')


    @staticmethod
    def get_start_date(ogg_e: WebElement) -> datetime:
        try:
            data_affissione = ogg_e.find_element(By.XPATH, ".//div[label/text()='Data affissione:']").text

            start_date = datetime.strptime(data_affissione, '%d/%m/%Y')

            return start_date
        except Exception as e:
            logging.warning(f'SectionsAlbo: Could not get start date: {e}')

    
    @staticmethod
    def get_end_date(ogg_e: WebElement) -> datetime:
        try:
            fine_pubblicazione = ogg_e.find_element(By.XPATH, ".//div[label/text()='Fine Pubblicazione:']").text
            
            end_date = datetime.strptime(fine_pubblicazione, '%d/%m/%Y')

            return end_date
        except Exception as e:
            logging.warning(f'SectionsAlbo: Could not get end date: {e}')

    
    @staticmethod
    def get_oggetto_link(ogg_e: WebElement)->str:
        try:
            a_element = ogg_e.find_element(By.TAG_NAME, 'a')
            return a_element.get_attribute('href')

        except Exception as e:
            logging.warning(f'SectionsAlbo: Could not get link from oggetto {e}')
        pass


    @staticmethod
    def rename_file(file_name:str, link_ogg: str)->str:
        # Generate a unique identifier for the file
        id = hashlib.sha256(link_ogg.encode()).hexdigest()[:8]

        return f'{id}_{file_name}'
    

    def get_main_info_oggetto(self, ogg_e: WebElement) -> dict:
        n_atto = self.get_n_atto(ogg_e=ogg_e)
        start_date = self.get_start_date(ogg_e=ogg_e)
        end_date = self.get_end_date(ogg_e=ogg_e)
        link = self.get_oggetto_link(ogg_e=ogg_e)
        
        return {link: dict(n_atto=n_atto,
                           start_date=start_date,
                           end_date=end_date)}


    def get_atti_from_page(self) -> dict:
        try:
            all_oggetti_page = {}
            # Waiting for the tabella element
            body_e = WebDriverWait(self.driver, 10).until(
                EC.presence_of_element_located((By.ID, 'tabella_albo'))
            )

            # Getting each row, one for each atto
            rows_e = body_e.find_elements(By.CLASS_NAME, 'paginated_element')

            # Process rows, one for atto
            if rows_e:
                for r_e in rows_e:
                    try:
                        start_date = self.get_start_date(r_e)
                        if (self.last_date_update == None) or (self.last_date_update < start_date.date()):
                            dict_ogg = self.get_main_info_oggetto(ogg_e=r_e)

                            all_oggetti_page.update(dict_ogg)
                        else:  # atto already in DB, skipping it
                            pass
                    except StaleElementReferenceException:  # Skip this row if it became stale
                        continue
            else:
                pass
        except Exception as e:
            logging.warning(f'SectionsAlbo: Could not get the atti from this page: {e}')

        return all_oggetti_page


    def loop_all_pages(self) -> dict:
        all_oggetti_dict = {}
        self.driver.get(self.url_comune)

        page_number = 1
        while True:
            try:
                logging.info(f'Getting atti from page: {page_number}')
                dict_page = self.get_atti_from_page()
                all_oggetti_dict.update(dict_page)

                # Finding next page button
                next_button = WebDriverWait(self.driver, 10).until(
                    EC.element_to_be_clickable((By.XPATH, "//a[@class='button' and @title='Successiva']"))
                )

                # Use JavaScript to click
                self.driver.execute_script("arguments[0].click();", next_button)
                page_number += 1

                # Waiting for the new content to load
                time.sleep(2)

            except Exception as e:
                # print("No more pages or an error occurred:", e)
                break
        
        return all_oggetti_dict
    

    @staticmethod
    def get_filename_from_cd(cd):
        if not cd:
            return None
        fname = None
        if 'filename=' in cd:
            fname = cd.split('filename=')[1]
            if '"' in fname:
                fname = fname.split('"')[1]
            else:
                fname = fname.split(';')[0].strip()
        return fname
    

    def general_download(self, link_file:str)->Path:
        try: 
            response = requests.get(link_file)

            cd = response.headers.get('Content-Disposition')
            b_name = self.get_filename_from_cd(cd)
            file_name = self.rename_file(b_name, link_file) if b_name else link_file.split('&')[0].split('/')[-1]

            # Construct the full path of the file
            file_path = Path(download_path, file_name)
            
            with open(file_path, 'wb') as file:
                file.write(response.content)
            
            return file_name, link_file, file_path
        except Exception as e:
            logging.info(f'SectionsAlbo: Could not download file from {link_file}: {e}')
    

    def download_single_file_external_page(self, link_file_external: str) -> None:
        try:
            self.driver.get(link_file_external)
            scarica_button = self.driver.find_element(By.CLASS_NAME, 'btn.btn-primary')
            link_file = scarica_button.get_attribute('href')

            # Send a HEAD request to check the content type
            response = requests.head(link_file, allow_redirects=True)
            
            if 'Content-Type' in response.headers:
                content_type = response.headers['Content-Type']
                if 'application/pdf' in content_type:
                    # This is a PDF file, download it
                    f_name, f_link, f_path = self.general_download(link_file=link_file)
                    return f_name, f_link, f_path
        except Exception as e:
            logging.info(f'SectionsAlbo: Could not get file from external link: {e}')


    def download_single_file_atto_page(self, link_ogg: str) -> Tuple[str, str, str]:
        """This method will get all the links available for a given atto, it will use the HEAD request 
        to check if it's possible to download a file from the given file, otherwise it probably it's an 
        external link, in this case it will open the external link and download the files from it.

        Args:
            link_ogg (str): Link of the atto.

        Returns:
            Tuple[str, str, str]: Name of the file, Link of the file, Path where it's been saved.
        """
        try:
            # Send a HEAD request to check the content type
            response = requests.head(link_ogg, allow_redirects=True)
            
            if 'Content-Type' in response.headers: 
                content_type = response.headers['Content-Type']
                # If the link has the file, download it
                if 'application/pdf' in content_type:
                    f_name, f_link, f_path = self.general_download(link_file=link_ogg)
                # Else maybe it's an external link, try to download it:
                else:
                    f_name, f_link, f_path = self.download_single_file_external_page(link_file_external=link_ogg)
                return f_name, f_link, f_path
            else:
                pass
        except Exception as e:
            logging.warning(f'SectionsAlbo: Could not download files from {link_ogg}')
    
    
    # def get_dict_objects(self):
    #     dict_ogg = {}
    #     if self.dict_links_oggetti:
    #         logging.info(f'Found {len(self.dict_links_oggetti)} oggetti')
    #         for link, dict_info in self.dict_links_oggetti.items():
    #             self.driver.get(link)
    #             try:
    #                 body_e = driver.find_element(By.CLASS_NAME, 'card-body.pe-0.ps-4.ps-md-5')

    #                 # Find the link element
    #                 list_link_e = body_e.find_elements(By.TAG_NAME, 'a')
    #                 list_link = [l_e.get_attribute('href') for l_e in list_link_e]

    #                 for l in list_link:
    #                     try:
    #                         f_name, f_link, f_path = self.download_single_file_atto_page(l)

    #                         dict_ogg[f_name] = dict(**dict_info,
    #                                                 file_name= f_name,
    #                                                 link_file = f_link,
    #                                                 internal_path = f_path
    #                                                 )
    #                     except:
    #                         pass

    #             except Exception as e:
    #                 logging.warning(f'SectionsAlbo: Could not download files: {e}')
    #     else:
    #         logging.info(f'SectionsAlbo: All files up-to-date!')

    def get_dict_objects(self):
        dict_ogg = {}
        total_oggetti = len(self.dict_links_oggetti)
        logging.info(f'Found {total_oggetti} oggetti to process')

        if self.dict_links_oggetti:
            for index, (link, dict_info) in enumerate(self.dict_links_oggetti.items()):
                # Log progress every 30 items
                if index % 30 == 0 and index > 0:
                    logging.info(f'Processing oggetto {index} of {total_oggetti}')

                self.driver.get(link)
                try:
                    body_e = self.driver.find_element(By.CLASS_NAME, 'card-body.pe-0.ps-4.ps-md-5')
                    list_link_e = body_e.find_elements(By.TAG_NAME, 'a')
                    list_link = [l_e.get_attribute('href') for l_e in list_link_e]

                    for l in list_link:
                        f_name, f_link, f_path = self.download_single_file_atto_page(l)
                        if f_name and f_link and f_path:
                            dict_ogg[f_name] = dict(**dict_info,
                                                    file_name=f_name,
                                                    link_file=f_link,
                                                    internal_path=f_path)
                except Exception as e:
                    logging.warning(f'SectionsAlbo: Could not download files: {e}')
            logging.info(f'Completed processing all {total_oggetti} oggetti')
        else:
            logging.info(f'SectionsAlbo: All files up-to-date!')


        return dict_ogg

In [9]:
sections_manager = SectionsAlbo(url_comune=url_comune,
                                driver_manager=driver_manager,
                                dir_download=download_path,
                                last_date_update=None)

INFO:root:Getting atti from page: 1
INFO:root:Getting atti from page: 2
INFO:root:Getting atti from page: 3
INFO:root:Getting atti from page: 4


In [None]:
sections_manager.dict_links_oggetti

In [None]:
sections_manager.get_dict_objects()