-
Notifications
You must be signed in to change notification settings - Fork 1.3k
[FLINK-27366] Record schema on filesystem path #101
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
2e1c59d to
c7c9095
Compare
| /** Json serializer for jackson. */ | ||
| public interface JsonSerializer<T> { | ||
|
|
||
| void serializer(T t, JsonGenerator generator) throws IOException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
serialize
|
|
||
| void serializer(T t, JsonGenerator generator) throws IOException; | ||
|
|
||
| T deserializer(JsonNode node); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deserialize
| return fieldIds.stream().max(Integer::compareTo).orElse(-1); | ||
| } | ||
|
|
||
| private static void collectFieldIds(Set<Integer> fieldIds, DataType type) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DataType type, Set<Integer> fieldIds. Input arguments should be in front of output arguments.
| return listVersionedFiles(schemaDirectory(), SCHEMA_PREFIX) | ||
| .reduce(Math::max) | ||
| .map(this::schema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add hint files just like SnapshotFinder? Maybe extract common classes for both snapshot and schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's necessary for schema because there won't be too many versions of schema
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
schema changes are a very low frequency thing (Compared to snapshot generation)
| return schema; | ||
| } else { | ||
| // retry | ||
| FileUtils.deleteOrWarn(temp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this method fails with exception temp file will not be cleaned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but we dont have other solutions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You have. Wrap the code with try... finally....
| import static org.junit.jupiter.api.Assertions.assertThrows; | ||
|
|
||
| /** Test for {@link SchemaManager}. */ | ||
| public class SchemaManagerTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lacks concurrent tests and cleanup tests for commitNewVersion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add after each check for cleanup tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add one testConcurrentCommit test.
| FileUtils.writeFileUtf8(temp, schema.toString()); | ||
|
|
||
| Boolean success = lock.runWithLock(() -> temp.getFileSystem().rename(temp, finalFile)); | ||
| if (success) { | ||
| return schema; | ||
| } else { | ||
| // retry | ||
| FileUtils.deleteOrWarn(temp); | ||
| boolean success = false; | ||
| try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
try should also cover file write. File write may be partial
We can store the schema on the path of the table store, which includes type, options, etc.
This schema should be in a format that supports evolution, which means that the fields contain id information.