In [1]:
import pandas as pd
import pyarrow.parquet as pq
import numpy as np
from datetime import datetime, timezone
import time

def load_parquet_pandas(path, batch_size=50):
    start = time.time()
    parquet_file = pq.ParquetFile(path)
    df_chunk = next(parquet_file.iter_batches(batch_size=batch_size)).to_pandas()
    print(f"Время чтения: {time.time() - start:.2f} секунд")
    return df_chunk

In [6]:
file = load_parquet_pandas("data/2024_yandex_metrika_hits.parquet")
file.columns = file.columns.str.replace('ym:pv:', '', regex=False)
file.to_csv("data/tmp_hits.csv")

Время чтения: 0.13 секунд


In [5]:
file = load_parquet_pandas("data/2024_yandex_metrika_visits.parquet")
file.columns = file.columns.str.replace('ym:s:', '', regex=False)
file.to_csv("data/tmp_visit.csv")

Время чтения: 0.35 секунд


In [2]:
def normalize_watchid(watch_id):
    """Приводит watchID к единому формату - строковому представлению целого числа"""
    if isinstance(watch_id, (int, np.integer)):
        return str(watch_id)
    elif isinstance(watch_id, float):
        # Преобразуем float через int для точности
        return str(int(watch_id))
    elif isinstance(watch_id, str):
        # Обрабатываем экспоненциальную запись
        if 'e+' in watch_id.lower() or 'e-' in watch_id.lower():
            try:
                # Преобразуем научную нотацию в целое число
                return str(int(float(watch_id)))
            except (ValueError, OverflowError):
                return watch_id
        else:
            # Убираем лишние символы для обычных чисел
            return watch_id.strip().replace("'", "").replace('"', '')
    else:
        return str(watch_id)

In [3]:
def read_matching_hits_normalized(hits_path, visits_watchids, batch_size=10000):
    """Читает hits с нормализацией watchID"""
    parquet_file = pq.ParquetFile(hits_path)
    all_matching_hits = []
    
    # Нормализуем watchID из visits
    visits_watchids_normalized = [normalize_watchid(wid) for wid in visits_watchids]
    visits_watchids_set = visits_watchids_normalized
    
    print(f"Поиск {len(visits_watchids_set)} нормализованных watchID")
    
    for i, batch in enumerate(parquet_file.iter_batches(batch_size=batch_size)):
        df_chunk = batch.to_pandas()
        
        # Нормализуем watchID в hits
        df_chunk['ym:pv:watchID'] = df_chunk['ym:pv:watchID'].apply(normalize_watchid)
        
        # Фильтруем по нормализованным watchID
        matching_chunk = df_chunk[df_chunk['ym:pv:watchID'].isin(visits_watchids_set)]
        
        if len(matching_chunk) > 0:
            # Сохраняем оригинальную колонку для совместимости
            all_matching_hits.append(matching_chunk)
            print(f"Чанк {i+1}: найдено {len(matching_chunk)} совпадений")
            
            # Примеры найденных совпадений для проверки
            sample_matches = matching_chunk['ym:pv:watchID'].head(3).tolist()
            print(f"  Примеры найденных watchID: {sample_matches}")
        else:
            print(f"Чанк {i+1}: ничего не найдено!")
        if i == 10:
            break
    
    if all_matching_hits:
        result = pd.concat(all_matching_hits, ignore_index=True)
        print(f"✅ Всего найдено совпадений: {len(result)}")
        return result
    else:
        print("❌ Совпадений не найдено")
        return pd.DataFrame()

In [4]:
def explode_and_join(visits, hits):
    
    # Приводим типы и чистим данные
    visits['watchID'] = visits['watchID'].apply(normalize_watchid)
    hits['watchID'] = hits['watchID'].apply(normalize_watchid)
    visits['clientID'] = visits['clientID'].astype(str).str.strip()
    hits['clientID'] = hits['clientID'].astype(str).str.strip()
    
    # Диагностика после explode
    print(f"Размер visits после explode: {len(visits)}")
    print(f"Уникальных watchID в visits после explode: {visits['watchID'].nunique()}")
    
    # Проверяем пересечение watchID
    visits_watchids = set(visits['watchID'].unique())
    hits_watchids = set(hits['watchID'].unique())
    common_watchids = visits_watchids.intersection(hits_watchids)
    
    print(f"Общих watchID: {len(common_watchids)}")
    print(f"WatchID только в visits: {len(visits_watchids - hits_watchids)}")
    print(f"WatchID только в hits: {len(hits_watchids - visits_watchids)}")
    
    # Merge
    joined = visits.merge(
        hits, 
        how='left', 
        on=['watchID', 'clientID'], 
        suffixes=('_visit', '_hit')
    )
    
    # Диагностика после merge
    print(f"Размер после merge: {len(joined)}")
    print(f"Строк с URL (не NaN): {joined['URL'].notna().sum()}")
    print(f"Процент заполнения URL: {joined['URL'].notna().mean() * 100:.2f}%")
    
    # Проверяем примеры данных
    print("\nПримеры watchID из visits (первые 5):")
    print(visits['watchID'].head().tolist())
    print("Примеры watchID из hits (первые 5):")
    print(hits['watchID'].head().tolist())
    
    # Если URL все еще NaN, проверяем конкретные случаи
    if joined['URL'].notna().sum() == 0:
        print("\n⚠️ ВНИМАНИЕ: Все URL равны NaN!")
        print("Проверяем конкретные watchID:")
        sample_watchids = visits['watchID'].head(3).tolist()
        for wid in sample_watchids:
            in_hits = hits[hits['watchID'] == wid]
            print(f"watchID '{wid}': найдено в hits - {len(in_hits)} записей")
    
    # Обработка дат и сортировка
    joined['dateTime_visit'] = pd.to_datetime(joined['dateTime_visit'])
    joined['dateTime_hit'] = pd.to_datetime(joined['dateTime_hit'])
    joined = joined.sort_values(['visitID', 'dateTime_hit'])
    
    return joined

In [5]:
def filter_visits_by_hits(visits, hits_df):
    """
    Удаляет из visits строки с watchID, которых нет в hits
    """
    hits = hits_df.copy()
    
    # Получаем множество существующих watchID в hits
    existing_watchids = set(hits['watchID'].unique())
    print(f"Всего уникальных watchID в hits: {len(existing_watchids)}")
    
    # Фильтруем visits - оставляем только те watchID, которые есть в hits
    initial_count = len(visits)
    visits_filtered = visits[visits['watchID'].isin(existing_watchids)]
    filtered_count = len(visits_filtered)
    
    print(f"Отфильтровано visits: {initial_count} -> {filtered_count} строк")
    print(f"Удалено {initial_count - filtered_count} строк ({((initial_count - filtered_count)/initial_count)*100:.1f}%)")
    
    return visits_filtered.reset_index(drop=True)

In [6]:
visits_norm = load_parquet_pandas("data/2022_yandex_metrika_visits.parquet", 99999)
visits_norm.columns = visits_norm.columns.str.replace('ym:s:', '', regex=False)
visits = visits_norm.copy()

Время чтения: 1.69 секунд


In [7]:
# Парсим watchIDs
if isinstance(visits['watchIDs'].iloc[0], str):
    try:
        visits['watchIDs'] = visits['watchIDs'].apply(json.loads)
    except:
        try:
            visits['watchIDs'] = visits['watchIDs'].apply(ast.literal_eval)
        except:
            visits['watchIDs'] = visits['watchIDs'].str.strip("[]").str.replace("'", "").str.split(",")
# Explode
visits = visits.explode('watchIDs').rename(columns={'watchIDs': 'watchID'}).reset_index(drop=True)

# Приводим типы и чистим данные
visits['watchID'] = visits['watchID'].astype(str).str.strip()
visits['clientID'] = visits['clientID'].astype(str).str.strip()

In [8]:
s = set(visits['watchID'])

In [9]:
hits = read_matching_hits_normalized("data/2022_yandex_metrika_hits.parquet", s, 99999)

Поиск 1427173 нормализованных watchID
Чанк 1: найдено 199 совпадений
  Примеры найденных watchID: ['96238819080929424', '96242998996893776', '108488914934169648']
Чанк 2: найдено 165 совпадений
  Примеры найденных watchID: ['902011393660420352', '107543512205230176', '107547479019356288']
Чанк 3: найдено 156 совпадений
  Примеры найденных watchID: ['1309776455500300544', '1309780395948769536', '1310216960660472064']
Чанк 4: найдено 150 совпадений
  Примеры найденных watchID: ['2002510594628649216', '265341404135817408', '35904263166885952']
Чанк 5: найдено 172 совпадений
  Примеры найденных watchID: ['485886493083828288', '1389468031430689024', '1869345526973989120']
Чанк 6: найдено 45 совпадений
  Примеры найденных watchID: ['1801848585805889792', '709677475015688448', '709732946260001024']
Чанк 7: ничего не найдено!
Чанк 8: ничего не найдено!
Чанк 9: ничего не найдено!
Чанк 10: ничего не найдено!
Чанк 11: ничего не найдено!
✅ Всего найдено совпадений: 887


In [10]:
hits.columns = hits.columns.str.replace('ym:pv:', '', regex=False)

In [11]:
hits.to_hdf('data/hits.h5', key='df', mode='w')

In [11]:
visits_filtered = filter_visits_by_hits(visits, hits)

Всего уникальных watchID в hits: 887
Отфильтровано visits: 1427173 -> 887 строк
Удалено 1426286 строк (99.9%)


In [13]:
visits_filtered.to_hdf('data/visits_f.h5', key='df', mode='w')

In [12]:
jo = explode_and_join(visits_filtered, hits)

Размер visits после explode: 887
Уникальных watchID в visits после explode: 887
Общих watchID: 887
WatchID только в visits: 0
WatchID только в hits: 0
Размер после merge: 887
Строк с URL (не NaN): 887
Процент заполнения URL: 100.00%

Примеры watchID из visits (первые 5):
['176159369259647136', '123300207586967600', '123304128317489456', '15800317056647376', '6463277528252545']
Примеры watchID из hits (первые 5):
['96238819080929424', '96242998996893776', '108488914934169648', '108491276043681840', '108496140732203072']


In [15]:
jo.to_hdf('data/joins_f.h5', key='df', mode='w')

In [14]:
hits.head()

Unnamed: 0,watchID,pageViewID,counterID,clientID,counterUserIDHash,date,dateTime,title,pageCharset,goalsID,...,parsedParamsKey6,parsedParamsKey7,parsedParamsKey8,parsedParamsKey9,parsedParamsKey10,httpError,networkType,shareService,shareURL,shareTitle
0,96238819080929424,469802881,45231030,1643003921635805428,12722090328218690967,2022-01-24,2022-01-24 08:58:41,Дни открытых дверей меганаправлений 2022,utf-8,[],...,[],[],[],[],[],0,cellular,,,
1,96242998996893776,469802881,45231030,1643003921635805428,12722090328218690967,2022-01-24,2022-01-24 08:58:57,,utf-8,[],...,[],[],[],[],[],0,cellular,,,
2,108488914934169648,559478545,45231030,1643049916361960344,8135432277710369579,2022-01-24,2022-01-24 21:57:32,Приёмная комиссия МАИ,utf-8,[],...,[],[],[],[],[],0,,,,
3,108491276043681840,372936429,45231030,1643049916361960344,8135432277710369579,2022-01-24,2022-01-24 21:57:41,Направления,utf-8,[],...,[],[],[],[],[],0,,,,
4,108496140732203072,233184934,45231030,1643049916361960344,8135432277710369579,2022-01-24,2022-01-24 21:57:59,Материаловедение и технология новых материалов,utf-8,[],...,[],[],[],[],[],0,,,,


In [13]:
def page_bounce_rates(visits_df, key_pages):
    # visits_df: строки с одной сессией, колонки: startURL, pageViews, visitId, dateTime
    df = visits_df.copy()
    df['is_bounce'] = df['pageViews'] == 1
    # кто вошёл на ключевую страницу (startURL == key_page) — или выставим условие "visited page anywhere in session"
    rows = []
    for page in key_pages:
        entered = df[df['startURL'].str.contains(page, na=False)]
        bounce_rate = entered['is_bounce'].mean() if len(entered)>0 else np.nan
        rows.append({'page': page, 'sessions': len(entered), 'bounce_rate': bounce_rate})
    return pd.DataFrame(rows)



In [None]:
pd.set_option('display.max_rows', 1000)
pd.set_option('display.max_columns', 50)
pd.set_option('display.width', 100000)

# Теперь можно смотреть больше данных
print(visits_filtered)

                 visitID  counterID              watchID        date             dateTime          dateTimeUTC  isNewUser                                           startURL                                             endURL  pageViews  visitDuration  bounce                 ipAddress regionCountry regionCity  regionCountryID  regionCityID             clientID     counterUserIDHash networkType goalsID goalsSerialNumber goalsDateTime goalsPrice goalsOrder  ... browserEngineVersion4 cookieEnabled javascriptEnabled screenFormat screenColors screenOrientation screenOrientationName  screenWidth screenHeight  physicalScreenWidth  physicalScreenHeight  windowClientWidth windowClientHeight parsedParamsKey1 parsedParamsKey2 parsedParamsKey3 parsedParamsKey4 parsedParamsKey5 parsedParamsKey6 parsedParamsKey7 parsedParamsKey8 parsedParamsKey9 parsedParamsKey10 lastsignRecommendationSystem lastsignMessenger
0     176155276604080158   45231030   176159369259647136  2022-01-27  2022-01-27 21:39:39  20

In [20]:
visits['startURL'].head()

0    https://priem.mai.ru/rating/?referer=https:%2F...
1    https://priem.mai.ru/rating/?referer=https:%2F...
2    https://priem.mai.ru/rating/?referer=https:%2F...
3    https://priem.mai.ru/rating/?referer=https:%2F...
4    https://priem.mai.ru/rating/?referer=https:%2F...
Name: startURL, dtype: object

In [15]:
page_bounce_rates(visits, ['https://priem.mai.ru'])

Unnamed: 0,page,sessions,bounce_rate
0,https://priem.mai.ru,1417477,0.162434


In [22]:
jo['URL']

203    https://priem.mai.ru/master/programs/item/inde...
205    https://priem.mai.ru/master/programs/item/inde...
204    https://priem.mai.ru/upload/iblock/779/24.04.0...
662                                https://priem.mai.ru/
663                                https://priem.mai.ru/
                             ...                        
80                                 https://priem.mai.ru/
435        https://priem.mai.ru/bachelor/tests/#internal
478    https://priem.mai.ru/orders/testing/magistracy...
479                       https://priem.mai.ru/calendar/
480                     https://lk.mai.ru/accounts/login
Name: URL, Length: 887, dtype: object

