Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write data to iceberg with using java api multi thread, cause data lost #8610

Closed
lengkristy opened this issue Sep 21, 2023 · 2 comments
Closed

Comments

@lengkristy
Copy link

lengkristy commented Sep 21, 2023

java code just like this:
`
Configuration configuration = new Configuration();
// this is a local file catalog
HadoopCatalog hadoopCatalog = new HadoopCatalog(configuration, icebergWareHousePath);
TableIdentifier name = TableIdentifier.of("logging", "logs");
Schema schema = new Schema(
Types.NestedField.required(1, "level", Types.StringType.get()),
Types.NestedField.required(2, "event_time", Types.TimestampType.withZone()),
Types.NestedField.required(3, "message", Types.StringType.get()),
Types.NestedField.optional(4, "call_stack", Types.ListType.ofRequired(5, Types.StringType.get()))
);
PartitionSpec spec = PartitionSpec.builderFor(schema)
.hour("event_time")
.identity("level")
.build();
Table table = hadoopCatalog.createTable(name, schema, spec);

    GenericAppenderFactory appenderFactory = new GenericAppenderFactory(table.schema());

    int partitionId = 1, taskId = 1;
    OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table, partitionId, taskId).format(FileFormat.PARQUET).build();
    final PartitionKey partitionKey = new PartitionKey(table.spec(), table.spec().schema());

    // partitionedFanoutWriter will auto partitioned record and create the partitioned writer
    PartitionedFanoutWriter<Record> partitionedFanoutWriter = new PartitionedFanoutWriter<Record>(table.spec(), FileFormat.PARQUET, appenderFactory, outputFileFactory, table.io(), TARGET_FILE_SIZE_IN_BYTES) {
        @Override
        protected PartitionKey partition(Record record) {
            partitionKey.partition(record);
            return partitionKey;
        }
    };

    Random random = new Random();
    List<String> levels = Arrays.asList("info", "debug", "error", "warn");
    GenericRecord genericRecord = GenericRecord.create(table.schema());

    // assume write 1000 records
    for (int i = 0; i < 1000; i++) {
        GenericRecord record = genericRecord.copy();
        record.setField("level",  levels.get(random.nextInt(levels.size())));

// record.setField("event_time", System.currentTimeMillis());
record.setField("event_time", OffsetDateTime.now());
record.setField("message", "Iceberg is a great table format");
record.setField("call_stack", Arrays.asList("NullPointerException"));
partitionedFanoutWriter.write(record);
}

    AppendFiles appendFiles = table.newAppend();

    // submit datafiles to the table
    Arrays.stream(partitionedFanoutWriter.dataFiles()).forEach(appendFiles::appendFile);

    // submit snapshot
    Snapshot newSnapshot = appendFiles.apply();
    appendFiles.commit();

`
Data loss may occur when writing iceberg with high concurrency.

@paulpaul1076
Copy link

Are you using the local file system? Your comment says "this is a local file catalog". It says here: https://iceberg.apache.org/docs/latest/java-api-quickstart/#using-a-hadoop-catalog that concurrent writes to a local fs with hadoop catalog are not safe. In order for them to be safe your fs has to support atomic rename.

@rdblue
Copy link
Contributor

rdblue commented Oct 5, 2023

Yes, I agree. The safety of the HadoopCatalog depends on the file system, which is one reason why we don't recommend using that catalog.

@rdblue rdblue closed this as completed Oct 5, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants