# Задание 1 — Подсчёт количества уникальных IPv6-адресов в большом файле

Входные данные: текстовый файл до **10⁹ строк**, каждая строка — валидный IPv6 (полная или сокращённая форма, произвольный регистр, ведущие нули, возможен `::`), нужно посчитать число **уникальных** IPv6-адресов, если память ограничена примерно **1 ГБ** и разрешены временные файлы и оптимизации, но **запрещены сторонние библиотеки** (только стандартная библиотека Python).

Ключевая идея: использовать **внешнюю сортировку (external merge sort)** по каноническому бинарному представлению IPv6 и считать уникальные при слиянии.


## 1. Через `set()` не подходит

Если бы файл был маленьким, можно было бы:
1) нормализовать каждый IPv6;
2) складывать в `set`;
3) взять `len(set)`.

Но при $10^9$ строк это не помещается в память. Даже если хранить адреса как 16 байт (128 бит),
это **16 ГБ** только на байты, а в Python накладные расходы на объект `bytes` и элемент `set` намного больше.

Нужно решение, работающее **потоково** и выносящее тяжёлые структуры на диск.


In [1]:
import os
import socket
import heapq
import tempfile
import shutil
import time
import random
import ipaddress
from typing import List, Tuple, Optional, BinaryIO

RECORD_SIZE = 16  # IPv6 занимает ровно 16 байт в бинарном виде (128 бит)


## 2. Канонизация IPv6: строка → 16 байт

По условию сравнение адресов должно выполняться в **канонической форме**:
`8 групп по 4 hex-цифры, lower-case, с двоеточиями`.

На практике даже удобнее хранить не строку, а **пакетный (packed) вид** IPv6 — ровно **16 байт**.
Он однозначен, не зависит от регистра и сокращений (`::`), и сравнивается быстрее.

Стандартная библиотека даёт функцию `socket.inet_pton(AF_INET6, s)`, которая парсит валидный IPv6
(включая сокращённую форму) и возвращает 16 байт.

Дополнительно реализовывается функция, которая превращает 16 байт обратно в каноническую строку
в формате из условия — это удобно для проверки и отладки.


In [2]:
def ipv6_to_packed(addr: str) -> bytes:
    """IPv6-строка (любой валидный формат) -> 16 байт (packed)."""
    return socket.inet_pton(socket.AF_INET6, addr)

def packed_to_canonical_str(packed: bytes) -> str:
    """16 байт -> каноническая строка: 8 групп по 4 hex, lower-case."""
    h = packed.hex()  # 32 hex-символа в нижнем регистре
    return ':'.join(h[i:i+4] for i in range(0, 32, 4))

# Тестовая проверка на одном адресе
test_addr = "2001:db0:0:123a::30"
p = ipv6_to_packed(test_addr)
packed_to_canonical_str(p)


'2001:0db0:0000:123a:0000:0000:0000:0030'

## 3. Проверка на примере из условия

Проверка, что две записи одного адреса дают одинаковый `packed` и одинаковую каноническую строку.


In [3]:
example_lines = [
    "2001:0DB0:0000:123A:0000:0000:0000:0030",
    "2001:db0:0:123a::30",
    "CD10:9A90:F9BB:E5B6:F714:86E7:F1BB:BDFC",
    "DF96:A23D:8BA9:BAA0:A807:FB50:F9CD:B266",
    "9D64:9DB4:B0FE:B3C2:F09F:8DE1:EC59:987D",
]

packed = [ipv6_to_packed(s) for s in example_lines]
canon = [packed_to_canonical_str(x) for x in packed]

list(zip(example_lines, canon, [x == packed[0] for x in packed]))


[('2001:0DB0:0000:123A:0000:0000:0000:0030',
  '2001:0db0:0000:123a:0000:0000:0000:0030',
  True),
 ('2001:db0:0:123a::30', '2001:0db0:0000:123a:0000:0000:0000:0030', True),
 ('CD10:9A90:F9BB:E5B6:F714:86E7:F1BB:BDFC',
  'cd10:9a90:f9bb:e5b6:f714:86e7:f1bb:bdfc',
  False),
 ('DF96:A23D:8BA9:BAA0:A807:FB50:F9CD:B266',
  'df96:a23d:8ba9:baa0:a807:fb50:f9cd:b266',
  False),
 ('9D64:9DB4:B0FE:B3C2:F09F:8DE1:EC59:987D',
  '9d64:9db4:b0fe:b3c2:f09f:8de1:ec59:987d',
  False)]

Видно, что первые две строки действительно приводятся к одному и тому же бинарному (и каноническому) адресу.


## 4. Наивное решение (только для маленьких файлов)

Наивный подсчёт уникальных для небольшого набора данных:
- читаем строки;
- преобразуем в 16 байт;
- кладём в `set`.

Это пригодится как **контрольная проверка** на тестах.


In [14]:
def count_unique_naive(input_path: str) -> int:
    s = set()
    with open(input_path, "r", encoding="ascii", errors="strict", newline="") as f:
        for line in f:
            addr = line.strip()
            if not addr:
                continue
            s.add(ipv6_to_packed(addr))
    return len(s)

# тест по примеру из условия
input_path = "test_input.txt"
count_unique_naive(input_path)


4

### Промежуточный вывод
На маленьком примере наивный подход работает. Но для 10⁹ строк он не годится из-за памяти.


## 5. Основное решение: внешняя сортировка + подсчёт уникальных при слиянии

Идея внешней сортировки:
1) читаем вход потоком и накапливаем в памяти **чанк** из K адресов;
2) сортируем чанк и записываем в временный файл ("run") в бинарном виде (каждый адрес — 16 байт);
3) получаем много отсортированных runs на диске;
4) дальше делаем **k-way merge** (слияние отсортированных файлов) и
   - либо сливаем в более крупные runs (если их слишком много),
   - либо сразу считаем уникальные (сравнивая с предыдущим значением).

Важное наблюдение: если данные отсортированы, то одинаковые значения идут подряд.
Значит, уникальные считаются одной переменной `prev` и счётчиком.


### 5.1. Запись одного отсортированного run

Будем хранить run как бинарный файл, где подряд идут записи по 16 байт.
Это компактно и быстро.


In [15]:
def flush_run(buf: List[bytes], run_idx: int) -> str:
    """Отсортировать буфер 16-байтных IPv6 и записать на диск как run-файл."""
    buf.sort()
    path = os.path.join(f"run_{run_idx:06d}.bin")
    with open(path, "wb", buffering=1024 * 1024) as f:
        for rec in buf:
            f.write(rec)  # ровно 16 байт
    return path


### 5.2. Генерация начальных отрезков из входного файла

Читаем входной текстовый файл и нарезаем его на чанки по `chunk_records` записей.
Каждый чанк сортируется и пишется на диск.

Память регулируется параметром `chunk_records`.


In [16]:
def generate_initial_runs(input_path: str, chunk_records: int) -> List[str]:
    runs: List[str] = []
    buf: List[bytes] = []
    run_idx = 0

    with open(input_path, "r", encoding="ascii", errors="strict", newline="") as fin:
        for line in fin:
            s = line.strip()
            if not s:
                continue
            buf.append(ipv6_to_packed(s))
            if len(buf) >= chunk_records:
                runs.append(flush_run(buf, run_idx))
                run_idx += 1
                buf.clear()

    if buf:
        runs.append(flush_run(buf, run_idx))
        buf.clear()

    return runs


### 5.3. k-way merge: слияние нескольких runs в один run

Слияние делается через мин-кучу (heap):
- из каждого файла читаем первый 16-байтный адрес;
- кладём в кучу пары `(значение, индекс_файла)`;
- вынимаем минимум, пишем в выход, дочитываем следующий из того же файла.

Так мы получаем один отсортированный файл из нескольких отсортированных.


In [17]:
def _open_runs(run_paths: List[str]) -> List[BinaryIO]:
    files: List[BinaryIO] = []
    for p in run_paths:
        files.append(open(p, "rb", buffering=1024 * 1024))
    return files

def merge_runs_to_file(run_paths: List[str], out_path: str) -> None:
    files = _open_runs(run_paths)
    try:
        heap: List[Tuple[bytes, int]] = []
        for i, f in enumerate(files):
            rec = f.read(RECORD_SIZE)
            if rec:
                heap.append((rec, i))
        heapq.heapify(heap)

        with open(out_path, "wb", buffering=1024 * 1024) as out:
            while heap:
                rec, i = heapq.heappop(heap)
                out.write(rec)
                nxt = files[i].read(RECORD_SIZE)
                if nxt:
                    heapq.heappush(heap, (nxt, i))
    finally:
        for f in files:
            try:
                f.close()
            except Exception:
                pass


### 5.4. Если runs слишком много: многоступенчатое слияние (reduce)

В реальности нельзя открыть одновременно тысячи файлов.
Поэтому мы уменьшаем число runs, сливая их партиями по `fan_in`.

После каждого слияния удаляем старые файлы


In [18]:
def reduce_runs(run_paths: List[str], fan_in: int) -> List[str]:
    level = 0
    runs = run_paths[:]

    while len(runs) > fan_in:
        new_runs: List[str] = []
        for batch_start in range(0, len(runs), fan_in):
            batch = runs[batch_start:batch_start + fan_in]
            merged_path = os.path.join(f"merge_{level:03d}_{batch_start // fan_in:06d}.bin")
            merge_runs_to_file(batch, merged_path)
            new_runs.append(merged_path)

            # Удаляем старые run-файлы
            for p in batch:
                try:
                    os.remove(p)
                except FileNotFoundError:
                    pass

        runs = new_runs
        level += 1

    return runs


### 5.5. Подсчёт уникальных при k-way merge

Когда у нас осталось "не слишком много" runs, можно открыть их одновременно и
сделать финальное k-way слияние, **не записывая результат на диск**, а сразу считая уникальные.

Для этого поддерживаем `prev` — предыдущий адрес, и если текущий `rec != prev`, увеличиваем счётчик.


In [19]:
def count_unique_across_runs(run_paths: List[str]) -> int:
    if not run_paths:
        return 0

    files = _open_runs(run_paths)
    try:
        heap: List[Tuple[bytes, int]] = []
        for i, f in enumerate(files):
            rec = f.read(RECORD_SIZE)
            if rec:
                heap.append((rec, i))
        heapq.heapify(heap)

        prev: Optional[bytes] = None
        uniq = 0

        while heap:
            rec, i = heapq.heappop(heap)
            if prev != rec:
                uniq += 1
                prev = rec
            nxt = files[i].read(RECORD_SIZE)
            if nxt:
                heapq.heappush(heap, (nxt, i))

        return uniq
    finally:
        for f in files:
            try:
                f.close()
            except Exception:
                pass


### 5.6. Итоговая функция решения задачи

Объединяем всё в одну функцию:
1) создаём временную директорию;
2) генерируем initial runs;
3) уменьшаем их число многоступенчатым слиянием;
4) считаем уникальные финальным k-way merge;
5) пишем ответ в выходной файл.


In [20]:
def count_unique_ipv6_external(input_path: str,
                               output_path: str,
                               chunk_records: int = 1_000_000,
                               fan_in: int = 128) -> int:

    runs = generate_initial_runs(input_path, chunk_records)
    runs = reduce_runs(runs, fan_in)
    ans = count_unique_across_runs(runs)

    with open(output_path, "w", encoding="utf-8", newline="\n") as fout:
        fout.write(str(ans) + "\n")

    return ans



## 6. Проверка решения на примере из условия


In [None]:
output_path = "test_output.txt"
ans = count_unique_ipv6_external(input_path, output_path, chunk_records=2, fan_in=8)
print("Ответ:", ans)


Ответ: 4


На тесте из условия 4, как и требуется.


## 8. Эксперимент на сгенерированных данных

Данные генерировались с помощью команды

`python .\data\task-1\generate_data.py input.txt 12596 100000000`




In [22]:
in_path = os.path.join("input.txt")
out_path = os.path.join("output.txt")

total_lines = 100000000      
unique_count = 12596   # число уникальных IPv6 внутри файла

print("Считаем уникальные внешней сортировкой...")
t2 = time.time()
ans = count_unique_ipv6_external(in_path, out_path, chunk_records=100_000, fan_in=64)
t3 = time.time()
print(f"Ответ: {ans}")
print(f"Время подсчёта: {t3 - t2:.2f} сек")


Считаем уникальные внешней сортировкой...
Ответ: 12596
Время подсчёта: 2.94 сек


### Результаты лежат в папке experiment_cases

## 9. Итоговые выводы

1) **Канонизация** IPv6 выполняется через `socket.inet_pton(AF_INET6, s)` и даёт однозначные 16 байт.
2) Для огромных файлов (до 10⁹ строк) нельзя хранить все адреса в памяти → нужен дисковый алгоритм.
3) **Внешняя сортировка** решает задачу при ограниченной RAM:
   - нарезаем вход на чанки, сортируем и сохраняем как runs;
   - выполняем многоступенчатое k-way слияние;
   - считаем уникальные при финальном merge за один проход.
4) Память управляется параметром `chunk_records`, число одновременно открытых файлов — `fan_in`.