Skip to content

Commit

Permalink
fix decode error; retry create kafka client
Browse files Browse the repository at this point in the history
  • Loading branch information
siyuan0322 committed Nov 9, 2023
1 parent 763ef04 commit 7d43ccf
Show file tree
Hide file tree
Showing 10 changed files with 32 additions and 22 deletions.
2 changes: 1 addition & 1 deletion interactive_engine/executor/assembly/rustfmt.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ unstable_features = true
chain_width = 48
max_width = 108
use_small_heuristics = "Max"
fn_args_layout = "Compressed"
fn_params_layout = "Compressed"
group_imports = "StdExternalCrate"
1 change: 1 addition & 0 deletions interactive_engine/executor/check_format.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ CURRENT=$(pwd)
for dir in "${directories[@]}"; do
cd "${CURRENT}/${dir}"
cargo +nightly fmt -- --check
#cargo +nightly fmt
done
2 changes: 1 addition & 1 deletion interactive_engine/executor/common/dyn_type/rustfmt.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ unstable_features = true
chain_width = 48
max_width = 108
use_small_heuristics = "Max"
fn_args_layout = "Compressed"
fn_params_layout = "Compressed"
group_imports = "StdExternalCrate"
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ unstable_features = true
chain_width = 48
max_width = 108
use_small_heuristics = "Max"
fn_args_layout = "Compressed"
fn_params_layout = "Compressed"
group_imports = "StdExternalCrate"
2 changes: 1 addition & 1 deletion interactive_engine/executor/engine/pegasus/rustfmt.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ unstable_features = true
chain_width = 48
max_width = 108
use_small_heuristics = "Max"
fn_args_layout = "Compressed"
fn_params_layout = "Compressed"
group_imports = "StdExternalCrate"
2 changes: 1 addition & 1 deletion interactive_engine/executor/ir/rustfmt.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ unstable_features = true
chain_width = 48
max_width = 108
use_small_heuristics = "Max"
fn_args_layout = "Compressed"
fn_params_layout = "Compressed"
group_imports = "StdExternalCrate"
2 changes: 1 addition & 1 deletion interactive_engine/executor/rustfmt.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ unstable_features = true
chain_width = 48
max_width = 108
use_small_heuristics = "Max"
fn_args_layout = "Compressed"
fn_params_layout = "Compressed"
group_imports = "StdExternalCrate"
16 changes: 4 additions & 12 deletions interactive_engine/executor/store/groot/src/db/graph/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,23 +186,15 @@ impl Decoder {
fn decode_var_len_property_at<'a>(
&self, reader: &UnsafeBytesReader<'a>, idx: usize,
) -> Option<ValueRef<'a>> {
let info = &self.src.props[idx];
let offset = self.src.offsets[idx];
let end_off = bytes_to_len(reader.read_bytes(offset, 3));
let mut start_off = 0;
let end_off = bytes_to_len(reader.read_bytes(self.src.offsets[idx], 3));
let mut start_off = 0; // idx == self.src.fixed_len_prop_count
if idx > self.src.fixed_len_prop_count {
let offset = self.src.offsets[idx - 1];
start_off = bytes_to_len(reader.read_bytes(offset, 3));
start_off = bytes_to_len(reader.read_bytes(self.src.offsets[idx - 1], 3));
}
let len = end_off - start_off;
if len > bytes_to_len(reader.read_bytes(*self.src.offsets.last().unwrap(), 3)) {
let msg = format!("fatal error! This codec cannot decode the bytes");
let err = gen_graph_err!(GraphErrorCode::DecodeError, msg);
error!("{:?}", err);
return None;
}
let start_off = start_off + self.src.var_len_prop_start_offset;
let bytes = reader.read_bytes(start_off, len);
let info = &self.src.props[idx];
let ret = ValueRef::new(info.r#type, bytes);
Some(ret)
}
Expand Down
2 changes: 1 addition & 1 deletion interactive_engine/executor/store/rustfmt.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ unstable_features = true
chain_width = 48
max_width = 108
use_small_heuristics = "Max"
fn_args_layout = "Compressed"
fn_params_layout = "Compressed"
group_imports = "StdExternalCrate"
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,29 @@ private AdminClient getAdmin() {
if (this.adminClient == null) {
synchronized (this) {
if (this.adminClient == null) {
Map<String, Object> adminConfig = new HashMap<>();
adminConfig.put("bootstrap.servers", this.servers);
this.adminClient = AdminClient.create(adminConfig);
try {
this.adminClient = createAdminWithRetry();
} catch (InterruptedException e) {
logger.error("Create Kafka Client interrupted");
}
}
}
}
return this.adminClient;
}

private AdminClient createAdminWithRetry() throws InterruptedException {
Map<String, Object> adminConfig = new HashMap<>();
adminConfig.put("bootstrap.servers", this.servers);

for (int i = 0; i < 10; ++i) {
try {
return AdminClient.create(adminConfig);
} catch (Exception e) {
logger.warn("Error creating Kafka AdminClient", e);
Thread.sleep(5000);
}
}
throw new RuntimeException("Create Kafka Client failed");
}
}

0 comments on commit 7d43ccf

Please sign in to comment.