Skip to content

[Transaction]Offload Transaction Data #12482

@liangyepianzhou

Description

@liangyepianzhou

Motivation

Pulsar stores topics in bookkeeper ledgers. Bookkeeper ledger are normally replicated to three nodes. In order to save costs, when the topics need to be stored for a long time, pulsar may offload topics to an object store. In this case, we default that all the entries in topics is necessary.

But when we implement transaction in pulsar, some auxiliary Mark and aborted messages will be filtered out in bookkeeper:

  • Transaction Mark: Identify the messages in a transaction how to deal
    • Commit mark: It means the messages in this transaction can be exposed to consumers.
    • Abort mark: It means the messages in this transaction have been abandoned.
  • Aborted messages: the messages in a aborted transaction.

So what we need to do is filter out the unnecessary entries in the ledger that will to be offloaded.

The remainder of the document covers how we will do these:

  • Filter out transaction Mark and aborted messages
  • Only offload the ledger before Stable Position

Background

TransactionBuffer

Stable Position

Let's first understand the implementation of transactionBuffer. There is a more important concept in the implementation of transactionBuffer: Stable Position. In order to explain this concept clearly, I introduced another variable ReadPosition in the above figure, which means the position we read. If when we read position 3, we know that transaction 1 is commit, so the message of transaction 1 can be read, but we don’t know that the message status of transaction 2 is commit or abort, so we can’t read the message of transaction 2 yet. Which means that we can only read the data of position 0 in the above figure. We use a variable Stable Position to record this position. When Read Position<3, we don't know the status of the above-mentioned transactions, so these messages cannot be exposed to consumers. At this time, Stable Position = -1. Similarly, when Read Position = 3, Stable Position = 0; when Read Position <6, Stable Position = 0; when Read Position =6 Stable Position = 4; when Read Position =7 Stable Position = 7;

Documentation

There is a detailed description of TransactionBuffer in this document

SNIP 4: Transaction buffer data clear and dispatch transaction message

SnapShot

TransactionBuffer will persist its own data according to certain conditions.

  • No need to take snapshot before using transaction
  • Regular take snapshot
  • Take snapshot when the transaction messages exceeds a certain value
  • When a producer build by a client which turn on transaction, TopicTransactionBuffer will take a snapshot first.And the State of TransactionBuffer will change to Ready from NoSnapShot

Offload

When we create a ManagerLedger and initialize it, the initializeBookKeeper method will be used. In this method, a task (updateLastLedgerCreatedTimeAndScheduleRolloverTask) will be executed to periodically offload the ledger.The offload method of the LedgerOffloader interface is called at the end of the call chain.

Now LedgerOffload mainly has two implementations:

  • FileSystemManagedLedgerOffloader
  • BlobStoreManagedLedgerOffloader

They receive a ReadHandle while can read the entries of a ledger store in bookkeeper, and read the all entries , then offload them to an object store.

ReadHandle

When we read data from tiered storage, we will call the readsync method of ReadHandle, which has three implementation classes.

ReadHandle, a class to read entry information in ledger.When it reads the entries of ledger, it traverses and reads one by one. If the entry read now is not the expected entry, it will find the entry with the index before the expected entry.This will cause an endless loop problem. The solution to it is discussed below.

Todo

  • Design a filter class
    • There needs to be a way to judge whether the transactionBuffer is NoSnapshot or Initializing.
    • Need to have a method to judge whether the current entry needs offload
    • Need to determine whether Position of the current ledger is less than or equal to MaxReadPosition
    • add a class OffloadFilterDisable.
  • When a timing task triggers offload or the client calls offload, filter out the ledger that cannot be offloaded currently in the list of ledgers
  • When processing syncRead in ReadHandle, some of the entries may be filtered out, which may cause logic problems in the previous code.We need to optimize these code.
  • If the message is read from offload, It does not need to be subject to the read limit of MaxReadPosition.

Discussion (2021-8-26)

  1. Append transaction buffer snapshot when receiving the first transaction message.
  2. Check whether the ledger could be offloaded based upon the transaction buffer snapshot position.
  3. Record transaction buffer snapshot in timing even if there is no transaction messages.

Discussion (2021-11-19)

  1. Only offload messageId, the data is null. When dispatcher filters messages, ack this message.

    • advantage:
      1. No need to modify the offload read logic
      2. Ack message is simple when dispatcher filter message
      3. Don't need to change Cursor readPosition.
      4. Entry is still in order
    • shortcoming:
      1. Leaving Broker to read messages, special processing is required
  2. Don't offload transaction message

    • advantage:
      1. Compatible with other reading, no special processing is required.
    • shortcoming:
      1. Need to change offload read logic, It destroys the order of messages in a ledger.
      2. Need to deal with empty entry ack and readPosition update.

Discussion (2021-12-1)

Because of column storage, so we can't offload transaction message.
There are two options below:
1. offload message metadata record the position of the previous marker hole.
2. broker handle message hole.

Broker handle optimized solution:
now offloader read entry only two way:
1. Read one position
2. Read continuous messages in a ledger
Therefore, the offloader reader knows what the marker holes of the message are, when we read an empty marker message, we can return an empty entry whose data is null but the metadata has a messageId. When broker receive this null data message, we can ack this messageId and change the cursor readPosition to correct position.

Discussion (2021-12-3)
Prerequisites:

  1. offload all of the transaction data. We don’t want to change the data to be different from the bookkeeper
    How to do?
    Row storage: offload marker.
    Column storage: The marker and data are split into two tables.

Metadata

Metadata

Assignees

No one assigned

    Labels

    lifecycle/staletype/enhancementThe enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions