Skip to content

Commit

Permalink
Move metadata to single json field
Browse files Browse the repository at this point in the history
  • Loading branch information
the-other-tim-brown committed Apr 25, 2024
1 parent 23e5cdb commit 109c024
Show file tree
Hide file tree
Showing 15 changed files with 210 additions and 139 deletions.
14 changes: 10 additions & 4 deletions api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@
<artifactId>log4j-1.2-api</artifactId>
</dependency>

<!--Jackson -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>

<!-- Junit -->
<dependency>
<groupId>org.junit.jupiter</groupId>
Expand All @@ -70,9 +80,5 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
</project>
104 changes: 0 additions & 104 deletions api/src/main/java/org/apache/xtable/model/TableSyncMetadata.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.model.metadata;

import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Optional;

import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Value;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

import org.apache.xtable.model.exception.ParseException;

/**
* Metadata representing the state of a table sync process. This metadata is stored in the target
* table's properties and is used to track the status of previous sync operation.
*/
@Value
@AllArgsConstructor(access = AccessLevel.PRIVATE)
public class TableSyncMetadata {
private static final int CURRENT_VERSION = 0;
private static final ObjectMapper MAPPER =
new ObjectMapper()
.registerModule(new JavaTimeModule())
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
.setSerializationInclusion(JsonInclude.Include.NON_NULL);

/** Property name for the XTABLE metadata in the table metadata/properties */
public static final String XTABLE_METADATA = "XTABLE_METADATA";

Instant lastInstantSynced;
List<Instant> instantsToConsiderForNextSync;
int version;

public static TableSyncMetadata of(
Instant lastInstantSynced, List<Instant> instantsToConsiderForNextSync) {
return new TableSyncMetadata(lastInstantSynced, instantsToConsiderForNextSync, CURRENT_VERSION);
}

public String toJson() {
try {
return MAPPER.writeValueAsString(this);
} catch (IOException e) {
throw new ParseException("Failed to serialize TableSyncMetadata", e);
}
}

public static Optional<TableSyncMetadata> fromJson(String metadata) {
if (metadata == null || metadata.isEmpty()) {
return Optional.empty();
} else {
try {
TableSyncMetadata parsedMetadata = MAPPER.readValue(metadata, TableSyncMetadata.class);
if (parsedMetadata.getLastInstantSynced() == null) {
throw new ParseException("LastInstantSynced is required in TableSyncMetadata");
}
if (parsedMetadata.getVersion() > CURRENT_VERSION) {
throw new ParseException(
"Unable handle metadata version: "
+ parsedMetadata.getVersion()
+ " max supported version: "
+ CURRENT_VERSION);
}
return Optional.of(parsedMetadata);
} catch (IOException e) {
throw new ParseException("Failed to deserialize TableSyncMetadata", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

import org.apache.xtable.conversion.PerTableConfig;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.TableSyncMetadata;
import org.apache.xtable.model.metadata.TableSyncMetadata;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.storage.DataFilesDiff;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.apache.xtable.model.InternalSnapshot;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.TableChange;
import org.apache.xtable.model.TableSyncMetadata;
import org.apache.xtable.model.metadata.TableSyncMetadata;
import org.apache.xtable.model.sync.SyncMode;
import org.apache.xtable.model.sync.SyncResult;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.model.metadata;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.stream.Stream;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import org.apache.xtable.model.exception.ParseException;

class TestTableSyncMetadata {

@ParameterizedTest
@MethodSource("provideMetadataAndJson")
void jsonRoundTrip(TableSyncMetadata metadata, String expectedJson) {
assertEquals(expectedJson, metadata.toJson());
assertEquals(metadata, TableSyncMetadata.fromJson(expectedJson).get());
}

private static Stream<Arguments> provideMetadataAndJson() {
return Stream.of(
Arguments.of(
TableSyncMetadata.of(
Instant.parse("2020-07-04T10:15:30.00Z"),
Arrays.asList(
Instant.parse("2020-08-21T11:15:30.00Z"),
Instant.parse("2024-01-21T12:15:30.00Z"))),
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"instantsToConsiderForNextSync\":[\"2020-08-21T11:15:30Z\",\"2024-01-21T12:15:30Z\"],\"version\":0}"),
Arguments.of(
TableSyncMetadata.of(Instant.parse("2020-07-04T10:15:30.00Z"), Collections.emptyList()),
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"instantsToConsiderForNextSync\":[],\"version\":0}"),
Arguments.of(
TableSyncMetadata.of(Instant.parse("2020-07-04T10:15:30.00Z"), null),
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"version\":0}"));
}

@Test
void failToParseJsonFromNewerVersion() {
assertThrows(
ParseException.class,
() ->
TableSyncMetadata.fromJson(
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"instantsToConsiderForNextSync\":[\"2020-08-21T11:15:30Z\",\"2024-01-21T12:15:30Z\"],\"version\":1}"));
}

@Test
void failToParseJsonWithMissingLastSyncedInstant() {
assertThrows(
ParseException.class,
() ->
TableSyncMetadata.fromJson(
"{\"instantsToConsiderForNextSync\":[\"2020-08-21T11:15:30Z\",\"2024-01-21T12:15:30Z\"],\"version\":0}"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.apache.xtable.model.InternalSnapshot;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.TableChange;
import org.apache.xtable.model.TableSyncMetadata;
import org.apache.xtable.model.metadata.TableSyncMetadata;
import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.apache.xtable.model.IncrementalTableChanges;
import org.apache.xtable.model.InstantsForIncrementalSync;
import org.apache.xtable.model.InternalSnapshot;
import org.apache.xtable.model.TableSyncMetadata;
import org.apache.xtable.model.metadata.TableSyncMetadata;
import org.apache.xtable.model.sync.SyncMode;
import org.apache.xtable.model.sync.SyncResult;
import org.apache.xtable.spi.extractor.ConversionSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import org.apache.xtable.conversion.PerTableConfig;
import org.apache.xtable.exception.NotSupportedException;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.TableSyncMetadata;
import org.apache.xtable.model.metadata.TableSyncMetadata;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.storage.DataFilesDiff;
Expand Down Expand Up @@ -202,9 +202,12 @@ public void completeSync() {

@Override
public Optional<TableSyncMetadata> getTableMetadata() {
return TableSyncMetadata.fromMap(
JavaConverters.mapAsJavaMapConverter(deltaLog.snapshot().metadata().configuration())
.asJava());
return TableSyncMetadata.fromJson(
deltaLog
.snapshot()
.metadata()
.configuration()
.getOrElse(TableSyncMetadata.XTABLE_METADATA, () -> null));
}

@Override
Expand Down Expand Up @@ -265,9 +268,10 @@ private void commitTransaction() {
}

private Map<String, String> getConfigurationsForDeltaSync() {
Map<String, String> configMap = new HashMap<>(metadata.asMap());
Map<String, String> configMap = new HashMap<>();
configMap.put(DeltaConfigs.MIN_READER_VERSION().key(), MIN_READER_VERSION);
configMap.put(DeltaConfigs.MIN_WRITER_VERSION().key(), MIN_WRITER_VERSION);
configMap.put(TableSyncMetadata.XTABLE_METADATA, metadata.toJson());
// Sets retention for the Delta Log, does not impact underlying files in the table
configMap.put(
DeltaConfigs.LOG_RETENTION().key(), String.format("interval %d hours", retentionInHours));
Expand Down
Loading

0 comments on commit 109c024

Please sign in to comment.