In [44]:
import zipfile
import csv
import re
from collections import defaultdict
from dateutil import parser
from datetime import timezone
from datetime import timedelta


In [136]:
# Достаем файлы из zip архива
def extract(path):
    with zipfile.ZipFile(path, "r") as zip_ref:
        zip_ref.extractall()


In [138]:
#Функция фильтрации, убираем строки с большим количеством пропусков 
def filter_sort_users(path):
    users = []

    with open(path, "r", encoding="utf-8") as f:
        reader = csv.reader(f, delimiter=",", quotechar='"', skipinitialspace=True)
        next(reader, None)  # Пропускаем заголовки

        for line_num, fields in enumerate(reader, start=2):
            if len(fields) < 13:
                log.warning(f"Строка {line_num} пропущена — слишком мало полей: {fields}")
                continue
            try:
                cleaned_key4 = re.sub(r':/everyday/.*$', '', fields[9]) #удаляем ненужные ссылки из parsedParamsKey4
                users.append({
                    "clientID": int(fields[0]),
                    "ipAddress": fields[10].replace('"', '').strip(),
                    "parsedParamsKey3": fields[8],
                    "parsedParamsKey4": cleaned_key4,
                    "dateTime": fields[12].strip(),
                    "deviceCategory": fields[2],
                })
            except Exception as e:
                log.error(f"Ошибка в строке {line_num}: {e} → {fields}")
                continue

    return users

In [58]:
#функция сохранения обработанной таблицы
def save_to_csv(filename, result):
    data = filter_sort_users(filename)
    #функция удаления лишних столбцов
    def clean(value: str) -> str:
        return value.replace('"', '').strip() if isinstance(value, str) else value

    with open(result, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(
            f,
            fieldnames=["clientID", "ipAddress", "parsedParamsKey3",
                        "parsedParamsKey4", "dateTime", "deviceCategory"],
            quoting=csv.QUOTE_ALL,
            quotechar='"',
            escapechar='\\'
        )
        writer.writeheader()
        for row in data:
            cleaned_row = {
                "clientID": row["clientID"],
                "ipAddress": clean(row["ipAddress"]),
                "parsedParamsKey3": clean(row["parsedParamsKey3"]),
                "parsedParamsKey4": clean(row["parsedParamsKey4"]),
                "dateTime": clean(row["dateTime"]),
                "deviceCategory": clean(row["deviceCategory"]),
            }
            writer.writerow(cleaned_row)


save_to_csv("travel.csv", "final_res")

файл сохранён


In [151]:
#функция подсчета конверсий
def load_result(filename, res_filename):
    res = []
    step1 = [
        'Product::Анкета Новая форма - Мобильный телефон::View',
        'Product::Анкета Новая форма - Электронная почта::View',
        'Product::Анкета Новая форма - ФИО::View', 'Product::Анкета Новая форма::View',
'Product::Анкета Новая форма - Гражданство РФ::View', 'Product::Оплата смартфоном::Click',
        'Product::SEO-блок::View', 'Product::Полезно знать::Click', 'Product::Тарифы::Click',
        'Product::SEO-блок - ссылки::Click', 'Product::Заказать карту::Click', 'Product::Кэшбэк::Click',
        'Product::Всё о�кэшбэке::Click', 'Product::Заказать карту::Click', 'Product::Карты::Menu_Click',
        'Product::Анкета Новая форма - ФИО в три поля::View', 'Product::Анкета Новая форма - Пол::View',
        'Product::test_event_dl::test_view', 'Product::Кредиты::Click', 'Product::Кредиты::View',
        'Product::Alfa Only::Menu_Click', 'Main Page::Футер::Click', 'Product::Расчётный счёт::Click',
        'Product::Приём платежей в торговых точках::Click', 'Product::Хэдер::Menu_Click',
        'Product::Меню сегментов::Menu_Click'
    ]

    step2 = ["Product::Анкета Новая форма - Гражданство РФ::Choice",
             'Product::Анкета Новая форма - ФИО::Enter',
             'Product::Анкета Новая форма - Электронная почта::Enter',
             'Product::Анкета Новая форма - Дата рождения::Enter',
             'Product::Анкета Новая форма - Пол::Choice',
             'Product::Анкета Новая форма - Мобильный телефон::Enter',
             'Product::Анкета Новая форма - Отчество::Enter',
             'Product::Анкета Новая форма - По паспорту без отчества::Choice',
             'Product::Анкета Новая форма - Фамилия::Enter',
             'Product::Анкета Новая форма - Имя::Enter',
             ]
    step3 = ['Product::Анкета - Шаг 1::Send', 'Product::Pop-up с СМС кодом::View',
             'Product::Pop-up с СМС кодом - Изменить номер телефона::View',
             'Product::Pop-up с СМС кодом::Enter', 'Product::Кажется мы уже знакомы::View',
             'Product::Pop-up с СМС кодом ::Enter',
             'Product::Серия и номер паспорта::Enter', 'Product::Серия и номер паспорта::Click',
             'Product::Pop-up с СМС кодом - Ввести код больше нельзя::View',
             'Product::Pop-up с СМС кодом - Ввести код больше нельзя::Click', 'Product::Pop-up с СМС кодом::View'
             ]

    step4 = ['Product::Заказ доставки::View', 'Product::Адрес доставки::Enter',
             'Product::Куда доставить::View',
             'Product::Выберите офис банка::View',
             'Product::Дата и время::View', 'Product::Дата и время::Click',
             'Product::Куда доставить::Choice',
             'Product::Комментарий в доставке по адресу::Enter',
             'Product::Заказ доставки - подтверждение СМС::View',
             'Product::Заказ доставки - подтверждение СМС::Enter',
             'Product::Заказ доставки - подтверждение СМС ::Enter',
             ]

    step5 = ['Product::Мобильное меню_вход в АЛБО::Click',
             'Product::Мобильное меню_вход в АО::Click', ]

    users = filter_sort_users(filename)
    client1, client2, client3, client4, client5 = set(), set(), set(), set(), set()

    for user in users:
        if user["parsedParamsKey3"] in step1: client1.add(user["clientID"])
        if user["parsedParamsKey3"] in step2: client2.add(user["clientID"])
        if user["parsedParamsKey3"] in step3: client3.add(user["clientID"])
        if user["parsedParamsKey3"] in step4: client4.add(user["clientID"])
        if user["parsedParamsKey3"] in step5: client5.add(user["clientID"])

    x1, x2, x3, x4, x5 = len(client1), len(client2), len(client3), len(client4), len(client5)

    def safe_div(a, b): return a / b if b else 0.0
    res.append({
        "visitors": x1, "step2": x2, "step3": x3, "step4": x4, "step5": x5,
        "conv_visit_to_step2": safe_div(x2, x1),
        "conv_step2_to_step3": safe_div(x3, x2),
        "conv_step3_to_step4": safe_div(x4, x3),
        "conv_step4_to_step5": safe_div(x5, x4),
        "conv_step1_to_step4": safe_div(x4, x1)
    })

    with open(res_filename, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=[
            "visitors", "step2", "step3", "step4", "step5",
            "conv_visit_to_step2", "conv_step2_to_step3",
            "conv_step3_to_step4", "conv_step4_to_step5",
            "conv_step1_to_step4"
        ])
        writer.writeheader()
        for row in res:
            writer.writerow(row)

    

def main(zip_path, extracted_filename, result_filename, conv_filename):
    extract(zip_path)
    filter_sort_users(extracted_filename)
    save_to_csv(extracted_filename, result_filename)
    load_result(extracted_filename, conv_filename)
    
                    

In [None]:
main("youth-card.csv.zip", "travel.csv", "final_result.csv", "final_conv.csv")
