Skip to content

Commit

Permalink
fix : insert lineprotocol error
Browse files Browse the repository at this point in the history
  • Loading branch information
Subsegment authored and roseboy-liu committed Jul 26, 2023
1 parent 9a42b66 commit 8a8b09a
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 17 deletions.
30 changes: 20 additions & 10 deletions coordinator/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,17 +395,27 @@ impl PointWriter {
span_recorder.span_ctx(),
)
.await;
if let Err(err @ CoordinatorError::FailoverNode { .. }) = result {
debug!(
"write data to remote {}({}) failed; write to hinted handoff!",
node_id, vnode_id
);

span_recorder.error(err.to_string());

return self
.write_to_handoff(vnode_id, node_id, tenant, precision, data)
.await;
if let Err(ref err) = result {
let meta_retry = MetaError::Retry {
msg: "default".to_string(),
};
let tskv_memory = tskv::Error::MemoryExhausted;
if matches!(*err, CoordinatorError::FailoverNode { .. })
|| err.error_code().to_string() == meta_retry.error_code().to_string()
|| err.error_code().to_string() == tskv_memory.error_code().to_string()
{
debug!(
"write data to remote {}({}) failed; write to hinted handoff!",
node_id, vnode_id
);

span_recorder.error(err.to_string());

return self
.write_to_handoff(vnode_id, node_id, tenant, precision, data)
.await;
}
}

debug!(
Expand Down
7 changes: 5 additions & 2 deletions meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,12 @@ pub enum MetaError {
#[error_code(code = 24)]
RequestLimit { kind: RequestLimiterKind },

#[snafu(display("An error occurred while processing the data. Please try again"))]
#[snafu(display(
"An error occurred while processing the data: {}. Please try again",
msg
))]
#[error_code(code = 25)]
Retry,
Retry { msg: String },

#[snafu(display("{}", msg))]
ObjectLimit { msg: String },
Expand Down
20 changes: 19 additions & 1 deletion meta/src/model/meta_tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use store::command;
use trace::info;

use crate::error::{MetaError, MetaResult};
use crate::store::command::EntryLog;
use crate::store::command::{EntryLog, ReadCommand};
use crate::store::key_path;
use crate::{client, store};

Expand Down Expand Up @@ -468,6 +468,24 @@ impl TenantMeta {
Ok(None)
}

pub async fn get_tskv_table_schema_by_meta(
&self,
db: &str,
table: &str,
) -> MetaResult<Option<Arc<TskvTableSchema>>> {
let req = ReadCommand::TableSchema(
self.cluster.clone(),
self.tenant.name().to_string(),
db.to_string(),
table.to_string(),
);
let rsp = self.client.read::<Option<TableSchema>>(&req).await?;
if let Some(TableSchema::TsKvTableSchema(val)) = rsp {
return Ok(Some(val));
}
Ok(None)
}

pub fn get_external_table_schema(
&self,
db: &str,
Expand Down
2 changes: 2 additions & 0 deletions meta/src/store/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ pub enum ReadCommand {
Tenant(String, String),
// cluster
Tenants(String),
// cluster, tenant, db, table
TableSchema(String, String, String, String),
}

pub const ENTRY_LOG_TYPE_SET: i32 = 1;
Expand Down
4 changes: 4 additions & 0 deletions meta/src/store/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,10 @@ impl StateMachine {
response_encode(self.get_struct::<Tenant>(&path))
}
ReadCommand::Tenants(cluster) => response_encode(self.process_read_tenants(cluster)),
ReadCommand::TableSchema(cluster, tenant_name, db_name, table_name) => {
let path = KeyPath::tenant_schema_name(cluster, tenant_name, db_name, table_name);
response_encode(self.get_struct::<TableSchema>(&path))
}
}
}

Expand Down
30 changes: 26 additions & 4 deletions tskv/src/schema/schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,19 @@ impl DBschemas {
let schema_get = self
.client
.get_tskv_table_schema(&schema.db, &schema.name)
.map_err(|_| MetaError::Retry)?
.ok_or(MetaError::Retry)?;
.map_err(|e| MetaError::Retry { msg: e.to_string() })?;
let schema_get = match schema_get {
None => self
.client
.get_tskv_table_schema_by_meta(&schema.db, &schema.name)
.await
.map_err(|e| MetaError::Retry { msg: e.to_string() })?
.ok_or(MetaError::Retry {
msg: "Table not found after TableAlreadyExists".to_string(),
})?,
Some(schema) => schema,
};

if schema.tenant == schema_get.tenant
&& schema.db == schema_get.db
&& schema.columns() == schema_get.columns()
Expand All @@ -223,8 +234,19 @@ impl DBschemas {
let schema_get = self
.client
.get_tskv_table_schema(&schema.db, &schema.name)
.map_err(|_| MetaError::Retry)?
.ok_or(MetaError::Retry)?;
.map_err(|e| MetaError::Retry { msg: e.to_string() })?;
let schema_get = match schema_get {
None => self
.client
.get_tskv_table_schema_by_meta(&schema.db, &schema.name)
.await
.map_err(|e| MetaError::Retry { msg: e.to_string() })?
.ok_or(MetaError::Retry {
msg: "Table not found when retry update table schema"
.to_string(),
})?,
Some(schema) => schema,
};
let mut schema = schema.as_ref().clone();
schema.schema_id = schema_get.schema_id + 1;
let schema = Arc::new(schema);
Expand Down

0 comments on commit 8a8b09a

Please sign in to comment.