# DE II Lab 1 - A practical Introduction to Apache Pulsar

## Summary

The lab assignment covers the practical part of the discussed concepts of data streaming frameworks. In this lab, we are going to explore one of the popular data streaming frameworks such as Apache Pulsar. The lab consists of four tasks. All tasks are compulsory for all the students. For this lab, we describe four tasks as follows:\
实验作业涵盖了所讨论的数据流框架概念的实践部分。在这个实验室里，我们将探索其中一个流行的数据流框架比如Apache Pulsar。该实验室由四项任务组成。所有的任务对所有的学生都是强制性的。对于本实验室，我们描述了以下四个任务：
* __Task 1__ provides an understanding of how to setup Apache Pulsar on a single Linux virtual Machine.\
__Task 1__提供了如何在单个Linux虚拟机上安装Apache Pulsar的理解。
* __Task 2__ demonstrates an implementation of the messaging system with Apache Pulsar. First, a single consumer and a producer running on the same machine is implemented. The task then introduces students how messaging systems can be distributed between different machines.\
__Task 2__演示了一个使用Apache Pulsar的消息系统的实现。首先，实现运行在同一台机器上的单个消费者和生产者。然后该任务向学生介绍如何在不同的机器之间分发消息系统。
* __Task 3__ is a theoretical task that requires an understanding of the concepts of data streaming frameworks i.e., specifically about Apache Pulsar.\
__Task 3__是一个理论任务，需要理解数据流框架的概念，即，特别是关于Apache Pulsar。
* __Task 4__ is a challenge task where students will implement a small use case of Apache pulsar.\
__Task 4__是一个具有挑战性的任务，学生将实现一个Apache Pulsar的小用例。

## Supplementary Material

* Information on Apache Pulsar: [Set up a standalone Pulsar locally](https://pulsar.apache.org/docs/en/standalone/)
* Relevant literature on data streaming framework: [A Survey of Distributed Data Stream Processing Frameworks](https://ieeexplore.ieee.org/document/8864052/)
* Information on Terminal Multiplexer: [Getting started with Tmux](https://linuxize.com/post/getting-started-with-tmux/)

## Preparation

* Log into the [SNIC cloud](https://east-1.cloud.snic.se/).
* Log in VM
```shell
# Producer
# IP address: 192.168.2.136
ssh -i /Users/lmf/PycharmProjects/MSc_DS/Important/key_DE.pem ubuntu@130.238.29.22
# Consumer
# IP address: 192.168.2.236
ssh -i /Users/lmf/PycharmProjects/MSc_DS/Important/key_DE.pem ubuntu@130.238.28.194
```
 * Remove the permanent known hosts while using the same floating IP
 ```shell
 vim ~/.ssh/known_hosts
 ```
* Following software packages should be installed for this assignment.\
应该为此作业安装以下软件包。
 * Ubuntu 20.04
 * Java 11.0.10
 ```shell
 # Update the repositories
 sudo apt update
 # Install OpenJDK:
 sudo apt install openjdk-11-jdk
 # Verify the version of the JDK:
 java -version
 ```
 * Docker 20.10.5

## Task1: Setting up Apache Pulsar

### A. Installing Apache Pulsar locally

1. Switch to root user and update the package lists\
切换到root用户并更新包列表
```shell
sudo -s
apt update; apt -y upgrade;
```

2. Download binary release of Apache Pulsar by running the following command in your terminal:\
在终端中运行以下命令下载Apache Pulsar的二进制版本:
```shell
wget https://archive.apache.org/dist/pulsar/pulsar-2.7.0/apache-pulsar-2.7.0-bin.tar.gz
```

3. Untar the downloaded tarball and navigate to the directory\
解压缩下载的tar文件并导航到该目录
```shell
tar xvfz apache-pulsar-2.7.0-bin.tar.gz
cd apache-pulsar-2.7.0
```

### B. Start Apache Pulsar standalone in a single machine

1. To see the log of Apache Pulsar on the terminal and also view logs from other components of the Apache Pulsar on a single machine. We will run our commands in the terminal multiplexer.\
可以在终端上查看Apache Pulsar的日志，也可以在单个机器上查看Apache Pulsar的其他组件的日志。我们将在终端多路复用器中运行我们的命令。
```shell
tmux new -s pulsar
```

 * You can detach from the Tmux session and return to your normal shell by typing `Ctrl+b d`. The program running in the Tmux session will continue to run after you detach from the session.\
 您可以通过键入`Ctrl+b d`从Tmux session中分离并返回到正常的shell。在Tmux session中运行的程序将在您从session分离后继续运行。

 * Reattach to the Tmux session by typing the following command\
 通过输入以下命令重新连接到Tmux session
 ```shell
 tmux attach-session -t <session_name>
 ```
 Example:\
 示例：
 ```shell
 tmux attach-session -t pulsar
 tmux attach-session -t consumer
 tmux attach-session -t producer
 ```

2. Start a local cluster using the following pulsar command by navigating to bin directory, and specifying standalone mode.\
通过导航到bin目录并指定独立模式，使用以下pulsar命令启动本地集群。
```shell
bin/pulsar standalone
```
The service is running on your terminal, which is under your direct control. If you need to run other commands, open a new terminal window.\
服务在您的终端上运行，该终端由您直接控制。如果需要运行其他命令，请打开一个新的终端窗口。
 * You can also run the service as a background process using the command:\
 也可以使用该命令将服务作为后台进程运行：
 ```shell
 bin/pulsar-daemon start standalone
 ```

3. When pulsar is successfully running, we will see the following output in the terminal\
当pulsar成功运行时，我们将在终端看到以下输出
```shell
2021-03-15 14:46:29,192 - INFO - [main:WebSocketService@95] - Configuration Store cache started
2021-03-15 14:46:29,192 - INFO - [main:AuthenticationService@61] - Authentication is disabled
2021-03-15 14:46:29,192 - INFO - [main:WebSocketService@108] - Pulsar WebSocket Service started
```
The above log shows the successful setup of the Apache pulsar.\
上面的日志显示了Apache pulsar的成功设置。

* Stop Pulsar standalone
 * Press `Ctrl+C` to stop a local standalone Pulsar.\
 按`Ctrl+C`停止一个本地standalone Pulsar。

 * If the service runs as a background process using the `pulsar-daemon start standalone` command, then use the following command to stop the service:\
 如果服务作为后台进程运行，使用`pulsar-daemon start standalone`命令，然后使用如下命令来停止服务：
 ```shell
 bin/pulsar-daemon stop standalone
 ``` 

### C. Setting up Apache Pulsar in docker

1. One of the advantages of using docker containers is to quickly setup a software service. We can run Apache Pulsar with the following docker command. To do this, we first exit Apache Pulsar (if it is running) by pressing `Cntrl + C` and then type the following command:\
使用docker容器的优点之一是可以快速设置软件服务。我们可以用下面的docker命令运行Apache Pulsar。要做到这一点，我们首先通过按`Cntrl + C`退出Apache Pulsar（如果它正在运行），然后输入以下命令:
```shell
docker run -it -p 6650:6650 -p 8080:8080 --mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf\ apachepulsar/pulsar:2.7.0 bin/pulsar standalone
```

## Task 2: Producing and Consuming messages

### A. Using the Python client API

* Useful Material
 * [Apache Pulsar - Pulsar Python client](https://pulsar.apache.org/docs/en/client-libraries-python/)
 * [Python Documentation - Module `pulsar`](http://pulsar.apache.org/api/python/)
  * [Tutorial - Getting Started with Apache Pulsar using Python](https://medium.com/@ankitsahay/getting-started-with-apache-pulsar-using-python-90a9fc1d6ff7)

1. First step is to install the pulsar client library for Python. Run the following command:\
第一步是安装用于Python的pulsar client库。执行如下命令:
```shell
sudo apt install python3-pip
pip3 install pulsar-client==2.7.1
```

2. We will run both a program which consumes messages (i.e., consumer) and a program that produces messages (i.e., producer) on a single machine. Therefore, we leverage terminal multiplexer so that we can run each program in their own session. We run the following command to create a session for consumer.\
我们将在一台机器上运行消费消息的程序（即消费者程序）和产生消息的程序（即生产者程序）。因此，我们利用终端多路复用器，这样我们就可以在每个程序各自的session中运行它们。我们运行以下命令为使用者创建一个session。
```shell
tmux new -s consumer
```

3. Create a consumer with the following code snippet\
使用以下代码片段创建消费者
```python
import pulsar
# Create a pulsar client by supplying ip address and port
client = pulsar.Client('pulsar://localhost:6650')
# Subscribe to a topic and subscription
consumer = client.subscribe('DEtopic', subscription_name = 'DE-sub')
# Display message received from producer
msg = consumer.receive()
try:
       print("Received message : '%s'" % msg.data())
       # Acknowledge for receiving the message
       consumer.acknowledge(msg)
except:
       consumer.negative_acknowledge(msg)
# Destroy pulsar client
client.close()
```
Save the above file as `consumer.py`\
将上述文件保存为`consumer.py`
```shell
vim consumer.py
```
and run it using the following command:\
并使用如下命令运行它：
```shell
tmux attach-session -t consumer
python3 consumer.py
```

4. We run the following command to create a session for producer\
我们运行以下命令为producer创建一个session
```shell
tmux new -s producer
```

5. Create a producer with the following code snippet\
使用以下代码片段创建生产者
```python
import pulsar
# Create a pulsar client by supplying ip address and port
client = pulsar.Client('pulsar://localhost:6650')
# Create a producer on the topic that consumer can subscribe to
producer = client.create_producer('DEtopic')
# Send a message to consumer
producer.send(('Welcome to Data Engineering Course!').encode('utf-8'))
# Destroy pulsar client
client.close()
```
Save the above file as `producer.py`\
将上述文件保存为`producer.py`
```shell
vim producer.py
```
and run it using the following command:\
并使用如下命令运行它：
```shell
python3 producer.py
```

* In the session where `consumer.py` is deployed, you should see one message like the following:\
在部署了`consumer.py`的session中，您应该看到如下消息：
```shell
Received message : 'b'Welcome to Data Engineering Course!''
```
<img src = "DE II T2 A.png">

### B. Running Apache Pulsar, consumer and producer on multiple machines

In this exercise, we will run consumer and producer as shown in the example above on separate 2 virtual machines. For example, Apache Pulsar, and producer runs on virtual machine A, whereas consumer runs on virtual machine B. This task requires setting up two virtual machines on SNIC cloud. To check if the distributed Apache Pulsar setup works i.e., messages are produced and consumed properly. See the output log of the consumer on virtual machine A.\
在这个练习中，我们将像上面的例子那样在两个不同的虚拟机上运行消费者和生产者。例如，Apache Pulsar和producer运行在虚拟机A上，而consumer运行在虚拟机B上。这个任务需要在SNIC cloud上设置两个虚拟机。为了检查分布式Apache Pulsar设置是否工作，即，消息的产生和消费是正确的。查看虚拟机A上消费者的输出日志。

__Note:__
To communicate consumer and producer with Apache Pulsar, you need to specify the IP address in the `client` object where Apache Pulsar is running.\
要使消费者与运行Apache Pulsar的生产者通信，需要在`client`对象中指定Apache Pulsar运行的IP地址。

In the file `consumer.py`, change\
在`consumer.py`文件中，将
```python
client = pulsar.Client("pulsar://localhost:6650")
```
to\
改为
```python
client = pulsar.Client("pulsar://<IP address where Apache Pulsar is running>:6650")
```

 * Example:\
 示例：
 ```python
 client = pulsar.Client("pulsar://192.168.2.136:6650")
 ```

## Task 3 Conceptual Questions

### Q1. What features does Apache Pulsar support have which the previous distributed data stream framework (e.g., Kafka) does not support?
Apache Pulsar支持哪些特性是以前的分布式数据流框架（如Kafka）不支持的?

__Answer__
* [Apache Pulsar 2.7.1](https://pulsar.apache.org/en/)
* [Apache Kafka vs Apache Pulsar](https://digitalis.io/blog/kafka/apache-kafka-vs-apache-pulsar/)
* [Comparing Apache Kafka and Apache Pulsar](https://blog.softwaremill.com/comparing-apache-kafka-and-apache-pulsar-3bd44e00f304)

1. Pulsar is highly scalable. One of the major advantages of Pulsar over Kafka is around the number of topics you can produce. There are hard limitations on a Kafka cluster when it comes to partitions, a limit of 4000 partitions per broker and a total of 200,000 across the entire cluster. Pulsar doesn’t suffer from this limitation, you can scale with millions of topics as the data is not stored within the brokers themselves but externally in Bookkeeper nodes.\
Pulsar是高度可扩展的。与Kafka相比，Pulsar的主要优势之一是你可以产生的主题的数量。Kafka集群在分区方面有严格的限制，每个broker的分区限制是4000，整个集群的分区限制是20万。Pulsar不受这种限制，您可以扩展数以百万计的主题，因为数据不是存储在broker本身中，而是存储在外部的Bookkeeper节点中。

2. Pulsar provides built-in geo-replication. It allows to protect against entire data center failures, as well as increase responsiveness for clients located in different parts of the world. With Kafka you have to rely on additional tooling.\
Pulsar提供内置的地理复制功能。它可以防止整个数据中心发生故障，并提高位于世界不同地区的客户机的响应能力。使用Kafka，你必须依赖额外的工具。

3. Pulsar is flexibible at message consumption. It has four different kinds of message consumption concepts also known as subscriptions. Four subscription modes are available in Pulsar: exclusive, shared, failover, and key_shared.\
Pulsar在信息消耗方面是灵活的。它有四种不同的消息消费概念，也称为订阅。Pulsar有四种订阅模式：独占、共享、故障转移和key_shared。

4. Pulsar offers tiered storage as part of the open source distribution, which is good for long term storage. It allows old and less frequently used data can be put on slower and more cost effective storage. While tiered storage appeared in Kafka only recently and is only available in the Confluent Kafka Platform 6.0.0 onwards as a paid for option.\
Pulsar作为开源发行版的一部分提供了分层存储，这有利于长期存储。它允许旧的和不经常使用的数据可以放在更慢和更经济有效的存储上。而分级存储最近才出现在Kafka中，并且只有在Confluent Kafka平台6.0.0之后才可以作为付费选项。

### Q2. What is the issue with using batch processing approach on data at scale? How do modern data stream processing systems such as Apache Pulsar can overcome the issue of batch processing?
(Read literature for this question: [A Survey of Distributed Data Stream Processing Frameworks](https://ieeexplore.ieee.org/document/8864052/))\
在大规模数据上使用批处理方法有什么问题？现代数据流处理系统，如Apache Pulsar，如何克服批量处理的问题？

__Answer__

Batch processing systems suffer from latency problems due to the need to collect input data into batches before it can be processed.\
批处理系统存在延迟问题，因为在处理之前需要将输入数据收集到批处理中。

Modern Data Stream Processing Systems (DSPS) try to combine batch and stream processing capabilities into a single or multiple parallel data processing pipelines. In this way, massive processing can be done on historical data to train machine learning models using batch data analytic pipelines, which can then be deployed on real-time incoming data streams for scoring using a separate data analytic pipeline.\
现代数据流处理系统（DSPS）试图将批处理和流处理能力结合到单个或多个并行数据处理管道中。通过这种方式，可以对历史数据进行大规模处理，使用批量数据分析管道训练机器学习模型，然后可以部署在实时传入的数据流上，使用单独的数据分析管道进行评分。

DSPS overcomes latency by processing big data volumes and provide useful insights into the data prior to saving it to long-term storage. It processes the live, raw data immediately as it arrives and meets the challenges of incremental processing, scalability, and fault tolerance.\
DSPS通过处理大数据量来克服延迟，并在将数据保存到长期存储之前提供有用的洞察。它在实时的原始数据到达时立即进行处理，并满足增量处理、可伸缩性和容错的挑战。

### Q3. What is the underlying messaging pattern used by Apache Pulsar? What is the advantage of such a messaging pattern?
Apache Pulsar使用的底层消息传递模式是什么？这种消息传递模式的优点是什么？

__Answer__
* [Apache Pulsar 2.7.1 - Concepts and Architecture: Messaging](https://pulsar.apache.org/docs/en/concepts-messaging/)
* [Benefits of Pub/Sub Messaging](https://aws.amazon.com/cn/pub-sub-messaging/benefits/)

Pulsar is built on the publish-subscribe pattern (often abbreviated to pub-sub). In this pattern, producers publish messages to topics. Consumers subscribe to those topics, process incoming messages, and send an acknowledgement when processing is complete. When a subscription is created, Pulsar retains all messages, even if the consumer is disconnected. Retained messages are discarded only when a consumer acknowledges that those messages are processed successfully.\
Pulsar是建立在发布-订阅模式（通常缩写为pub-sub）。在此模式中，生产者将消息发布到主题。使用者订阅这些主题，处理传入消息，并在处理完成时发送确认。创建订阅后，Pulsar保留所有消息，即使用户断开连接。保留的消息只有在消费者确认这些消息已被成功处理时才会被丢弃。

Pub-sub messaging provides instant event notifications for these distributed applications. The pub-sub model enables event-driven architectures and asynchronous parallel processing, while improving performance, reliability and scalability.\
pub-sub消息传递为这些分布式应用程序提供即时事件通知。pub-sub模型支持事件驱动的体系结构和异步并行处理，同时提高性能、可靠性和可伸缩性。

### Q.4 What are different modes of subscription? When are each modes of subscription used?
有哪些不同的subscription模式？什么时候使用每种subscription模式？

__Answer__\
[Apache Pulsar 2.7.1 - Concepts and Architecture: Messaging/Subscriptions](https://pulsar.apache.org/docs/en/concepts-messaging/#subscriptions)

A subscription is a named configuration rule that determines how messages are delivered to consumers. Four subscription modes are available in Pulsar: exclusive, shared, failover, and key_shared.\
订阅是一个指定的配置规则，它决定了如何将消息传递给使用者。Pulsar有四种订阅模式：exclusive、共享、failover和key_shared。

In Pulsar, you can use different subscriptions flexibly.\
在Pulsar，你可以灵活地使用不同的订阅。
* If you want to achieve traditional "fan-out pub-sub messaging" among consumers, specify a unique subscription name for each consumer. It is exclusive subscription mode.\
如果你想在消费者中实现传统的“扇出发布-订阅消息”，为每个消费者指定一个唯一的订阅名称。exclusive模式。
* If you want to achieve "message queuing" among consumers, share the same subscription name among multiple consumers(shared, failover, key_shared).\
如果你想在消费者之间实现“消息队列”，在多个消费者之间共享相同的订阅名称(shared, failover, key_shared)。
* If you want to achieve both effects simultaneously, combine exclusive subscription mode with other subscription modes for consumers.\
如果您想要同时达到这两种效果，可以将exclusive模式与其他订阅模式相结合。

### Q.5 What is the role of the ZooKeeper? Is it possible to ensure reliability in streaming frameworks such as Pulsar or Kafka without ZooKeeper?
ZooKeeper的角色是什么？在没有ZooKeeper的情况下，有可能在Pulsar或Kafka流框架中确保可靠性吗？

__Answer__
* [Apache Pulsar 2.7.1 - Administration/ZooKeeper and BookKeeper administration](https://pulsar.apache.org/docs/en/administration-zk-bk/)
* [How does Kafka depend on Zookeeper?](https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdoesKafkadependonZookeeper?)
* [stackoverflow - Is Zookeeper a must for Kafka?](https://stackoverflow.com/questions/23751708/is-zookeeper-a-must-for-kafka)

In Pulsar, ZooKeeper is responsible for a wide variety of configuration-related and coordination-related tasks.\
在Pulsar中，ZooKeeper负责各种各样与配置和协调相关的任务。

It is hard to ensure reliability in streaming frameworks such as Pulsar or Kafka without ZooKeeper. For Kafka, although Zookeeper dependency is about to be removed from the clients, the brokers will continue to be heavily depend on Zookeeper for server failure detection, data partitioning and in-sync data replication. For Pulsar, each Pulsar instance relies on two separate ZooKeeper quorums. Local ZooKeeper operates at the cluster level and provides cluster-specific configuration management and coordination. Each Pulsar cluster needs to have a dedicated ZooKeeper cluster. Configuration Store operates at the instance level and provides configuration management for the entire system (and thus across clusters). An independent cluster of machines or the same machines that local ZooKeeper uses can provide the configuration store quorum.\
在没有ZooKeeper的情况下，很难在Pulsar或Kafka等流框架中保证可靠性。对于Kafka来说，尽管对Zookeeper的依赖将从客户端中移除，但是代理将继续严重依赖Zookeeper来进行服务器故障检测、数据分区和同步数据复制。对于Pulsar，每个Pulsar实例依赖于两个独立的ZooKeeper quorum。Local ZooKeeper运行在集群级别，提供集群特有的配置管理和协调。每个Pulsar集群需要有一个专用的ZooKeeper集群。配置存储在实例级运行，并为整个系统（从而跨集群）提供配置管理。一个独立集群的机器或者本地ZooKeeper使用的相同机器可以提供配置存储仲裁。

### Q.6 Enlist different components of Pulsar?
Pulsar的不同组成？

__Answer__\
[Apache Pulsar 2.7.1 - Architecture Overview](https://pulsar.apache.org/docs/en/concepts-architecture-overview/)

* Broker(s)
 * HTTP server
 * Dispatcher
* BookKeeper instances (Bookies)
* ZooKeeper (metadata store)
* Bookies (persistent store for messages)
* Service discovery

### Q.7 Mention what is the meaning of broker in Pulsar?
Pulsar中broker的意义是什么？

__Answer__\
[Apache Pulsar 2.7.1 - Architecture Overview](https://pulsar.apache.org/docs/en/concepts-architecture-overview/)

In a Pulsar cluster, one or more brokers handles and load balances incoming messages from producers, dispatches messages to consumers, communicates with the Pulsar configuration store to handle various coordination tasks, stores messages in BookKeeper instances (aka bookies), relies on a cluster-specific ZooKeeper cluster for certain tasks, and more.\
在一个Pulsar集群中，一个或多个broker处理和负载平衡来自生产者的传入消息，将消息分派给消费者，与Pulsar配置库通信以处理各种协调任务，将消息存储在BookKeeper实例（即bookies）中，某些任务依赖于特定集群的ZooKeeper集群，和更多。

### Q.8 What is the role of a tenant in pulsar?
Pulsar中tenant的角色是什么？

__Answer__\
[Apache Pulsar - Concepts and Architecture/Multi Tenancy](https://pulsar.apache.org/docs/en/concepts-multi-tenancy/)

Pulsar was created from the ground up as a multi-tenant system. To support multi-tenancy, Pulsar has a concept of tenants. Tenants can be spread across clusters and can each have their own authentication and authorization scheme applied to them. They are also the administrative unit at which storage quotas, message TTL, and isolation policies can be managed. The tenant is the most basic unit of categorization for topics (more fundamental than the namespace and topic name).\
Pulsar是作为一个多tenant系统创建的。为了支持多tenant，Pulsar有tenant的概念。tenant可以跨集群分布，每个tenant可以应用自己的身份验证和授权方案。它们也是可以管理存储配额、消息TTL和隔离策略的管理单元。tenant是主题分类的最基本单元（比名称空间和主题名称更基本）。

### Q.10 What will happen if there is no log compaction in Apache Pulsar?
如果Apache Pulsar没有日志压缩会发生什么?

__Answer__\
[Apache Pulsar - Concepts and Architecture/Topic Compaction](https://pulsar.apache.org/docs/en/concepts-topic-compaction/)

For some use cases consumers don't need a complete "image" of the topic log. They may only need a few values to construct a more "shallow" image of the log, perhaps even just the most recent value. Log compaction would be highly beneficial in this case because it would keep consumers from needing to rewind through obscured messages. Without log compaction, it can also be very time intensive for Pulsar consumers to "rewind" through the entire log of messages.\
对于某些用例，消费者不需要主题日志的完整“映像”。他们可能只需要几个值来构建日志的更“浅层”映像，甚至可能只需要最近的值。日志压缩在这种情况下非常有益，因为它将使消费者不需要倒带模糊的消息。如果没有日志压缩，对于Pulsar消费者来说，“倒带”整个消息日志的时间也会非常密集。

## Task 4: Splitting and merging operations with Apache Pulsar

https://github.com/merlimat/sandbox/tree/efe78991b173db8e3518d0e7f21f3ecfe8e403eb

In this task, we are going to demonstrate how Apache Pulsar can be used for splitting and merging of operations. For this exercise, please clone the repository by using the following command:\
在这个任务中，我们将演示如何使用Apache Pulsar来分割和合并操作。在这个练习中，请使用以下命令克隆存储库:
```shell
git clone https://github.com/JesperStromblad/DE2LAB.git
```

The repository consists of a `conversion.py` file which demonstrates a code that has a “conversion” operation applied to each word on a string. Assume that there is a requirement that the operation is applied on each word instead of the entire string. Based on this premise, you are required to complete the following sub-tasks:\
存储库由一个`conversion.py`文件组成，该文件演示了一个对字符串上的每个单词应用“conversion”操作的代码。假设需要将操作应用于每个单词，而不是整个字符串。基于此前提，您需要完成以下子任务:

### 1. Identify an issue with the current implementation in terms of handling big data i.e., words are in the order of millions.
确定当前在处理大数据方面存在的问题，例如，单词以百万为单位。
> __of/in the order of sth__\
(_BrE_)(_NAmE_ also __on the order of__)(_formal_) about sth; approximately sth 大约；差不多\
She earns something __in the order of__ ￡80 000 a year. 她的年收入为8万英镑左右。

__Answer__

One of the issues with the current implementation in terms of handling big data is scaling the stream processing capacity based on the load.\
在处理大数据方面，当前实现的问题之一是根据负载扩大流处理能力。

### 2. Redesign the current implementation by using Apache Pulsar to demonstrate splitting and merging of data i.e., how the same operation on each word (i.e., splitting) and the resultant string (i.e., merging) can be handled by consumers and producers. Provide a diagram to demonstrate how your architecture will look like i.e.,
通过使用Apache Pulsar重新设计当前的实现来演示数据的分割和合并，也就是，消费者和生产者如何处理每个单词上的相同操作（也就是分割）和结果字符串（也就是合并）。提供一个图表来演示你的架构将会是怎样的，
 1. How data splitting and merging is handled by consumer and producer\
 消费者和生产者如何处理数据拆分和合并
 2. Label broker, consumer and producer.\
 标签broker、消费者和生产者。

### 3. Provide an implementation (preferably in Python programming language) of your architecture with Apache Pulsar. You can set up your architecture on a single virtual machine by creating different sessions for each consumer and producer.
使用Apache Pulsar提供您架构的实现（最好使用Python编程语言）。通过为每个消费者和生产者创建不同的session，您可以在单个虚拟机上设置您的体系结构。

### Shell

* Log in VM
```shell
ssh -i /Users/lmf/PycharmProjects/MSc_DS/Important/key_DE.pem ubuntu@130.238.29.22
```

* Switch to root user
```shell
sudo -s
```

* Navigate to Apache Pulsar directory
```shell
cd apache-pulsar-2.7.0
```

* Create tenant
```shell
bin/pulsar-admin tenants create DEII-tenant
```

* Create namespace
```shell
bin/pulsar-admin namespaces create DEII-tenant/DEII-namespace
```

* Create partitioned topic
```shell
bin/pulsar-admin topics create-partitioned-topic \
persistent://DEII-tenant/DEII-namespace/DEII-L1T4 --partitions 5
```

* List topic
```shell
bin/pulsar-admin topics list DEII-tenant/DEII-namespace
```

* Test
```shell
sudo vim negative_acknowledge.py
python3 negative_acknowledge.py
```

* Example
```shell
sudo vim example_producer.py
sudo vim example_consumer.py
tmux new -s example_consumer
python3 example_consumer.py
python3 example_producer.py
tmux attach-session -t example_consumer
```

* Modify python file
```shell
sudo vim capitalization_producer.py
sudo vim capitalization_comsumer.py
```

* Run consumer
```shell
tmux new -s capitalization_comsumer
tmux attach-session -t capitalization_comsumer
python3 capitalization_comsumer.py
```

* Run producer
```shell
python3 capitalization_producer.py
```

### capitalization_consumer.py


