# 00. Инженерия данных и первичная подготовка

## 1. Инициализация окружения и конфигурация

Инициализация путей на основе настроей `config.py`. Если активирован `Сolab`

In [None]:
import pandas as pd
import numpy as np
from pathlib import Path
import sys
import re

In [None]:
# === Конфигурация ===
class Config:
  """
  Класс для хранения всех настроек проекта
  Если разработка происходит в Colab, изменить IS_COLAB = True
  """
  IS_COLAB = True
  DATA_DIR_NAME = "data"
  FILE_PATTERN = "w{}_25.xlsx"
  WEEKS_RANGE = range(1, 21)

  # Путь к датасет-файлам проекта
  DRIVE_PATH = "/content/drive/MyDrive/Colab Notebooks/Timesheet Analysis/"

In [None]:
if Config.IS_COLAB:
  try:
    from google.colab import drive
    drive.mount("/content/drive")
    BASE_DIR = Path(Config.DRIVE_PATH) / Config.DATA_DIR_NAME
  except ImportError:
    print("Ошибка подключения к Google Drive")
else:
  BASE_DIR = Path.cwd() / Config.DATA_DIR_NAME

## 2. Загрузка и консолидация данных

Объединяем еженедельные файлы в единый `df`. Имена файлов и количество недель генерируется на основе конфигурационного класса

In [None]:
def load_timesheet_data(base_dir, weeks_range, pattern):
  """
  Загружает и консолидирует xlsx-файлы за указанные недели
  """
  all_chunks = []
  missing_files =[]

  for week in weeks_range:
    file_path = base_dir / pattern.format(week)

    if not file_path.exists():
      missing_files.append(file_path.name)
      continue

    try:
      df_temp = pd.read_excel(file_path)
      df_temp['source_file'] = file_path.name
      all_chunks.append(df_temp)
    except Exception as e:
      print(f"Ошибка загрузки файла {file_path.name}: {e}")

  if not all_chunks:
    print("Данные не найдены. Необходимо проверить структуру папок и файлы.")
    return pd.DataFrame()

  if missing_files:
    print(f"Не удалось загрузить файлов: {len(missing_files)}")

  df = pd.concat(all_chunks, ignore_index=True)
  print(f"Успешно загруженные файлы: {len(all_chunks)}")
  return df

df = load_timesheet_data(BASE_DIR, Config.WEEKS_RANGE, Config.FILE_PATTERN)

## 3. Очистка и нормализация схемы данных

1. Переименовываем столбцы в snake_case
2. Удаляем технические столбцы

In [None]:
COLUMN_MAPPING = {
    'Unnamed: 0': 'week_label',
    'Unnamed: 1': 'date',
    'С': 'start_time',
    'По': 'end_time',
    'Часы': 'duration_hours',
    'Описание': 'description',
    'Тип Активности': 'activity_type'
}

In [None]:
def preprocess_df(df, mapping):
  if df.empty:
    return df

  # Ренэйминг соглсно мееппингу тех объектов, которые есть в df
  existing_mapping = {k: v for k, v in mapping.items() if k in df.columns}
  df = df.rename(columns=existing_mapping)

  # Оставляем только нужные колонки
  target_columns = list(existing_mapping.values())
  if 'source_file' in df.columns:
    target_columns.append('source_file')

  df = df[target_columns].copy()

  return df

df = preprocess_df(df, COLUMN_MAPPING)

## 4. Базовый препроцессинг и приведение типов

### 4.1 Преобразование типов
* Преобразование `date`, `start_time` и `end_time` в формат `datetime`
* Преобразование типа `object` в `stringDtype`

In [None]:
def cast_types_schema(df):
  if df.empty:
    return df

  df = df.copy()

  df["date"] = pd.to_datetime(df["date"], format="%d %b %y", errors="coerce")

  for col in ["start_time", "end_time"]:
    df[col] = pd.to_datetime(df[col], format="%H:%M", errors="coerce").dt.time

  for col in ["start_time", "end_time"]:
    df[col] = df.apply(
        lambda x: pd.Timestamp.combine(x["date"].date(), x[col])
        if pd.notnull(x["date"]) and pd.notnull(x[col])
        else pd.NaT,
        axis=1
    )

  for col in ["week_label", "description", "activity_type"]:
    df[col] = df[col].astype("string")

  return df

df = cast_types_schema(df)


### 4.2 Нормализация текстовых описаний

In [None]:
def normalize_text_fields(df):
  if df.empty:
    return df

  df = df.copy()

  for col in ["description", "activity_type"]:
    df[col] = df[col].str.strip().str.lower()

  return df

df = normalize_text_fields(df)


## 5. Feature Engineering

### 5.1 Предварительная подготовка и классификация типов активности

* `deep_work_types` — типы, относящиеся к глубокой работе
* `personal_training_types ` — обучение и самообразование
* `other types ` — другие типы активностей

In [None]:
all_activity_types = set(df['activity_type'].unique())
deep_work_types = {
    'моделирование', 'проектный менеджмент', 'тестирование и доработки',
    'бизнес-анализ', 'настройка интеграции',
}
personal_training_types = {'изучение материалов, тренингов, лайфхаков'}
excluded_types = deep_work_types | personal_training_types
other_types = all_activity_types.difference(excluded_types)

print(f"""
  deep_work_types: {deep_work_types}
  personal_training_types: {personal_training_types}
  other_types: {other_types}
""")

### 5.2 Создадим аналитические метрики паттернов эффективности, оценки когнективной нагрузки

1. Временные признаки
* `hour_start` - час начала сессии;
 > _Зачем:_ Понять, когда пользователь фактически начинает рабочий день и в какие часы наблюдается пик активности.
* `day_name` - название дня недели;
> _Зачем:_ Сравнение «тяжелого понедельника» с «расслабленной пятницей».
* `is_weekend` - бинарный флаг выходного дня;
> _Зачем:_ Выявление скрытых переработок и риска выгорания.
* `is_overtime` - бинарный флаг позних задач.
> _Зачем:_ Выявление скрытых переработок и риска выгорания.

2. Поведенческая классификация
* Признак `work_nature`, который содержит классификацию типов активности;
> _Зачем:_ Оценка баланса между созданием ценности и операционной текучкой.
* `is_flow_session` - бинарный флаг, когда сессия относится к deep work и длится не менее 1 часа.
> _Зачем:_ Понять, удается ли пользователю входить в «состояние потока».

3. Анализ последовательности и фрагментации
* `is_context_switch` - флаг переключения контекста внутри одного дня;
> _Зачем:_ Расчет Индекса фрагментации. Частые переключения между «Моделированием» и «Почтой» убивают продуктивность.
* `gap_minutes` - время в минутах между началом текущей задачи и окончанием предыдущей.
> _Зачем:_ Оценка паузы между задачами.



In [None]:
def build_feature_pipeline(df):
  if df.empty:
    return df

  df = df.copy()

  df = df.sort_values(by=['date', 'start_time']).reset_index(drop=True)

  # === 1. Временные признаки ===
  df["hour_start"] = df["start_time"].dt.hour
  df["day_name"] = df["date"].dt.day_name()
  df["is_weekend"] = df["date"].dt.dayofweek.isin([5, 6]).astype(int)
  df["is_overtime"] = (df["end_time"].dt.hour >= 19).astype(int)

  # === 2. Поведенческая классификация
  df["work_nature"] = np.select(
      condlist=[
          df['activity_type'].isin(deep_work_types),
          df['activity_type'].isin(personal_training_types)
      ],
      choicelist=[
          "deep_work",
          "personal_training"
      ],
      default="other_work"
  )
  df["is_flow_session"] = (
      (df["work_nature"] == "deep_work") & df["duration_hours"] >=1.0
  ).astype(int)

  # === 3. Анализ последовательности и фрагментации

  # Сдвиг для сравнения с предыдущей задачей
  df["prev_activity"] = df["activity_type"].shift(1)
  df["prev_end_time"] = df["end_time"].shift(1)
  df["prev_date"] = df["date"].shift(1)

  # 3.1 Подсчет переключения
  df["is_context_switch"] = (
      (df["date"] == df["prev_date"]) &
      (df["activity_type"] != df["prev_activity"])
  ).astype(int)

  # 3.2 Сколько времени проходит между задачами
  df["gap_minutes"] = (
      (df["start_time"] - df["prev_end_time"]).dt.total_seconds() / 60
  ).fillna(0).where(df["date"].eq(df["prev_date"]))

  cols_to_drop = ["prev_activity", "prev_end_time", "prev_date"]
  df = df.drop(columns=cols_to_drop)

  return df

df = build_feature_pipeline(df)


---

In [None]:
df.info()