Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adapter: correct initial timestamp determination in transactions #20267

Merged
merged 1 commit into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/adapter/src/coord/id_bundle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl Coordinator {
session: &Session,
id_bundle: &CollectionIdBundle,
) -> Vec<String> {
id_bundle
let mut names: Vec<_> = id_bundle
.iter()
// This could filter out an entry that has been replaced in another transaction.
.filter_map(|id| self.catalog().try_get_entry(&id))
Expand All @@ -93,6 +93,8 @@ impl Coordinator {
.resolve_full_name(item.name(), Some(session.conn_id()))
.to_string()
})
.collect()
.collect();
names.sort();
names
}
}
180 changes: 91 additions & 89 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1659,7 +1659,7 @@ impl Coordinator {
});
return;
}
Ok((Some(TransactionOps::Peeks(timestamp_context)), _))
Ok((Some(TransactionOps::Peeks(determination)), _))
if ctx.session().vars().transaction_isolation()
== &IsolationLevel::StrictSerializable =>
{
Expand All @@ -1670,7 +1670,7 @@ impl Coordinator {
response,
action,
},
timestamp_context,
timestamp_context: determination.timestamp_context,
})
.expect("sending to strict_serializable_reads_tx cannot fail");
return;
Expand Down Expand Up @@ -2117,67 +2117,109 @@ impl Coordinator {
when: &QueryWhen,
cluster_id: ClusterId,
timeline_context: TimelineContext,
id_bundle: &CollectionIdBundle,
source_bundle: &CollectionIdBundle,
source_ids: &BTreeSet<GlobalId>,
Comment on lines +2120 to 2121
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an existing issue, but I'm finding it very difficult to tell the difference between source_ids and source_bundle. source_ids seems to be IDs directly referenced in the query, while source_bundle seems to be the result of a call to sufficient_collections on an index_oracle.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not at all important for this PR, I just wanted to call it out.

real_time_recency_ts: Option<Timestamp>,
) -> Result<TimestampDetermination<Timestamp>, AdapterError> {
let determination = self.determine_timestamp(
session,
id_bundle,
when,
cluster_id,
timeline_context.clone(),
real_time_recency_ts,
)?;

// We only track the peeks in the session if the query doesn't use AS
// OF or we're inside an explicit transaction. The latter case is
// necessary to support PG's `BEGIN` semantics, whose behavior can
// depend on whether or not reads have occurred in the txn.
if when.is_transactional() {
session.add_transaction_ops(TransactionOps::Peeks(
determination.timestamp_context.clone(),
))?;
} else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
// If the query uses AS OF, then ignore the timestamp.
session.add_transaction_ops(TransactionOps::Peeks(TimestampContext::NoTimestamp))?;
}

let in_immediate_multi_stmt_txn = session.transaction().is_in_multi_statement_transaction()
&& when == &QueryWhen::Immediately;
let in_immediate_multi_stmt_txn = session.transaction().in_immediate_multi_stmt_txn(when);
let timedomain_bundle;

// Fetch or generate a timestamp for this query and what the read holds would be if we need to set
// them.
let (determination, potential_read_holds) =
match session.get_transaction_timestamp_determination() {
// Use the transaction's timestamp if it exists and this isn't an AS OF query.
Some(
determination @ TimestampDetermination {
timestamp_context: TimestampContext::TimelineTimestamp(_, _),
..
},
) if in_immediate_multi_stmt_txn => (determination, None),
_ => {
let determine_bundle = if in_immediate_multi_stmt_txn {
// In a transaction, determine a timestamp that will be valid for anything in
// any schema referenced by the first query.
timedomain_bundle = self.timedomain_for(
source_ids,
&timeline_context,
session.conn_id(),
cluster_id,
)?;
&timedomain_bundle
} else {
// If not in a transaction, use the source.
source_bundle
};
let determination = self.determine_timestamp(
session,
determine_bundle,
when,
cluster_id,
timeline_context,
real_time_recency_ts,
)?;
// We only need read holds if the read depends on a timestamp. We don't set the
// read holds here because it makes the code a bit more clear to handle the two
// cases for "is this the first statement in a transaction?" in an if/else block
// below.
let read_holds = determination
.timestamp_context
.timestamp()
.map(|timestamp| (timestamp.clone(), determine_bundle));
(determination, read_holds)
}
};

// If we are in a single statement transaction, there is no need to
// acquire read holds to prevent compaction as they will be released
// immediately following the completion of the transaction.
//
// If we're in a multi-statement transaction and the query does not use `AS OF`,
// acquire read holds on any sources in the current time-domain if they have not
// already been acquired. If the query does use `AS OF`, it is not necessary to
// acquire read holds.
if in_immediate_multi_stmt_txn {
self.check_txn_timedomain_conflicts(
session,
&determination.timestamp_context,
id_bundle,
)?;

// If we've already acquired read holds for the txn, we can skip doing so again
if !self.txn_reads.contains_key(session.conn_id()) {
if let Some(timestamp) = determination.timestamp_context.timestamp() {
let timedomain_id_bundle = self.timedomain_for(
source_ids,
&timeline_context,
session.conn_id(),
cluster_id,
)?;

let read_holds =
self.acquire_read_holds(timestamp.clone(), &timedomain_id_bundle);
// Either set the valid read ids for this transaction (if it's the first statement in a
// transaction) otherwise verify the ids referenced in this query are in the timedomain.
if let Some(txn_reads) = self.txn_reads.get(session.conn_id()) {
// Find referenced ids not in the read hold. A reference could be caused by a
// user specifying an object in a different schema than the first query. An
// index could be caused by a CREATE INDEX after the transaction started.
let allowed_id_bundle = txn_reads.id_bundle();
let outside = source_bundle.difference(&allowed_id_bundle);
// Queries without a timestamp and timeline can belong to any existing timedomain.
if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
let valid_names =
self.resolve_collection_id_bundle_names(session, &allowed_id_bundle);
let invalid_names = self.resolve_collection_id_bundle_names(session, &outside);
return Err(AdapterError::RelationOutsideTimeDomain {
relations: invalid_names,
names: valid_names,
});
}
} else {
if let Some((timestamp, bundle)) = potential_read_holds {
let read_holds = self.acquire_read_holds(timestamp, bundle);
self.txn_reads.insert(session.conn_id().clone(), read_holds);
}
}
}

// TODO: Checking for only `InTransaction` and not `Implied` (also `Started`?) seems
// arbitrary and we don't recall why we did it (possibly an error!). Change this to always
// set the transaction ops. Decide and document what our policy should be on AS OF queries.
// Maybe they shouldn't be allowed in transactions at all because it's hard to explain
// what's going on there. This should probably get a small design document.

// We only track the peeks in the session if the query doesn't use AS
// OF or we're inside an explicit transaction. The latter case is
// necessary to support PG's `BEGIN` semantics, whose behavior can
// depend on whether or not reads have occurred in the txn.
let mut transaction_determination = determination.clone();
if when.is_transactional() {
session.add_transaction_ops(TransactionOps::Peeks(transaction_determination))?;
} else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
// If the query uses AS OF, then ignore the timestamp.
transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
session.add_transaction_ops(TransactionOps::Peeks(transaction_determination))?;
};

Ok(determination)
}

Expand Down Expand Up @@ -2245,46 +2287,6 @@ impl Coordinator {
})
}

/// Verifies that no timedomain conflicts exist between the transaction's current read
/// holds and the specified sets of incoming collections
fn check_txn_timedomain_conflicts(
&self,
session: &Session,
timestamp_context: &TimestampContext<Timestamp>,
incoming_id_bundle: &CollectionIdBundle,
) -> Result<(), AdapterError> {
// If there are no `txn_reads`, then this must be the first query in the transaction
// and we can skip timedomain validations.
if let Some(txn_reads) = self.txn_reads.get(session.conn_id()) {
// Queries without a timestamp and timeline can belong to any existing timedomain.
if let TimestampContext::TimelineTimestamp(_, _) = timestamp_context {
// Verify that the references and indexes for this query are in the
// current read transaction.
let allowed_id_bundle = txn_reads.id_bundle();
// Find the first reference or index (if any) that is not in the transaction. A
// reference could be caused by a user specifying an object in a different
// schema than the first query. An index could be caused by a CREATE INDEX
// after the transaction started.
let outside = incoming_id_bundle.difference(&allowed_id_bundle);
if !outside.is_empty() {
let mut valid_names =
self.resolve_collection_id_bundle_names(session, &allowed_id_bundle);
let mut invalid_names =
self.resolve_collection_id_bundle_names(session, &outside);
// Sort so error messages are deterministic.
valid_names.sort();
invalid_names.sort();
return Err(AdapterError::RelationOutsideTimeDomain {
relations: invalid_names,
names: valid_names,
});
}
}
}

Ok(())
}

/// Checks to see if the session needs a real time recency timestamp and if so returns
/// a future that will return the timestamp.
fn recent_timestamp(
Expand Down