Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support binary file formats for indexing tasks #8812

Closed
jihoonson opened this issue Nov 2, 2019 · 2 comments
Closed

Support binary file formats for indexing tasks #8812

jihoonson opened this issue Nov 2, 2019 · 2 comments

Comments

@jihoonson
Copy link
Contributor

jihoonson commented Nov 2, 2019

Motivation

Native indexing tasks currently support only text file formats because FirehoseFactory is tightly coupled with InputRowParser (#5584). Other issues around FirehoseFactory and InputRowParser are:

  • Sampler and indexing task use the same Firehose interface even though their use cases are pretty different.
  • InputRowParser doesn't have to be exposed in the user spec.
  • Parser is only for text file formats, but all ParseSpecs have makeParser().
  • Parallel indexing tasks will need to be able to read a portion of a file for finer-grained parallelism in the future. To do this, the interface abstracting file formats should have a trait of splittable.
  • Kafka/Kinesis tasks can use only implementations of ByteBufferInputRowParser. This forces us to have duplicate implementations for batch and realtime tasks (AvroHadoopInputRowParser vs AvroStreamInputRowParser).

Since It's not easy to modify or improve existing interfaces without huge change, we need new interfaces that can be used instead of FirehoseFactory, Firehose, InputRowParser, and ParseSpec. The new interfaces should support the following storage types and file formats.

  • Storage types
    • HDFS
    • cloud (s3, gcp, etc)
    • local
    • http
    • sql
    • byte (inline, kafka/kinesis tasks)
    • Druid (reingtestion)
  • File formats
    • csv
    • tsv
    • json
    • regex
    • influx
    • javascript
    • avro
    • orc
    • parquet
    • protobuf
    • thrift

Proposed changes

The proposed new interfaces are:

InputSource

InputSource abstracts the storage where input data comes from for batch ingestion. This will replace FiniteFirehoseFactory.

public interface InputSource
{
  /**
   * Returns true if this inputSource can be processed in parallel using ParallelIndexSupervisorTask.
   */
  boolean isSplittable();

  /**
   * Returns true if this inputSource supports different {@link InputFormat}s.
   */
  boolean needsFormat();

  InputSourceReader reader(
      InputRowSchema inputRowSchema,
      @Nullable InputFormat inputFormat,
      @Nullable File temporaryDirectory
  );
}

InputRowSchema is the schema for InputRow to be created.

public class InputRowSchema
{
  private final TimestampSpec timestampSpec;
  private final DimensionsSpec dimensionsSpec;
  private final List<String> metricsNames;
}

SplittableSource is the splittable InputSource that can be processed in parallel.

public interface SplittableInputSource<T> extends InputSource
{
  @JsonIgnore
  @Override
  default boolean isSplittable()
  {
    return true;
  }

  Stream<InputSplit<T>> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException;

  int getNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException;

  SplittableInputSource<T> withSplit(InputSplit<T> split);
}

Check HttpInputSource as an example.

InputSourceReader and InputSourceSampler

You can create InputSourceReader and InputSourceSampler from InputSource. InputSourceReader is for reading inputs and creating segments while InputSourceSampler is for sampling inputs.

public interface InputSourceReader
{
  CloseableIterator<InputRow> read() throws IOException;

  CloseableIterator<InputRowPlusRaw> sample() throws IOException;
}

These reader and sampler are the interfaces what users will use directly. SplitIteratingReader is an example of InputSourceReader.

ObjectSource and InputFormat

InputSourceReader and InputSourceSampler will internally use ObjectSource and InputFormat. ObjectSource knows how to read bytes from the given object.

public interface ObjectSource<T>
{
  int FETCH_BUFFER_SIZE = 4 * 1024;
  int MAX_FETCH_RETRY = 3;

  interface CleanableFile
  {
    File file();

    void cleanup();
  }

  CleanableFile fetch(File temporaryDirectory, byte[] fetchBuffer) throws IOException;

  T getObject();

  InputStream open() throws IOException;

  Predicate<Throwable> getRetryCondition();
}

You can directly open an InputStrema on the ObjectSource or fetch() the remote object into a local disk and open a FileInputStream on it. This may be useful to avoid expensive random access on remote storage (e.g., Orc file in s3) or holding connections for too long time (as in SqlFirehoseFactory). Check HttpSource as an example ObjectSource.

InputFormat knows how to parse bytes.

public interface InputFormat
{
  boolean isSplittable();

  ObjectReader createReader(InputRowSchema inputRowSchema);
}

ObjectReader actually reads and parses data and returns an interator of InputRow.

public interface ObjectReader
{
  CloseableIterator<InputRow> read(ObjectSource<?> source, File temporaryDirectory) throws IOException;

  CloseableIterator<InputRowPlusRaw> sample(ObjectSource<?> source, File temporaryDirectory) throws IOException;
}

For example, OrcInputFormat creates OrcReader. Note that this implementation is really not optimized but will show how it could be implemented.

Deprecated ParseSpec

The existing ParseSpec will be split into TimestampSpec, DimensionsSpec, and Inputformat. TimestampSpec and DimensionsSpec will be at the top level of DataSchema. InputFormat will be in ioConfig.

An example spec is:

{
  "type": "index_parallel",
  "spec": {
    "dataSchema": {
      "dataSource": "wikipedia",
      "timestampSpec": {
        "column": "timestamp",
        "format": "iso"
      },
      "dimensionsSpec": {
        "dimensions": [
          "channel",
          "cityName",
          "comment",
          "countryIsoCode",
          "countryName",
          "diffUrl",
          "flags",
          "isAnonymous",
          "isMinor",
          "isNew",
          "isRobot",
          "isUnpatrolled",
          "namespace",
          "page",
          "regionIsoCode",
          "regionName",
          "user"
        ]
      },
      "metricsSpec": [
        {
          "type": "count",
          "name": "count"
        }
      ],
      "granularitySpec": {
        "type": "uniform",
        "segmentGranularity": "DAY",
        "rollup": true,
        "intervals": ["2018/2019"]
      }
    },
    "ioConfig": {
      "type": "index_parallel",
      "inputSource": {
        "type": "http",
        "uris": [
          "https://path/to/wikipedia.json.gz"
        ]
      },
      "inputFormat": {
        "type": "json"
      },
      "appendToExisting": false
    },
    "tuningConfig": {
      "type": "index_parallel",
      "maxNumConcurrentSubTasks": 10
    }
  }
}

Prefetch and cache

The FirehoseFactorys extending PrefetchableTextFilesFirehoseFactory currently support prefetching and caching which I don't find very useful. I ran a couple of tests in a cluster and my laptop. The total time taken to download 2 files (200 MB each) from s3 into local storage was 4 sec and 20 sec, whereas the total ingestion time was 20 min and 30 min, respectively. I think the more important issue is probably the indexing or the segment merge speed. The prefetch and cache will be not supported with new interfaces until it becomes a real bottleneck.

FirehoseFactory

FirehoseFactory will remain for RealtimeIndexTask and AppenderatorDriverRealtimeIndexTask.

Rationale

One possible alternative would be modifying existing interfaces. I think adding new ones will be better because the new ones are pretty different from existing ones.

Operational impact

A couple of interfaces will be deprecated but kept for a couple of future releases. This means, the old spec will be still respected.

  • ParseSpec will be deprecated and split into TimestampSpec, DimensionsSpec, and InputFormat.
  • DataSchema will have TimestampSpec and DimensionsSpec from the deprecated ParseSpec.
  • IOConfig will have InputFormat and InputSource.
  • FirehoseFactory will be deprecated for all batch tasks in favor of InputSource. This will not be applied to the compaction task since it doesn't have firehoseFactory in its spec.
  • InputRowParser will be deprecated.

Test plan (optional)

Unit tests will be added to test backward compatibility and the implementations of new interfaces.

Future work (optional)

A new method can be added to ObjectSource for more optimized data scan.

int read(ByteBuffer buffer, int offset, int length) throws IOException;
@jihoonson jihoonson changed the title Support binary file formats for batch tasks Support binary file formats for indexing tasks Nov 2, 2019
@jihoonson
Copy link
Contributor Author

This is a pretty big refactoring. I will add new interfaces first and then implement individual file formats and storage types.

@jihoonson
Copy link
Contributor Author

Closing this proposal since it was added in 0.17.0.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant