In [None]:
import pandas as pd
from typing import Dict, List, Union, Optional

def check_duplicate_events(df: pd.DataFrame) -> Dict:
    """Проверка дубликатов событий с одинаковыми ts и event"""
    results = {'duplicates': []}
    duplicate_cols = ['ts', 'event']

    if 'counter_id' in df.columns:
        duplicate_cols.append('counter_id')

    duplicates = df[df.duplicated(subset=duplicate_cols, keep=False)]
    if not duplicates.empty:
        results['duplicates'] = duplicates.sort_values(duplicate_cols).to_dict('records')
    return results

print(check_duplicate_events(df))

In [None]:
def check_session_start(df: pd.DataFrame) -> Dict:
    """Проверка, что сессии начинаются с 1"""
    results = {'start_errors': []}

    if 'randPAS_session_id' not in df.columns:
        raise KeyError("Отсутствует столбец randPAS_session_id")

    first_events = df.groupby('randPAS_session_id').first()

    start_errors = first_events[
        (first_events['page_view_order_number'] != 1) |
        (first_events['event_order_number'] != 1)
    ]

    for session_id, row in start_errors.iterrows():
        results['start_errors'].append({
            'session_id': session_id,
            'first_page_view': int(row['page_view_order_number']),
            'first_event': int(row['event_order_number'])
        })

    return results

print(ckeck_session_start(df))


NameError: name 'pd' is not defined

In [None]:
def check_order_relation(df: pd.DataFrame) -> Dict:
    """Проверка соотношения page_view и event order numbers"""
    results = {'relation_errors': []}

    relation_errors = df[df['event_order_number'] < df['page_view_order_number']]

    if not relation_errors.empty:
        for session_id, group in relation_errors.groupby('randPAS_session_id'):
            results['relation_errors'].append({
                'session_id': session_id,
                'count': len(group),
                'examples': group[['ts', 'page_view_order_number', 'event_order_number']].head(3).to_dict('records')
            })

    return results


In [None]:
def check_numbering_sequence(df: pd.DataFrame) -> Dict:
    """Проверка пропусков в нумерации событий"""
    results = {'missing_numbers': []}

    grouped = df.groupby('randPAS_session_id')

    for session_id, group in grouped:
        session_data = group.sort_values('ts')

        page_diff = session_data['page_view_order_number'].diff().dropna()
        if any(page_diff != 1):
            results['missing_numbers'].append({
                'session_id': session_id,
                'type': 'page_view',
                'positions': session_data[page_diff != 1][['ts', 'page_view_order_number']].to_dict('records')
            })


        event_diff = session_data['event_order_number'].diff().dropna()
        if any(event_diff != 1):
            results['missing_numbers'].append({
                'session_id': session_id,
                'type': 'event',
                'positions': session_data[event_diff != 1][['ts', 'event_order_number']].to_dict('records')
            })

    return results

In [1]:
!gdown --id 1GvWVG9iS3sQkYnaERcd_G2hCbpa2xLyC

Downloading...
From (original): https://drive.google.com/uc?id=1GvWVG9iS3sQkYnaERcd_G2hCbpa2xLyC
From (redirected): https://drive.google.com/uc?id=1GvWVG9iS3sQkYnaERcd_G2hCbpa2xLyC&confirm=t&uuid=de365904-b2d4-4c3f-bdc0-4169556e1216
To: /content/data_2024-10-09_part2.parquet
100% 540M/540M [00:09<00:00, 55.5MB/s]


In [4]:
import pandas as pd

df = pd.read_parquet('data_2024-10-09_part2.parquet')

In [3]:
import pandas as pd
from datetime import timedelta

