In [None]:
import sys
import mle_tools
import pyspark
import pyarrow
import pandas as pd
import subprocess
import re
import ast
import os
import uuid
import json
import pyspark.sql.functions as F
from mle_tools.data_minings import connections, formater
from pyspark.sql import Window as W
from joblib import delayed, Parallel
from pandas import Timestamp
from tqdm import tqdm
from datetime import datetime
from sklearn.model_selection import train_test_split

# Подготовить спарк

In [None]:
spark = connections.get_spark({
    'spark.app.name': "data_collecting",
    'spark.executor.memory': '32g',
    'spark.driver.memory': '32g',
    'spark.dynamicAllocation.maxExecutors': '32',
    'spark.sql.execution.arrow.pyspark.enabled': "true",
})

fs = connections.prepare_env_for_pyarrow()

# Разметка #1

In [None]:
with open(os.getenv('SCENARIO_PATH'), "r", encoding="utf-8") as filepath:
    table = json.load(filepath)
    
scenario = pd.DataFrame(table['info'])
scenario_sdf = spark.createDataFrame(scenario)

# Добавить инфо к разметке #1 + разметка #2

In [None]:
logs = (
    spark.table(os.getenv('LOGS'))
    
    .filter(F.col('date_part') >= '20250201')
    
    .withColumn('rk', F.row_number().over(W.partitionBy('id').orderBy(F.desc('time'))))
    .filter(F.col('rk') == 1)
    .drop('rk') 
    
    .select('id', 'time', 's_id', 'channel', 'text', 'client_id')
)

In [None]:
selected = logs.join(scenario_sdf, scenario_sdf.text==logs.text).select('text', 'id', 's_id', 'markup')

old_data = formater.spark_df_to_pandas(selected, fs).rename(columns={'s_id': 'session_id'})
new_data = formater.spark_df_to_pandas(spark.sql(f"SELECT * FROM {os.getenv('LABELS_PATH')}"), fs)[['text', 'id', 'session_id', 'markup']]

data_df = pd.concat((old_data, new_data))
data = spark.createDataFrame(data_df)

                                                                                

# Сбор данных

In [None]:
data_selected = data.select('session_id').dropDuplicates(['session_id']) 
sessions_sdf = logs.join(data_selected, logs.s_id==data_selected.session_id)
sessions_pdf = formater.spark_df_to_pandas(sessions_sdf, fs)

                                                                                

In [None]:
s2id = {
    sid: cid
    for sid, cid in zip(sessions_pdf['s_id'], sessions_pdf['client_id'])
    if cid is not None
       and cid != 'Bot'
       and '_' not in cid
}

sessions_pdf['client_id'] = sessions_pdf['s_id'].map(lambda sid: s2id.get(sid, sid))

In [None]:
client_ids = spark.createDataFrame(sessions_pdf.drop_duplicates(subset=['client_id']))
clients = logs.join(client_ids, logs.client_id==client_ids.client_id)

In [None]:
session_ids = (
    clients
    .select(F.col('s_id').alias('session_id'))
    .distinct()
)

client_sessions_df = formater.spark_df_to_pandas(
    logs.join(session_ids, logs.s_id == session_ids.session_id),
    fs
)

                                                                                

In [None]:
s2id = {
    sid: cid
    for sid, cid in zip(client_sessions_df['s_id'], client_sessions_df['client_id'])
    if cid is not None
       and cid != 'AlfaBot'
       and '_' not in cid
}

client_sessions_df['client_id'] = client_sessions_df['s_id'].map(lambda sid: s2id.get(sid, sid))

In [None]:
formatter = "%Y-%m-%d %H:%M:%S.%f"

client_sessions_df = client_sessions_df.sort_values('time')
client_sessions_df = client_sessions_df[~client_sessions_df['time'].isna()]
client_sessions_df['time'] = client_sessions_df['time'].apply(lambda x: datetime.strptime(x, formatter))
client_sessions_df = client_sessions_df.groupby('client_id').agg(list)

In [None]:
n_blocks = 1000
block_size = len(client_sessions_df) // n_blocks
for i in range(n_blocks):
    client_sessions_df[i * block_size: (i + 1) * block_size].to_csv(f'{os.getenv('LOCAL_PATH')}/chunk{i}.csv', index=False)

In [None]:
def trim_substrings(texts, substring='version 0:'):

    def _trim(text):
        if not text:
            return ''
        matches = list(re.finditer(re.escape(substring), text))
        if len(matches) == 2:
            return text[matches[0].end():matches[1].start()]
        return text

    return [_trim(t) for t in texts]

def parse_timestamps(ts_str):

    pattern = re.compile(r"Timestamp\('([^']+)'\)")
    return [Timestamp(ts) for ts in pattern.findall(ts_str)]

def format_context(texts, channels):
    
    prev = channels[0]
    ctx = '[BOT]' if prev == 'AM' else '[CLIENT]'

    for text, ch in zip(texts, channels):
        if not text:
            continue

        if ch != prev:
            ctx += ' ' + ('[BOT]' if ch == 'AM' else '[CLIENT]') + ' '
            prev = ch
        else:
            ctx += ' '

        ctx += re.sub(r"\[(.*?)\]\(.*?\)", r"\1", text)

    return ctx

def process_file(path):

    df = pd.read_csv(
        path,
        converters={
            'id':         ast.literal_eval,
            'time':       parse_timestamps,
            'channel':    ast.literal_eval,
            'text':       ast.literal_eval
        }
    )

    df['text'] = df['text'].apply(trim_substrings)

    records = []
    
    for ids, times, channels, texts in zip(df['id'], df['time'], df['channel'], df['text']):
        start = 0
        for i, (msg_id, t, ch) in enumerate(zip(ids, times, channels)):
            while (times[i] - times[start]).days >= 7 or i - start > 10:
                start += 1

            ctx = format_context(texts[start:i+1], channels[start:i+1])
            if ctx:
                records.append({
                    'id':   msg_id,
                    'cntxt': ctx.replace('\n', ' ').replace('\r', ' ')
                })

    return pd.DataFrame(records)

directory = os.getenv('UPLOAD_PATH')
csv_files = [os.path.join(directory, f) for f in os.listdir(directory) if f.endswith('.csv')]

dfs = Parallel(n_jobs=-1)(
    delayed(process_file)(file) for file in tqdm(csv_files)
)

result_df = pd.concat(dfs, ignore_index=True)

100%|██████████| 1000/1000 [13:06<00:00,  1.27it/s]


In [None]:
markuped = pd.merge(dfs, data_df, on='id', how='right')[['cntxt', 'markup']]

In [None]:
train_val, test = train_test_split(markuped, test_size=0.15, stratify=markuped['intent'])

In [None]:
train, val = train_test_split(train_val, test_size=0.1, stratify=train_val['intent'])

In [None]:
train.to_csv(os.getenv('LOCAL_TRAIN_PATH'))
val.to_csv(os.getenv('LOCAL_VAL_PATH'))
test.to_csv(os.getenv('LOCAL_TEST_PATH'))