Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ option (BUILD_LIBHDFSPP
"Include LIBHDFSPP library in the build process"
OFF)

option (BUILD_SPARSEHASH
"Include sparsehash library in the build process"
OFF)

option(BUILD_CPP_TESTS
"Build the googletest unit tests"
ON)
Expand Down
7 changes: 6 additions & 1 deletion c++/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ set(SOURCE_FILES
Compression.cc
ConvertColumnReader.cc
CpuInfoUtil.cc
Dictionary.cc
Exceptions.cc
Geospatial.cc
Int128.cc
Expand Down Expand Up @@ -212,8 +213,8 @@ target_link_libraries (orc
$<BUILD_INTERFACE:orc::snappy>
$<BUILD_INTERFACE:orc::lz4>
$<BUILD_INTERFACE:orc::zstd>
$<BUILD_INTERFACE:orc::sparsehash>
$<BUILD_INTERFACE:${LIBHDFSPP_LIBRARIES}>
$<BUILD_INTERFACE:${SPARSEHASH_LIBRARIES}>
)

target_include_directories (orc
Expand All @@ -232,6 +233,10 @@ if (BUILD_LIBHDFSPP)
target_compile_definitions(orc PUBLIC -DBUILD_LIBHDFSPP)
endif (BUILD_LIBHDFSPP)

if (BUILD_SPARSEHASH)
target_compile_definitions(orc PUBLIC -DBUILD_SPARSEHASH)
endif (BUILD_SPARSEHASH)

if (BUILD_CPP_ENABLE_METRICS)
message(STATUS "Enable the metrics collection")
target_compile_definitions(orc PUBLIC ENABLE_METRICS=1)
Expand Down
111 changes: 8 additions & 103 deletions c++/src/ColumnWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@
#include <memory>
#include "ByteRLE.hh"
#include "ColumnWriter.hh"
#include "Dictionary.hh"
#include "RLE.hh"
#include "Statistics.hh"
#include "Timezone.hh"
#include "Utils.hh"

#include <sparsehash/dense_hash_map>

namespace orc {
StreamsFactory::~StreamsFactory() {
// PASS
Expand Down Expand Up @@ -927,104 +926,6 @@ namespace orc {
ColumnWriter::finishStreams();
dataStream_->finishStream();
}

/**
* Implementation of increasing sorted string dictionary
*/
class SortedStringDictionary {
public:
struct DictEntry {
DictEntry(const char* str, size_t len) : data(std::make_unique<std::string>(str, len)) {}

std::unique_ptr<std::string> data;
};

SortedStringDictionary() : totalLength_(0) {
/// Need to set empty key otherwise dense_hash_map will not work correctly
keyToIndex_.set_empty_key(std::string_view{});
}

// insert a new string into dictionary, return its insertion order
size_t insert(const char* str, size_t len);

// write dictionary data & length to output buffer
void flush(AppendOnlyBufferedStream* dataStream, RleEncoder* lengthEncoder) const;

// get dict entries in insertion order
const std::vector<DictEntry>& getEntriesInInsertionOrder() const;

// return count of entries
size_t size() const;

// return total length of strings in the dictioanry
uint64_t length() const;

void clear();

private:
// store dictionary entries in insertion order
mutable std::vector<DictEntry> flatDict_;

// map from string to its insertion order index
google::dense_hash_map<std::string_view, size_t> keyToIndex_;
uint64_t totalLength_;

// use friend class here to avoid being bothered by const function calls
friend class StringColumnWriter;
friend class CharColumnWriter;
friend class VarCharColumnWriter;
// store indexes of insertion order in the dictionary for not-null rows
std::vector<int64_t> idxInDictBuffer_;
};

// insert a new string into dictionary, return its insertion order
size_t SortedStringDictionary::insert(const char* str, size_t len) {
size_t index = flatDict_.size();

auto it = keyToIndex_.find(std::string_view{str, len});
if (it != keyToIndex_.end()) {
return it->second;
} else {
flatDict_.emplace_back(str, len);
totalLength_ += len;

const auto& lastEntry = flatDict_.back();
keyToIndex_.emplace(std::string_view{lastEntry.data->data(), lastEntry.data->size()}, index);
return index;
}
}

// write dictionary data & length to output buffer
void SortedStringDictionary::flush(AppendOnlyBufferedStream* dataStream,
RleEncoder* lengthEncoder) const {
for (const auto& entry : flatDict_) {
dataStream->write(entry.data->data(), entry.data->size());
lengthEncoder->write(static_cast<int64_t>(entry.data->size()));
}
}

// get dict entries in insertion order
const std::vector<SortedStringDictionary::DictEntry>&
SortedStringDictionary::getEntriesInInsertionOrder() const {
return flatDict_;
}

// return count of entries
size_t SortedStringDictionary::size() const {
return flatDict_.size();
}

// return total length of strings in the dictioanry
uint64_t SortedStringDictionary::length() const {
return totalLength_;
}

void SortedStringDictionary::clear() {
totalLength_ = 0;
keyToIndex_.clear();
flatDict_.clear();
}

class StringColumnWriter : public ColumnWriter {
public:
StringColumnWriter(const Type& type, const StreamsFactory& factory,
Expand Down Expand Up @@ -1324,6 +1225,9 @@ namespace orc {
// flush dictionary data & length streams
dictionary.flush(dictStream.get(), dictLengthEncoder.get());

// convert index from insertion order to dictionary order
dictionary.reorder(dictionary.idxInDictBuffer_);

// write data sequences
int64_t* data = dictionary.idxInDictBuffer_.data();
if (enableIndex) {
Expand Down Expand Up @@ -1367,14 +1271,15 @@ namespace orc {
}

// get dictionary entries in insertion order
const auto& entries = dictionary.getEntriesInInsertionOrder();
std::vector<const SortedStringDictionary::DictEntry*> entries;
dictionary.getEntriesInInsertionOrder(entries);

// store each length of the data into a vector
for (uint64_t i = 0; i != dictionary.idxInDictBuffer_.size(); ++i) {
// write one row data in direct encoding
const auto& dictEntry = entries[static_cast<size_t>(dictionary.idxInDictBuffer_[i])];
directDataStream->write(dictEntry.data->data(), dictEntry.data->size());
directLengthEncoder->write(static_cast<int64_t>(dictEntry.data->size()));
directDataStream->write(dictEntry->data->data(), dictEntry->data->size());
directLengthEncoder->write(static_cast<int64_t>(dictEntry->data->size()));
}

deleteDictStreams();
Expand Down
99 changes: 99 additions & 0 deletions c++/src/Dictionary.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "Dictionary.hh"

namespace orc {

// insert a new string into dictionary, return its insertion order
size_t SortedStringDictionary::insert(const char* str, size_t len) {
size_t index = flatDict_.size();

auto it = keyToIndex_.find(std::string_view{str, len});
if (it != keyToIndex_.end()) {
return it->second;
} else {
flatDict_.emplace_back(str, len, index);
totalLength_ += len;

const auto& lastEntry = flatDict_.back().entry;
keyToIndex_.emplace(std::string_view{lastEntry.data->data(), lastEntry.data->size()}, index);
return index;
}
}

// write dictionary data & length to output buffer
void SortedStringDictionary::flush(AppendOnlyBufferedStream* dataStream,
RleEncoder* lengthEncoder) const {
std::sort(flatDict_.begin(), flatDict_.end(), LessThan());

for (const auto& entryWithIndex : flatDict_) {
dataStream->write(entryWithIndex.entry.data->data(), entryWithIndex.entry.data->size());
lengthEncoder->write(static_cast<int64_t>(entryWithIndex.entry.data->size()));
}
}

/**
* Reorder input index buffer from insertion order to dictionary order
*
* We require this function because string values are buffered by indexes
* in their insertion order. Until the entire dictionary is complete can
* we get their sorted indexes in the dictionary in that ORC specification
* demands dictionary should be ordered. Therefore this function transforms
* the indexes from insertion order to dictionary value order for final
* output.
*/
void SortedStringDictionary::reorder(std::vector<int64_t>& idxBuffer) const {
// iterate the dictionary to get mapping from insertion order to value order
std::vector<size_t> mapping(flatDict_.size());
for (size_t i = 0; i < flatDict_.size(); ++i) {
mapping[flatDict_[i].index] = i;
}

// do the transformation
for (size_t i = 0; i != idxBuffer.size(); ++i) {
idxBuffer[i] = static_cast<int64_t>(mapping[static_cast<size_t>(idxBuffer[i])]);
}
}

// get dict entries in insertion order
void SortedStringDictionary::getEntriesInInsertionOrder(
std::vector<const DictEntry*>& entries) const {
/// flatDict_ is sorted in insertion order before [[SortedStringDictionary::flush]] is invoked.
entries.resize(flatDict_.size());
for (size_t i = 0; i < flatDict_.size(); ++i) {
entries[i] = &(flatDict_[i].entry);
}
}

// return count of entries
size_t SortedStringDictionary::size() const {
return flatDict_.size();
}

// return total length of strings in the dictioanry
uint64_t SortedStringDictionary::length() const {
return totalLength_;
}

void SortedStringDictionary::clear() {
totalLength_ = 0;
keyToIndex_.clear();
flatDict_.clear();
}
} // namespace orc
104 changes: 104 additions & 0 deletions c++/src/Dictionary.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cstddef>
#include <memory>
#include <string>

#ifdef BUILD_SPARSEHASH
#include <sparsehash/dense_hash_map>
#else
#include <unordered_map>
#endif

#include "RLE.hh"

namespace orc {
/**
* Implementation of increasing sorted string dictionary
*/
class SortedStringDictionary {
public:
struct DictEntry {
DictEntry(const char* str, size_t len) : data(std::make_unique<std::string>(str, len)) {}

std::unique_ptr<std::string> data;
};

struct DictEntryWithIndex {
DictEntryWithIndex(const char* str, size_t len, size_t index)
: entry(str, len), index(index) {}

DictEntry entry;
size_t index;
};

SortedStringDictionary() : totalLength_(0) {
#ifdef BUILD_SPARSEHASH
/// Need to set empty key otherwise dense_hash_map will not work correctly
keyToIndex_.set_empty_key(std::string_view{});
#endif
}

// insert a new string into dictionary, return its insertion order
size_t insert(const char* str, size_t len);

// write dictionary data & length to output buffer
void flush(AppendOnlyBufferedStream* dataStream, RleEncoder* lengthEncoder) const;

// reorder input index buffer from insertion order to dictionary order
void reorder(std::vector<int64_t>& idxBuffer) const;

// get dict entries in insertion order
void getEntriesInInsertionOrder(std::vector<const DictEntry*>&) const;

// return count of entries
size_t size() const;

// return total length of strings in the dictioanry
uint64_t length() const;

void clear();

private:
struct LessThan {
bool operator()(const DictEntryWithIndex& l, const DictEntryWithIndex& r) {
return *l.entry.data < *r.entry.data; // use std::string's operator<
}
};
// store dictionary entries in insertion order
mutable std::vector<DictEntryWithIndex> flatDict_;

#ifdef BUILD_SPARSEHASH
// map from string to its insertion order index
google::dense_hash_map<std::string_view, size_t> keyToIndex_;
#else
std::unordered_map<std::string_view, size_t> keyToIndex_;
#endif

uint64_t totalLength_;

// use friend class here to avoid being bothered by const function calls
friend class StringColumnWriter;
friend class CharColumnWriter;
friend class VarCharColumnWriter;
// store indexes of insertion order in the dictionary for not-null rows
std::vector<int64_t> idxInDictBuffer_;
};

} // namespace orc
Loading
Loading