# Домашнє завдання до модуля “Просунуті структури для оптимізації та пошуку” (теми 3 та 4)

**Завдання 1. Застосування алгоритму максимального потоку для логістики товарів**



1) Ініціалізація: реалізація Едмондса–Карпа

In [6]:
# -*- coding: utf-8 -*-
from collections import deque, defaultdict

class MaxFlow:
    def __init__(self):
        self.cap = defaultdict(dict)

    def add_edge(self, u, v, c):
        # Додаємо/підсумовуємо місткість; зворотне ребро з нульовою місткістю
        self.cap[u][v] = self.cap[u].get(v, 0) + c
        self.cap[v].setdefault(u, 0)

    def edmonds_karp(self, s, t):
        # Ініціалізуємо потоки нулями
        flow = defaultdict(dict)
        for u in self.cap:
            for v in self.cap[u]:
                flow[u][v] = 0

        maxflow = 0
        while True:
            # BFS по залишковій мережі
            parent = {s: None}
            q = deque([s])
            while q and t not in parent:
                u = q.popleft()
                for v in self.cap[u]:
                    residual = self.cap[u][v] - flow[u][v]
                    if residual > 0 and v not in parent:
                        parent[v] = u
                        q.append(v)
                        if v == t:
                            break
            if t not in parent:
                break  # немає доповнюючого шляху

            # Знаходимо “пляшкове горлечко”
            v = t
            bottleneck = float('inf')
            while v != s:
                u = parent[v]
                bottleneck = min(bottleneck, self.cap[u][v] - flow[u][v])
                v = u

            # Підсилюємо потік уздовж шляху
            v = t
            while v != s:
                u = parent[v]
                flow[u][v] += bottleneck
                flow[v][u] -= bottleneck
                v = u

            maxflow += bottleneck

        return maxflow, flow

2) Побудова графа мережі

In [7]:
# Вузли
T1, T2 = "Термінал 1", "Термінал 2"
W1, W2, W3, W4 = "Склад 1", "Склад 2", "Склад 3", "Склад 4"
shops = [f"Магазин {i}" for i in range(1, 15)]

S, T = "S", "T"  # супер-джерело та супер-стік

g = MaxFlow()

# Задамо місткості з S до терміналів рівними сумі їх вихідних ребер
t1_total = 25 + 20 + 15
t2_total = 15 + 30 + 10
g.add_edge(S, T1, t1_total)
g.add_edge(S, T2, t2_total)

# Термінали → Склади
g.add_edge(T1, W1, 25)
g.add_edge(T1, W2, 20)
g.add_edge(T1, W3, 15)

g.add_edge(T2, W3, 15)
g.add_edge(T2, W4, 30)
g.add_edge(T2, W2, 10)

# Склади → Магазини
g.add_edge(W1, "Магазин 1", 15)
g.add_edge(W1, "Магазин 2", 10)
g.add_edge(W1, "Магазин 3", 20)

g.add_edge(W2, "Магазин 4", 15)
g.add_edge(W2, "Магазин 5", 10)
g.add_edge(W2, "Магазин 6", 25)

g.add_edge(W3, "Магазин 7", 20)
g.add_edge(W3, "Магазин 8", 15)
g.add_edge(W3, "Магазин 9", 10)

g.add_edge(W4, "Магазин 10", 20)
g.add_edge(W4, "Магазин 11", 10)
g.add_edge(W4, "Магазин 12", 15)
g.add_edge(W4, "Магазин 13", 5)
g.add_edge(W4, "Магазин 14", 10)

# Магазини → супер-стік (без обмеження попиту)
BIG = 10**9
for sh in shops:
    g.add_edge(sh, T, BIG)

3) Запуск Едмондса–Карпа і вивід максимального потоку

In [8]:
maxflow, flow = g.edmonds_karp(S, T)
print("Максимальний сумарний потік:", maxflow)

Максимальний сумарний потік: 115


4) Декомпозиція потоку у шляхи S→Термінал→Склад→Магазин→T і таблиця “Термінал→Магазин”

In [9]:
from collections import defaultdict
import pandas as pd

# Побудуємо граф з ребер, де прокачаний позитивний потік
pos = defaultdict(dict)
for u in flow:
    for v, f in flow[u].items():
        if f > 0:
            pos[u][v] = f

# Жадібно витягнемо прості шляхи та підсумуємо “внесок” кожного термінала в кожен магазин
terminal_shop = defaultdict(int)

def extract_one_path(pos, s, t):
    parent = {s: None}
    stack = [s]
    found = False
    while stack:
        u = stack.pop()
        if u == t:
            found = True
            break
        for v, f in pos[u].items():
            if f > 0 and v not in parent:
                parent[v] = u
                stack.append(v)
    if not found:
        return None, 0
    # відновлюємо шлях
    path = [t]
    v = t
    while v != s:
        v = parent[v]
        path.append(v)
    path.reverse()
    # пляшкове горлечко
    b = float('inf')
    for i in range(len(path)-1):
        u, v = path[i], path[i+1]
        b = min(b, pos[u][v])
    # зменшуємо позитивні потоки на шляху
    for i in range(len(path)-1):
        u, v = path[i], path[i+1]
        pos[u][v] -= b
        if pos[u][v] == 0:
            del pos[u][v]
    return path, b

while True:
    path, amt = extract_one_path(pos, S, T)
    if not path:
        break
    # очікувана форма: S → Термінал → Склад → Магазин → T
    if len(path) >= 5:
        term = path[1]
        shop = path[-2]
        terminal_shop[(term, shop)] += amt

rows = [{"Термінал": t, "Магазин": s, "Фактичний Потік (одиниць)": int(v)}
        for (t, s), v in sorted(terminal_shop.items())]
df = pd.DataFrame(rows)
df

Unnamed: 0,Термінал,Магазин,Фактичний Потік (одиниць)
0,Термінал 1,Магазин 1,15
1,Термінал 1,Магазин 2,10
2,Термінал 1,Магазин 4,15
3,Термінал 1,Магазин 5,5
4,Термінал 1,Магазин 7,15
5,Термінал 2,Магазин 10,20
6,Термінал 2,Магазин 11,10
7,Термінал 2,Магазин 5,5
8,Термінал 2,Магазин 6,5
9,Термінал 2,Магазин 7,5


5) Агрегації для звіту + збереження CSV

In [10]:
per_terminal = df.groupby("Термінал")["Фактичний Потік (одиниць)"].sum().reset_index(name="Разом по терміналу")
per_shop = df.groupby("Магазин")["Фактичний Потік (одиниць)"].sum().reset_index(name="Разом по магазину")

# Збереження у файли
df.to_csv("maxflow_terminal_to_shop.csv", index=False)
per_terminal.to_csv("maxflow_totals_by_terminal.csv", index=False)
per_shop.to_csv("maxflow_totals_by_shop.csv", index=False)

maxflow, per_terminal, per_shop.sort_values("Разом по магазину")

(115,
      Термінал  Разом по терміналу
 0  Термінал 1                  60
 1  Термінал 2                  55,
       Магазин  Разом по магазину
 6   Магазин 6                  5
 3   Магазин 2                 10
 5   Магазин 5                 10
 2  Магазин 11                 10
 8   Магазин 8                 10
 4   Магазин 4                 15
 0   Магазин 1                 15
 1  Магазин 10                 20
 7   Магазин 7                 20)

**Завдання 2. Розширення функціоналу префіксного дерева**

1) Клас Homework + запасна (fallback) базова Trie

In [14]:
# -*- coding: utf-8 -*-
# 1) Якщо у вас вже є файл trie.py з класом Trie — він буде використаний автоматично.
try:
    from trie import Trie
except Exception:
    # Мінімальна сумісна реалізація Trie для локальних тестів (fallback).
    class _Node:
        __slots__ = ("children", "value", "is_end")
        def __init__(self):
            self.children = {}
            self.value = None
            self.is_end = False

    class Trie:
        def __init__(self):
            self.root = _Node()

        def put(self, key: str, value=None):
            if not isinstance(key, str):
                raise TypeError("Key must be a string")
            node = self.root
            for ch in key:
                node = node.children.setdefault(ch, _Node())
            node.is_end = True
            node.value = value

        def get(self, key: str):
            if not isinstance(key, str):
                raise TypeError("Key must be a string")
            node = self.root
            for ch in key:
                if ch not in node.children:
                    return None
                node = node.children[ch]
            return node.value if node.is_end else None

        def contains(self, key: str) -> bool:
            if not isinstance(key, str):
                raise TypeError("Key must be a string")
            node = self.root
            for ch in key:
                if ch not in node.children:
                    return False
                node = node.children[ch]
            return node.is_end


class Homework(Trie):
    """
    Розширений Trie:
      - count_words_with_suffix(pattern) -> int
      - has_prefix(prefix) -> bool

    Властивості:
      * Вхідні параметри — лише str (інакше TypeError).
      * Враховується регістр.
      * Ефективність:
          - підтримується набір слів _words (унікальність),
          - префіксний індекс _prefix_counts: prefix -> count  (O(1) для has_prefix),
          - реверсний trie (_rev_root) для швидкого підрахунку суфіксів за O(|pattern|).
    """

    class _RevNode:
        __slots__ = ("children", "subtree_count", "end_count")
        def __init__(self):
            self.children = {}
            self.subtree_count = 0
            self.end_count = 0

    def __init__(self):
        super().__init__()
        self._words = set()
        self._prefix_counts = {}     # prefix -> count
        self._rev_root = Homework._RevNode()

    # === допоміжні методи для індексації ===
    def _add_prefixes(self, word: str):
        for i in range(1, len(word) + 1):
            p = word[:i]
            self._prefix_counts[p] = self._prefix_counts.get(p, 0) + 1

    def _insert_reversed(self, word: str):
        node = self._rev_root
        node.subtree_count += 1
        for ch in reversed(word):
            node = node.children.setdefault(ch, Homework._RevNode())
            node.subtree_count += 1
        node.end_count += 1

    # Перевизначаємо put, щоб підтримувати індекси
    def put(self, key: str, value=None):
        if not isinstance(key, str):
            raise TypeError("Key must be a string")
        is_new = key not in self._words
        super().put(key, value)
        if is_new:
            self._words.add(key)
            self._add_prefixes(key)
            self._insert_reversed(key)

    # --- Вимога 1: кількість слів, що закінчуються на pattern ---
    def count_words_with_suffix(self, pattern) -> int:
        if not isinstance(pattern, str):
            raise TypeError("pattern must be a string")
        if pattern == "":
            return len(self._words)  # порожній суфікс: всі слова

        node = self._rev_root
        for ch in reversed(pattern):
            node = node.children.get(ch)
            if node is None:
                return 0
        return node.subtree_count

    # --- Вимога 2: чи існує хоч одне слово з prefix ---
    def has_prefix(self, prefix) -> bool:
        if not isinstance(prefix, str):
            raise TypeError("prefix must be a string")
        if prefix == "":
            return len(self._words) > 0
        return self._prefix_counts.get(prefix, 0) > 0


2) Перевірки з умови

In [15]:
trie = Homework()
words = ["apple", "application", "banana", "cat"]
for i, word in enumerate(words):
    trie.put(word, i)

# Перевірка кількості слів, що закінчуються на заданий суфікс
assert trie.count_words_with_suffix("e") == 1      # apple
assert trie.count_words_with_suffix("ion") == 1    # application
assert trie.count_words_with_suffix("a") == 1      # banana
assert trie.count_words_with_suffix("at") == 1     # cat

# Перевірка наявності префікса
assert trie.has_prefix("app") is True   # apple, application
assert trie.has_prefix("bat") is False
assert trie.has_prefix("ban") is True   # banana
assert trie.has_prefix("ca")  is True   # cat

print("Базові перевірки з умови пройдені.")

Базові перевірки з умови пройдені.


3) Розширені тести (регістри, крайові випадки, помилки типів)

In [16]:
# Додаємо слово з великої літери — перевіряємо чутливість до регістру
trie.put("Apple", 100)

# Префікси (регістр-чутливо)
assert trie.has_prefix("App") is True     # Apple
assert trie.has_prefix("app") is True     # apple, application
assert trie.has_prefix("BAT") is False

# Суфікси (регістр-чутливо)
assert trie.count_words_with_suffix("le") == 2  # apple + Apple
assert trie.count_words_with_suffix("Le") == 0
assert trie.count_words_with_suffix("")  == len(trie._words)  # порожній суфікс => всі слова

# Порожній префікс: є хоча б одне слово?
assert trie.has_prefix("") is True

# TypeError для некоректних типів
for bad in (None, 123, ["a"], {"x":1}):
    try:
        trie.has_prefix(bad)
        raise AssertionError("has_prefix мав кинути TypeError")
    except TypeError:
        pass
    try:
        trie.count_words_with_suffix(bad)
        raise AssertionError("count_words_with_suffix мав кинути TypeError")
    except TypeError:
        pass

print("Розширені перевірки (регістр/крайові/помилки) пройдені.")

Розширені перевірки (регістр/крайові/помилки) пройдені.


4) Швидкий стрес-тест на великому наборі

In [17]:
# Створимо багато слів та переконаймося, що методи працюють швидко
import random, string, time

big = Homework()
N = 20000
random.seed(42)

def rand_word(min_len=5, max_len=12):
    L = random.randint(min_len, max_len)
    return "".join(random.choice(string.ascii_lowercase) for _ in range(L))

words_big = [rand_word() for _ in range(N)]
for i, w in enumerate(words_big):
    big.put(w, i)

# Декілька випадкових перевірок
prefix = words_big[0][:3]
suffix = words_big[1][-3:]

t0 = time.time(); _ = big.has_prefix(prefix); t1 = time.time()
t2 = time.time(); _ = big.count_words_with_suffix(suffix); t3 = time.time()

print(f"has_prefix('{prefix}') за {1e3*(t1-t0):.2f} мс")
print(f"count_words_with_suffix('{suffix}') за {1e3*(t3-t2):.2f} мс")

has_prefix('axi') за 0.07 мс
count_words_with_suffix('csn') за 0.06 мс
