Required Packages

In [1]:
# numpy==1.22.3
# pandas==1.4.2
# python-dateutil==2.8.2
# pytz==2022.1
# pywin32==303
# pyxlsb==1.0.9
# six==1.16.0
# xlwings==0.27.5

In [2]:
#import required modules
import os
import re
import asyncio
import functools
import pandas as pd

from cluster.cluster import Cluster
from concurrent.futures import ThreadPoolExecutor
from dataframes.dataframe_optimized import DataFrameOptimized
from utils import constants as const, index as utils

Functions

In [4]:
async def get_bases(sources: dict[str, str], files: list[str], cached_data: bool = False) -> 'tuple(list[str], list[DataFrameOptimized])':
    """Get DataFrames of sources

    Args:
        sources (dict[str, str]): dict of sources 

    Returns:
        list[DataFrameOptimized]: [description]
    """
    if cached_data is True:
        #only for test with csv value - delete
        bases = {}
        for file in os.listdir(os.path.join(const.ROOT_DIR, f"files/temp")):
            source = {}
            for key, s in sources.items():
                if key in file:
                    source = s
            
            converters = Cluster.process_converters(source["converters"], source["converters"].keys()) if "converters" in source.keys(
            ) else None
            bases[f"{file.split('.')[0]}"] = DataFrameOptimized.get_table_csv(os.path.join(const.ROOT_DIR, f"files/temp/{file}"), 
                    encoding="utf-8",
                    converters=converters)

        bases["base_consulta_directa"] = [bases.pop('base_consulta_directa_0')]
        bases["base_consulta_indirecta"] = [bases.pop('base_consulta_indirecta_0'), bases.pop('base_consulta_indirecta_1')]

        return bases
    else:
        loop = asyncio.get_event_loop()
        
        with ThreadPoolExecutor() as executor:

            futures = []
            keys = []

            for key, source in sources.items():
                path = files[key].split("|") #list[base, ...] - key is a name of base
                if len(path) == 1:
                    path = path[0]
                keys.append(key)
                futures.append(loop.run_in_executor(executor, functools.partial(Cluster.preprocess_base, **{"path": path, "properties": source})))

            results = await asyncio.gather(*futures)

        for key, base in zip(keys, results): 
            if isinstance(base, (list, tuple)):
                if "_directa" in key:
                    print("stop")
                for idx in range(len(base)):
                    base[idx].table.to_csv(f"{os.path.join(const.ROOT_DIR, 'files/temp')}/{key}_{idx}.csv", encoding="utf-8", index = None)
            else:
                base.table.to_csv(f"{os.path.join(const.ROOT_DIR, 'files/temp')}/{key}.csv", encoding="utf-8", index = None)

        return dict(zip(keys, results))

def get_predeterminated_files(_path: str):
    
        found = {
            "base_socios": "",
            "base_coordenadas": "",
            "base_universo_directa": "",
            "base_universo_indirecta": "",
            "base_consulta_directa": "",
            "base_consulta_indirecta": ""
        }
        for (dirpath, dirnames, filenames) in os.walk(_path):
            for file in filenames:
                if re.search("socio", file, re.IGNORECASE):
                    found["base_socios"] = os.path.join(_path, file)
                elif re.search("coord", file, re.IGNORECASE):
                    found["base_coordenadas"] = os.path.join(_path, file)
                elif re.search(r"universo\s+direc", file, re.IGNORECASE):
                    found["base_universo_directa"] = os.path.join(_path, file)
                elif re.search(r"universo\s+indirec", file, re.IGNORECASE):
                    found["base_universo_indirecta"] = os.path.join(_path, file)

            for dir in dirnames:
                files = []
                root_dir = os.path.join(_path, dir)
                for _file in os.listdir(root_dir):
                    files.append(os.path.normpath(os.path.join(root_dir, _file)))
                if re.search("indirecta", dir, re.IGNORECASE):
                    found["base_consulta_indirecta"] = "|".join(files)
                elif re.search("directa", dir, re.IGNORECASE):
                    found["base_consulta_directa"] = "|".join(files)
        return found

In [5]:
#load configfile
config = utils.get_config(os.path.join(const.ROOT_DIR, "config.yml"))

principal process

In [9]:
# only for runing async process in notebook 
# !pip install nest_asyncio
import nest_asyncio
nest_asyncio.apply()



You should consider upgrading via the 'C:\Users\Dell\.virtualenvs\ClusteringKg-7VG3ZJPS\Scripts\python.exe -m pip install --upgrade pip' command.


In [10]:
#load config, paths and structure of files
config = utils.get_config(os.path.join(const.ROOT_DIR, "config.yml"))
files_found = get_predeterminated_files(os.path.join(const.ROOT_DIR, "files/Bases"))
sources = config["sources"]

#actual event loop
loop = asyncio.get_event_loop()

#get bases of data
bases = loop.run_until_complete(get_bases(sources, files_found, cached_data=False))  #use cached_data=True, after running the first time, create folder

#execute process to merge
final_base = Cluster()
bases = loop.run_until_complete(final_base.merge_all(bases, config["order_base"]))

#save result
final_base.table.to_csv("base_final.csv", index=False, encoding="utf-8", float_format='%.10f')

  self._parser.feed(data)
  0%|          | 0/4 [00:00<?, ?it/s]

procesando la base de socios...


 25%|██▌       | 1/4 [00:00<00:01,  2.92it/s]

procesando la base de coordenadas...


 50%|█████     | 2/4 [00:04<00:05,  2.60s/it]

procesando la base de universos...


 75%|███████▌  | 3/4 [00:07<00:02,  2.82s/it]

procesando las bases de consulta...


  uniques = Index(uniques)
100%|██████████| 4/4 [00:58<00:00, 14.55s/it]
