In [None]:
#**Задача 1: Система управления библиотекой**

from collections import namedtuple
from collections import defaultdict
from typing import NamedTuple

Book = namedtuple('Book', ['title', 'author', 'isbn', 'year'])
books = []
Reader = namedtuple('Reader', ['name', 'reader_id', 'phone'])
readers = []

# создаю словарь с жанрами и добавляю туда книги
genre_book = defaultdict(list)

genre_book[genre].append(Book(title, author, isbn, year))


from collections import deque

# cоздаю словарь из id как ключа и Reader как значения для более удобного хранения и доступа
readers_by_id = {r.reader_id: r for r in readers}

# делаю очередь
book_queue = deque()
_ = [book_queue.append(reader_id)]

# считаю
from collections import Counter

all_authors = [book.author for book in books]
author_count = Counter(all_authors)

# история

book_history = OrderedDict()

def add_to_history(reader_id, book):
    if reader_id in book_history:
        book_history[reader_id].append(book)
    else:
        book_history[reader_id] = [book]

# подготовка к сериализации: namedtuple нужно преобразовать в словари
def serialize_namedtuple(obj):
    if isinstance(obj, (Book, Reader)):
        return obj._asdict()
    if isinstance(obj, list):
        return [serialize_namedtuple(i) for i in obj]
    return obj

data = {
    'books': serialize_namedtuple(books),
    'readers': serialize_namedtuple(readers),
    'genre_book': {genre: serialize_namedtuple(book_list) for genre, book_list in genre_book.items()},
    'book_queue': list(book_queue),
    'author_count': dict(author_count),
    'book_history': {reader_id: serialize_namedtuple(book_list) for reader_id, book_list in book_history.items()},
}

# JSON сериализация
with open('library_data.json', 'w', encoding='utf-8') as f_json:
    json.dump(data, f_json, ensure_ascii=False, indent=4)

# Pickle сериализация (можно сериализовать оригинальные объекты)
with open('library_data.pkl', 'wb') as f_pickle:
    pickle.dump({
        'books': books,
        'readers': readers,
        'genre_book': genre_book,
        'book_queue': book_queue,
        'author_count': author_count,
        'book_history': book_history,
    }, f_pickle)


In [None]:
#**Задача 2: Анализатор файловой системы**

import os
import sys
import json
from collections import namedtuple, Counter, defaultdict, deque

#  создаем namedtuple для хранения информации о файле
FileInfo = namedtuple('FileInfo', ['name', 'size', 'extension', 'modified_time'])

#  классификатор по размеру
def classify_size(size_bytes):
    if size_bytes < 1 * 1024 * 1024:
        return 'small'
    elif 1 * 1024 * 1024 <= size_bytes <= 100 * 1024 * 1024:
        return 'medium'
    else:
        return 'large'

#  основная функция анализатора
def analyze_directory(root_path):
    extension_counter = Counter()
    size_groups = defaultdict(list)
    recent_files = deque(maxlen=10)
    all_files_info = []

    for dirpath, dirnames, filenames in os.walk(root_path):
        for filename in filenames:
            full_path = os.path.join(dirpath, filename)
            try:
                # информация о файле
                size = os.path.getsize(full_path)
                extension = os.path.splitext(filename)[1].lower()
                modified_timestamp = os.path.getmtime(full_path)
                modified_time = os.path.getmtime(full_path)
                
                file_info = FileInfo(
                    name=full_path,
                    size=size,
                    extension=extension,
                    modified_time=modified_time
                )

                # собираем данные
                all_files_info.append(file_info)
                extension_counter[extension] += 1
                size_category = classify_size(size)
                size_groups[size_category].append(file_info)
                recent_files.append(file_info)

            except Exception as e:
                print(f"Ошибка при обработке файла {full_path}: {e}")

    # 7. статистика по памяти
    memory_usage = {
        'all_files_info': sys.getsizeof(all_files_info),
        'extension_counter': sys.getsizeof(extension_counter),
        'size_groups': sys.getsizeof(size_groups),
        'recent_files': sys.getsizeof(recent_files)
    }

    # 8. сохраняем в JSON
    result = {
        'extensions': dict(extension_counter),
        'size_groups': {
            key: [file._asdict() for file in files] for key, files in size_groups.items()
        },
        'recent_files': [file._asdict() for file in recent_files],
        'memory_usage': memory_usage
    }

    with open('filesystem_analysis.json', 'w', encoding='utf-8') as f:
        json.dump(result, f, ensure_ascii=False, indent=4)

    print("Анализ завершен. Результаты сохранены в 'filesystem_analysis.json'.")

# точка входа
if __name__ == '__main__':
    if len(sys.argv) != 2:
        print("Использование: python analyzer.py <путь_к_директории>")
        sys.exit(1)

    directory_path = sys.argv[1]
    if not os.path.isdir(directory_path):
        print("Указанный путь не является директорией.")
        sys.exit(1)

    analyze_directory(directory_path)


In [None]:
#**Задача 3: Система конфигурации приложения**

import os
import json
import pickle
from collections import namedtuple, ChainMap, defaultdict, OrderedDict

# создание namedtuple `Config`
Config = namedtuple("Config", ["key", "value", "section", "default_value"])

# словари конфигураций
default_config = {
    "host": "localhost",
    "port": 8080,
    "debug": False,
    "log_level": "INFO"
}

user_config = {
    "debug": True,
    "log_level": "DEBUG"
}

# test (добавим переменные окружения в os.environ)
os.environ["host"] = "prod.server.com"
os.environ["port"] = "9090"

# преобразуем переменные окружения в словарь (преобразуя значения к нужным типам)
env_config = {
    "host": os.environ.get("host"),
    "port": int(os.environ.get("port", 8080))
}

# объединение конфигураций с приоритетом
combined_config = ChainMap(env_config, user_config, default_config)

# группировка настроек по секциям
section_mapping = {
    "host": "network",
    "port": "network",
    "debug": "app",
    "log_level": "app"
}

grouped_config = defaultdict(dict)
ordered_config = OrderedDict()

for key in combined_config:
    section = section_mapping.get(key, "other")
    value = combined_config[key]
    default_value = default_config.get(key)
    
    config_entry = Config(
        key=key,
        value=value,
        section=section,
        default_value=default_value
    )

    grouped_config[section][key] = config_entry
    ordered_config[key] = config_entry

# сериализация в JSON и pickle
# подготовим словарь для сериализации (нельзя сериализовать namedtuple напрямую в JSON)
serializable_config = {
    key: {
        "value": config.value,
        "section": config.section,
        "default_value": config.default_value
    }
    for key, config in ordered_config.items()
}

# JSON
with open("config.json", "w") as json_file:
    json.dump(serializable_config, json_file, indent=4)

# Pickle
with open("config.pkl", "wb") as pickle_file:
    pickle.dump(ordered_config, pickle_file)

# вывод grouped_config (по секциям)
for section, configs in grouped_config.items():
    print(f"[{section.upper()}]")
    for key, config in configs.items():
        print(f"{key} = {config.value} (default: {config.default_value})")
    print()


In [3]:
#**Задача 4: Мониторинг системы**

import os
import sys
import time
import pickle
from collections import namedtuple, deque, Counter, defaultdict
from datetime import datetime

# создаем namedtuple
SystemInfo = namedtuple('SystemInfo', ['cpu_count', 'memory_usage', 'process_id', 'user_name'])

# хранилище последних 20 измерений
history = deque(maxlen=20)

# счетчик использования функций
function_usage = Counter()

# группировка измерений по времени
grouped_by_time = defaultdict(list)

# функция мониторинга
def collect_system_info():
    function_usage['collect_system_info'] += 1

    cpu_count = os.cpu_count()
    memory_usage = sys.getallocatedblocks()
    process_id = os.getpid()
    
    try:
        user_name = os.getlogin()
    except OSError:
        user_name = os.environ.get('USERNAME') or os.environ.get('USER') or 'unknown'

    info = SystemInfo(cpu_count, memory_usage, process_id, user_name)
    return info

# основной цикл мониторинга
def monitor_system(interval=2, duration=20):
    for _ in range(duration):
        info = collect_system_info()
        timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        
        # добавляем в историю
        history.append(info)

        # группируем по времени
        grouped_by_time[timestamp].append(info)

        print(f"[{timestamp}] {info}")
        time.sleep(interval)

# сохраняем историю в pickle
def save_history(filename='monitoring_history.pkl'):
    function_usage['save_history'] += 1

    with open(filename, 'wb') as f:
        pickle.dump({
            'history': list(history),
            'grouped_by_time': dict(grouped_by_time),
            'function_usage': dict(function_usage)
        }, f)
    print(f"История мониторинга сохранена в {filename}")


In [6]:
#**Задача 5: Система логирования**

from collections import namedtuple, deque, defaultdict, Counter, OrderedDict, ChainMap
import os

LogEntry = namedtuple('LogEntry', ['timestamp', 'level', 'message', 'module', 'function'])

# test
log_entry = LogEntry(
    timestamp=os.path.getmtime('.'),
    level='INFO',
    message='System started',
    module='main',
    function='start'
)

history_logs = deque(maxlen=100)
history_logs.appendleft(log_entry)  

grouped_logs = defaultdict(list)
grouped_logs[log_entry.level].append(log_entry)
print(grouped_logs['INFO'])

logs_count = Counter({level: len(logs) for level, logs in grouped_logs.items()})
print(logs_count)

time_logs = OrderedDict()
time_logs[log_entry.timestamp] = log_entry
time_logs.popitem(last=False)

all_logs = ChainMap(grouped_logs, logs_count, time_logs)

# сериализация
import json
import pickle

# функция для того чтобы можно было логи сохранить в json'е
def log_entry_to_dict(log_entry):
    return {
        'timestamp': log_entry.timestamp,
        'level': log_entry.level,
        'message': log_entry.message,
        'module': log_entry.module,
        'function': log_entry.function
    }

# сохраняем history_logs в JSON
with open('logs.json', 'w', encoding='utf-8') as f_json:
    json.dump([log_entry_to_dict(log) for log in history_logs], f_json, ensure_ascii=False, indent=4)

# сохраняем history_logs в pickle
with open('logs.pkl', 'wb') as f_pickle:
    pickle.dump(history_logs, f_pickle)


[LogEntry(timestamp=1758466828.7362828, level='INFO', message='System started', module='main', function='start')]
Counter({'INFO': 1})


In [None]:
#**Задача 6: Кэш-система**

from collections import namedtuple, deque, defaultdict, Counter, OrderedDict, ChainMap
import os
import sys
import pickle

CacheEntry = namedtuple('CacheEntry', ['key', 'value', 'timestamp', 'access_count'])

# test
cache_test = CacheEntry(
    key='key',
    value='value',
    timestamp=0,
    access_count=0
)

cache_LRU = OrderedDict()
access_history = deque()
access_counter = Counter()
access_stats = defaultdict(int)
MAX_CACHE_SIZE = 5

def set(key, value):
    global cache_LRU, access_history, access_counter, access_stats

    # получить временную метку файла, если он существует
    try:
        timestamp = os.path.getmtime(key)
    except FileNotFoundError:
        timestamp = 0 

    # если ключ есть, то обновляем значение и увеличиваем счетчик обращений
    if key in cache_LRU:
        old_entry = cache_LRU[key]
        new_entry = CacheEntry(
            key=key,
            value=value,
            timestamp=timestamp,
            access_count=old_entry.access_count + 1
        )
        # двигаем в конец
        cache_LRU.move_to_end(key)
        cache_LRU[key] = new_entry
    else:
        # удаление если много ключей
        if len(cache_LRU) >= MAX_CACHE_SIZE:
            cache_LRU.popitem(last=False)

        new_entry = CacheEntry(
            key=key,
            value=value,
            timestamp=timestamp,
            access_count=1
        )
        cache_LRU[key] = new_entry

    # статистика
    access_history.append(key)
    access_counter[key] += 1
    access_stats[key] += 1

def get(key):
    global cache_LRU, access_history, access_counter, access_stats

    # если ключа нет в кэше — возвращаем None
    if key not in cache_LRU:
        return None

    # обновляем порядок
    cache_LRU.move_to_end(key)

    # увеличиваем счетчик обращений
    old_entry = cache_LRU[key]
    new_entry = CacheEntry(
        key=key,
        value=old_entry.value,
        timestamp=old_entry.timestamp,
        access_count=old_entry.access_count + 1
    )
    cache_LRU[key] = new_entry

    # статистика
    access_history.append(key)
    access_counter[key] += 1
    access_stats[key] += 1

    return new_entry.value 

def delete_cache(key):
    global cache_LRU, access_counter, access_stats

    if key in cache_LRU:
        del cache_LRU[key]
        if key in access_counter:
            del access_counter[key]
        if key in access_stats:
            del access_stats[key]

# удаление кэша и всей статистики
def clear():
    global cache_LRU, access_history, access_counter, access_stats

    cache_LRU.clear()
    access_history.clear()
    access_counter.clear()
    access_stats.clear()

def size():
    total_size = sys.getsizeof(cache_LRU)
    for entry in cache_LRU.values():
        total_size += sys.getsizeof(entry)
        total_size += sys.getsizeof(entry.key)
        total_size += sys.getsizeof(entry.value)
    return total_size

def save(filepath):
    global cache_LRU, access_history, access_counter, access_stats

    with open(filepath, 'wb') as f:
        pickle.dump({
            'cache_LRU': cache_LRU,
            'access_history': access_history,
            'access_counter': access_counter,
            'access_stats': access_stats,
            'MAX_CACHE_SIZE': MAX_CACHE_SIZE
        }, f)

def load(filepath):
    global cache_LRU, access_history, access_counter, access_stats, MAX_CACHE_SIZE

    with open(filepath, 'rb') as f:
        data = pickle.load(f)
        cache_LRU = data.get('cache_LRU', OrderedDict())
        access_history = data.get('access_history', deque())
        access_counter = data.get('access_counter', Counter())
        access_stats = data.get('access_stats', defaultdict(int))
        MAX_CACHE_SIZE = data.get('MAX_CACHE_SIZE', 5)


In [None]:
#**Задача 7: Анализатор текста**

import os
import sys
import json
from collections import namedtuple, Counter, defaultdict, deque, OrderedDict

# 1. создаем namedtuple
WordInfo = namedtuple('WordInfo', ['word', 'frequency', 'length', 'first_occurrence'])

def analyze_text(file_path):
    if not os.path.isfile(file_path):
        raise FileNotFoundError(f"Файл '{file_path}' не найден.")

    with open(file_path, 'r', encoding='utf-8') as file:
        text = file.read()

    words = [word.strip(".,!?;:()[]{}\"'").lower() for word in text.split()]
    
    # 2. подсчет частоты слов
    word_counter = Counter(words)

    # 3. группируем слова по длине
    length_groups = defaultdict(list)
    for word in set(words):
        length_groups[len(word)].append(word)

    # 4. последние 50 уникальных слов
    last_50_unique = deque(maxlen=50)
    seen = set()
    for word in reversed(words):
        if word not in seen:
            last_50_unique.appendleft(word)
            seen.add(word)

    # 5. слова в порядке первого появления
    first_occurrence_order = OrderedDict()
    for idx, word in enumerate(words):
        if word not in first_occurrence_order:
            first_occurrence_order[word] = idx

    # 6. формируем список WordInfo
    word_info_list = []
    for word, freq in word_counter.items():
        word_info = WordInfo(
            word=word,
            frequency=freq,
            length=len(word),
            first_occurrence=first_occurrence_order[word]
        )
        word_info_list.append(word_info)

    # 7. анализ памяти
    memory_usage = {
        'word_counter': sys.getsizeof(word_counter),
        'length_groups': sys.getsizeof(length_groups),
        'last_50_unique': sys.getsizeof(last_50_unique),
        'first_occurrence_order': sys.getsizeof(first_occurrence_order),
        'word_info_list': sys.getsizeof(word_info_list),
        'total_words': len(words)
    }

    # 8. сохраняем результаты в JSON
    result = {
        'words_info': [wi._asdict() for wi in word_info_list],
        'length_groups': {str(k): v for k, v in length_groups.items()},
        'last_50_unique': list(last_50_unique),
        'memory_usage': memory_usage
    }

    output_path = os.path.splitext(file_path)[0] + '_analysis.json'
    with open(output_path, 'w', encoding='utf-8') as out_file:
        json.dump(result, out_file, ensure_ascii=False, indent=4)

    print(f"Анализ завершен. Результаты сохранены в '{output_path}'.")


