#  Импорт и Клик

In [1]:
import os
import random
import re
from collections import defaultdict
import json
import csv

import getpass
import clickhouse_driver
from clickhouse_driver import Client

import pytz
MSK = pytz.timezone('Europe/Moscow')
from datetime import datetime, timedelta

import pandas as pd
from tqdm import tqdm
import numpy as np


# -----------------------------------------------
random_seed = 1234
random.seed(random_seed)
os.environ['PYTHONHASHSEED'] = str(random_seed)
np.random.seed(random_seed)

In [2]:
tmp_pass = getpass.getpass(prompt=f'"analytic" password: ')
client = Client(
    host='##.#.##.#', 
    port=9440,
    user='analytic',
    password=tmp_pass,
    secure=True,
    verify = False,
    ca_certs='../../clickhouse-prom-certs/ca.pem', 
)
print(client.execute('select 1'))

"analytic" password:  ········


/etc/timezone is deprecated on Debian, and no longer reliable. Ignoring.


[(1,)]


# Параметры

In [29]:
result_filename = 'data/result.parquet'
fraud_filename = 'data/fraud.json'
columns_filename = 'data/columns.json'


prom = 'prom2'
org_id = '####' # ####

date_from_notfraud = '2025-01-09 00:00:00'
date_to_notfraud = '2025-03-01 00:00:00'
#2025-01-01 - 2025-03-08     ####
#2025-01-09 - 2025-03-23  #####

partition = 100 # по сколько фродовых сессий упоминать в запросе (чтобы не достигать лимита символов)
possibility_to_gather = 0.001 # какую долю от количества сессий в запросе брать в сет
max_file_size = 1000000000 # максимальный размер одного файла в байтах
max_number_rows = 50000 # максимальное количество сессий записаных во временный список-накопитель
str_columns = ['users', 'mouse_by_page', 'scripts_data', 'invalid_pids', 'fingerprint_data_2', 'mouse_movement_data', 
               'fingerprint_fields', 'ip_data', 'keyboard_data', 'tcp_params', 'iframes_data', 'invalid_canonical_ids', 
               'forms', 'global_ids_2', 'ip_tags', 'pid_tags', 'url_tags', 'script_tags', 'dynamic_tag_counters']

# при препроцессинге в папке будут доп файлы с припиской итерации генерации
dataset_notfraud_file_name = 'data/notfraud_rawdata_fromclick' 
dataset_fraud_file_name = 'data/fraud_rawdata_fromclick'

# Загрузка фрода

In [9]:
with open(fraud_filename, 'r', encoding='utf-8') as file:
    fraud_data = json.load(file)

# Функции

## Параметризация запроса

In [10]:
with open(columns_filename, 'r', encoding='utf-8') as file:
    columns = json.load(file)

def gen_request(q_where_part: str, prom):
    """
    return q_result, q_template, q_from_part, q_where_part
    """
    
    q_template = f"""
    SELECT
        {','.join(columns)}
    """
    
    q_from_part = f' FROM {prom}.session '   
    q_result = q_template + q_from_part + q_where_part
    
    return q_result, q_template, q_from_part, q_where_part

columns = [x[0] for x in client.execute(gen_request(' LIMIT 0', prom)[0], with_column_types=True)[1]]

## Выгрузка

In [21]:
def file_worker(sup, columns, str_columns, temp_file_name, oper, output_files):
    sup_df = pd.DataFrame(data=sup, columns=columns)
                
    for str_col in str_columns:
        if str_col in sup_df.columns:
            sup_df[str_col] = sup_df[str_col].astype(str)

    current_filename = temp_file_name+'_'+str(oper)+'.parquet'
    new_filename = temp_file_name+'_'+str(oper+1)+'.parquet'

    if os.path.exists(current_filename):
        if os.path.getsize(current_filename) >= max_file_size:
            sup_df.to_parquet(new_filename, engine='pyarrow')
            output_files.append(new_filename)
            rec = f'\nОтсечка. Создание нового файла: {new_filename} | {datetime.now().strftime("%d-%m-%Y %H:%M:%S.%f")}'
            print(rec)
            write_to_file(rec)
        else:
            temp_data = pd.read_parquet(current_filename)
            temp_data = pd.concat([temp_data, sup_df], ignore_index=True)
            temp_data.to_parquet(current_filename)
            rec = f'Отправка в {current_filename}. Итерация № {oper} | {datetime.now().strftime("%d-%m-%Y %H:%M:%S.%f")}'
            print(rec)
            write_to_file(rec)
            
            output_files.append(current_filename)
            del temp_data
    else:
        sup_df.to_parquet(current_filename)
        output_files.append(current_filename)
        rec = f'Отправка в {current_filename}. Итерация № {oper} | {datetime.now().strftime("%d-%m-%Y %H:%M:%S.%f")}'
        print(rec)
        write_to_file(rec)
    del sup, sup_df


def click_extractor(gen_q, 
                    temp_file_name,
                    possibility_to_gather = 0.05,
                    max_file_size = 1000000000, 
                    max_number_rows = 5000,
                    needed_users = None):
    """
    gen_q = gen_request
    possibility_to_gather - вероятность с которой соберется запись из клика - это рандомизация для больших запросов
    temp_file_name - текстовое название файла в который будут скидываться постепенно данные.
        От этого файла также будут производные с добавлением номера итерации.
    max_file_size - размер файла, после которого будет создаваться новый файл (в байтах)
    max_number_rows - количество записей, записанных в датафрейм, после которого будет отправка в файл и создание нового фрейма
    """
    
    oper = 1
    
    sup = []
    output_files = []
    
    rec = f'(click_extractor) Начало извлечения данных из ClickHouse | {datetime.now().strftime("%d-%m-%Y %H:%M:%S.%f")}'
    print(rec)
    write_to_file(rec)
    
    request, request_template, _, where = gen_q
    
    if not needed_users:
        needed_users = [x[0] for x in client.execute(f"SELECT distinct(users.user_id) FROM {prom}.session ARRAY JOIN users.user_id {where}")]

    rec = f'(click_extractor) Количество найденных пользователей: {len(needed_users)}'
    print(rec)
    write_to_file(rec)
    
    users = random.sample(needed_users, int(len(needed_users) * possibility_to_gather)) 
    rec = f'(click_extractor) Количество отобранных пользователей: {len(users)}'
    print(rec)
    write_to_file(rec)
    
    if len(users) > partition:
        for i in tqdm(range(0, len(users), partition)):
            
            init_where = 'WHERE ' if where == '' else ' AND '
            add_q = f""" {init_where} length(arrayIntersect(users.user_id,{"['"+"','".join(users[i:i+partition])+"']"}))>0"""
            
            sup.extend(client.execute(request+add_q))
            if len(sup) >= max_number_rows:
                file_worker(sup, columns, str_columns, temp_file_name, oper, output_files)
                sup = []
                oper+=1

            rec = f'(click_extractor) Завершено {min(i + partition, len(users))} из {len(users)}'
            print(rec)
            write_to_file(rec)
            
        if len(sup)>0:      # остаточные данные  
            file_worker(sup, columns, str_columns, temp_file_name, oper, output_files)
            
    else:
        init_where = 'WHERE ' if where == '' else ' AND '
        add_q = f"""{init_where} length(arrayIntersect(users.user_id,{"['"+"','".join(users)+"']"}))>0"""

        df = client.query_dataframe(request+add_q)
        for i in str_columns:
            if i in df.columns:
                df[i]=df[i].astype(str)
            
        filename = temp_file_name+'_'+str(oper)+'.parquet'
        df.to_parquet(filename)
        output_files.append(filename)
        
        del df
    
    rec = f'(click_extractor) Завершение: {datetime.now().strftime("%d-%m-%Y %H:%M:%S.%f")}'
    print(rec)
    write_to_file(rec)
    
    rec = f'(click_extractor) Файлы: {output_files}'
    print(rec)
    write_to_file(rec)
        
    return set(output_files)


def get_raw_dataset(gen_request, 
                    download_notfraud,
                    date_from_notfraud,
                    date_to_notfraud,
                    download_fraud,
                    possibility_to_gather,
                    fraud_users):
    
    """
    gen_request - макет запроса в ClickHouse в виде инстанции функции
    download_notfraud - bool - загружать нефрод или нет
    date_from_notfraud - дата начала загрузки нефрода (форма: '%Y-%m-%d %H:%M:%S')
    date_to_notfraud - дата начала загрузки нефрода (форма: '%Y-%m-%d %H:%M:%S')
    download_fraud - bool - загружать фрод или нет
    """

    global prom, fraud_filename 

    date_from_notfraud_loc = int((MSK.localize(datetime.strptime(date_from_notfraud,'%Y-%m-%d %H:%M:%S'))).timestamp())
    date_to_notfraud_loc = int((MSK.localize(datetime.strptime(date_to_notfraud,'%Y-%m-%d %H:%M:%S'))).timestamp()) 
    
    if not download_notfraud and not download_fraud:
        rec = 'Указаны download_notfraud и download_fraud как False - ошибка.'
        print(rec)
        write_to_file(rec)
        return None
    
    oper = 1
    files = []
    
    rec = f'(get_dataset) Время начала работы функции | {datetime.now().strftime("%Y.%m.%d.%H.%M")}'
    print(rec)
    write_to_file(rec)
    
    if download_fraud and fraud_users:
        rec = f'(get_dataset) Загрузка фродовых сессий | {datetime.now().strftime("%Y.%m.%d.%H.%M")}'
        print(rec)
        write_to_file(rec)
        
        fraud_files = click_extractor(gen_request(f"WHERE org_id = '{org_id}'", prom=prom), 
                                      dataset_fraud_file_name,
                                      possibility_to_gather = 1,
                                      max_file_size = max_file_size, 
                                      max_number_rows = max_number_rows,
                                      needed_users = fraud_users)
        files.extend(fraud_files)
                    
    if download_notfraud:
        rec = f'(get_dataset) Загрузка нефродовых сессий | {datetime.now().strftime("%Y.%m.%d.%H.%M")}'
        print(rec)
        write_to_file(rec)
        q_where_part = f"""WHERE 
                            begin_at >= {date_from_notfraud_loc} AND 
                            begin_at < {date_to_notfraud_loc} AND 
                            org_id = '{org_id}'"""

        
        notfraud_files = click_extractor(gen_request(q_where_part, prom=prom), 
                                         dataset_notfraud_file_name,
                                         possibility_to_gather = possibility_to_gather,
                                         max_file_size = max_file_size, 
                                         max_number_rows = max_number_rows)
        files.extend(notfraud_files)
        
    res_df = pd.DataFrame()
    
    for i in files:
        temp_data = pd.read_parquet(i)
        res_df = pd.concat([res_df, temp_data], ignore_index=True)
    
    return res_df, files


def write_to_file(record, name: str = 'log.csv'):
    with open(name, 'a', newline='', encoding='utf-8') as f:
        writer = csv.writer(f, delimiter=';', dialect='excel')
        writer.writerow([datetime.now()] + [record])

# Исполнение выгрузки

In [25]:
df_from_click, files = get_raw_dataset(gen_request, 
                                       download_notfraud=False,
                                       date_from_notfraud = date_from_notfraud,
                                       date_to_notfraud = date_to_notfraud,
                                       download_fraud = True,
                                       possibility_to_gather = possibility_to_gather,
                                       fraud_users = fraud_users)

(get_dataset) Время начала работы функции | 2025.03.27.14.30
(get_dataset) Загрузка фродовых сессий | 2025.03.27.14.30
(click_extractor) Начало извлечения данных из ClickHouse | 27-03-2025 14:30:59.625823
(click_extractor) Количество найденных пользователей: 1
(click_extractor) Количество отобранных пользователей: 1


KeyboardInterrupt: 