## **Testando Com Multi-threads**

In [14]:
import pyodbc
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed

def get_informix_connection():
    driver = 'informix_Producao_BCARGA' 
    user = 'classprod'
    password = 'yY5eQ4vT'
    database = 'central'
    server = 'bd1hom'
    
    connection_string = f'DSN={driver};UID={user};PWD={password};DATABASE={database};SERVER={server}'
    connection = pyodbc.connect(connection_string)
    return connection

def query_informix(date_range):
    query = f"""
    SELECT CAST(nfce.sqnfeconsumo AS VARCHAR(255)) AS sqnfeconsumo, 
           hin.nrdocumento AS cnpj, 
           itemnfce.dsproduto, 
           itemnfce.vlproduto,
           itemnfce.qtcomprod, 
           itemnfce.vlbasecalcicms, 
           itemnfce.sgunidmedcom 
    FROM usudba.tbfis_nfeconsumo nfce
    JOIN usudba.tbfis_itemnfec itemnfce ON nfce.sqnfeconsumo = itemnfce.sqnfeconsumo
    JOIN usudba.tbcad_instituicao inst ON nfce.sqemitente = inst.sqhumanoinst
    JOIN usudba.tbfis_hinfeletron hin ON inst.sqhumanoinst = hin.sqhinfeletron
    WHERE hin.nrdocumento IN ('75315333007464', '75315333008940', '75315333011496', '75315333014673')
      AND nfce.dhemissao BETWEEN '{date_range[0]}' AND '{date_range[1]}'
    """
    
    connection = get_informix_connection()
    
    try:
        result_df = pd.read_sql(query, connection)
        return result_df
    except pyodbc.Error as e:
        print("Error: ", e)
        cursor = connection.cursor()
        cursor.execute(query)
        columns = [column[0] for column in cursor.description]
        print("Columns in the query: ", columns)
        cursor.close()
    finally:
        connection.close()

def fetch_data_parallel(date_ranges):
    results = []
    with ThreadPoolExecutor(max_workers=40) as executor:  
        future_to_date_range = {executor.submit(query_informix, date_range): date_range for date_range in date_ranges}
        for future in as_completed(future_to_date_range):
            try:
                result = future.result()
                if result is not None:
                    results.append(result)
            except Exception as exc:
                print(f"Generated an exception: {exc}")
    
    if results:
        final_df = pd.concat(results, ignore_index=True)
        return final_df
    return pd.DataFrame()

if __name__ == "__main__":
    date_ranges = [
        ('2024-04-10 00:00:00', '2024-04-11 23:59:59'),
        ('2024-04-12 00:00:00', '2024-04-13 23:59:59'),
        ('2024-04-14 00:00:00', '2024-04-15 23:59:59'),
        ('2024-04-16 00:00:00', '2024-04-17 23:59:59'),
        ('2024-04-18 00:00:00', '2024-04-19 01:59:59')
    ]
    
    result_df = fetch_data_parallel(date_ranges)
    if not result_df.empty:
        print(result_df)




       sqnfeconsumo            cnpj                                dsproduto  \
0        2766522142  75315333007464  REFRIG COCA COLA LT 350ML SIX PACK LMPM   
1        2766522142  75315333007464                   KETCHUP TAMBAU BB 830G   
2        2766522142  75315333007464                    V COENTRO MATEUS A UN   
3        2766524029  75315333007464  PAPEL HIG LEBLON SOFT BLC FD L12P11 20M   
4        2766524029  75315333007464              TAPIOCA CAICO PENEIRADA 1KG   
...             ...             ...                                      ...   
258919   2762100334  75315333014673                          Uvoffee - Fatia   
258920   2762100334  75315333014673                          Uvoffee - Fatia   
258921   2762100334  75315333014673                         Monoffee - Fatia   
258922   2762100334  75315333014673                         Monoffee - Fatia   
258923   2762100334  75315333014673                         Monoffee - Fatia   

        vlproduto  qtcomprod  vlbasecal

## **Atualizacao Consulta Sefaz**


In [None]:
import pyodbc
import pandas as pd
import re
from concurrent.futures import ThreadPoolExecutor, as_completed

def get_informix_connection():
    driver = 'informix_Producao_BCARGA' 
    user = 'classprod'
    password = 'yY5eQ4vT'
    database = 'central'
    server = 'bd1hom'
    
    connection_string = f'DSN={driver};UID={user};PWD={password};DATABASE={database};SERVER={server}'
    connection = pyodbc.connect(connection_string)
    return connection

def query_informix(date_range, cnpj):
    query = f"""
    SELECT CAST(nfce.sqnfeconsumo AS VARCHAR(255)) AS sqnfeconsumo, 
           doc.nrdocumento AS cnpj, 
           itemnfce.dsproduto, 
           itemnfce.vlproduto,
           itemnfce.qtcomprod, 
           itemnfce.vlbasecalcicms, 
           itemnfce.sgunidmedcom 
    FROM tbfis_nfeconsumo nfce
    JOIN tbfis_itemnfec itemnfce ON nfce.sqnfeconsumo = itemnfce.sqnfeconsumo
    JOIN tbcad_instituicao inst ON nfce.sqemitente = inst.sqhumanoinst
    JOIN tbcad_dochuminst dochi ON inst.sqhumanoinst = dochi.sqhumanoinst
    JOIN tbcad_documento doc ON dochi.sqdocumento = doc.sqdocumento
    WHERE dochi.stregistro = 1
      AND doc.sqtpdochuminst = 2  -- cnpj
      AND doc.nrdocumento = '{cnpj}'
      AND nfce.dhemissao BETWEEN '{date_range[0]}' AND '{date_range[1]}'
    """
    
    connection = get_informix_connection()
    
    try:
        result_df = pd.read_sql(query, connection)
        return result_df
    except pyodbc.Error as e:
        print("Error: ", e)
        cursor = connection.cursor()
        cursor.execute(query)
        columns = [column[0] for column in cursor.description]
        print("Columns in the query: ", columns)
        cursor.close()
    finally:
        connection.close()

def fetch_data_parallel(date_ranges, cnpjs):
    results = []
    with ThreadPoolExecutor(max_workers=50) as executor:  
        future_to_cnpj = {executor.submit(query_informix, date_ranges[0], cnpj): cnpj for cnpj in cnpjs}
        for future in as_completed(future_to_cnpj):
            try:
                result = future.result()
                if result is not None:
                    results.append(result)
            except Exception as exc:
                print(f"Generated an exception: {exc}")
    
    if results:
        final_df = pd.concat(results, ignore_index=True)
        return final_df
    return pd.DataFrame()

if __name__ == "__main__":
    date_ranges = [
        ('2024-01-10 00:00:00', '2024-01-10 23:59:59')
    ]
    df_contribuintes = pd.read_csv(r'C:\Users\jamil\OneDrive\Área de Trabalho\Infomix-Sefaz\carga\estabelecimentos_nao_contemplados_307.csv')
    cnpj_contribuintes = df_contribuintes['CNPJ'].astype(str).tolist()
    cnpj_contribuintes = [re.sub(r'\D', '', cnpj).zfill(14) for cnpj in cnpj_contribuintes]
    
    result_df = fetch_data_parallel(date_ranges, cnpj_contribuintes)
    
    if not result_df.empty:
        print(result_df)
        result_df.to_parquet('result_65.parquet', engine='pyarrow', index=False)