In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

In [6]:
plt.rcParams["figure.figsize"] = [12,8] # Размер графиков

# Добавляем данные:
## Важно:
Данные предварительно должны быть сгрупированы следующим образом:
1 колонка - Название операции.
2 колонка - Количество операций.
3 колонка - Дата.
4 колонка - Час.

In [None]:
data = pd.read_csv("data.csv", sep=",")

## Переименовываем колонки для дальнейшей работы:

In [None]:
data.columns.values[0] = "name"
data.columns.values[1] = "count"
data.columns.values[2] = "date"
data.columns.values[3] = "hour"
data.head()

## Группируем данные:

In [None]:
data_by_date = data[["date", "count", "hour"]].groupby(["date", "hour"]).sum()
data_by_date = data_by_date.reset_index()
data_by_date.head()

## Определение типичной нагрузки:

### Оцениваем текущее распределение нагрузки по датам:

In [None]:
def generate_line_plot(df: pd.DataFrame) -> None:
    """Функция для построения линейного графика."""
    sns.lineplot(data=df, x="hour", y="count", hue="date")
    sns.set_style("darkgrid") # Добавляем сетку
    plt.xlabel("Часы")
    plt.ylabel("Количество")
    plt.xticks(range(24)) # Добавляем деления для оси x
    plt.legend(
        loc="upper center",
        bbox_to_anchor=(0.5, -0.1),
        ncol=6 # Количество столбцов в легенде
    )
    plt.tight_layout() # Автоотступы в легенде

In [None]:
generate_line_plot(data_by_date)

## Фильтруем данные по часам и количеству:
Ниже описана функция для фильтрации данных. Ее можно использовать для дополнительной фильтрации.

In [None]:
def data_filter_by_line(df: pd.DataFrame, hour_from: int, hour_to: int, count_from: int, count_to: int, dates_by_filter: list[str] = None) -> pd.DataFrame:
    """
    Фунция для фильтрации данных ждя построения линейного графика.

    :param df: Исходный датафрейм.
    :param hour_from: начало периода получения данных (часы).
    :param hour_to: Конец периода получения данных (часы).
    :param count_from: Миниум диапазона количества.
    :param count_to: Максимум диапазона количества.
    :param dates_by_filter: Список дат для отфильтровывания.

    :return: Датафрейм с отфильтрованными значениями.
    """
    data_by_filter = df.loc[(df["hour"] >= hour_from) & (df["hour"] <= hour_to) & (df["count"] >= count_from) & (df["count"] <= count_to)]
    if dates_by_filter:
        data_by_filter = data_by_filter.loc[~data_by_filter["date"].isin(dates_by_filter)]
    dates_in_filtered = data_by_filter["date"].unique() # Достаем уникальные значения даты.
    filtered_data = df[df["date"].isin(dates_in_filtered)] # Отфильтровываем необходимые данные.
    return filtered_data


## Строим график по получившимся данным:

### Введите диапазон часов и диапазон количества:

In [None]:
hour_from = 13
hour_to = 13
count_from = 100000
count_to = 300000
filtered_data = data_filter_by_line(data_by_date, hour_from, hour_to, count_from, count_to)
generate_line_plot(filtered_data)

## Отфильтровываем выбросы:
Каждый раз при запуске перезаписываюся данные переменной "filtered_data". Можно использовать несколько раз, меняя входные значения часов и количества. Чтобы начать фильтрацию сначала, выполните предыдущую ячейку.

In [None]:
hour_from = 13
hour_to = 13
count_from = 150000
count_to = 250000
dates_by_filter = [] # В список можно добавить даты для фильтрации отдельных дней.
filtered_data = data_filter_by_line(filtered_data, hour_from, hour_to, count_from, count_to, dates_by_filter)
generate_line_plot(filtered_data)

# Определение пик-часа:

## Распределение средней нагрузки по часам:

In [None]:
by_filter = filtered_data["date"].unique() # Выбираем подходящие данные с прошлого шага
result_data = data.loc[data["date"].isin(by_filter)] # Получаем данные с которыми дальше работаем
grouped = result_data.groupby(["name", "hour"])["count"].mean().reset_index() # Средняя нагрузка по часам.
grouped["count"] = grouped["count"].round().astype("int32") # Округляем
grouped.head()

