Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cosmos] Full Fidelity Change Feed changes for pull model #30161

Merged
merged 8 commits into from
Jul 29, 2022

Conversation

simorenoh
Copy link
Member

@simorenoh simorenoh commented Jul 27, 2022

This PR contains the changes for using the pull model full-fidelity change feed, including tests and spark changes.

Porting from this last opened PR: #29799

Reference .Net SDK PR - Azure/azure-cosmos-dotnet-v3#3197

@azure-sdk
Copy link
Collaborator

API change check

APIView has identified API level changes in this PR and created following API reviews.

azure-cosmos

private def getAttributeNode(objectNode: ObjectNode, attributeName: String): JsonNode = {
objectNode.get(attributeName) match {
case jsonNode: JsonNode => jsonNode
case _ => null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use null in Scala - it is considered a No-Go (Option[T]) is used instead.

I don't actually think you need this helper method. objectNode.get("name") is all this one is doing (when using Option instead of null)

But objectNode.get just returns a JsonNode - could be anything from Object to Array - that is why before we had separate helper methods also validating the expected return type.

You can switch to moving objectNode.get instead but need to add validation that the returned JsonNode is of the expected type

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, will do.

def getChangeFeedLsn(objectNode: ObjectNode): String = {
getAttributeNode(objectNode, MetadataJsonBodyAttributeName) match {
case metadataNode: JsonNode =>
metadataNode.get(MetadataLsnAttributeName) match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is identical to getAttributeAsString ?

Copy link
Member

@kushagraThapar kushagraThapar Jul 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the only difference between them is getAttributeAsString returns objectNode.toString() whereas this returns objectNode.asText(), which I think are different, because toString() is called on object node type, whereas asText() is called on value node type.

@@ -113,6 +108,13 @@ private[cosmos] class CosmosRowConverter(
new GenericRowWithSchema(values.toArray, schema)
}

def fromObjectNodeToRowV1(schema: StructType,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume V1 is for ChangeFeed V1 - please add because in two weeks no-one can remember what V1 and V2 stand for otherwise

like fromObjectNodeToChangeFeedRowV1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack, will do.

var currentNode = getAttributeNode(objectNode, CurrentAttributeName)
if (currentNode == null || currentNode.isEmpty) {
currentNode = getAttributeNode(objectNode, PreviousRawJsonBodyAttributeName)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

factor out finding the "right" payload node into separate function to avoid code duplication

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack, will do.

@@ -735,6 +796,34 @@ private[cosmos] class CosmosRowConverter(
Option(objectNode.get(name)).map(convertToSparkDataType(dataType, _, schemaConversionMode)).orNull
}

private def convertStructToSparkDataTypeV1(schema: StructType,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment regadring V1 vs. V2 with prefix as above

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack, will do.

println("Input : ", inputArrayBuffer.mkString(","))
println("Output : ", outputArray.mkString(","))
if (inputArrayBuffer.length != outputArray.length) {
return false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Long story short - never use return in scala - this is one of the things in scala that confused the hack out of me initiially. return has different semantic based on from where the function is called - if you google for retrun and scala you'll find the details - long-story short - never use return, just amke sure the last line returns the value.

here to simplify I would simply rename the method to validateArraysUnordered and replcae the checks where you return false with an assert (the xxx shouldEqual yyy used in otherplaces)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack, will do.

@@ -42,7 +42,11 @@ public interface Lease {
*/
String getTimestamp();

ChangeFeedState getContinuationState(
ChangeFeedState getIncrementalContinuationState(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please rename getContinuationStateWireFormatV0 vs. getContinuationStateWireFormatV1 or similar?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one actually has nothing to do with change feed wire format. One is for getting the continuation state for incremental mode, and one for full fidelity mode.

I guess once I make the lease changes as well, then there will be a good separation of these continuation states based on different lease structure. I might need to create a new lease altogether, will update it then if that's okay with you.

@@ -19,8 +16,9 @@ import org.apache.spark.sql.catalyst.expressions.{GenericRowWithSchema, UnsafeMa
import org.apache.spark.sql.catalyst.util.ArrayData

import java.io.IOException
import java.time.{Instant, LocalDate, OffsetDateTime, ZoneOffset}
import java.sql.{Date, Timestamp}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add corresponding Test coverage in CosmosRowConverterITest and CosmosRowConverterSpec - these were created by Matias - and are one of the best set of tests we have in all of the spark connector. Having extensive coverage of teh RowConverter functionality there has proven very useful

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack, will do.

Copy link
Member

@FabianMeiswinkel FabianMeiswinkel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good - tests missing in CosmosRowConverterITest/CosmosRowConverterSpec and two "scala coding violations" ( and ) summarizes my requested changes.

Copy link
Member

@FabianMeiswinkel FabianMeiswinkel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM now - thanks!


private val changeFeedRequestOptions = {
private val changeFeedRequestOptions = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove the extra space

objectNode.get(MetadataJsonBodyAttributeName) match {
case metadataNode: JsonNode =>
metadataNode.get(TimeToLiveExpiredPropertyName) match {
case valueNode: JsonNode =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent

objectNode.get(MetadataJsonBodyAttributeName) match {
case metadataNode: JsonNode =>
metadataNode.get(OperationTypeAttributeName) match {
case valueNode: JsonNode =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent

// For multi-master, crts will be the latest resolution timestamp of any conflicts
private def parseTimestamp(objectNode: ObjectNode): Long = {
objectNode.get(MetadataJsonBodyAttributeName) match {
case metadataNode: JsonNode =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if objectNode does not have MetadataJsonBodyAttribute

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will always have the metadata json body attribute name.
private[spark] val defaultFullFidelityChangeFeedSchemaForInferenceDisabled = StructType(Seq(
StructField(RawJsonBodyAttributeName, StringType, nullable=true),
StructField(IdAttributeName, StringType, nullable=false),
StructField(TimestampAttributeName, LongType, nullable=false),
StructField(ETagAttributeName, StringType, nullable=false),
StructField(LsnAttributeName, LongType, nullable=false),
StructField(MetadataJsonBodyAttributeName, StringType, nullable=false),
StructField(PreviousRawJsonBodyAttributeName, StringType, nullable=true),
StructField(OperationTypeAttributeName, StringType, nullable=false),
StructField(CrtsAttributeName, LongType, nullable=false),
StructField(PreviousImageLsnAttributeName, LongType, nullable=true)
))

It is not nullable.

// If there is no continuation token, we start from now (which is by default).
// On REST level, change feed is using IfNoneMatch/ETag instead of continuation.
request.getHeaders().put(HttpConstants.HttpHeaders.IF_NONE_MATCH,
HttpConstants.HeaderValues.IF_NONE_MATCH_ALL);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we do not need to populate here for Incremental mode?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For incremental, this header is not mandatory, and if not present it is default by beginning. For full fidelity, this header is mandatory.

Copy link
Member

@xinlian12 xinlian12 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks

@kushagraThapar
Copy link
Member

/azp run java - cosmos - tests

@azure-pipelines
Copy link

Azure Pipelines successfully started running 1 pipeline(s).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants