In [None]:
from pathlib import Path
from requests.compat import urljoin, urlencode
from rich.progress import track
from typing import List, Dict
from uau_api import UauAPI
from uau_api.settings import Settings
from uau_api.utils import save_base64_to_file, write_jsonl
import aiofiles
import aiohttp
import asyncio
import dateutil
import json
import pandas as pd
import random
import re
import requests
# Initialize the client
uau = UauAPI(Settings().API_URL, Settings().API_KEY)
wd  = Path(Settings().WORKDIR)
uau.authenticate('leonardo', 'hybr01')
url = urljoin(Settings().API_URL, 'Obras/ObterObrasAtivas')
async def fetch_content():
    async with aiohttp.ClientSession() as session:
        async with session.request(method='POST', url=url, headers=uau.session.headers) as response:
            if response.status == 200:
                return await response.read()
            else:
                return f"Error: {response.status}"
content = await fetch_content()
print(content)


In [None]:

url = urljoin(Settings().API_URL, 'Obras/ObterObrasAtivas')
requests.post(url, headers=uau.session.headers)

In [10]:
def get_file(empresa: int, obra: str, processo: str) -> List[Dict]:
    return uau.Anexo.retornar_arquivos_em_lista_bytes(
        empresa, f"PROCESSO {processo}-{obra}"
    )


def load_jsonl(jsonl: str | Path):
    file_content = []
    with open(jsonl) as fc:
        json_file = [json.loads(line) for line in fc.readlines()]
        file_content.extend(json_file)
    return file_content


def remove_duplicate_dicts(list_of_dicts):
    return [dict(t) for t in {tuple(d.items()) for d in list_of_dicts}]


def url_encode(file: str) -> str:
    base_url = "https://hybrazil.sharepoint.com/sites/DataScience/Documentos%20Compartilhados/Forms/AllItems.aspx"
    root_dir = "/sites/DataScience/Documentos Compartilhados/NotasUAU/"
    params = dict(
        id=f"{root_dir}{file}",
        viewid="5b892423-10db-4b4f-a8f9-f0bdeafd317e",
        parent=root_dir,
    )
    encoded = urlencode(params)
    return f"{base_url}?{encoded}"

In [11]:
df = pd.DataFrame(load_jsonl(wd / 'NotasUAU/downloaded.jsonl'))

In [22]:
cpf_cnpj = df['CNPJCPFFornecedor'].unique()

In [40]:
def check_string_in_dict_values(data: dict, search_string: str) -> bool:
    """
    Checks if any value in a dictionary contains a given partial string.

    Args:
        data (dict): The dictionary to search within.
        search_string (str): The partial string to search for.

    Returns:
        bool: True if the partial string is found in any value, False otherwise.
    """
    for value in data.values():
        if search_string in value:
            return True
    return False

In [None]:
from IPython.display import display, HTML
htm = df[['Empresa', 'Obra', 'Processos', 'url']].explode('Processos').map(lambda x: x.get('Numero') if isinstance(x, dict) else x)
htm['item_index'] = htm.groupby(['Empresa', 'Obra', 'Processos']).cumcount()
htm['item_url'] = htm.apply(lambda x: f'<a href="{x['url']}">Arquivo-{x['item_index'] + 1}</a>', axis=1)
display(HTML(
    htm.query('Empresa==322').drop(columns='url').to_html(escape=False)
))


In [None]:
get_file(273, '420A', 14)

In [None]:

nfe_entrada_jsonl = []
for file in (wd / 'NotasUAU/json_files').glob('*.jsonl'):
    content = load_jsonl(file)
    nfe_entrada_jsonl.extend(content)
    
nfe_entrada = pd.json_normalize(nfe_entrada_jsonl)
nfe_entrada = nfe_entrada.filter(regex='^(?!NotaFiscal).')


In [None]:
collected = load_jsonl(wd / 'NotasUAU/collected.jsonl')
collected_df = pd.DataFrame(collected)

In [None]:
nfe_entrada["to_download"] = nfe_entrada["Processos"].apply(
    lambda x: remove_duplicate_dicts(
        [
            dict(Empresa=p["Empresa"], Obra=p["Obra"], Processo=p["Numero"])
            for p in x
            if isinstance(x, list)
        ]
    )
)

In [None]:
downloaded = load_jsonl(wd / 'NotasUAU/downloaded.jsonl')
downloaded = pd.DataFrame(downloaded)

