Skip to content

Commit

Permalink
Fix Loader::distributeToShards for dictionary encoded shard key
Browse files Browse the repository at this point in the history
The shard key can be a dictionary encoded string as well. Fixes #50.
  • Loading branch information
asuhan committed Jul 22, 2017
1 parent 3753c8a commit 7ff7ea5
Showing 1 changed file with 59 additions and 56 deletions.
115 changes: 59 additions & 56 deletions Import/Importer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -798,65 +798,68 @@ void Loader::distributeToShards(std::vector<OneShardBuffers>& all_shard_import_b
}
CHECK(shard_col_desc);
CHECK_LE(static_cast<size_t>(table_desc->shardedColumnId), import_buffers.size());
const auto& shard_column_input_buffer = import_buffers[table_desc->shardedColumnId - 1];
if (shard_col_desc->columnType.is_integer()) {
for (size_t i = 0; i < row_count; ++i) {
const auto val = int_value_at(*shard_column_input_buffer, i);
const auto shard = val % shard_count;
auto& shard_output_buffers = all_shard_import_buffers[shard];
for (size_t col_idx = 0; col_idx < import_buffers.size(); ++col_idx) {
const auto& input_buffer = import_buffers[col_idx];
const auto& col_ti = input_buffer->getTypeInfo();
const auto type = col_ti.is_decimal() ? decimal_to_int_type(col_ti) : col_ti.get_type();
switch (type) {
case kBOOLEAN:
shard_output_buffers[col_idx]->addBoolean(int_value_at(*input_buffer, i));
break;
case kSMALLINT:
shard_output_buffers[col_idx]->addSmallint(int_value_at(*input_buffer, i));
break;
case kINT:
shard_output_buffers[col_idx]->addInt(int_value_at(*input_buffer, i));
break;
case kBIGINT:
shard_output_buffers[col_idx]->addBigint(int_value_at(*input_buffer, i));
break;
case kFLOAT:
shard_output_buffers[col_idx]->addFloat(float_value_at(*input_buffer, i));
break;
case kDOUBLE:
shard_output_buffers[col_idx]->addDouble(double_value_at(*input_buffer, i));
break;
case kTEXT:
case kVARCHAR:
case kCHAR: {
CHECK_LT(i, input_buffer->getStringBuffer()->size());
shard_output_buffers[col_idx]->addString((*input_buffer->getStringBuffer())[i]);
break;
}
case kTIME:
case kTIMESTAMP:
case kDATE:
shard_output_buffers[col_idx]->addTime(int_value_at(*input_buffer, i));
break;
case kARRAY:
if (IS_STRING(col_ti.get_subtype())) {
CHECK(input_buffer->getStringArrayBuffer());
CHECK_LT(i, input_buffer->getStringArrayBuffer()->size());
const auto& input_arr = (*(input_buffer->getStringArrayBuffer()))[i];
shard_output_buffers[col_idx]->addStringArray(input_arr);
} else {
shard_output_buffers[col_idx]->addArray((*input_buffer->getArrayBuffer())[i]);
}
break;
default:
CHECK(false);
auto& shard_column_input_buffer = import_buffers[table_desc->shardedColumnId - 1];
const auto& shard_col_ti = shard_col_desc->columnType;
CHECK(shard_col_ti.is_integer() || (shard_col_ti.is_string() && shard_col_ti.get_compression() == kENCODING_DICT));
if (shard_col_ti.is_string()) {
const auto payloads_ptr = shard_column_input_buffer->getStringBuffer();
CHECK(payloads_ptr);
shard_column_input_buffer->addDictEncodedString(*payloads_ptr);
}
for (size_t i = 0; i < row_count; ++i) {
const auto val = int_value_at(*shard_column_input_buffer, i);
const auto shard = val % shard_count;
auto& shard_output_buffers = all_shard_import_buffers[shard];
for (size_t col_idx = 0; col_idx < import_buffers.size(); ++col_idx) {
const auto& input_buffer = import_buffers[col_idx];
const auto& col_ti = input_buffer->getTypeInfo();
const auto type = col_ti.is_decimal() ? decimal_to_int_type(col_ti) : col_ti.get_type();
switch (type) {
case kBOOLEAN:
shard_output_buffers[col_idx]->addBoolean(int_value_at(*input_buffer, i));
break;
case kSMALLINT:
shard_output_buffers[col_idx]->addSmallint(int_value_at(*input_buffer, i));
break;
case kINT:
shard_output_buffers[col_idx]->addInt(int_value_at(*input_buffer, i));
break;
case kBIGINT:
shard_output_buffers[col_idx]->addBigint(int_value_at(*input_buffer, i));
break;
case kFLOAT:
shard_output_buffers[col_idx]->addFloat(float_value_at(*input_buffer, i));
break;
case kDOUBLE:
shard_output_buffers[col_idx]->addDouble(double_value_at(*input_buffer, i));
break;
case kTEXT:
case kVARCHAR:
case kCHAR: {
CHECK_LT(i, input_buffer->getStringBuffer()->size());
shard_output_buffers[col_idx]->addString((*input_buffer->getStringBuffer())[i]);
break;
}
case kTIME:
case kTIMESTAMP:
case kDATE:
shard_output_buffers[col_idx]->addTime(int_value_at(*input_buffer, i));
break;
case kARRAY:
if (IS_STRING(col_ti.get_subtype())) {
CHECK(input_buffer->getStringArrayBuffer());
CHECK_LT(i, input_buffer->getStringArrayBuffer()->size());
const auto& input_arr = (*(input_buffer->getStringArrayBuffer()))[i];
shard_output_buffers[col_idx]->addStringArray(input_arr);
} else {
shard_output_buffers[col_idx]->addArray((*input_buffer->getArrayBuffer())[i]);
}
break;
default:
CHECK(false);
}
++all_shard_row_counts[shard];
}
} else {
CHECK(false);
++all_shard_row_counts[shard];
}
}

Expand Down

0 comments on commit 7ff7ea5

Please sign in to comment.