In [16]:
def detect_wandering(joined_df, funnel_urls=None, pageview_threshold=8):
    sessions = joined_df.groupby('visitID').agg({
        'URL':'nunique',
        'watchID':'count',
        'dateTime_visit':'min'
    }).rename(columns={'watchID':'hits','URL':'unique_pages'})
    sessions['is_wandering'] = (sessions['hits'] >= pageview_threshold)
    if funnel_urls:
        funnel_hits = joined_df[joined_df['URL'].str.contains('|'.join(funnel_urls), na=False)][['visitID']].drop_duplicates()
        sessions = sessions.merge(funnel_hits.assign(funnel_reached=1), on='visitID', how='left').fillna({'funnel_reached':0})
        sessions['is_wandering'] = sessions['is_wandering'] & (sessions['funnel_reached']==0)
    return sessions[sessions['is_wandering']].sort_values('hits', ascending=False)


In [None]:
ses = jo.groupby('visitID')

<pandas.core.groupby.generic.DataFrameGroupBy object at 0x0000021117DC3230>

In [None]:
s = visits_filtered['endURL']

0           https://priem.mai.ru/news/item.php?id=162829
1           https://priem.mai.ru/news/item.php?id=162114
2           https://priem.mai.ru/news/item.php?id=162114
3          https://priem.mai.ru/results/orders/index.php
4      https://priem.mai.ru/news/item.php?WEB_FORM_ID...
                             ...                        
882         https://priem.mai.ru/news/item.php?id=163975
883         https://priem.mai.ru/news/item.php?id=163975
884              https://priem.mai.ru/bachelor/programs/
885                                https://priem.mai.ru/
886         https://priem.mai.ru/news/item.php?id=162114
Name: endURL, Length: 887, dtype: object

In [17]:
det = detect_wandering(jo)
det

Unnamed: 0_level_0,unique_pages,hits,dateTime_visit,is_wandering
visitID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
17542544384065580,24,36,2022-01-20 21:35:18,True
10876499585663100,11,13,2022-01-20 14:31:30,True
30346975922880544,6,12,2022-01-21 11:09:24,True
5121065280339992,9,11,2022-01-20 08:25:35,True
43865656861917295,8,10,2022-01-22 01:28:54,True
7215843364569300,5,10,2022-01-20 10:38:46,True
243947450148585564,8,9,2022-01-30 21:29:45,True
970233572537663729,9,9,2022-03-03 23:05:47,True
10176228187701268,6,8,2022-01-20 13:46:59,True
33100761892913230,8,8,2022-01-21 14:04:29,True


In [72]:
def count_backtracks(joined_df):
    # grouped by session, sorted by hit time
    def backtracks_for_session(df):
        urls = list(df['URL'].fillna(''))
        bt = 0
        for i in range(2, len(urls)):
            if urls[i] == urls[i-2]:
                bt += 1
        return bt
    bts = joined_df.groupby('visitID').apply(lambda df: backtracks_for_session(df.sort_values('dateTime_hit')))
    return bts.rename('backtracks').reset_index()


In [74]:
count_backtracks(jo).head()

  bts = joined_df.groupby('visitID').apply(lambda df: backtracks_for_session(df.sort_values('dateTime_hit')))


Unnamed: 0,visitID,backtracks
0,265462931380240424,0
1,292251483796144399,2
2,373069999639363618,0
3,1255768063759351818,3
4,1436542266787758293,21


In [75]:
def compute_funnel_conversion(joined_df, funnel_steps):
    # funnel_steps — list of URL regexes in order
    import re
    def first_step_index(urls):
        for i,pattern in enumerate(funnel_steps):
            for u in urls:
                if re.search(pattern, u):
                    return i
        return None

    sessions = joined_df.groupby('visitID').agg({'URL': lambda s: list(s)})
    sessions['first_reached'] = sessions['URL'].apply(first_step_index)
    total = len(sessions)
    per_step = []
    for i in range(len(funnel_steps)):
        reached = (sessions['first_reached'] == i).sum()
        converted_from_step = (sessions['first_reached'] >= i).sum()  # users who reached this or later
        per_step.append({'step': i, 'reached': reached, 'cumulative_reached': converted_from_step, 'percent': converted_from_step/total})
    return pd.DataFrame(per_step)
