# TF-IDF
Алгоритм вычисления TF-IDF для корпуса из восьми документов

In [130]:
from collections import defaultdict, namedtuple, deque
import math
from typing import List, Tuple, Iterator, NamedTuple
from itertools import groupby
import string

In [131]:
def preprocess_text(text: str) -> str:
    text = text.translate(str.maketrans('', '', string.punctuation))
    text = text.lower()
    return text

def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element

def groupbykey(iterable):
    t = {}
    for (k2, v2) in iterable:
        t[k2] = t.get(k2, []) + [v2]
    return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(list(map(lambda x: MAP(*x), RECORDREADER))))))

In [132]:
# Job 1: Рассчитываем TF (Term Frequency)
def RECORDREADER_JOB1():
    for docname, contents in documents.items():
        yield (docname, contents.split())  #(ID документа, список слов)

def MAP_JOB1(docname: str, words: list):
    word_count = defaultdict(int)
    total_words = len(words)

    for word in words:
        word_count[word] += 1

    for word, count in word_count.items():
        yield ((word, docname), count / total_words)  #((Слово, ID документа), TF)

def REDUCE_JOB1(term_doc_pair, tf_values):
    total_tf = sum(tf_values)
    yield (term_doc_pair, total_tf)  #((Слово, ID документа), TF)

# Job 2: Рассчитываем DF (Document Frequency)
def RECORDREADER_JOB2():
    for term_doc_pair, tf in job1_output:
        term, doc_id = term_doc_pair
        yield (term, (doc_id, tf))

def MAP_JOB2(term, doc_tf_pair):
    doc_id, tf = doc_tf_pair
    yield (term, (doc_id, tf, 1))  #(Слово, (ID документа, TF, 1 для DF))

def REDUCE_JOB2(term, values):
    df = 0
    results = []
    for doc_id, tf, count in values:
        df += count
        results.append((doc_id, tf))
    for doc_id, tf in results:
        yield ((term, doc_id), (tf, df))  #((Слово, ID документа), (TF, DF))

# Job 3: Рассчитываем TF-IDF
def RECORDREADER_JOB3():
    for term_doc_pair, tf_df_pair in job2_output:
        term, doc_id = term_doc_pair
        tf, df = tf_df_pair
        yield (term, (doc_id, tf, df))  #(Слово, (ID документа, TF, DF))

def MAP_JOB3(term, doc_tf_df_pair, total_docs):
    doc_id, tf, df = doc_tf_df_pair
    idf = math.log(total_docs / df) if df != 0 else 0
    tf_idf = tf * idf
    yield ((term, doc_id), tf_idf)  #((Слово, ID документа), TF-IDF)

def REDUCE_JOB3(term_doc_pair, values):
    # Возвращаем ((Слово, ID документа), TF-IDF) для каждого значения
    return [(term_doc_pair, value) for value in values]

In [133]:
documents = {
    "doc1": preprocess_text("Streaming data is the data from sensors as well as other real-time surveillance systems. Distributed stream processing systems are the software that manages such data. Such frameworks have to deliver outcomes on the go instantly. They are susceptible to delay and malfunction or system failures. The system must be tolerant of faults and always accessible. Many variables, such as improved network arrival rates, node failures, and so on, disrupt the system's reliability. Some operators need to be relocated online from one physical resource to another to manage or reimburse a slow or failing node. In this study, we propose a co-location based systematic migration heuristic for live operator migration between physical resources using a migration map revised with costs for each migration. The suggested method evaluates continuous operator performance patterns and makes online scheduling decisions based on the same. The decisions include migrating operators during a node failure or straggling."),
    "doc2": preprocess_text("Distributed stream processing engines are designed with a focus on scalability to process big data volumes in a continuous manner. We present the Theodolite method for benchmarking the scalability of distributed stream processing engines. Core of this method is the definition of use cases that microservices implementing stream processing have to fulfill. For each use case, our method identifies relevant workload dimensions that might affect the scalability of a use case. We propose to design one benchmark per use case and relevant workload dimension. We present a general benchmarking framework, which can be applied to execute the individual benchmarks for a given use case and workload dimension. Our framework executes an implementation of the use case's dataflow architecture for different workloads of the given dimension and various numbers of processing instances. This way, it identifies how resources demand evolves with increasing workloads. Within the scope of this paper, we present 4 identified use cases, derived from processing Industrial Internet of Things data, and 7 corresponding workload dimensions. We provide implementations of 4 benchmarks with Kafka Streams and Apache Flink as well as an implementation of our benchmarking framework to execute scalability benchmarks in cloud environments. We use both for evaluating the Theodolite method and for benchmarking Kafka Streams' and Flink's scalability for different deployment options."),
    "doc3": preprocess_text("Batch and stream processing are separately and efficiently applied in many applications. However, some newer data-driven applications such as the Internet of Things and cloud computing call for hybrid processing approaches in order to handle the speed and accuracy required for processing such complex data. In this paper, we propose a Hybrid Distributed Batch-Stream (HDBS) architecture for anomaly detection in real-time data. The hybrid architecture, while benefiting from the accuracy provided by batch processing, also enjoys the speed and real-time features of stream processing. In the proposed architecture, our focus is on the algorithmic aspects of hybrid processing including the interaction models between batch and stream processing units, the characteristics of batch and stream machine learning algorithms and the principles of merging the results of different processing units. The driving idea of such combination is that the results of batch and stream processing units are complementary with each other, as one of them constructs accurate models based on previous data, and the other one is capable of processing new stream data in real-time. Furthermore, we propose a generalized version of the HDBS with respect to its algorithms and communication policy levels. In the generalized HDBS architecture, we address the various aspects of the interaction between the batch and stream processing units, and the merging operations to produce the final results. the evaluations of the proposed architecture using various criteria (accuracy, space complexity, and time complexity) demonstrate that the accuracy of the proposed method is higher than the accuracy of the batch processing methods, its time complexity is also similar to one of the stream processing methods and much less than the batch processing methods, which makes our proposed architecture an efficient and practical solution for real-time anomaly detection."),
    "doc4": preprocess_text("There have been increasing demands for real time processing of the ever-growing data. In order to meet this requirement and ensure the reliable processing of streaming data, a variety of distributed stream processing architectures and platforms have been developed, which handles the fundamental task of allocating processing tasks to the currently available physical resources and routing streaming data between these resources. However, many stream processing systems lack an intelligent scheduling mechanism, in which their default schedulers allocate tasks without taking resource demands and availability, or the transfer latency between resources into consideration. Besides, stream processing has a strict request for latency. Thus it is important to give latency guarantee for distributed stream processing. In this paper, we propose two new algorithms for stream processing with latency guarantee, both the algorithms consider transfer latency and resource demand in task allocation. Both algorithms can guarantee latency constraints. Algorithm AHA reduces more than 21.3% and 58.9% resources compared with the greedy and the round-robin algorithms, and algorithm PHA further improves the resource utilization to 32.1% and 73.2%."),
    "doc5": preprocess_text("In the era of Big Data, typical architecture of distributed real-time stream processing systems is the combination of Flume, Kafka, and Storm. As a kind of distributed message system, Kafka has the characteristics of horizontal scalability and high throughput, which is manly deployed in many areas in order to address the problem of speed mismatch between message producers and consumers. When using Kafka, we need to quickly receive data sent by producers. In addition, we need to send data to consumers quickly. Therefore, the performance of Kafka is of critical importance to the performance of the whole stream processing system. In this paper, we propose the improved design of real-time stream processing systems, and focus on improving the Kafka’s data loading process. We use Kafka cat to transfer data from the source to Kafka topic directly, which can reduce the network transmission. We also utilize the memory file system to accelerate the process of data loading, which can address the bottleneck and performance problems caused by disk I/O. Extensive experiments are conducted to evaluate the performance, which show the superiority of our improved design."),
    "doc6": preprocess_text("In this paper, nearly 40 commonly used deep neural network(DNN) models are selected, and their cross-platform and cross-inference frameworks are deeply analysed. The main metrics of accuracy, the total number of model parameters, the computational complexity, the accuracy density, the inference time, the memory consumption and other related parameters are used to measure their performance. The heterogeneous computing experiment is implemented on both the Google Colab cloud computing platform and the Jetson Nano embedded edge computing platform. The obtained performance is compared with that of two previous computing platforms: a workstation equipped with an NVIDIA Titan X Pascal and an embedded system based on an NVIDIA Jetson TX1 board. In addition, on the Jetson Nano embedded edge computing platform, different inference frameworks are investigated to evaluate the inference efficiency of the DNN models. Regression models are established to characterize the variation in the computing performance of different DNN classification algorithms so that the inference results of unknown models can be estimated. ANOVA methods are proposed to quantify the differences between models. The experimental results have important guiding significance for the better selection, deployment and application of DNN models in practice."),
    "doc7": preprocess_text("Unmanned Aerial Vehicles (UAVs), which can operate autonomously in dynamic and complex environments, are becoming increasingly common. Deep learning techniques for motion control have recently taken a major qualitative step since vision-based inference tasks can be executed directly on edge. The goal is to fully integrate the machine learning (ML) element into small UAVs. However, given the limited payload capacity and energy available on small UAVs, integrating computing resources sufficient to host ML and vehicle control functions is still challenging. This paper presents a modular and generic system that can control the UAV by evaluating vision-based ML tasks directly inside the resource-constrained UAV. Two different vision-based navigation configurations were tested and demonstrated. The first configuration implements an autonomous landing site detection system, tested with two models based on LeNet-5 and MobileNetV2, respectively. This allows the UAV to change its planned path accordingly and approach the target to land. Moreover, a model for people detection based on a custom MobileNetV2 network was evaluated in the second configuration. Finally, the execution time and power consumption were measured and compared with a cloud computing approach. The results show the ability of the developed system to dynamically react to the environment to provide the necessary maneuver after detecting the target exploiting only the constrained computational resources of the UAV controller. Furthermore, we demonstrated that moving to the edge, instead of using cloud computing inference, decreases the energy requirement of the system without reducing the quality of service."),
    "doc8": preprocess_text("With the continuous development of Internet of Things (IoT) and the overwhelming explosion of Big Data, edge computing serves as an efficient computing mode for time stringent data processing, which can bypass the constraints of network bandwidth and delay, and has been one of the foundation of interconnected applications. Although edge computing has gradually become one of bridges between cloud computing centers and mobile terminals, the literature still lacks a thorough review on the recent advances in edge computing platforms. In this paper, we firstly introduce the definition of edge computing and advantages of edge computing platform. And then, we summarize the key technologies of constructing an edge computing platform, and propose a general framework for edge computing platform. The role of distributed storage management systems in building edge computing platform is elaborated in detail. Furthermore, we give some applications to illustrate how to use third-party edge computing platforms to build specific applications. Finally, we briefly outline current open issues of edge computing platform based on our literature survey."),
}

