Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rapid): Load/Unload data from primary engine to secondary engine #8

Closed
ShannonBase opened this issue Sep 16, 2023 · 1 comment
Closed
Assignees
Labels
feature it will be implemented as a new feature

Comments

@ShannonBase
Copy link
Contributor

ShannonBase commented Sep 16, 2023

1: Overview

The first step of processing AP workloads is load the basic full data into rapid engine, then rapid will start propagation operation automatically. When a table loaded from innodb to rapid engine, some meta informations will be also loaded into catalog table, such as performance_schema.rpd_column, performance_schema.rpd_column_id, etc. A backgroud thread will be launched when system start, then start to monitor the redo log, when a new DML operation done, this background thread starts to parse the incoming redo log, and apply the changes into IMCS.

When the load statement was executed, it would peform the load operation. Overall, just like insert into xxx select xxx statement, the system firstly do a table scan via index or full table scan.

1: It scans the target table, usually it is an innodb table. And, here, there is a problem must be clarified at first. That is which data will be visible to operation, and which is not. Therefore, here, we define that only the committed data will be visible to scan operation. In other words, that means we will use auto commited transaction to do table scan. the transaciton will be read committed isolation level.

The new data inserted when we do table scaning, all these the latest data will not be seen by the operation, because this would not happen. An exclusive mdl lock is used to protected the new rows are inserted into table when the loading operation is running.

2: Except the core functions, there must be some system parameters to monitors the load operations, for example, how many data have been loaded? and how many remains, and so on. some parallel related parameters also will be introduced into, such as POD( parallel of degree), etc. Therefore, some system parameters will be introduced.

2: Implementation

By now, the innodb has only a few parallel table scan abitities, such as based on index counting operations, and check table.

Innodb index is orginzied as B+tree, and each node in B+tree has a fixed size (generallly 16K, or denotated by UNIV_PAGE_SIZE in source code). Innodb reads the data from disk and loaded into buffer pool, and will spill out the old pages and load the latest page into buffer pool because the size of buffer pool is NOT un-limited. usually the buffer pool is configurated to 70%-80% of physical memory size.
image

Data on disk is managed in three-level: (1)segment; (2)cluster; (3)page. InnoDB does in-place updates, so when accessing a specific page, latch protection is required.

2.1: Full Table Scan
As the innodb does. Here, we would not discuss anymore. We mainly foucs on parall index scan.

2.2 Parallel index scan
The basic idea of parall index scan is divided index B+tree into several workloads, and each workload is scanned by using ONE thread.

The index table scan will follow the following steps:

  1. From the root node;
  2. Turn to left or right according to the comparision result. (If node value less than key, then the cursor go to left, otherwise, to right)
  3. Go through all the B+tree until the leaf nodes.
  4. Check all the data in that leaf node, and if found return, or otherwise return NOT_FOUND;

If we do FULL table scan, it will check all the node in that B+tree.

In ShannonBase, the semantic load operation is defined as following. ShannonBase does full table scan, and find out all the records in target table, and transfer them from row-based tuple into column-based tuple, then be loaded into rapid engine.

Now, we can using parallel scanning B+tree via dividing scanning operations simultaneously. For example, it perform scanning operations in parallely according to POD and height of the index.

ShannonBase uses Multi-version Concurrency Control (MVCC) to determine which row is visible or not.

2.2.1 InnoDB baisc parallel scan
Before we start to support more parallel operation in innodb, we, firstly, start to re-examine. All the worklogs of MySQL is listed as below.

image

This research result is searched by parallel keyword. From the result we can read that parallel operation mainly includes: (1) parallel query execution; (2) innodb parallel read of index. And there are several main worklog we should notice, (1)WL#12978 | InnoDB:Fix imbalance during parallel scan; (2) WL#11720 InnoDB: Parallel read of index; (3) WL#1092 Parallel load data infile and bulk insert; (4)WL#12978 InnoDB:Fix imbalance during parallel scan

For more information, ref to: https://dev.mysql.com/worklog/?sc=&sd=&k=parallel&s=

PS: to find out what changeed in WL#11720, we can use git log --grep WL#11720 to figure out what changes in this commit. this commit id: dbfc59ffaf8096a2dc5b76766fedb45ff2fb8cbf

/** The core idea is to find the left and right paths down the B+Tree.These
paths correspond to the scan start and scan end search. Follow the links
at the appropriate btree level from the left to right and split the scan
on each of these sub-tree root nodes.

If the user has set the maximum number of threads to use at say 4 threads
and there are 5 sub-trees at the selected level then we will split the 5th
sub-tree dynamically when it is ready for scan.

We want to allow multiple parallel range scans on different indexes at the
same time. To achieve this split out the scan  context (Scan_ctx) from the
execution context (Ctx). The Scan_ctx has the index  and transaction
information and the Ctx keeps track of the cursor for a specific thread
during the scan.

To start a scan we need to instantiate a Parallel_reader. A parallel reader
can contain several Scan_ctx instances and a Scan_ctx can contain several
Ctx instances. Its' the Ctx instances that are eventually executed.

This design allows for a single Parallel_reader to scan multiple indexes
at once.  Each index range scan has to be added via its add_scan() method.
This functionality is required to handle parallel partition scans because
partitions are separate indexes. This can be used to scan completely
different indexes and tables by one instance of a Parallel_reader.

To solve the imbalance problem we dynamically split the sub-trees as and
when required. e.g., If you have 5 sub-trees to scan and 4 threads then
it will tag the 5th sub-tree as "to_be_split" during phase I (add_scan()),
the first thread that finishes scanning the first set of 4 partitions will
then dynamically split the 5th sub-tree and add the newly created sub-trees
to the execution context (Ctx) run queue in the Parallel_reader. As the
other threads complete their sub-tree scans they will pick up more execution
contexts (Ctx) from the Parallel_reader run queue and start scanning the
sub-partitions as normal.

Note: The Ctx instances are in a virtual list. Each Ctx instance has a
range to scan. The start point of this range instance is the end point
of the Ctx instance scanning values less than its start point. A Ctx
will scan from [Start, End) rows. We use std::shared_ptr to manage the
reference counting, this allows us to dispose of the Ctx instances
without worrying about dangling pointers.

2.2.2 InnoDB basic parallel scan implementation
The change for parallel scan mainly in these files. (1) sql/handler.h; (2) innobase/handler/ha_innodb.h; (3) innobase/handler/handle0alter.cc; (4) include/row0pread.h. And the system params mainly defined in srv/srv0srv.cc.

Limitation: Can NOT parallel scan on secondary index .

class Parallel_reader {
};

  /** Specifies the range from where to start the scan and where to end it. */
  struct Scan_range {
};

  /** Thread related context information. */
  struct Thread_ctx {
};

/** Parallel reader context. */
class Parallel_reader::Scan_ctx {
};


class handler {
...
/**
    Initializes a parallel scan. It creates a parallel_scan_ctx that has to
    be used across all parallel_scan methods. Also, gets the number of
    threads that would be spawned for parallel scan.
*/
  virtual int parallel_scan_init(void *&scan_ctx [[maybe_unused]],
                                 size_t *num_threads [[maybe_unused]],
                                 bool use_reserved_threads [[maybe_unused]]) {
    return 0;
  }

/**
    Run the parallel read of data.
*/
  virtual int parallel_scan(void *scan_ctx [[maybe_unused]],
                            void **thread_ctxs [[maybe_unused]],
                            Load_init_cbk init_fn [[maybe_unused]],
                            Load_cbk load_fn [[maybe_unused]],
                            Load_end_cbk end_fn [[maybe_unused]]) {
    return 0;
  }

/**
    End of the parallel scan.
*/
  virtual void parallel_scan_end(void *scan_ctx [[maybe_unused]]) { return; }
...

/** Start row of the scan range. */
struct Key_reader_row {
};

/** Parallel read implementation. */
template <typename T, typename R>
class Reader {
}; 

2.3: Data Foramt

  • Option 1:
    Each column is organized as a file, when it flushes to disk. The format of columns in memory is also called as IMCU(In-memory column unit). An IMCU consisted by CUs(Column Unit), A CU has two parts: (1) Header, Meta Information; (2) Data; Data also can be divided into a bunch of chunks.

image

image

All chunks are linked. The address of the first chunk can be found from Cu's header, and also contains the address of the next chunk.

A chunk's consist with header and data. Header contains the meta information of this chunk. the data part is where the real data located.

class Chunk {
  public:
    class Chunk_header {
     private:
        byte* m_data;
        Chunk* m_next_chunk;
    };
 private:
   std::mutex  m_header_mutex;
   Chunk_header* m_header; 
};

class Cu {
public:
  class Cu_header {
     ....
     Compression_algorithm* m_comp_algo;
     Local_dictionary* m_dictionary;
     Chunk::Chunk_header* m_start_header;
  };

private:
  std::mutex m_header_mutex; 
  Cu_header* m_header;
  Chunk* m_chunks;
};

Gets the first Cu from IMCS. In an IMCS instance header, it has a header, which has a pointer to the address of IMCU.

When a new data in, it stores it in order. Insert sort can be used to make it ordered. It uses binary search to find the data.
But it data is in compressed format, at this situation, we need a new algorithm to find the data in compresssed data.

Now, we go to deeper. Giving out the more specific details of the data. Here, we notice that every data we write into CU should a tansaction id attached to it to mark which transaction it belongs.

  • 1: Fixed length Data Type:
  • 2: Variable Length Data Type:

Let's disscuss these data type above in detail.

1: Fix Length Data Type:

image

A record is divided into 3 parts: (1): infos, to indicate its information, such as, deleted or not, is null or not, total length, etc.
for example: the 7th bit: delete flag; the 6th bit: null flag, others is: total length of this data.

2: Var Length Data Type:

For the var length data type, a field to indicate the lengt of the data is needed. Here, there is an issue we should pay more attention to is size cost of storage. If a record has too many extra info to describe but the size of data itselft does not contain too much data. It will make ratio of the data effectiveness is too small.

And, as for, the string data type is another talk. It has some attributes to describe it. (1) length; (2) charset information. In order to saving the space, dictionary copression is incorprated.

There's also three parts in: (1) infos(2~4 bytes); (2) trx id (8 bytes); (3) data(variety); If data len is less than 65535, infos will use 2bytes. otherwise 4bytes used,
image

(payload ratio of this way seems not to effieciency)

  • Option 2:
    Uinsg some system columns to store the meta info. And this will make imcs more possible for SIMD and parallel processing.
    Apart the info bit, trx id, rowid and smu_ptr as independent columns.
    image

Now format 1 has been implemented, and format 2 will be implemented in next version

2.4: Local Dictionary
ref to: data compression/dictionary compression

2.5: Related Code
Loading data operation, the sematic of load of secondary engine, is to load the data from primary engine to secondary engine.

when the load statement, alter statement, is executed. mysql_execute_command will be executed and in this function. lex->m_sql_cmd is Sql_cmd_secondary_load_unload. Therefore, Sql_cmd_secondary_load_unload::execute executed.

At last, the execution will be in ha_rapid::load_table, and in this function, it starts to scan the target table to read the corresponding data.

class ha_rapid : public handler {
 public:
  ha_rapid(handlerton *hton, TABLE_SHARE *table_share);

 private:
  int create(const char *, TABLE *, HA_CREATE_INFO *, dd::Table *) override {
    return HA_ERR_WRONG_COMMAND;
  }

  int open(const char *name, int mode, unsigned int test_if_locked,
           const dd::Table *table_def) override;

  int close() override { return 0; }

  int rnd_init(bool) override { return 0; }

  int rnd_next(unsigned char *) override { return HA_ERR_END_OF_FILE; }

  int rnd_pos(unsigned char *, unsigned char *) override {
    return HA_ERR_WRONG_COMMAND;
  }

  int info(unsigned int) override;

  ha_rows records_in_range(unsigned int index, key_range *min_key,
                           key_range *max_key) override;

  void position(const unsigned char *) override {}

  unsigned long index_flags(unsigned int, unsigned int, bool) const override;

  THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to,
                             thr_lock_type lock_type) override;

  Table_flags table_flags() const override;

  const char *table_type() const override { return "MOCK"; }

  int load_table(const TABLE &table) override;

  int unload_table(const char *db_name, const char *table_name,
                   bool error_if_not_loaded) override;

  THR_LOCK_DATA m_lock;
};

2.6: Meta Data Info

when a table loaded into rapid, all the information about loaded tables will be added to catalog table, performance_schema.rpd_xxxx. Taking performance_schema.rpd_column_id as an instance, it contains the following part: (1) Id; (2)Table ID; (3) Column Name.

3: Unload Data
Unload operation will release the loaded data from rapid. it's a opposite operation fo loading operation. After the data clearn up, the meta inforation in performance_schema.rpd_xxx also be cleared.

Here, we dont discuss this part in detail anymore.

Related Issue: #20

@ShannonBase ShannonBase changed the title feat(rapid): Load data from primary engine to secondary engine feat(rapid): Load/Unload data from primary engine to secondary engine Sep 16, 2023
@ShannonBase ShannonBase self-assigned this Oct 11, 2023
@ShannonBase ShannonBase added the feature it will be implemented as a new feature label Oct 16, 2023
@ShannonBase
Copy link
Contributor Author

Done, close it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature it will be implemented as a new feature
Projects
None yet
Development

No branches or pull requests

1 participant