# Задание №1

Есть Pandas DataFrame со столбцами `[“customer_id”, “product_id”, “timestamp”]`, который содержит данные по просмотрам товаров на сайте. Есть проблема – просмотры одного `customer_id` не разбиты на сессии (появления на сайте). Мы хотим разместить сессии так, чтобы сессией считались все смежные просмотры, между которыми не более 3 минут.

Написать методом который создаст в `Pandas DataFrame` столбец `session_id` и проставит в нем уникальный `int id` для каждой сессии.

У каждого пользователя может быть по несколько сессий. Исходный `DataFrame` может быть большим – до 100 млн строк.

# Решение

основные импорты

In [None]:
import pandas as pd
import numpy as np
import datetime
import random
from tqdm import tqdm
import threading
import multiprocessing
from itertools import chain
import glob
import os
from joblib import Parallel, delayed

генерируем массивы уникальных uuid для покупателей и товаров

In [2]:
import uuid

customers = []
max_unique_customers = 100
for _ in range(max_unique_customers):
    customers.append(str(uuid.uuid4()))
    
products = []
max_unique_products = 1000
for _ in range(max_unique_products):
    products.append(uuid.uuid4().hex)

создаем Pandas DataFrame со столбцами [“customer_id”, “product_id”, “timestamp”], который содержит данные по просмотрам товаров на сайте

In [3]:
# однопроцессное решение

# matrix = []
# max_size = 100000000

# dt = datetime.datetime(2022, 9, 1)
# step = datetime.timedelta(seconds=5)

# for _ in tqdm(range(max_size)):
#     matrix.append([random.choice(customers), random.choice(products), dt])
#     dt += step

# многопроцессное решение

def save_part(matrix, id , size):
    columns = ["customer_id", "product_id", "timestamp"]

    df = pd.DataFrame(data=matrix, columns=columns)
    print("Save", "gen_logs"+str(id)+"-"+str(size)+".csv")
    df.to_csv("gen_logs"+str(id)+"-"+str(size)+".csv", index=False, header=True)
    
def gen_matrix_part(id, size):
    sec_step = 5
    dt = datetime.datetime(2022, 9, 1) + datetime.timedelta(seconds=sec_step*(id*size))
    step = datetime.timedelta(seconds=sec_step)
    temp_matrix = []
    for i in range(size):
        temp_matrix.append([random.choice(customers), random.choice(products), dt])
        dt += step
    print("Ready job id", id)
    # можно сохранить 10 отдельных файлов (каждый по 10М строк) для условия 100М строк
#     save_part(temp_matrix, id, size)
#     return id
#     или накапливать в оперативной памяти
    return temp_matrix

procs = 10
size = 100000

# https://superfastpython.com/multiprocessing-pool-map-multiple-arguments/
with multiprocessing.Pool() as pool:
    async_results = [pool.apply_async(gen_matrix_part, args=(i, size,)) for i in range(procs)]
    # close the process pool
    pool.close()
    # wait for all tasks to finish
    pool.join()
    

Ready job id 0
Ready job idReady job idReady job id  32 1


Ready job id 4
Ready job id 5
Ready job id 6
Ready job id 7
Ready job id 8
Ready job id 9


In [4]:
# для случая накопления в оперативной памяти
matrix = [ar.get() for ar in async_results]
# https://stackoverflow.com/a/952952
matrix = list(chain.from_iterable(matrix))

In [5]:
columns = ["customer_id", "product_id", "timestamp"]

df = pd.DataFrame(data=matrix, columns=columns)
df

Unnamed: 0,customer_id,product_id,timestamp
0,16527631-541f-4927-933e-1576ae08874c,644a5f577e014b3498b742a4c85d7a37,2022-09-01 00:00:00
1,53a52951-ccc8-460a-9e52-cc9e45376734,8b22d3c11444473c92e4458cd5f7a4f8,2022-09-01 00:00:05
2,0b1b03cc-ccfd-49d4-9422-26c0f660c6ad,6fdf9cd786af47b788cc64855b7846d3,2022-09-01 00:00:10
3,69876966-097a-46f7-9cbb-ec8d39ee764a,2693a19b0ff34ffcae2b9c444550aa56,2022-09-01 00:00:15
4,ba4bb959-d525-4739-8bff-9f05abc5e5b9,d8857f706c574877958da09c511871fb,2022-09-01 00:00:20
...,...,...,...
999995,ba4bb959-d525-4739-8bff-9f05abc5e5b9,cfcd7f75505344c6bdc124d001a831eb,2022-10-28 20:52:55
999996,d689a148-7ea6-4dc0-a704-89b32a7b0fd7,9d4265aac9824a7fa36856053756babd,2022-10-28 20:53:00
999997,1bf89d68-d9d9-4d2a-b841-218cc549dd20,dda0bbb1744147a4b8b4cb19c27f799c,2022-10-28 20:53:05
999998,fa1f8b6f-c25d-4f82-9cd8-049bce781bf6,9cf06f112b264727872bec91140224c6,2022-10-28 20:53:10


In [None]:
# для случая чтения 10 файлов 
# all_files = glob.glob(os.path.join(".", "*.csv"))
# df = pd.concat((pd.read_csv(f) for f in all_files), ignore_index=True)
# df

проверяем дубликаты для покупателей и продуктов

In [None]:
dups_customer_id = df.pivot_table(columns=['customer_id'], aggfunc='size')
dups_customer_id

In [None]:
dups_product_id = df.pivot_table(columns=['product_id'], aggfunc='size')
dups_product_id

пишем функцию добавления сессий

In [None]:
def add_session(group):
    group['deltaTsec'] = group["timestamp"].diff().dt.seconds.fillna(0).astype(int)
#     print(group)
    
    sessions = []
    session_time = 0
    session_limit = 180
    i = 0
    for delta in group['deltaTsec']:
        session_time += delta
        if i == 0 or session_time > session_limit:
            session_time = 0
            sessions.append(str(uuid.uuid4()))
        elif session_time <= session_limit:
            sessions.append(sessions[i - 1])
        
#         print("session_time",session_time)
        i += 1
        
    group['session_id'] = sessions
    group = group.drop('deltaTsec', axis=1)
#     print(group)
    return group

# df_groupby = df.groupby("customer_id", as_index=False).apply(add_session)
# df_groupby

In [None]:
# или параллельная обработка
def applyParallel(dfGrouped, func):
    retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
    return pd.concat(retLst)

df_groupby = applyParallel(df.groupby("customer_id", as_index=False), add_session)

In [None]:
dups_session_id = df_groupby.pivot_table(columns=['session_id'], aggfunc='size')
dups_session_id

проверяем, перебирая сессии и проверяя, что разница между первым и последним `timestamp` для уникальной сессии составляет не более 3 минут

In [None]:
def test(session_group):
    print("=====\nДля сессии", session_group.name, 
          "\nпользователя", session_group['customer_id'].iloc[0], 
          "\nразница составляет", session_group['timestamp'].iloc[-1], "-", session_group['timestamp'].iloc[0], "=", session_group['timestamp'].iloc[-1] - session_group['timestamp'].iloc[0], 
          " - ", session_group['timestamp'].iloc[-1] - session_group['timestamp'].iloc[0] <= datetime.timedelta(seconds=180))
    print(session_group.iloc[0])
    print(session_group.iloc[-1])

df_groupby.groupby("session_id", as_index=False).apply(test)

In [None]:
df.loc[df['customer_id'] == "6b8b38d2-8e56-4a56-a806-f87e4789d0e8"]

In [None]:
df[:20]