def detect_location_changes(df: pd.DataFrame) -> pd.DataFrame:
    """
    Анализирует смену местоположения (geo_city_id и ip) для каждого пользователя.
    Возвращает DataFrame с информацией о сменах местоположения и временных интервалах.
    """
    df_sorted = df.sort_values(['randPAS_user_passport_id', 'ts'])
    grouped = df_sorted.groupby('randPAS_user_passport_id')

    results = []

    for user_id, group in grouped:
        user_data = group[['ts', 'ip', 'geo_city_id']].drop_duplicates()

        changes = user_data[
            (user_data['geo_city_id'].shift() != user_data['geo_city_id']) |
            (user_data['ip'].shift() != user_data['ip'])
        ].copy()
        if len(changes) > 1:
            changes['time_diff'] = changes['ts'].diff().dt.total_seconds()
            city_changes = list(zip(
                changes['geo_city_id'].astype(str),
                changes['time_diff'].astype(str)
            ))

            results.append({
                'user_id': user_id,
                'city_changes': " → ".join([f"{city} ({time}s)" for city, time in city_changes]),
                'change_count': len(changes) - 1,
                'first_change': changes['ts'].iloc[1],
                'last_change': changes['ts'].iloc[-1],
                'unique_cities': changes['geo_city_id'].nunique(),
                'unique_ips': changes['ip'].nunique()
            })

    return pd.DataFrame(results)

if __name__ == "__main__":

    test_data = {
        'ts': pd.to_datetime(['2023-01-01 10:00', '2023-01-01 11:00', '2023-01-01 12:00',
                             '2023-01-02 09:00', '2023-01-02 10:00']),
        'randPAS_user_passport_id': ['user1', 'user1', 'user1', 'user2', 'user2'],
        'ip': ['192.168.1.1', '192.168.1.2', '192.168.1.2', '10.0.0.1', '10.0.0.2'],
        'geo_city_id': [1, 2, 5, 3, 4]
    }
    df = pd.DataFrame(test_data)

    result_df = detect_location_changes(df)

    print("Анализ смены местоположения пользователей:")
    print(result_df[['user_id', 'city_changes', 'change_count']])

    if not result_df.empty:
        print("\nСтатистика по изменениям:")
        print(f"Всего пользователей с изменениями: {len(result_df)}")
        print(f"Среднее количество изменений на пользователя: {result_df['change_count'].mean():.2f}")
        print(f"Максимальное количество городов у одного пользователя: {result_df['unique_cities'].max()}")

Анализ смены местоположения пользователей:
  user_id                          city_changes  change_count
0   user1  1 (nans) → 2 (3600.0s) → 5 (3600.0s)             2
1   user2                3 (nans) → 4 (3600.0s)             1

Статистика по изменениям:
Всего пользователей с изменениями: 2
Среднее количество изменений на пользователя: 1.50
Максимальное количество городов у одного пользователя: 3


In [5]:
import pandas as pd
import numpy as np
from scipy import stats
import matplotlib.pyplot as plt

def analyze_city_activity(df, min_events=10, z_threshold=3, rolling_window='1H'):
    """
    Анализирует активность по городам и выявляет аномальные всплески

    Параметры:
        df: DataFrame с данными
        min_events: минимальное количество событий для анализа города
        z_threshold: порог для детектирования аномалий (в стандартных отклонениях)
        rolling_window: размер окна для скользящей статистики

    Возвращает:
        DataFrame с результатами анализа
    """

    if not pd.api.types.is_datetime64_any_dtype(df['ts']):
        df['ts'] = pd.to_datetime(df['ts'])
    city_activity = df.groupby(['geo_city_id', pd.Grouper(key='ts', freq=rolling_window)])\
                     .size()\
                     .reset_index(name='event_count')
    #print(city_activity)
    city_stats = city_activity.groupby('geo_city_id')['event_count'].agg(['count', 'mean', 'std'])
    valid_cities = city_stats[city_stats['count'] > 5].index
    city_activity = city_activity[city_activity['geo_city_id'].isin(valid_cities)]

    city_activity['z_score'] = city_activity.groupby('geo_city_id')['event_count']\
        .transform(lambda x: (x - x.mean()) / x.std())

    city_activity['is_anomaly'] = city_activity['z_score'] > z_threshold


    total_activity = df.groupby(pd.Grouper(key='ts', freq=rolling_window))\
                      .size()\
                      .reset_index(name='total_events')

    city_activity = city_activity.merge(total_activity, on='ts')
    city_activity['activity_ratio'] = city_activity['event_count'] / city_activity['total_events']

    city_activity['prev_ratio'] = city_activity.groupby('geo_city_id')['activity_ratio'].shift(1)
    city_activity['ratio_change'] = (city_activity['activity_ratio'] - city_activity['prev_ratio']) / city_activity['prev_ratio']

    anomalies = city_activity[city_activity['is_anomaly']].sort_values('z_score', ascending=False)

    return anomalies, city_activity



anomalies, city_activity = analyze_city_activity(df)

