Skip to content

Commit

Permalink
Change TableDataWriter to write data using UncompressedSegments
Browse files Browse the repository at this point in the history
  • Loading branch information
Mytherin committed Oct 30, 2019
1 parent d1fae7e commit cd87feb
Show file tree
Hide file tree
Showing 23 changed files with 270 additions and 276 deletions.
4 changes: 2 additions & 2 deletions src/common/file_buffer.cpp
Expand Up @@ -7,8 +7,8 @@
using namespace duckdb;
using namespace std;

FileBuffer::FileBuffer(uint64_t bufsiz) :
Buffer(BufferType::FILE_BUFFER) {
FileBuffer::FileBuffer(FileBufferType type, uint64_t bufsiz) :
type(type) {
assert(bufsiz % FILE_BUFFER_BLOCK_SIZE == 0);
assert(bufsiz >= FILE_BUFFER_BLOCK_SIZE);
// we add (FILE_BUFFER_BLOCK_SIZE - 1) to ensure that we can align the buffer to FILE_BUFFER_BLOCK_SIZE
Expand Down
29 changes: 0 additions & 29 deletions src/include/common/buffer.hpp

This file was deleted.

16 changes: 11 additions & 5 deletions src/include/common/file_buffer.hpp
Expand Up @@ -8,13 +8,18 @@

#pragma once

#include "common/buffer.hpp"
#include "common/constants.hpp"

namespace duckdb {
struct FileHandle;

enum class FileBufferType : uint8_t {
BLOCK = 1,
MANAGED_BUFFER = 2
};

//! The FileBuffer represents a buffer that can be read or written to a Direct IO FileHandle.
class FileBuffer : public Buffer {
class FileBuffer {
constexpr static int FILE_BUFFER_BLOCK_SIZE = 4096;
constexpr static int FILE_BUFFER_HEADER_SIZE = sizeof(uint64_t);

Expand All @@ -23,14 +28,16 @@ class FileBuffer : public Buffer {
//! FILE_BUFFER_BLOCK_SIZE. The content in this buffer can be written to FileHandles that have been opened with
//! DIRECT_IO on all operating systems, however, the entire buffer must be written to the file. Note that the
//! returned size is 8 bytes less than the allocation size to account for the checksum.
FileBuffer(uint64_t bufsiz);
FileBuffer(FileBufferType type, uint64_t bufsiz);
virtual ~FileBuffer();

//! The type of the buffer
FileBufferType type;
//! The buffer that users can write to
data_ptr_t buffer;
//! The size of the portion that users can write to, this is equivalent to internal_size - FILE_BUFFER_HEADER_SIZE
uint64_t size;

public:
//! Read into the FileBuffer from the specified location. Automatically verifies the checksum, and throws an
//! exception if the checksum does not match correctly.
void Read(FileHandle &handle, uint64_t location);
Expand All @@ -39,7 +46,6 @@ class FileBuffer : public Buffer {
void Write(FileHandle &handle, uint64_t location);

void Clear();

private:
//! The pointer to the internal buffer that will be read or written, including the buffer header
data_ptr_t internal_buffer;
Expand Down
6 changes: 5 additions & 1 deletion src/include/storage/block_manager.hpp
Expand Up @@ -28,7 +28,11 @@ class BlockManager {
//! Read the content of the block from disk
virtual void Read(Block &block) = 0;
//! Writes the block to disk
virtual void Write(Block &block) = 0;
virtual void Write(FileBuffer &block, block_id_t block_id) = 0;
//! Writes the block to disk
void Write(Block &block) {
Write(block, block.id);
}
//! Write the header; should be the final step of a checkpoint
virtual void WriteHeader(DatabaseHeader header) = 0;
};
Expand Down
6 changes: 3 additions & 3 deletions src/include/storage/buffer/buffer_list.hpp
Expand Up @@ -8,16 +8,16 @@

#pragma once

#include "common/buffer.hpp"
#include "common/file_buffer.hpp"

namespace duckdb {

struct BufferEntry {
BufferEntry(unique_ptr<Buffer> buffer) :
BufferEntry(unique_ptr<FileBuffer> buffer) :
buffer(move(buffer)), ref_count(1), prev(nullptr) { }

//! The actual buffer
unique_ptr<Buffer> buffer;
unique_ptr<FileBuffer> buffer;
//! The amount of references to this entry
index_t ref_count;
//! Next node
Expand Down
10 changes: 3 additions & 7 deletions src/include/storage/buffer/managed_buffer.hpp
@@ -1,31 +1,27 @@
//===----------------------------------------------------------------------===//
// DuckDB
//
// storage/bufer/managed_buffer.hpp
// storage/buffer/managed_buffer.hpp
//
//
//===----------------------------------------------------------------------===//

#pragma once

#include "common/common.hpp"
#include "common/buffer.hpp"
#include "common/file_buffer.hpp"
#include "storage/storage_info.hpp"

namespace duckdb {
class BufferManager;

//! Managed buffer is an arbitrarily-sized buffer that is at least of size >= BLOCK_SIZE
class ManagedBuffer : public Buffer {
class ManagedBuffer : public FileBuffer {
public:
ManagedBuffer(BufferManager &manager, index_t size, bool can_destroy, block_id_t id);

//! The buffer manager this buffer belongs to
BufferManager &manager;
//! A pointer to the data held by the buffer
unique_ptr<data_t[]> data;
//! The size of the buffer in bytes
index_t size;
//! Whether or not the managed buffer can be freely destroyed when unpinned.
//! - If can_destroy is true, the buffer can be destroyed when unpinned and hence be unrecoverable. After being destroyed, Pin() will return false.
//! - If can_destroy is false, the buffer will instead be written to a temporary file on disk when unloaded from memory, and read back into memory when Pin() is called.
Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/buffer_manager.hpp
Expand Up @@ -19,7 +19,7 @@

namespace duckdb {

//! The buffer manager is a
//! The buffer manager is in charge of handling memory management for the database. It hands out memory buffers that can be used by the database internally.
class BufferManager {
friend class BufferHandle;
public:
Expand Down
38 changes: 10 additions & 28 deletions src/include/storage/checkpoint/table_data_writer.hpp
Expand Up @@ -12,48 +12,30 @@
#include "common/unordered_map.hpp"

namespace duckdb {

struct StringDictionary {
unordered_map<string, index_t> offsets;
index_t size = 0;
};
class ManagedBufferHandle;
class UncompressedSegment;
struct SegmentStatistics;

//! The table data writer is responsible for writing the data of a table to the block manager
class TableDataWriter {
public:
static constexpr index_t BLOCK_HEADER_NUMERIC = 0;
static constexpr index_t BLOCK_HEADER_STRING = sizeof(int32_t);
//! Marker indicating that a string is a big string (bigger than block size), and located in a special big string
//! area
static constexpr char BIG_STRING_MARKER[2] = {'\201', '\0'};
static constexpr index_t BIG_STRING_MARKER_SIZE = 2 * sizeof(char) + sizeof(block_id_t);

public:
TableDataWriter(CheckpointManager &manager, TableCatalogEntry &table);
~TableDataWriter();

void WriteTableData(Transaction &transaction);
private:
void AppendData(index_t col_idx, Vector &data);

void WriteColumnData(DataChunk &chunk, index_t column_index);
void WriteString(index_t index, const char *val);
void FlushBlock(index_t col);
void CreateSegment(index_t col_idx);
void FlushSegment(index_t col_idx);

void WriteDataPointers();

private:
//! Flush the block of the column if it cannot fit write_size more bytes
void FlushIfFull(index_t col, index_t write_size);
//! Writes the dictionary to the block buffer
void FlushDictionary(index_t col);

CheckpointManager &manager;
TableCatalogEntry &table;

vector<unique_ptr<Block>> blocks;
vector<index_t> offsets;
vector<index_t> tuple_counts;
vector<index_t> row_numbers;
vector<index_t> indexes;
vector<StringDictionary> dictionaries;
vector<unique_ptr<UncompressedSegment>> segments;
vector<unique_ptr<SegmentStatistics>> stats;

vector<vector<DataPointer>> data_pointers;
};
Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/in_memory_block_manager.hpp
Expand Up @@ -28,7 +28,7 @@ class InMemoryBlockManager : public BlockManager {
void Read(Block &block) override {
throw Exception("Cannot perform IO in in-memory database!");
}
void Write(Block &block) override {
void Write(FileBuffer &block, block_id_t block_id) override {
throw Exception("Cannot perform IO in in-memory database!");
}
void WriteHeader(DatabaseHeader header) override {
Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/numeric_segment.hpp
Expand Up @@ -23,7 +23,7 @@ class NumericSegment : public UncompressedSegment {
void FetchRow(FetchState &state, Transaction &transaction, row_t row_id, Vector &result) override;

//! Append a part of a vector to the uncompressed segment with the given append state, updating the provided stats in the process. Returns the amount of tuples appended. If this is less than `count`, the uncompressed segment is full.
index_t Append(SegmentStatistics &stats, TransientAppendState &state, Vector &data, index_t offset, index_t count) override;
index_t Append(SegmentStatistics &stats, Vector &data, index_t offset, index_t count) override;

//! Rollback a previous update
void RollbackUpdate(UpdateInfo *info) override;
Expand Down
7 changes: 0 additions & 7 deletions src/include/storage/single_file_block.hpp

This file was deleted.

2 changes: 1 addition & 1 deletion src/include/storage/single_file_block_manager.hpp
Expand Up @@ -33,7 +33,7 @@ class SingleFileBlockManager : public BlockManager {
//! Read the content of the block from disk
void Read(Block &block) override;
//! Write the given block to disk
void Write(Block &block) override;
void Write(FileBuffer &block, block_id_t block_id) override;
//! Write the header to disk, this is the final step of the checkpointing process
void WriteHeader(DatabaseHeader header) override;

Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/string_segment.hpp
Expand Up @@ -54,7 +54,7 @@ class StringSegment : public UncompressedSegment {
void FetchRow(FetchState &state, Transaction &transaction, row_t row_id, Vector &result) override;

//! Append a part of a vector to the uncompressed segment with the given append state, updating the provided stats in the process. Returns the amount of tuples appended. If this is less than `count`, the uncompressed segment is full.
index_t Append(SegmentStatistics &stats, TransientAppendState &state, Vector &data, index_t offset, index_t count) override;
index_t Append(SegmentStatistics &stats, Vector &data, index_t offset, index_t count) override;

//! Rollback a previous update
void RollbackUpdate(UpdateInfo *info) override;
Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/uncompressed_segment.hpp
Expand Up @@ -54,7 +54,7 @@ class UncompressedSegment {
virtual void FetchRow(FetchState &state, Transaction &transaction, row_t row_id, Vector &result) = 0;

//! Append a part of a vector to the uncompressed segment with the given append state, updating the provided stats in the process. Returns the amount of tuples appended. If this is less than `count`, the uncompressed segment is full.
virtual index_t Append(SegmentStatistics &stats, TransientAppendState &state, Vector &data, index_t offset, index_t count) = 0;
virtual index_t Append(SegmentStatistics &stats, Vector &data, index_t offset, index_t count) = 0;

//! Update a set of row identifiers to the specified set of updated values
void Update(DataTable &table, SegmentStatistics &stats, Transaction &transaction, Vector &update, row_t *ids, row_t offset);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/block.cpp
Expand Up @@ -3,5 +3,5 @@
using namespace duckdb;
using namespace std;

Block::Block(block_id_t id) : FileBuffer(BLOCK_SIZE), id(id) {
Block::Block(block_id_t id) : FileBuffer(FileBufferType::BLOCK, BLOCK_SIZE), id(id) {
}
4 changes: 1 addition & 3 deletions src/storage/buffer/managed_buffer.cpp
Expand Up @@ -5,9 +5,7 @@ using namespace duckdb;
using namespace std;

ManagedBuffer::ManagedBuffer(BufferManager &manager, index_t size, bool can_destroy, block_id_t id) :
Buffer(BufferType::MANAGED_BUFFER), manager(manager), size(size), can_destroy(can_destroy), id(id) {
FileBuffer(FileBufferType::MANAGED_BUFFER, size), manager(manager), can_destroy(can_destroy), id(id) {
assert(id >= MAXIMUM_BLOCK);
assert(size >= BLOCK_SIZE);

data = unique_ptr<data_t[]>(new data_t[size]);
}
14 changes: 7 additions & 7 deletions src/storage/buffer_manager.cpp
Expand Up @@ -21,7 +21,7 @@ BufferManager::~BufferManager() {
}

unique_ptr<BlockHandle> BufferManager::Pin(block_id_t block_id) {
// this method should only be used to pin blocks
// this method should only be used to pin blocks that exist in the file
assert(block_id < MAXIMUM_BLOCK);

// first obtain a lock on the set of blocks
Expand Down Expand Up @@ -56,7 +56,7 @@ unique_ptr<BlockHandle> BufferManager::Pin(block_id_t block_id) {
used_list.Append(move(buffer_entry));
} else {
auto buffer = entry->second->buffer.get();
assert(buffer->type == BufferType::FILE_BUFFER);
assert(buffer->type == FileBufferType::BLOCK);
result_block = (Block*) buffer;
// add one to the reference count
AddReference(entry->second);
Expand Down Expand Up @@ -100,14 +100,14 @@ unique_ptr<Block> BufferManager::EvictBlock() {
assert(entry->ref_count == 0);
// erase this identifier from the set of blocks
auto buffer = entry->buffer.get();
if (buffer->type == BufferType::FILE_BUFFER) {
if (buffer->type == FileBufferType::BLOCK) {
// block buffer: remove the block and reuse it
auto block = (Block*) buffer;
blocks.erase(block->id);
// free up the memory
current_memory -= BLOCK_SIZE;
// finally return the block obtained from the current entry
return unique_ptr_cast<Buffer, Block>(move(entry->buffer));
return unique_ptr_cast<FileBuffer, Block>(move(entry->buffer));
} else {
// managed buffer: cannot return a block here
auto managed = (ManagedBuffer*) buffer;
Expand Down Expand Up @@ -159,7 +159,7 @@ unique_ptr<ManagedBufferHandle> BufferManager::PinBuffer(block_id_t buffer_id, b
auto buffer = entry->second->buffer.get();
AddReference(entry->second);
// now return it
assert(buffer->type == BufferType::MANAGED_BUFFER);
assert(buffer->type == FileBufferType::MANAGED_BUFFER);
auto managed = (ManagedBuffer*) buffer;
assert(managed->id == buffer_id);
return make_unique<ManagedBufferHandle>(*this, managed, buffer_id);
Expand Down Expand Up @@ -188,7 +188,7 @@ void BufferManager::WriteTemporaryBuffer(ManagedBuffer &buffer) {
// create the file and write the size followed by the buffer contents
auto handle = fs.OpenFile(path, FileFlags::WRITE | FileFlags::CREATE);
handle->Write(&buffer.size, sizeof(index_t), 0);
handle->Write(buffer.data.get(), buffer.size, sizeof(index_t));
buffer.Write(*handle, sizeof(index_t));
}

unique_ptr<ManagedBufferHandle> BufferManager::ReadTemporaryBuffer(block_id_t id) {
Expand All @@ -206,7 +206,7 @@ unique_ptr<ManagedBufferHandle> BufferManager::ReadTemporaryBuffer(block_id_t id
}
// now allocate a buffer of this size and read the data into that buffer
auto buffer = make_unique<ManagedBuffer>(*this, alloc_size, false, id);
handle->Read(buffer->data.get(), alloc_size, sizeof(index_t));
buffer->Read(*handle, sizeof(index_t));

auto managed_buffer = buffer.get();
current_memory += alloc_size;
Expand Down

0 comments on commit cd87feb

Please sign in to comment.