In [1]:
# Command to make `src` module available in notebook
# Details: https://stackoverflow.com/a/35273613

import os
import sys
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)


### Поиск аномалий

В этом наборе экспериментов мы будем пытаться находит аномалии в последовательности событий

In [2]:
from collections import defaultdict
from datetime import datetime, timedelta, date
from enum import Enum

from pydantic import BaseModel
import numpy as np
import pandas as pd
import seaborn

from src.cert import (
    CERTDatasetVersion,
    CERTDataType,
    CERTDataType,
    load_cert_dataset_users_dataframe,
    load_dataframe,
)


KeyboardInterrupt: 

In [None]:

class BaseEvent(BaseModel):

    event_id: str
    user_id: str
    timestamp: datetime

    @property
    def weekday(self) -> int:
        return self.timestamp.weekday()

    @property
    def is_work_time(self) -> bool:
        return 8 < self.timestamp.hour < 18



In [None]:
class LogonEventType(str, Enum):
    login = 'logon'
    logout = 'logoff'

class LogonEvent(BaseEvent):

    type: LogonEventType




In [None]:
users = load_cert_dataset_users_dataframe(version=CERTDatasetVersion.cert_3_2)
logon_activities = load_dataframe(
    data_type=CERTDataType.logon,
)


In [None]:
# Нужно построить для каждого человека окно, в котором он оббычно находится на работе
# Кажется, что это время между тем, как он обычно заходит и выходит
# Вопрос: что значит "обычно"?


In [None]:
class UserLogonRecord(BaseModel):
    avg_first_login: float | None = None
    avg_last_logout: float | None = None

    last_logout: datetime | None = None
    last_login: datetime | None = None

    login_count: int = 0
    logout_count: int = 0

    def add_event(self, type: LogonEventType, timestamp: datetime):
        if type == LogonEventType.login:
            self.add_login(timestamp)
        elif type == LogonEventType.logout:
            self.add_logout(timestamp)
        else:
            raise TypeError(f'Unknown event type {type}')

    def add_login(self, timestamp: datetime):
        if self.login_count == 0:
            self.avg_first_login = timestamp.hour * 60 + timestamp.minute
            self.login_count = 1
            self.last_login = timestamp
            return

        assert self.avg_first_login is not None and self.last_login is not None

        if self.last_login.date() == timestamp.date():
            return

        mins = timestamp.hour * 60 + timestamp.minute
        self.avg_first_login = (
            (self.avg_first_login * self.login_count + mins)
            / (self.login_count + 1)
        )
        self.last_login = timestamp
        self.login_count += 1

    def add_logout(self, timestamp: datetime):
        if self.logout_count == 0:
            if self.last_logout is None or timestamp.date() == self.last_logout.date():
                self.last_logout = timestamp
            else:
                self.avg_last_logout = self.last_logout.hour * 60 + self.last_logout.minute
                self.last_logout = timestamp
                self.login_count = 1
            return

        assert self.last_logout is not None and self.avg_last_logout is not None

        if timestamp.date() == self.last_logout.date():
            self.last_logout = timestamp
            return

        mins = self.last_logout.hour * 60 + self.last_logout.minute
        self.avg_last_logout = (
            (self.avg_last_logout * self.logout_count + mins)
            / (self.logout_count + 1)
        )
        self.last_logout = timestamp
        self.logout_count += 1


class LogonWorkdayIndicator:
    """
    Горит, если человек зашел во время, сильно отличающееся от оббычного времени входа

    Хранит для каждого человека отображение user_id -> (avg_login, avg_logoff)

    В первом поле храним первое время входа за день
    Во втором последнее. Индикатор горит, если первое время входа сильно меньше обычного первого,
    или если произошел вход/выход во время, сильно отличающееся от обычного
    """

    def __init__(self, delta: int = 15, train_days: int = 30):
        self.delta = delta
        self.train_days = train_days
        self.user_to_avg_logon_map: dict[str, UserLogonRecord] = {}
        self.first_day: date | None = None

    def process(self, event: BaseEvent) -> bool:
        if not isinstance(event, LogonEvent):
            return False

        if self.first_day is None:
            self.first_day = event.timestamp.date()

        if event.user_id not in self.user_to_avg_logon_map:
            self.user_to_avg_logon_map[event.user_id] = UserLogonRecord()

        record = self.user_to_avg_logon_map[event.user_id]

        if (
            (event.timestamp.date() - self.first_day).days < self.train_days
            or record.avg_first_login is None
            or record.avg_last_logout is None
        ):
            self.user_to_avg_logon_map[event.user_id].add_event(event.type, event.timestamp)
            return False

        mins = event.timestamp.hour * 60 + event.timestamp.minute
        if (
            event.type == LogonEventType.login and mins < record.avg_first_login - self.delta
            or event.type == LogonEventType.logout and mins > record.avg_last_logout + self.delta
        ):
            return True

        self.user_to_avg_logon_map[event.user_id].add_event(event.type, event.timestamp)
        return False


In [None]:
def get_event_factory(data: pd.DataFrame, user_id: str | None = None):
    if user_id:
        data = data.query(f'user == "{user_id}"')

    for _, row in data.iterrows():
        yield LogonEvent(
            event_id=row['id'],
            user_id=row['user'],
            timestamp=datetime.strptime(row['date'], '%m/%d/%Y %H:%M:%S'),
            type=row['activity'].lower(),
        )


In [None]:
indicator = LogonWorkdayIndicator(delta=120, train_days=90)
events = get_event_factory(logon_activities, 'AFM0640')

indicator_on_map = defaultdict(list)