## Стоим графики:
Графики показывают как распределялась средняя нагрузка по часам в выбранных ранее данных по каждому пользовательскому сценарию.

In [None]:
g = sns.FacetGrid(data=grouped, col="name", col_wrap=2, height=4, aspect=1.5, sharey=False, sharex=False)
g.map(sns.barplot, "hour", "count", order=range(24))
g.set_axis_labels("Часы", "Количество")
g.set_titles("{col_name}")
plt.tight_layout()
plt.show()

## Определение часа максимальной нагрузки для каждого UC:

In [None]:
max_hours = grouped.loc[grouped.groupby("name")["count"].idxmax()].reset_index(drop=True)
max_hours.head()

### Группируем по часам и смотрим у какого количества операций пик выпал на это время:

In [None]:
x = max_hours[["hour", "count"]].groupby("hour").count().reset_index()
x.plot.bar(x="hour", y="count")
plt.xlabel("Часы")
plt.ylabel("Количество операций")
plt.show()

### Ищем пиковый час сразу по всем операциям:

In [None]:
max_hours_all = grouped.groupby("hour")["count"].sum().reset_index()
max_hours_all.plotbar(x="hour", y="count")
plt.xlabel("Часы")
plt.ylabel("Количество операций")
plt.show()

In [None]:
maximum = max_hours_all["count"].max() # Пролучаем максимальную частоту операций
maximum_hour = max_hours_all[max_hours_all["count"] == maximum]["hour"].min() # Час максимальной нагрузки.
hours = max_hours_all.loc[ # Берем пик-час и до двух ближайших, если они подходят (больше 90% от пик-часа)
    (max_hours_all["count"] > maximum * 0.9) &
    (
        (max_hours_all["hour"] == maximum_hour) |
        (max_hours_all["hour"] == maximum_hour - 1) |
        (max_hours_all["hour"] == maximum_hour + 1) 
    )
]["hour"]
result_grouped = grouped.loc[grouped["hour"].isin(hours)].reset_index(drop=True) # Прогоняем через фильтр
result_grouped = result_grouped.groupby("name")["count"].max().reset_index() # Берем максимум из каждого часа
result_grouped.head()

### Смотрим, каких операций нет в итоговой выборке:

In [None]:
missing = grouped.loc[~grouped["name"].isin(result_grouped["name"])]
missing["name"].unique()

# Формируем список операций:
Сортируем их по убыванию количества. Добавляем колонку с накопленным итогом, чтобы отслеживать точность профиля нагрузки.

In [None]:
all_operations = result_grouped.sort_values(by="count", ascending=False).reset_index(drop=True)
all_operations["percent"] = (all_operations["count"].cumsum() / all_operations["count"].sum()) * 100
all_operations["percent"] = all_operations["percent"].round(2)
all_operations

## Необходимые операции для соответствия профилю:
На данном этапе оставляем операции, которые подходят под требования точности профиля. Также здесь можно добавить критичные и тяжелые операции, которые необходимо включить в профиль. Кроме того, здесь можно указать минимальное количество операций для попадания в профиль нагрузки.

In [None]:
accuracy = 98 # Минимальный процент точности профиля.
critical_operations = [] # Список критичных операций.
heavy_operations = [] # Список тяжелых операций.
min_count = 0 # Минимальное количество операций в час для попадания в профиль.

profile = all_operations.loc[all_operations["percent"].round() <= accuracy | (all_operations["name"].isin(critical_operations)) | (all_operations["name"].isin(heavy_operations))]
profile = profile[["name", "count"]].reset_index(drop=True)
profile = profile[profile["count"] >= min_count]
profile

# Итоговый профиль для подачи нагрузки:

In [None]:
profile["percent"] = (profile["count"] / profile["count"].cum()) * 100
profile["percent"] = profile["percent"].round(2)
profile

## Диаграмма количества операций:

In [None]:
profile.plot.pie(y="count", startangle=90, labeldistance=None, radius=1.2)
plt.axis("off")
plt.legend(
    labels=profile["name"],
    loc="lower center",
    bbox_to_anchor=(0.5, -0.15),
    ncol=2 # Количество столбцов в легенде
)
plt.tight_layout()
plt.show()