print("Топ аномалий активности:")
print(anomalies[['ts', 'geo_city_id', 'event_count', 'z_score', 'activity_ratio', 'ratio_change']].head())



  city_activity = df.groupby(['geo_city_id', pd.Grouper(key='ts', freq=rolling_window)])\
  total_activity = df.groupby(pd.Grouper(key='ts', freq=rolling_window))\


Топ аномалий активности:
                      ts  geo_city_id  event_count   z_score  activity_ratio  \
865  2024-10-09 07:00:00     468866.0          106  3.214264        0.000190   
1069 2024-10-09 03:00:00     472278.0           47  3.187923        0.000068   
4646 2024-10-09 07:00:00     564719.0          117  3.146020        0.000210   
7422 2024-10-09 15:00:00    1498693.0           51  3.051541        0.000085   
8682 2024-10-09 03:00:00    2051523.0           80  3.010474        0.000116   

      ratio_change  
865       4.103835  
1069      2.249572  
4646      4.257913  
7422      2.543364  
8682      1.304661  


In [10]:
import pandas as pd
from datetime import timedelta

def detect_suspicious_ips(df: pd.DataFrame,
                         max_users_per_ip: int = 10) -> pd.DataFrame:
    """
    Обнаруживает IP-адреса с аномально большим количеством пользователей

    Параметры:
        df: DataFrame с данными
        max_users_per_ip: максимальное допустимое количество пользователей с одного IP

    Возвращает:
        DataFrame с подозрительными IP и статистикой
    """
    ip_stats = (
        df.groupby('ip')
        .agg(
            unique_users=('randPAS_user_passport_id', 'nunique'),
            total_actions=('randPAS_user_passport_id', 'count'),
            first_seen=('ts', 'min'),
            last_seen=('ts', 'max')
        )
        .reset_index()
    )

    suspicious_ips = ip_stats[ip_stats['unique_users'] > max_users_per_ip]
    suspicious_ips['activity_period'] = suspicious_ips['last_seen'] - suspicious_ips['first_seen']

    return suspicious_ips.sort_values('unique_users', ascending=False)


def detect_user_activity_spikes(df: pd.DataFrame,
                              time_window_sec: int = 60,
                              max_actions: int = 30) -> pd.DataFrame:
    """
    Обнаруживает пользователей с аномально высокой активностью

    Параметры:
        df: DataFrame с данными
        time_window_sec: временное окно в секундах для анализа
        max_actions: максимальное допустимое количество действий за окно

    Возвращает:
        DataFrame с подозрительными пользователями и статистикой
    """

    df_sorted = df.sort_values(['randPAS_user_passport_id', 'ts'])
    df_sorted['time_diff'] = (
        df_sorted.groupby('randPAS_user_passport_id')['ts']
        .diff()
        .dt.total_seconds()
    )
    rapid_actions = df_sorted[df_sorted['time_diff'] < time_window_sec] \
    .groupby('randPAS_user_passport_id') \
    .agg(
        rapid_actions_count=('time_diff', 'count'),
        min_time_diff=('time_diff', 'min'),
        avg_time_diff=('time_diff', 'mean'),
        ip_list=('ip', lambda x: x.unique().tolist())
    ) \
    .reset_index()
    suspicious_users = rapid_actions[rapid_actions['rapid_actions_count'] > max_actions]
    suspicious_users['ip_count'] = suspicious_users['ip_list'].apply(len)

    return suspicious_users.sort_values('rapid_actions_count', ascending=False)



print("Анализ подозрительных IP-адресов:")
suspicious_ips = detect_suspicious_ips(df, max_users_per_ip=10)
print(suspicious_ips)

print("\nАнализ подозрительной активности пользователей:")
suspicious_users = detect_user_activity_spikes(df, time_window_sec=10, max_actions=5)
print(suspicious_users)

Анализ подозрительных IP-адресов:
Empty DataFrame
Columns: [ip, unique_users, total_actions, first_seen, last_seen, activity_period]
Index: []

Анализ подозрительной активности пользователей:


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  suspicious_users['ip_count'] = suspicious_users['ip_list'].apply(len)


            randPAS_user_passport_id  rapid_actions_count  min_time_diff  \