In [None]:
if not downloaded.empty:
    cols = ['Empresa', 'Obra', 'Numero', 'CodigoFornecedor', 'NumeroNotaFiscal', ]
    indexes_df1 = nfe_entrada.set_index(cols).index
    indexes_df2 = downloaded.set_index(cols).index

    indexes_df1.isin(indexes_df2)
    nfe_entrada = nfe_entrada[~indexes_df1.isin(indexes_df2)]

In [None]:
columns = nfe_entrada.filter(regex=re.compile('data', re.IGNORECASE)).columns
nfe_entrada[columns] = nfe_entrada[columns].map(dateutil.parser.parse)
idx = nfe_entrada['ChaveNotaFiscalEletronica'].replace('', pd.NA).isna()

In [None]:
import base64

async def async_download_file(session: aiohttp.ClientSession, method: str, url: str, *args, **kwargs) -> bytes:
    try:
        async with session.request(method=method, url=url, *args, **kwargs) as response:
            if response.status == 200:
                return await response.read()
            return None
    except Exception:
        return None

async def async_save_to_file(file_content: bytes, file_type: str, destination: Path|str, mode: str) -> None:
    
    if mode not in ('a', 'w', 'wb'):
        return
    
    if isinstance(destination, str):
        destination = Path(destination)
        
    if content is None:
        return

    destination.parent.mkdir(parents=True, exist_ok=True)
    if file_type == 'image':
        try:
            file_bytes = base64.b64decode(file_content)
            async with aiofiles.open(destination, mode=mode) as file:
                await file.write(file_bytes)
        except Exception as e:
            e
    if file_type == 'json':
        async with aiofiles.open(destination, mode=mode) as file:
                await file.write(file_bytes)


async def process(session: aiohttp.ClientSession, url: str, destination: Path | str) -> None:
    """ """
    if isinstance(destination, str):
        destination = Path(destination)

    content = await async_download_file(session, url)
    await async_save_to_file(content, destination)
    await asyncio.sleep(random.uniform(0.1, 0.3))

    
async def main(data: list[dict], column: str, destination: str):
    destination = Path(destination)
    async with aiohttp.ClientSession() as session:
        tasks = []
        for row in data:
            link = urljoin(url, row[column])
            task = asyncio.create_task(process(session, link, destination))
            tasks.append(task)
            
            # Process in chunks to avoid overwhelming the server
            if len(tasks) >= 10:
                await asyncio.gather(*tasks)
                tasks = []

        if tasks:
            await asyncio.gather(*tasks)

In [None]:
for _, row in nfe_entrada[~idx].iloc[-1:].explode("to_download").dropna(subset='to_download').iterrows():
    empresa = row["Empresa"]
    obra = row["Obra"]
    processo = row["to_download"].get("Processo")
    data_formatada = row["DataEmissao"].strftime("%Y-%m-%d")
    cpf_cnpj_ornecedor = row["CNPJCPFFornecedor"]
    numero_nota_fiscal = row["NumeroNotaFiscal"]
    dir_name = f"{row['DataEmissao'].year}/{row['DataEmissao'].month:02d}/{row['DataEmissao'].day:02d}/{empresa}-{obra}"
    destination = wd / f"NotasUAU/{dir_name}"
    response = get_file(*row["to_download"].values())
    if isinstance(response, list):
        print(row["to_download"].values())
        for item in response:
            conteudo = item["ConteudoArquivo"]
            extensao_arquivo = Path(item["NomeArquivo"]).suffix.lower()
            file_name = (
                f"{empresa}-"
                f"{obra}-"
                f"{processo}-"
                f"{data_formatada}-"
                f"{cpf_cnpj_ornecedor}-"
                f"{numero_nota_fiscal}-"
                f"{extensao_arquivo}"
            )
            file_location = destination / f"{file_name}"
            save_base64_to_file(
                base64_string=item["ConteudoArquivo"],
                output_filename=file_location
            )
            url = url_encode(str(file_location).split('NotasUAU/')[-1])
            row[columns] = row[columns].map(lambda x: x.strftime('%Y-%m-%d'))
            line = row.to_dict() | dict(url=url)
            
            write_jsonl(line, wd / 'NotasUAU/downloaded.jsonl', mode='a')
            

In [None]:
response