# Job 1: Вычисляем TF
input_data_1 = RECORDREADER_JOB1()
job1_output = list(MapReduce(input_data_1, MAP_JOB1, REDUCE_JOB1))
#print("Job 1 Output:", job1_output)

# Job 2: Вычисляем DF
input_data_2 = RECORDREADER_JOB2()
job2_output = list(MapReduce(input_data_2, MAP_JOB2, REDUCE_JOB2))
#print("Job 2 Output:", job2_output)

# Job 3: Вычисляем TF-IDF
total_docs = len(documents)
input_data_3 = RECORDREADER_JOB3()
job3_output = list(MapReduce(input_data_3, lambda term, doc_tf_df_pair: MAP_JOB3(term, doc_tf_df_pair, total_docs), REDUCE_JOB3))

results_by_doc = defaultdict(list)
for ((word, docname), tfidf) in job3_output:
    results_by_doc[docname].append((word, tfidf))

# Печатаем результаты
for docname, words in results_by_doc.items():
    print(f"Document: {docname}")
    for word, tfidf in sorted(words):
        print(f"   {word}: {tfidf:.6f}")
    print()


Document: doc1
   a: 0.000000
   accessible: 0.013863
   always: 0.013863
   and: 0.000000
   another: 0.013863
   are: 0.003836
   arrival: 0.013863
   as: 0.009400
   based: 0.006267
   be: 0.009242
   between: 0.001918
   colocation: 0.013863
   continuous: 0.006539
   costs: 0.013863
   data: 0.005754
   decisions: 0.027726
   delay: 0.009242
   deliver: 0.013863
   disrupt: 0.013863
   distributed: 0.001918
   during: 0.013863
   each: 0.006539
   evaluates: 0.013863
   failing: 0.013863
   failure: 0.013863
   failures: 0.027726
   faults: 0.013863
   for: 0.001780
   frameworks: 0.009242
   from: 0.009242
   go: 0.013863
   have: 0.003133
   heuristic: 0.013863
   improved: 0.009242
   in: 0.000000
   include: 0.013863
   instantly: 0.013863
   is: 0.000000
   live: 0.013863
   makes: 0.009242
   malfunction: 0.013863
   manage: 0.013863
   manages: 0.013863
   many: 0.004621
   map: 0.013863
   method: 0.006539
   migrating: 0.013863
   migration: 0.055452
   must: 0.013863
   

# Breadth-First Search
Алгоритм параллельного поиска в ширину  
Найти кратчайший путь от вершины 0 до вершины 19

In [134]:
def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element

def groupbykey(iterable):
    t = {}
    for (k2, v2) in iterable:
        t[k2] = t.get(k2, []) + [v2]
    return t.items()

def MapReduce(INPUTREADER, MAP, REDUCE):
    return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), INPUTREADER())))))


class Node(NamedTuple):
    id: int
    adj_list: list[int]
    distance: int

graph_lst = {
    0: [1, 3],
    1: [0, 2, 4],
    2: [1, 3, 5, 6],
    3: [0, 2, 8],
    4: [1, 10, 11],
    5: [2, 8, 10],
    6: [2, 7],
    7: [6, 12],
    8: [3, 5, 7, 9],
    9: [8, 10],
    10: [4, 5, 9, 11, 16],
    11: [4, 10, 13],
    12: [7, 14],
    13: [11, 14],
    14: [12, 13, 15],
    15: [14, 16],
    16: [10, 15, 17],
    17: [16, 18],
    18: [17, 19],
    19: [18]
}

# Функция создания графа
def GRAPHCREATER():
    return [(n, Node(id=n, adj_list=nn, distance=100)) for n, nn in graph_lst.items()]

# Функция для начала поиска (для вершины 0)
def initialize_graph():
    graph_struct = GRAPHCREATER()
    N = graph_struct[0][1]
    N1 = Node(N.id, N.adj_list, 0)  # Начинаем с вершины 0 с расстоянием 0
    graph_struct[0] = (N.id, N1)
    return graph_struct

# Функция MAP (поиск соседей и обновление расстояния)
def MAP(n: int, N: Node):
    d = N.distance
    for m in N.adj_list:
        yield (m, d + 1)

# Функция REDUCE (выбор минимального расстояния для каждой вершины)
def REDUCE(m: int, distances: Iterator[int]):
    dmin = min(distances)
    yield (m, dmin)

# Чтение данных для MapReduce
def READER():
    for i, m in output:
        N = graph_struct[i][1]
        yield (i, Node(N.id, N.adj_list, m))

graph_struct = initialize_graph()

output = [(0, 0)]

for iteration in range(1, len(graph_lst) + 1):  # Делаем максимум N итераций, где N - количество вершин
    print(f"Итерация {iteration}:")

    output = MapReduce(READER, MAP, REDUCE)
    output = list(output)

    # Обновляем граф на основе результатов MapReduce
    updated_nodes = []  # Список обновленных вершин
    for i, d in output:
        M = graph_struct[i][1]
        if M.distance == 100:  # Если вершина не была посещена, обновляем её
            graph_struct[i] = (i, Node(M.id, M.adj_list, d))
            updated_nodes.append(i)

    if updated_nodes:
        print(f"Обновленные вершины: {updated_nodes}")
    else:
        print("Нет обновлений на этой итерации")

    output = [(i, d) for i, d in output if d != 100]

    if any(i == 19 for i, d in output):
        print("Вершина 19 достигнута!")
        break

for i, M in graph_struct:
    if M.id == 19:
        print(f"Кратчайший путь от вершины 0 до вершины 19: {M.distance}")

Итерация 1:
Обновленные вершины: [1, 3]
Итерация 2:
Обновленные вершины: [2, 4, 8]
Итерация 3:
Обновленные вершины: [5, 6, 10, 11, 7, 9]
Итерация 4:
Обновленные вершины: [16, 13, 12]
Итерация 5:
Обновленные вершины: [15, 17, 14]
Итерация 6:
Обновленные вершины: [18]
Итерация 7:
Обновленные вершины: [19]
Вершина 19 достигнута!
Кратчайший путь от вершины 0 до вершины 19: 7
