In [None]:
### prepare
import pandas as pd
import re
import os
import requests
from concurrent.futures import ThreadPoolExecutor
import magic
from PIL import Image
import pdf2image
import pytesseract

class DataIngestion():
    
    def __init__(self,
                docs_file = "../../data/unglobalcompact.csv",
                raw_folder = "../../data/raw_files/") -> None:
        self.docs_file = docs_file
        self.raw_folder = raw_folder
        pass

    def _sanitize_name(self, name):
        name = re.sub(r"[^a-zA-Z ]","", name).lower()
        return re.sub(r"[ ]+", "_", name)
    
    def _get_file_atts(self, path):
        dir = os.path.dirname(path)
        base_name = os.path.basename(path)
        name, extension = os.path.splitext(base_name)
        return {"dir" : dir,
                "name" : name,
                "ext" : extension.split("?")[0]}
    
    def _get_destination(self, row, dir=None):
        atts_dict = self._get_file_atts(row["communication_on_progress_file"])
        index = str(row.name)
        company =self._sanitize_name(row["name"])
        ext = atts_dict["ext"]
        if dir is None:
            dir = ""
        return dir + "_".join([index, company]) + ext
    
    def load_docs(self):
        self.docs_data = pd.read_csv(self.docs_file)   
        new_columns = {c: self._sanitize_name(c) for c in self.docs_data.columns}
        self.docs_data = self.docs_data.rename(columns=new_columns)
        url_rows = self.docs_data.communication_on_progress_file.notnull()
        self.docs_data = self.docs_data.loc[url_rows,
                ["name","type", "country", "sector", "communication_on_progress_file"]]
        self.docs_data["file_destination"] = self.docs_data\
            .apply(self._get_destination, dir=self.raw_folder, axis=1)
        return self
        
    def _download_file(self, url, file_path):
        if os.path.exists(file_path):
            print(f"File on '{file_path}' already exists. Skipping download.")
            return None
        try:
            response = requests.get(url, stream=True)
            response.raise_for_status()
            with open(file_path, 'wb') as file:
                for chunk in response.iter_content(chunk_size=8192):
                    file.write(chunk)
            print(f"File on '{file_path}' downloaded successfully.")
            return file_path
        except requests.RequestException as e:
            print(f"Error downloading '{file_path}': {e}.")
            return None

    def download_reports(self):
        url_list = self.docs_data["communication_on_progress_file"].values
        path_list = self.docs_data["file_destination"].values
        with ThreadPoolExecutor(max_workers=5) as executor:
            executor.map(self._download_file, url_list, path_list)
        return self
    
    def _check_path(self,path):
        if os.path.exists(path):
            return path
        else:
            return None

    def _get_conversion_path(self,file_destination, dir_name=None):
        if dir_name==None:
            dir_name = os.path.dirname(file_destination)
        base_name = os.path.basename(file_destination)
        name, extension = os.path.splitext(base_name)
        return os.path.join(*[dir_name, name+".pdf"])

    def _get_metadata(self):
        self.docs_data["file_type"] = self.docs_data\
            .apply(lambda x:magic.from_file(x["file_destination"],mime=True), axis=1)
        self.docs_data["file_size"] = self.docs_data\
            .apply(lambda x:os.path.getsize(x["file_destination"])/10**6, axis=1)
        self.docs_data["converted_file_destination"] = None
        pdf_rows = self.docs_data.file_type=="application/pdf"
        self.docs_data.loc[pdf_rows, "converted_file_destination"] = self.docs_data.loc[pdf_rows, "file_destination"]
        conv_rows_ind = self.docs_data.loc[
            (self.docs_data.converted_file_destination.isnull())
            & (~self.docs_data.file_type.isin(["inode/x-empty", "application/octet-stream"]))
            & (self.docs_data.file_size<90),].index
        self.docs_data["conversion"] = False
        self.docs_data.loc[conv_rows_ind, "conversion"] = True
        return self

    def _convert_row(self, ind):
        file_destination = self.docs_data.loc[ind, "file_destination"]
        converted_file_destination = self._get_conversion_path(file_destination)
        converted_file_dir = os.path.dirname(converted_file_destination)
        # NOTE: make sure this works
        if os.path.exists(converted_file_destination):
            print(f"File on '{converted_file_destination}' already exists. Skipping conversion.")
            self.docs_data.loc[ind, "converted_file_destination"] = converted_file_destination
            return self
        try:
            cmd = f"libreoffice --headless --convert-to pdf {file_destination} --outdir {converted_file_dir}"
            status = " ".join(os.popen(cmd).readlines())
            if "error" in status.lower():
                raise Exception(status)
            self.docs_data.loc[ind, "converted_file_destination"] = self._check_path(converted_file_destination)
            print(f"File on '{converted_file_destination}' converted successfully.")
        except Exception as e:
            print(f"Error converting '{file_destination}': {e}.")
        return self

    def convert_reports(self):
        self = self._get_metadata()
        for ind in self.docs_data.loc[self.docs_data.conversion,].index:
            self = self._convert_row(ind)
        return self

    # NOTE: use ghostscript, parallelize (it is working, but it is slow)

    def _pdf2txt(self, file_path):
        pages = pdf2image.convert_from_path(file_path, dpi=200, thread_count=4)
        txt = ""
        for pageNum,imgBlob in enumerate(pages):
            txt += pytesseract.image_to_string(imgBlob)
        return txt

    def _get_txt_path(self, file_path,
            dir_name="../../data/txt_files/"):
        base_name = os.path.basename(file_path)
        name, extension = os.path.splitext(base_name)
        return os.path.join(*[dir_name, name+".txt"])

    def _txt2file(self, text, file_path):
        dir = os.path.dirname(file_path)
        if not os.path.exists(dir):
            os.makedirs(dir)
        with open(file_path, "w") as text_file:
            text_file.write(text)
        return None

    def read_reports(self, overwrite=False):
        self.docs_data["txt_file_destination"] = None
        for ind, row in self.docs_data.iterrows():
            pdf_path = row["converted_file_destination"]
            txt_path = self._get_txt_path(pdf_path)
            if os.path.exists(txt_path) and not overwrite:
                print(f"File on '{txt_path}' already exists. Skipping reading.")
                self.docs_data.loc[ind, "txt_file_destination"] = txt_path
                continue
            else:
                print(f"Reading '{row['converted_file_destination']}'.")
                try:
                    txt = self._pdf2txt(pdf_path)
                    self._txt2file(txt, txt_path)
                    self.docs_data.loc[ind, "txt_file_destination"] = self._check_path(txt_path)
                except Exception as e:
                    print(f"Error reading '{pdf_path}': {e}.")
        return self        

In [None]:
Ingest = DataIngestion().load_docs().download_reports().convert_reports().read_reports()