Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FileIO writeDynamic with AvroIO.sink not writing all data #20332

Closed
damccorm opened this issue Jun 4, 2022 · 4 comments
Closed

FileIO writeDynamic with AvroIO.sink not writing all data #20332

damccorm opened this issue Jun 4, 2022 · 4 comments

Comments

@damccorm
Copy link
Contributor

damccorm commented Jun 4, 2022

FileIO writeDynamic with AvroIO.sink is not writing all data in the following pipeline. The amount of data written varies between runs but it is consistently dropping records. This is with a very small test dataset - 6 records, which should produce 3 directories.


Pipeline p = Pipeline.create(options);
PCollection<KV<String, AvroRecord>> records = p.apply(TextIO.read().from("/tmp/input.csv"))
.apply(ParDo.of(new
StringToDatasetIDAvroRecordFcn()));

//write out into AVRO in each separate directory
records.apply("Write
avro file per dataset", FileIO.<String, KV<String, AvroRecord>>writeDynamic()
  .by(KV::getKey)
 
.via(Contextful.fn(KV::getValue), Contextful.fn(x -> AvroIO.sink(AvroRecord.class).withCodec(BASE_CODEC)))

 .to(options.getTargetPath())
  .withDestinationCoder(StringUtf8Coder.of())
  .withNaming(key -> defaultNaming(key
+ "/export", PipelinesVariables.Pipeline.AVRO_EXTENSION)));

p.run().waitUntilFinish();

If i replace AvroIO.sink() with TextIO.sink() (and replace the initial mapping function) then the correct number of records are written to the separate directories. This is working consistently.

e.g.


// Initialise pipeline
Pipeline p = Pipeline.create(options);

PCollection<KV<String, String>> records
= p.apply(TextIO.read().from("/tmp/input.csv")).apply(ParDo.of(new StringToDatasetIDKVFcn()));

//write
out into AVRO in each separate directory
records.apply("Write CSV file per dataset", FileIO.<String,
KV<String, String>>writeDynamic()
    .by(KV::getKey)
    .via(Contextful.fn(KV::getValue), TextIO.sink())

   .to(options.getTargetPath())
    .withDestinationCoder(StringUtf8Coder.of())
    .withNaming(datasetID
-> defaultNaming(key + "/export", ".csv"));

 p.run().waitUntilFinish();

cc [~timrobertson100]

Imported from Jira BEAM-10100. Original Jira may contain additional context.
Reported by: djtfmartin.

@kennknowles
Copy link
Member

@johnjcasey this is reminiscent of something discussed today

@johnjcasey
Copy link
Contributor

@Abacn you were attempting to repro this, did you have any success?

@Abacn
Copy link
Contributor

Abacn commented Sep 14, 2022

@johnjcasey I was not able to reproduce the issue with flink running locally.

@johnjcasey
Copy link
Contributor

I'm going to close this as not reproduceable

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants