Skip to content

Latest commit

 

History

History
276 lines (245 loc) · 10.7 KB

part5.md

File metadata and controls

276 lines (245 loc) · 10.7 KB

Part5

这章目的是插入数据达到最大数切分叶子节点,解决上一章节我们数据库仅能存储 13 个元素的尴尬。

首先我们先删除之前超过叶子节点最大元素时的报错:

void leaf_node_insert(Cursor* cursor, uint32_t key, Row* value) {
  void* node = get_page(cursor->table->pager, cursor->page_num);

  uint32_t num_cells = *leaf_node_num_cells(node);
  if (num_cells >= LEAF_NODE_MAX_CELLS) {
-    printf("Need to implement splitting a leaf node.\n");
-    exit(EXIT_FAILURE);
+    leaf_node_split_and_insert(cursor, key, value);
+    return;
  }

ExecuteResult execute_insert(Statement* statement, Table* table) {
   void* node = get_page(table->pager, table->root_page_num);
   uint32_t num_cells = (*leaf_node_num_cells(node));
-  if (num_cells >= LEAF_NODE_MAX_CELLS) {
-    return EXECUTE_TABLE_FULL;
-  }

接着,叶子节点被切分后,需要一个上层的内部节点关联:

internal_node的顺序如下行:
[common_header:6][num_keys:4][right_child_pointer:4][left_chid_pointer0:4][left_child0:4][left_child_pointer1:4][left_child1:4]...

Our internal node format

像上面这样摆放页面数据,因为子节点 pointer / key 占据如此小,可以容纳 510 keys 和 511 个子节点 pointer(+1 right_child)。这表明我们不需要拿着一个 key 去遍历所有子节点内容!

# internal node layers max # leaf nodes Size of all leaf nodes
0 511^0 = 1 4 KB
1 511^1 = 512 ~2 MB
2 511^2 = 261,121 ~1 GB
3 511^3 = 133,432,831 ~550 GB

实际上,我们不可能满足一个子节点把 4kb 全部填满,肯定有浪费的空间(wasted space). 但是我们仅仅通过从磁盘加载 4 个页面,就把 500GB 的数据从内存中读取查找出来。这就是为什么 B-Tree 对于数据库是那么重要的数据结构了。

接下来我们增加一些内部节点的数据:

// 内部节点Header Layout
+const uint32_t INTERNAL_NODE_NUM_KEYS_SIZE = sizeof(uint32_t);
+const uint32_t INTERNAL_NODE_NUM_KEYS_OFFSET = COMMON_NODE_HEADER_SIZE;
+const uint32_t INTERNAL_NODE_RIGHT_CHILD_SIZE = sizeof(uint32_t);
+const uint32_t INTERNAL_NODE_RIGHT_CHILD_OFFSET =
    INTERNAL_NODE_NUM_KEYS_OFFSET + INTERNAL_NODE_NUM_KEYS_SIZE;
+const uint32_t INTERNAL_NODE_HEADER_SIZE = COMMON_NODE_HEADER_SIZE +
                                           INTERNAL_NODE_NUM_KEYS_SIZE +
                                           INTERNAL_NODE_RIGHT_CHILD_SIZE;

// 内部节点Body Layout
+const uint32_t INTERNAL_NODE_CHILD_SIZE = sizeof(uint32_t);
+const uint32_t INTERNAL_NODE_KEY_SIZE = sizeof(uint32_t);
+const uint32_t INTERNAL_NODE_CELL_SIZE =
    INTERNAL_NODE_CHILD_SIZE + INTERNAL_NODE_KEY_SIZE;

// 下面这些是内部节点便利函数
+uint32_t* internal_node_num_keys(void* node) {
+ return node + INTERNAL_NODE_NUM_KEYS_OFFSET;
+}
+uint32_t* internal_node_right_child(void* node) {
+ return node + INTERNAL_NODE_RIGHT_CHILD_OFFSET;
+}
+uint32_t* internal_node_cell(void* node, uint32_t cell_num) {
+ return node + INTERNAL_NODE_HEADER_SIZE + cell_num * INTERNAL_NODE_CELL_SIZE;
+}
/*
  在内部节点node中根据子页面索引获得内存坐标
  不能超过该内存节点最大元素个数
  小于child_num 都是在left_childs中存储
  等于时是right_child
  */
+uint32_t* internal_node_child(void* node, uint32_t child_num) {
+ uint32_t num_keys = *internal_node_num_keys(node);
+ if (child_num > num_keys) {
+   printf("Tried to access child_num %d > num_keys %d\n", child_num, num_keys);
+   exit(EXIT_FAILURE);
+ } else if (child_num == num_keys) {
+   return internal_node_right_child(node);
+ } else {
+   return internal_node_cell(node, child_num);
+ }
+}
+uint32_t* internal_node_key(void* node, uint32_t key_num) {
+ return internal_node_cell(node, key_num) + INTERNAL_NODE_CHILD_SIZE;
+}
+void initialize_internal_node(void* node) {
+ set_node_type(node, NODE_INTERNAL);
+ set_node_root(node, false);
+ *internal_node_num_keys(node) = 0;
+}

// 通用节点遍历函数
+uint32_t get_node_max_key(void* node) {
+ switch (get_node_type(node)) {
+ case NODE_INTERNAL:
+   return *internal_node_key(node, *internal_node_num_keys(node) - 1);
+ case NODE_LEAF:
+   return *leaf_node_key(node, *leaf_node_num_cells(node) - 1);
+ }
+}
+bool is_node_root(void* node) {
+ uint8_t value = *(uint8_t*)(node + IS_ROOT_OFFSET);
+ return (bool)value;
+}
+void set_node_root(void* node, bool is_root) {
+ uint8_t value = is_root;
+ *(uint8_t*)(node + IS_ROOT_OFFSET) = value;
+}
// 补充增加一个获取未使用过的新页面坐标: 每次新更新的pager->num_pages 不就是那个页面坐标么!
+uint32_t get_unused_page_num(Pager* pager) { return pager->num_pages; }

叶子节点遍历函数 initialize 时, set_node_root 为 false, 并且Table*(*db_open)(const char* filename)发现是首次打开时,root_node 设置 set_node_root 为 true:

// 附加子节点切分LEFT / RIGHT 数量; 其实都是数量7
+const uint32_t LEAF_NODE_RIGHT_SPLIT_COUNT = (LEAF_NODE_MAX_CELLS + 1) / 2;
+const uint32_t LEAF_NODE_LEFT_SPLIT_COUNT = (LEAF_NODE_MAX_CELLS + 1) - LEAF_NODE_RIGHT_SPLIT_COUNT;

void initialize_leaf_node(void* node) {
  set_node_type(node, NODE_LEAF);
+ set_node_root(node, false);
  *leaf_node_num_cells(node) = 0

Table* db_open(const char* filename) {
  ...
    void* root_node = get_page(pager, 0);
    initialize_leaf_node(root_node);
+   set_node_root(root_node, true);
  }
  return table;

准备工作好,接下来实现切分函数leaf_node_split_and_insert

+void leaf_node_split_and_insert(Cursor* cursor, uint32_t key, Row* value) {
+ void* old_node = get_page(cursor->table->pager, cursor->page_num);
  // 首先先把新的页面做出来,并且当成是right child
+ uint32_t new_page_num = get_unused_page_num(cursor->table->pager);
+ void* new_node = get_page(cursor->table->pager, new_page_num);
+ initialize_leaf_node(new_node);

  // 从大到小将值分到new_node和old_node去,new_node(right child)放大的值
  // 注意: i 是int32_t 类型,不是 uint32_t 哦
+ for (int32_t i = LEAF_NODE_MAX_CELLS; i>= 0; i--) {
+   void* destination_node;
+   if (i >= LEAF_NODE_LEFT_SPLIT_COUNT) {
+     destination_node = new_node;
+   } else {
+     destination_node = old_node;
+   }
    // 计算将要放到指定内存坐标
+   uint32_t index_within_node = i % LEAF_NODE_LEFT_SPLIT_COUNT;
+   void* destination = leaf_node_cell(destination_node, index_within_node);
    // 如果i为新加入的数据,则为其写入
    // 注意: 这里提前修正了原作中的bug,该bug 在chapter12章才修复
+   if (i == cursor->cell_num) {
      // 原来要放的数据被当前数据占用
      // 放的目标是还是上面用于计算得到的 index_within_node
      // 同步记录key
+     serialize_row(leaf_node_value(destination_node, index_within_node), value);
+     *leaf_node_key(destination_node, index_within_node) = key;
+   } else if (i > cursor->cell_num) {
+     memcpy(destination, leaf_node_cell(old_node, i - 1), LEAF_NODE_CELL_SIZE);
+   } else {
      // 这里leaf_node_cell(old_node, i) 不 `-1` 是因为上面的已经算入了`(i == cursor->cell_num)`
+     memcpy(destination, leaf_node_cell(old_node, i), LEAF_NODE_CELL_SIZE);
+   }
+ }
  // 更新两个节点的num_cells信息
+ *leaf_node_num_cells(old_node) = LEAF_NODE_LEFT_SPLIT_COUNT;
+ *leaf_node_num_cells(new_node) = LEAF_NODE_RIGHT_SPLIT_COUNT;
  // 如果是root节点 - 从root 节点分出left child 和root节点
+ if (is_node_root(old_node)) {
+   return create_new_root(cursor->table, new_page_num);
+ } else {
+   printf("Need to implement updating parent after split\n");
+   exit(EXIT_FAILURE);
+ }
+}
+void create_new_root(Table* table, uint32_t right_child_page_num) {
+ void* root = get_page(table->pager, table->root_page_num);
+ void* right_child = get_page(table->pager, right_child_page_num);
+ // 新建left_child
+ uint32_t left_child_page_num = get_unused_page_num(table->pager);
+ void* left_child = get_page(table->pager, left_child_page_num);
+ // 将root数据拷贝至left_child
+ memcpy(left_child, root, PAGE_SIZE);
+ set_node_root(left_child, false);
+ // root数据内容将重写为internal格式,并且填充内容
+ initialize_internal_node(root);
+ set_node_root(root, true);
+ *internal_node_num_keys(root) = 1;
+ *internal_node_child(root, 0) = left_child_page_num;
+ uint32_t left_child_max_key = get_node_max_key(left_child);
+ *internal_node_key(root, 0) = left_child_max_key;
+ *internal_node_right_child(root) = right_child_page_num;
+}

以上实现了本章的目标: 切分叶子节点(14 + 1 = x + (15-x)), 先切出来 right_child, 然后再切出来 left_child, 然后设置 root 节点后,写入各自数据。

补充打印数据,方便查看:

-void print_leaf_node(void* node) {
-  uint32_t num_cells = *leaf_node_num_cells(node);
-  printf("leaf (size %d)\n", num_cells);
-  for (uint32_t i = 0; i < num_cells; i++) {
-    uint32_t key = *leaf_node_key(node, i);
-    printf("  - %d : %d\n", i, key);
-  }
-}
+void indent(uint32_t level) {
+  for (uint32_t i = 0; i < level; i++) {
+    printf("  ");
+  }
+}
+
+void print_tree(Pager* pager, uint32_t page_num, uint32_t indentation_level) {
+  void* node = get_page(pager, page_num);
+  uint32_t num_keys, child;
+
+  switch (get_node_type(node)) {
+    case NODE_INTERNAL:
+      num_keys = *internal_node_num_keys(node);
+      indent(indentation_level);
+      printf("- internal (size %d)\n", num_keys);
+      for (uint32_t i = 0; i < num_keys; i++) {
+        child = *internal_node_child(node, i);
+        print_tree(pager, child, indentation_level + 1);
+
+        indent(indentation_level + 1);
+        printf("- key %d\n", *internal_node_key(node, i));
+      }
+      child = *internal_node_right_child(node);
+      print_tree(pager, child, indentation_level + 1);
+      break;
+    case NODE_LEAF:
+      num_keys = *leaf_node_num_cells(node);
+      indent(indentation_level);
+      printf("- leaf (size %d)\n", num_keys);
+      for (uint32_t i = 0; i < num_keys; i++) {
+        indent(indentation_level + 1);
+        printf("- %d\n", *leaf_node_key(node, i));
+      }
+      break;
+  }
+}

// 更新调用函数
   } else if (strcmp(input_buffer->buffer, ".btree") == 0) {
     printf("Tree:\n");
-    print_leaf_node(get_page(table->pager, 0));
+    print_tree(table->pager, 0, 0);
     return META_COMMAND_SUCCESS;

测试

$chmod +x part5.sh && ./part5.sh

下一章

Part6 - 内部节点查找