In [None]:
#**Задача 8: Система управления задачами**

import json
import pickle
import time
from collections import namedtuple, defaultdict, deque, OrderedDict, Counter
import os

# 1. namedtuple для задач
Task = namedtuple('Task', ['id', 'title', 'description', 'priority', 'status', 'created_date'])

# глобальные переменные для хранения данных
tasks = []
next_id = 1
# 2. группировка по статусу
tasks_by_status = defaultdict(list)
# 3. очередь высокого приоритета
high_priority_queue = deque(maxlen=10)  
# 4. счетчик приоритетов
priority_counter = Counter()
# 5. история по порядку создания             
task_history = OrderedDict()             

STATUS_LIST = {'todo', 'in_progress', 'done'}
PRIORITY_LEVELS = {'low', 'medium', 'high'}
DATA_DIR = 'task_data'

def add_task(title, description, priority='medium', status='todo'):
    """добавление новой задачи"""
    global next_id, tasks
    
    task = Task(
        id=next_id,
        title=title,
        description=description,
        priority=priority,
        status=status,
        created_date=time.time()
    )
    
    tasks.append(task)
    _update_collections(task)
    next_id += 1
    save_tasks()
    return task

def _update_collections(task):
    """обновление всех коллекций"""
    tasks_by_status[task.status].append(task)
    if task.priority == 'high':
        high_priority_queue.appendleft(task)
    priority_counter[task.priority] += 1
    task_history[task.created_date] = task

def complete_task(task_id):
    """отметка задачи как выполненной"""
    global tasks
    
    for i, task in enumerate(tasks):
        if task.id == task_id:
            updated_task = task._replace(status='done')
            tasks[i] = updated_task
            _rebuild_collections()
            save_tasks()
            return updated_task
    return None

def _rebuild_collections():
    """перестроение коллекций"""
    global tasks_by_status, high_priority_queue, priority_counter, task_history
    
    tasks_by_status.clear()
    priority_counter.clear()
    task_history.clear()
    high_priority_queue.clear()
    
    for task in tasks:
        _update_collections(task)

def get_tasks_by_status(status):
    """получение задач по статусу"""
    return tasks_by_status.get(status, [])

def get_priority_queue():
    """получение очереди высокоприоритетных задач"""
    return list(high_priority_queue)

def get_tasks_by_priority(priority):
    """получение задач по приоритету"""
    return [task for task in tasks if task.priority == priority]

# 7. работа с файлами через os.path
def _get_file_path(filename):
    return os.path.join(DATA_DIR, filename)

def save_tasks():
    """сохранение в JSON и Pickle"""
    os.makedirs(DATA_DIR, exist_ok=True)
    tasks_data = [task._asdict() for task in tasks]
    
    # JSON
    with open(_get_file_path('tasks.json'), 'w', encoding='utf-8') as f:
        json.dump(tasks_data, f, ensure_ascii=False, indent=2)
    
    # Pickle
    with open(_get_file_path('tasks.pickle'), 'wb') as f:
        pickle.dump(tasks_data, f)

def load_tasks():
    """загрузка из файлов"""
    global tasks, next_id
    
    json_file = _get_file_path('tasks.json')
    
    if os.path.exists(json_file):
        with open(json_file, 'r', encoding='utf-8') as f:
            tasks_data = json.load(f)
        
        tasks = [Task(**data) for data in tasks_data]
        next_id = max([t.id for t in tasks], default=0) + 1
        _rebuild_collections()

