In [1]:
import time
import pandas as pd
import cx_Oracle

def set_comment(txt):
    print(time.strftime("%H:%M:%S")+': '+txt)

set_comment('Начало...')

try:
    conn_balu_rez.ping()
    set_comment('Соединение с *** проверено...')
except:
    dsn_tns = cx_Oracle.makedsn(host='***', port=***, service_name='***')
    conn_balu_rez = cx_Oracle.connect(user='***', password='***', dsn=dsn_tns, encoding='UTF-8')
    set_comment('Соединение с *** установлено...')

sql_file = open('client_list.sql','r')
sql_client_list = sql_file.read()
sql_file.close()
set_comment('Прочитан SQL из файла списка клиентов (client_list.sql)...')

#Выгрузка списка клиентов; percentage - целое число, процент от полной выборки
def get_client_list(percentage = 100):
    if conn_balu_rez is None:
        set_comment('Нет соединения с БД')
        return None
    else:
        time_start = time.time()

        set_comment('Начало чтения данных')  
        #входной параметр - приблизительная доля клиентов для обработки
        if percentage < 100:
            sql_to_execute = sql_client_list+'\n and mod(cl.id,100) < '+str(percentage)
        else:
            sql_to_execute = sql_client_list

        df_client_list = pd.read_sql(sql=sql_to_execute, con=conn_balu_rez)  

        set_comment('Время выполнения, сек - '+str(round(time.time() - time_start,3)))
        set_comment('Для анализа отобрано клиентов, шт - '+str(df_client_list.CL_ID.count()))
        return df_client_list
    
sql_file = open('client_activity.sql','r')
sql_client_activity = sql_file.read()
sql_file.close()
set_comment('Прочитан SQL из файла активности клиента (client_activity.sql)...')

#Получение списка счетов клиента и активности по ним по его ИД
def get_client_activity(cl_id):
    if conn_balu_rez is None:
        set_comment('Нет соединения с БД')
        return None
    else:
        v_ret = pd.read_sql(sql=sql_client_activity, con=conn_balu_rez, params = {'cl_id':cl_id})
        return v_ret

def dich_ne_rabotaet_v_jupyter(threads_number, cl_id_list):
    from multiprocessing import Pool
    workers = Pool(threads_number)
    results = workers.map(get_client_activity, cl_id_list)

    workers.close()
    workers.join()
    return results

07:48:37: Начало...
07:48:38: Соединение с *** установлено...
07:48:38: Прочитан SQL из файла списка клиентов (client_list.sql)...
07:48:38: Прочитан SQL из файла активности клиента (client_activity.sql)...


In [2]:
all_clients = get_client_list()

07:48:44: Начало чтения данных
07:51:46: Время выполнения, сек - 181.272
07:51:46: Для анализа отобрано клиентов, шт - 65926


In [3]:
import workers as w
from multiprocessing import Process, Queue
from ipywidgets import IntProgress
from IPython.display import display

#Не забыть выпилить перезагрузку модуля:
#from importlib import reload
#reload(w)

def proc_clients_in_thread(clients, thread_number):
    #Инициация переменных:
    time_start = time.time()
    
    #Очередь сообщений от потоков:
    que_mess = Queue()

    #Очередь необработанных ИД клиентов
    que_for_proc  = Queue()

    #Очередь результатов обработки 
    que_done = Queue()

    #Формирование очереди к обработке
    for i in clients:
        que_for_proc.put(i)
        
    #Инициация прогрессбаров
    que_for_proc_load = IntProgress(min=0, max=que_for_proc.qsize(), value=que_for_proc.qsize())
    que_for_proc_load.bar_style = 'info'
    que_for_proc_load.description = 'FP '+str(que_for_proc.qsize())
    display(que_for_proc_load)
    
    que_mess_load = IntProgress(min=0, max=0, value=0)
    que_mess_load.bar_style = 'info'
    que_mess_load.description = 'M 0'
    display(que_mess_load)

    que_done_load = IntProgress(min=0, max=0, value=0)
    que_done_load.bar_style = 'info'
    que_done_load.description = 'D 0'
    display(que_done_load)    
    
    set_comment('Принято к обработке клиентов, шт - '+str(clients.count()))
    
    is_alive = True
    result = None
    procs = []
    df_list = []
    
    #Пока, есть живые процессы или невычитанные очереди
    while is_alive or not que_mess.empty() or not que_done.empty():
        if len(procs) < thread_number:    
            proc = Process(target=w.do_request, args=(que_mess,que_for_proc,que_done,sql_client_activity))
            procs.append(proc)
            proc.start()
            
            #Конфетка с привкусом миндаля
            que_for_proc.put('PLEASE_DIE')
   
        #Тыкаем потоки палочкой, если не огрызаются - мертвы
        is_alive = False
        for proc in procs:
            if proc.is_alive():
                is_alive = True
    
        #Проверка очереди сообщений   
        if not que_mess.empty():
            print(que_mess.get())

        #Проверка очереди результатов
        if not que_done.empty():
            df_list.append(que_done.get())

        que_for_proc_load.value = que_for_proc.qsize()
        que_for_proc_load.description = 'FP '+str(que_for_proc.qsize())
        
        if que_mess.qsize() > que_mess_load.max:
            que_mess_load.max = que_mess.qsize() 
        que_mess_load.value = que_mess.qsize()
        que_mess_load.description = 'M '+str(que_mess.qsize())
        
        if que_done.qsize() > que_done_load.max:
            que_done_load.max = que_done.qsize() 
        que_done_load.value = que_done.qsize()
        que_done_load.description = 'D '+str(que_done.qsize())

    que_mess.close()
    que_for_proc.close()
    que_done.close()
    
    set_comment('Сборка результата')
    result = pd.concat(df_list,ignore_index = True)
    
    set_comment('Время обработки, сек - '+str(round(time.time()-time_start,3)))
    
    return result
set_comment('Подготовка к обработке произведена')

09:46:25: Подготовка к обработке произведена


In [4]:
res = proc_clients_in_thread(all_clients.CL_ID,15)

IntProgress(value=65926, bar_style='info', description='FP 65926', max=65926)

IntProgress(value=0, bar_style='info', description='M 0', max=0)

IntProgress(value=0, bar_style='info', description='D 0', max=0)

09:46:33: Принято к обработке клиентов, шт - 65926
3996: 09:46:46: Подключился к ***...
1912: 09:46:53: Подключился к ***...
12224: 09:47:01: Подключился к ***...
13212: 09:47:09: Подключился к ***...
11004: 09:47:16: Подключился к ***...
11528: 09:47:25: Подключился к ***...
6616: 09:47:33: Подключился к ***...
13268: 09:47:41: Подключился к ***...
12228: 09:47:50: Подключился к ***...
4332: 09:47:57: Подключился к ***...
2740: 09:48:05: Подключился к ***...
12796: 09:48:13: Подключился к ***...
12600: 09:48:20: Подключился к ***...
10620: 09:48:28: Подключился к ***...
10268: 09:48:36: Подключился к ***...
10620: 10:26:05: Соединение с *** закрыто...
10268: 10:26:05: Соединение с *** закрыто...
12224: 10:26:05: Соединение с *** закрыто...
1912: 10:26:05: Соединение с *** закрыто...
12600: 10:26:05: Соединение с *** закрыто...
11528: 10:26:05: Соединение с *** закрыто...
12228: 10:26:05: Соединение с *** закрыто...
13212: 10:26:05: Соединение с *** закрыто...
12796: 10:26:08: Соединен

In [5]:
res.CL_ID.memory_usage()

828344

In [18]:
3700/60

61.666666666666664

In [None]:
for i in res.ARRAY_NAZN_SUM_DT:
    if i != '[]':
        print(i)

In [None]:
#Для работы с выгружаемыми JSON
j = json.loads(data.OKVEDS_50[0])
#j - список словарей Python
for i in j:
    print(i['okv'])

#Экспериментики
n = 0
cl_activity = None
for v_id in all_clients.CL_ID:
    if cl_activity is None:
        cl_activity = get_client_activity(v_id)
    else:
        cl_activity = cl_activity.append(get_client_activity(v_id))
set_comment('Все данные получены')