0                                                 4735933            0.0   
1211  YV(Ov!phAxb1Bv\S1A5V-JT[._*6xu                  224            0.0   
1999  |`xUr;5_+]^'"3_=a4';d'2(}0K}<v                  188            0.0   
786   EI50u:&BOZ&Y"6d]o2mDT4@zP~uOT_                  161            0.0   
1929  y^10#^1HaH.lpLD+!/[pZ#7M3ok!)y                  138            0.0   
...                              ...                  ...            ...   
1267  \.\;&Ey6NunDd+kWVe+'D@WjnI8Yt7                    6            0.0   
683   @_!Mtm[|~SdDVk@W*sp*Q&W#%T"+@T                    6            0.0   
974   N=R@!PexpH;lqV'.(hRQDL@)=,\W;t                    6            0.0   
2002  |kxZIAr<det}yQz{`~45u@gUYsV5GX                    6            0.0   
4      1Y.<;?P\_3}g9FDK#:6!O0;&>c3ni                    6            0.0   

      avg_time_diff                                            ip_list  \
0          0.

In [15]:
def detect_anomalous_time_windows(df, time_col, user_col, threshold=1.5, window_size='1H'):
    df = df.copy()

    df[time_col] = pd.to_datetime(df[time_col])
    df['time_window'] = df[time_col].dt.floor(window_size)

    df = df.sort_values(by=[user_col, time_col])
    df['time_diff'] = df.groupby(user_col)[time_col].diff().dt.total_seconds()

    df['time_category'] = pd.cut(df['time_diff'],
                                 bins=[0, 30, 300, 1800, float('inf')],
                                 labels=['short', 'medium', 'long', 'very_long'])

    time_window_stats = df.groupby('time_window')['time_category'].value_counts(normalize=True).unstack().fillna(0)

    time_window_stats['short_ratio_change'] = time_window_stats['short'].pct_change().abs().fillna(0)

    anomalous_windows = time_window_stats[time_window_stats['short_ratio_change'] > threshold]

    return anomalous_windows[['short', 'medium', 'long']]

# смотрим доли действий, разница между которыми до 30 секунд, до 5 минут и до 30 минут. если резко меняется, то выводим.
anomalies = detect_anomalous_time_windows(df, time_col='ts', user_col='randPAS_user_passport_id')

print("Аномальные временные окна с резкими изменениями долей:")
print(anomalies)


  df['time_window'] = df[time_col].dt.floor(window_size)


Аномальные временные окна с резкими изменениями долей:
Empty DataFrame
Columns: [short, medium, long]
Index: []


In [16]:
import pandas as pd

def detect_anomalous_device_shares(df, time_col, device_col, threshold=1.5, window_size='30T'):
    df = df.copy()
    df[time_col] = pd.to_datetime(df[time_col])
    df['time_window'] = df[time_col].dt.floor(window_size)

    device_shares = df.groupby(['time_window', device_col]).size().unstack().fillna(0)
    device_shares = device_shares.div(device_shares.sum(axis=1), axis=0)
    device_shares_change = device_shares.pct_change().abs().fillna(0)

    anomalous_windows = device_shares_change[device_shares_change.max(axis=1) > threshold]
    return anomalous_windows

#аномалии во временных окнах по распределению типов устройств
anomalous_device_windows = detect_anomalous_device_shares(df, time_col='ts', device_col='ua_device_type')
print("Аномальные временные окна с резкими изменениями в долях устройств:")
print(anomalous_device_windows)


  df['time_window'] = df[time_col].dt.floor(window_size)


Аномальные временные окна с резкими изменениями в долях устройств:
Empty DataFrame
Columns: [, mobile, other, pc, tablet]
Index: []


In [22]:
def detect_anomalous_page_views(df, time_col, user_col, page_col, threshold=3, window_size='30T'):
    df = df.copy()
    df[time_col] = pd.to_datetime(df[time_col])
    df['time_window'] = df[time_col].dt.floor(window_size)
    page_views = df.groupby(['time_window', page_col])[user_col].nunique().unstack().fillna(0)
    page_views_change = page_views.pct_change().abs()
    page_views_change[page_views.shift(1) == 0] = np.nan
    anomalies = page_views_change[page_views_change > threshold].stack().reset_index()
    anomalies.columns = ['time_window', 'url', 'growth']

    return anomalies.dropna()

anomalous_page_windows = detect_anomalous_page_views(
    df,
    time_col='ts',
    user_col='randPAS_user_passport_id',
    page_col='url'
)

if not anomalous_page_windows.empty:
    print(" Найдены аномальные всплески посещаемости страниц:")
    for _, row in anomalous_page_windows.iterrows():
        print(f" {row['time_window']} |  {row['url']} |  Рост в {row['growth']:.2f} раз")
else:
    print("Аномалий не найдено")


  df['time_window'] = df[time_col].dt.floor(window_size)


⚠️ Найдены аномальные всплески посещаемости страниц:
 2024-10-09 03:30:00 |  https://b4fd7a71f468fc04.ru/820e801a50697fea |  Рост в 4.00 раз
 2024-10-09 04:00:00 |  https://57580f5dcfabe04b.ru/278ee9908f785f17 |  Рост в 3.50 раз
 2024-10-09 05:00:00 |  https://b4fd7a71f468fc04.ru/d9d6ec6e8c2b2202 |  Рост в 8.00 раз
 2024-10-09 06:00:00 |  https://b4fd7a71f468fc04.ru/025de1d395eddcd8 |  Рост в 4.00 раз
 2024-10-09 06:00:00 |  https://b4fd7a71f468fc04.ru/7575915bc7d42308 |  Рост в 8.00 раз
 2024-10-09 06:30:00 |  https://57580f5dcfabe04b.ru/4e5f3d1910434c05 |  Рост в 4.00 раз
 2024-10-09 07:00:00 |  https://b4fd7a71f468fc04.ru/820e801a50697fea |  Рост в 4.00 раз
 2024-10-09 07:30:00 |  https://57580f5dcfabe04b.ru/4e5f3d1910434c05 |  Рост в 5.00 раз
 2024-10-09 15:00:00 |   |  Рост в 3.45 раз
 2024-10-09 15:00:00 |  https://57580f5dcfabe04b.ru/4e5f3d1910434c05 |  Рост в 7.00 раз
 2024-10-09 15:00:00 |  https://57580f5dcfabe04b.ru/8616e9e20334b88d |  Рост в 5.00 раз
 2024-10-09 15:00:00 | 

In [23]:
from sklearn.ensemble import IsolationForest
import pandas as pd

def detect_anomalous_users(df, user_col, time_col, page_col):
    user_page_times = df.groupby([user_col, page_col])[time_col].sum().reset_index()

    user_avg_times = user_page_times.groupby(user_col)[time_col].mean().reset_index()
    user_avg_times.columns = [user_col, 'avg_time_spent']
    model = IsolationForest(contamination=0.05, random_state=42)  # contamination - доля аномальных
    user_avg_times['is_anomalous'] = model.fit_predict(user_avg_times[['avg_time_spent']])
    anomalous_users = user_avg_times[user_avg_times['is_anomalous'] == -1]

    return anomalous_users


anomalous_users = detect_anomalous_users(df, user_col='randPAS_user_passport_id', time_col='secs', page_col='url')

if not anomalous_users.empty:
    print("Найдены аномальные пользователи с нетипичным поведением:")
    print(anomalous_users)
else:
    print("Аномальных пользователей не найдено")


⚠️ Найдены аномальные пользователи с нетипичным поведением:
            randPAS_user_passport_id  avg_time_spent  is_anomalous
0                                        1170.081622            -1
87    #Pw.V$:Qk$hsyt $vfr${@{p =uh)F      616.000000            -1
95    #iLr!G9pN+fv:,%Y88S.jnfKM3p[sd     3460.000000            -1
99    #y|g}JfyBDmh0xX^]Ds[n3kz4![=A6      956.000000            -1
144   %O`*{wc%K} "E0n3p\Ig,/UMc{=j;H     1270.666667            -1
...                              ...             ...           ...
2275  {h Uw{ZN05~AX@i<Yrgf^{m]xYWr_H     4809.000000            -1
2304  |d])87=_dT`s:FhkD4d`Mo{V/'{s.z      612.250000            -1
2314  }*i6!(F(!xQ~DS+%k?$ksv*gX=Kh/j     1440.000000            -1
2320  }Ct%V=I'q/vE^FVU2yp|9PIe&6P1+u     1001.500000            -1
2325  }TIBwWp64kv")-'8E3WSl rWz"Xs^      1280.000000            -1

[117 rows x 3 columns]