for event in events:
    val = indicator.process(event)
    if event.type == 'logon':
        print(
            event.timestamp,
            indicator.user_to_avg_logon_map['AFM0640'].avg_first_login)
    if indicator.process(event):
        indicator_on_map[event.user_id].append(event.event_id)



2010-01-04 08:39:00 519
2010-01-04 13:16:25 519
2010-01-05 08:45:00 522.0
2010-01-05 13:44:12 522.0
2010-01-06 08:45:00 523.5
2010-01-07 08:45:00 524.25
2010-01-07 12:40:52 524.25
2010-01-08 08:34:00 519.125
2010-01-08 12:25:31 519.125
2010-01-11 08:30:00 514.5625
2010-01-11 11:21:54 514.5625
2010-01-11 20:00:57 514.5625
2010-01-12 05:37:32 425.78125
2010-01-12 08:31:00 425.78125
2010-01-12 11:34:22 425.78125
2010-01-13 08:41:00 473.390625
2010-01-13 12:15:27 473.390625
2010-01-14 08:45:00 499.1953125
2010-01-14 09:08:15 499.1953125
2010-01-15 03:12:57 345.59765625
2010-01-15 08:45:00 345.59765625
2010-01-15 13:27:20 345.59765625
2010-01-18 08:45:00 435.298828125
2010-01-18 12:08:12 435.298828125
2010-01-19 08:30:00 472.6494140625
2010-01-19 14:17:35 472.6494140625
2010-01-20 08:41:00 496.82470703125
2010-01-20 13:19:27 496.82470703125
2010-01-21 08:33:00 504.912353515625
2010-01-21 13:33:12 504.912353515625
2010-01-22 08:45:00 514.9561767578125
2010-01-22 12:40:40 514.9561767578125
20

In [40]:
len(indicator_on_map)


189

In [41]:
logon_workday_indicator_users = set(indicator_on_map.keys())


In [79]:
device_df = load_dataframe(
    data_type=CERTDataType.device,
)
device_df.head()


Unnamed: 0,id,date,user,pc,activity
0,{R8N4-R8YE70QN-3287FCGL},01/01/2010 06:46:40,RPM0600,PC-9164,Connect
1,{S2M7-P6FL85EG-7721KZFG},01/01/2010 07:33:10,RPM0600,PC-9164,Disconnect
2,{T6E3-Z4ZI24WS-2427MCYM},01/01/2010 07:59:59,RPM0600,PC-9164,Connect
3,{U7O7-D2ZJ25RP-9120NPLP},01/01/2010 08:02:14,WXW0044,PC-9422,Connect
4,{F4W5-B9ID71RL-3407HBRZ},01/01/2010 08:05:12,CSD0242,PC-8696,Connect


In [28]:
class DeviceUsageEvent(BaseEvent):
    pass


class DeviceUsageRecord(BaseModel):

    last_events: list[datetime] = []
    max_val: int = 0

    @property
    def cur_val(self) -> int:
        return len(self.last_events)

    def add_event(
        self,
        timestamp: datetime,
        limit: int,
        is_train: bool,
    ):
        self.last_events.append(timestamp)
        self.last_events = [
            x for x in self.last_events
            if x > timestamp - timedelta(minutes=limit)
        ]

        if is_train and len(self.last_events) > self.max_val:
            self.max_val = len(self.last_events)


class DeviceUsageMaxIndicator:
    """
    Индикатор будет показывать статистические выбросы по данным использования внешних устройств
    """

    def __init__(self, limit: int = 60, train_days: int = 30):
        self.limit = limit
        self.train_days = train_days
        self.usage: dict[str, DeviceUsageRecord] = {}
        self.first_day = None

    def process(self, event: BaseEvent) -> bool:

        if self.first_day is None:
            self.first_day = event.timestamp.date()

        if not isinstance(event, DeviceUsageEvent):
            return False

        if event.user_id not in self.usage:
            self.usage[event.user_id] = DeviceUsageRecord()


        is_train = (event.timestamp.date() - self.first_day).days < self.train_days

        self.usage[event.user_id].add_event(
            timestamp=event.timestamp,
            limit=self.limit,
            is_train=is_train,
        )

        return (
            not is_train
            and self.usage[event.user_id].cur_val > self.usage[event.user_id].max_val
        )


In [29]:
def get_event_factory(data: pd.DataFrame, user_id: str | None = None):
    if user_id:
        data = data.query(f'user == "{user_id}"')

    for _, row in data.iterrows():
        yield DeviceUsageEvent(
            event_id=row['id'],
            user_id=row['user'],
            timestamp=datetime.strptime(row['date'], '%m/%d/%Y %H:%M:%S'),
        )


In [30]:
device_df = load_dataframe(
    data_type=CERTDataType.file,
)

indicator = DeviceUsageMaxIndicator(limit=120, train_days=90)
events = get_event_factory(device_df)

indicator_on_map = defaultdict(list)

for event in events:
    if indicator.process(event):
        indicator_on_map[event.user_id].append((event.event_id, event.timestamp))


In [31]:
len(indicator_on_map)

file_indicator_users = set(indicator_on_map.keys())


In [33]:

len(logon_workday_indicator_users & file_indicator_users)


72

In [34]:
device_df = load_dataframe(
    data_type=CERTDataType.device,
)

indicator = DeviceUsageMaxIndicator(limit=120, train_days=90)
events = get_event_factory(device_df)

indicator_on_map = defaultdict(list)

for event in events:
    if indicator.process(event):
        indicator_on_map[event.user_id].append((event.event_id, event.timestamp))

device_indicator_users = set(indicator_on_map.keys())


In [42]:
len(logon_workday_indicator_users & file_indicator_users & device_indicator_users)


46