def display_info():
    """краткая информация о задачах"""
    print(f"\nвсего задач: {len(tasks)}")
    print("по статусу:", {k: len(v) for k, v in tasks_by_status.items()})
    print("по приоритету:", dict(priority_counter))
    print("в очереди высокого приоритета:", len(high_priority_queue))

# 6. ChainMap для объединения различных списков задач
def get_task_chainmap():
    """создание ChainMap для объединения списков задач"""
    from collections import ChainMap
    return ChainMap(
        {'all_tasks': tasks},
        {'by_status': tasks_by_status},
        {'by_priority': {p: [t for t in tasks if t.priority == p] for p in PRIORITY_LEVELS}},
        {'high_priority_queue': list(high_priority_queue)}
    )

In [None]:
#**Задача 9: Система мониторинга производительности**

import sys
import os
import time
import json
import pickle
from collections import namedtuple, deque, defaultdict, Counter, OrderedDict
from datetime import datetime

# NamedTuple для метрик
PerformanceMetric = namedtuple("PerformanceMetric", ["function_name", "execution_time", "memory_usage", "timestamp"])

# очередь для последних 100 метрик
recent_metrics = deque(maxlen=100)

# группировка по функциям
function_metrics = defaultdict(list)

# счетчик вызовов функций
function_calls = Counter()

# хронологическое хранилище
chronological_metrics = OrderedDict()

# запись метрики
def record_metric(function_name, start_time, result):
    end_time = time.time()
    execution_time = end_time - start_time
    memory_usage = sys.getsizeof(result)
    timestamp = datetime.now().isoformat()

    metric = PerformanceMetric(function_name, execution_time, memory_usage, timestamp)

    recent_metrics.append(metric)
    function_metrics[function_name].append(metric)
    function_calls[function_name] += 1
    chronological_metrics[timestamp] = metric

# получение статистики по функции
def get_function_stats(function_name):
    metrics = function_metrics.get(function_name, [])
    if not metrics:
        return f"No metrics recorded for function '{function_name}'"

    total_exec_time = sum(m.execution_time for m in metrics)
    total_memory = sum(m.memory_usage for m in metrics)
    count = len(metrics)

    return {
        "function_name": function_name,
        "calls": function_calls[function_name],
        "avg_execution_time": total_exec_time / count,
        "avg_memory_usage": total_memory / count
    }

# получение использования памяти
def get_memory_usage():
    return sum(sys.getsizeof(m) for m in recent_metrics)

# экспорт метрик
def export_metrics(directory="metrics_output"):
    os.makedirs(directory, exist_ok=True)

    # JSON
    json_path = os.path.join(directory, "metrics.json")
    with open(json_path, "w") as f:
        json.dump([m._asdict() for m in recent_metrics], f, indent=4)

    # Pickle
    pickle_path = os.path.join(directory, "metrics.pkl")
    with open(pickle_path, "wb") as f:
        pickle.dump(list(recent_metrics), f)

    print(f"Metrics exported to {directory}/ (JSON and Pickle formats)")


In [None]:
#**Задача 10: Комплексная система управления данными**



#Создайте комплексную систему управления данными, объединяющую все изученные концепции:

#1. Создайте несколько namedtuple для различных типов данных (User, Product, Order, etc.)
#2. Используйте `defaultdict` для создания индексов по различным полям
#3. Используйте `deque` для реализации очередей обработки данных
#4. Используйте `Counter` для аналитики и статистики
#5. Используйте `OrderedDict` для хранения данных в определенном порядке
#6. Используйте `ChainMap` для объединения различных источников данных
#7. Используйте `os` и `sys` для работы с файловой системой и мониторинга
#8. Реализуйте CRUD операции (Create, Read, Update, Delete)
#9. Добавьте функции экспорта/импорта данных в различных форматах
#10. Сериализуйте все данные в JSON, pickle и другие форматы
#11. Добавьте типизацию для всех функций и классов
#12. Реализуйте систему логирования для отслеживания операций