Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for building materialized views using Lucene formats #13188

Open
bharath-techie opened this issue Mar 18, 2024 · 13 comments
Open

Support for building materialized views using Lucene formats #13188

bharath-techie opened this issue Mar 18, 2024 · 13 comments

Comments

@bharath-techie
Copy link

Description

We are exploring the use case of building materialized views for certain fields and dimensions using Star Tree index while indexing the data. This will be based on the configured fields (dimensions and metrics) during index creation. This is inspired from http://hanj.cs.illinois.edu/pdf/vldb03_starcube.pdf and Apache Pinot’s Star Tree index. Star Tree helps to enforce upper bound on the aggregation queries ensuring predictable latency and resource usage, it is also storage space efficient and configurable.
OpenSearch RFC : opensearch-project/OpenSearch#12498

Creating this issue to discuss approaches to support Star Tree in Lucene and also to get feedback on any other approaches/recommendations from the community.

Quick overview on Star Tree index creation flow

The Star Tree DocValues fields and Star Tree index are created during the flush / merge flows of indexing

Flush / merge flow

  1. Create initial set of Star Tree documents based on the configured dimensions and metrics.
  2. Sort the Star Tree documents based on dimensions (fields) and aggregate on the metrics (fields).
  3. Create Star Tree index.
  4. Create Star Tree DocValues fields for each of the Star Tree dimensions and metrics

star-lucene

Challenges

Main challenge is that ‘StarTree’ index is a multi-field index compared to other formats in Lucene / OpenSearch. This makes it infeasible to use the PerField extension defined in Lucene today. We explored ‘BinaryDocValues’ to encode dimensions and metrics, but the ‘type’ of dimensions and metrics are different. So we couldn’t find a way to extend it. [Dimensions could be numeric or text or combination].

Create Star Tree index

Approach 1 - Create a new format to build materialized views

We can create a new dedicated file format (similar to points format, postings format) for materialized views which accepts list of dimensions and metrics and the default implementation for it could be the Star Tree index.

Pros

  • This can be the standard format for supporting materialized views in Lucene which developers can use or extend to create custom solutions.
  • With this format, users can directly create materialized views during index time without the storage of original documents via DocValues indices

Cons

  • This will be a maintenance overhead.

Approach 2 - Extend DocValues format

Indexing - Extend DocValues to support materialized views

We can extend DocValues format to support a new type of field ‘AGGREGATED’ which will hold the configured list of dimensions and metrics by the user during index creation.

AggregatedField {
    List<String>      DimensionFields
    List<MetricConfig> MetricFields
}
MetricConfig {
     FieldName fieldName
    MetricFunction function
}
MetricFunction {
     SUM,
    AVG,
    COUNT
    ....
}

During flush / merge , the values of the dimensions and metrics will be read from the associated ‘DocValues’ fields using DocValuesProducer and we will create the Star Tree indices as per the steps mentioned above.

Search flow

We can extend ‘LeafReader’ and ‘DocValuesProducer’ with a new method ‘getAggregatedDocValues’ to get the Star Tree index during query time. This retrieves the root of the Star Tree and the dimensions and metrics DocValues fields.

Pros

  • If the above extensions are in place , any custom codec implementation of ‘DocValues’ can have a custom implementation for materialized views which can create relevant indices during flush/merge.
  • Less maintenance overhead

Cons

  • Tight coupling between DocValuesFormat and materialized views.
  • The original documents are needed to create the derived aggregated Star Tree documents

Open questions

Any suggestions on a way to pack values of ‘dimensions’ and ‘metrics' as part of ‘AggregatedField’ during indexing as part of ‘addDocument’ flow? Also, should we explore this or we can simply create the derived ‘AggregatedField’ during flush/merge ?

Create Star Tree DocValues fields

Star Tree index is backed by Star Tree DocValues fields.
So to read/write, we can reuse the existing ‘DocValuesFormat’. Each field is stored as ‘Numeric’ DocValues field or ‘SortedSet’ DocValues field in case of text fields.

To accommodate this, we propose to make DocValuesFormat extend ‘Codec’ and ‘Extension’ , so that we can create the StarTree DocValues fields with custom extensions.

@Override
  public DocValuesConsumer fieldsConsumer(SegmentWriteState state, DATA_CODEC, DATA_EXTENSION, META_CODEC, META_EXTENSION) throws IOException {
    return new Lucene90DocValuesConsumer(
        state, DATA_CODEC, DATA_EXTENSION, META_CODEC, META_EXTENSION);
  }

  @Override
  public DocValuesProducer fieldsProducer(SegmentWriteState state, DATA_CODEC, DATA_EXTENSION, META_CODEC, META_EXTENSION) throws IOException {
    return new Lucene90DocValuesProducer(
        state, DATA_CODEC, DATA_EXTENSION, META_CODEC, META_EXTENSION);
  }
@jpountz
Copy link
Contributor

jpountz commented Mar 19, 2024

Figuring out the right API for this idea sounds challenging, but I like the idea.

@msfroh
Copy link
Contributor

msfroh commented Apr 2, 2024

I wonder if we could think of this more broadly as a caching problem.

Basically, you could evaluate some "question" (aggregations, statistics, etc.) for all segments and save the index-level question and the per-SegmentCommitInfo answers to disk. When a new segment gets flushed (or produced by a merge), we could eagerly evaluate the question against the new segment. (We would probably also need to reevaluate for segments that have new deletes, and maybe updated doc values.)

@bharath-techie
Copy link
Author

bharath-techie commented Apr 22, 2024

There are several advantages to keeping the new index as part of the same Lucene segment. It reduces maintenance overhead and enables Near Real-Time (NRT) use cases. Specifically, for the star tree index, incrementally building the star tree as segments get flushed and merged takes significantly less time, as the sort and aggregation operations can be optimized.

Considering these advantages, I'm further exploring the idea of a new format to support multi-field indices, which can also be extended to create other types of composite indices.

DataCubesFormat vs CompositeValuesFormat

Since Lucene is also used for OLAP use cases, we can create a 'DataCubesFormat' specifically designed to create multi-field indices on a set of dimensions and metrics. [Preferred]

Alternatively, if we want a more generic format for creating indices based on any set of fields, we could go with 'CompositeValuesFormat'.

While the underlying implementation for both formats would be similar (creating indices on a set of Lucene fields), 'DataCubesFormat' is more descriptive and tailored to the OLAP use case.

Implementation

For clarity, we will focus on 'DataCubesFormat' in the rest of this section.

Broadly, we have two ways to implement the format.

DataCubesConfig via IndexWriterConfig / SegmentInfo [ Preferred ]

  • During flush/merge, we create the multi-field indices based on the existing fields (DocValues). We can supply a list of 'DataCubeField' configurations as part of 'DataCubesConfig' to 'IndexWriterConfig' and save it as part of the SegmentInfo.
  • This is quite similar to how 'IndexSort' is implemented in Lucene.
  • The 'DataCubesFormat' can be used during flush / merge for consuming the existing fields' writers to create 'DataCubesIndices' based on the config.

Pros

  • Reuses existing writer implementations for individual fields (dimensions and metrics) to create the 'DataCube' indices.
  • Consistent with existing features like 'IndexSort'.

Cons

  • Users cannot create 'DataCube' indices without the associated 'DocValues' fields.

Add/update doc flow with a new DataCubeField

Users can pass the set of dimensions and metrics as part of a new 'DataCubeField' during the 'ProcessDocument' flow.

Pros

  • Allows users to create 'DataCube' indices without the associated 'DocValues' fields.

Cons

  • Potentially complicates the indexing process and introduces additional overhead.
  • May not be necessary, as the existing 'DocValues' writer implementations should cover most use cases for numeric/text dimensions and numeric metrics.
  • Difficult to fall back to existing fields if the cardinality of the said fields is too high, as the 'DataCubeField' would be a separate entity. With the 'IndexWriterConfig / SegmentInfo' approach, we can exercise a guardrail and not create 'DataCubes' for fields with high cardinality, while still leveraging the existing field data.

Overall, the preferred approach of using 'IndexWriterConfig' and 'SegmentInfo' seems more suitable for implementing the 'DataCubesFormat'.

@bharath-techie
Copy link
Author

DataCubesFormat

public abstract class DataCubesFormat implements NamedSPILoader.NamedSPI {

    /**
     * Returns producer to read the data cubes from the index.
     */
    public abstract DataCubesProducer<?> fieldsProducer(SegmentReadState state) throws IOException;

    /**
     * Returns a DocValuesConsumer to write a 'DataCubes' index based on doc values.
     */
    public abstract DataCubesDocValuesConsumer fieldsConsumer(
            SegmentWriteState state, DataCubesConfig dataCubesConfig) throws IOException;
}

DataCubesConfig

public class DataCubesConfig {
    public List<DataCubeField> getFields() {
        // Implementation
    }
    // Additional configuration options as needed
}

public class DataCubeField {
    public List<Dimension> getDimensions();

    public List<Metric> getMetrics();

    public static class DataCubeFieldProvider {
        public DataCubeField readDataCubeField(DataInput in) {
            // Implementation
        }

        public void writeDataCubeField(DataCubeField dataCubeField, DataOutput out) {
            // Implementation
        }
    }
}

public class Dimension {
    public String fieldName;
    // Additional dimension-specific options
}

public class Metric {
    public String fieldName;
    public AggregationFunction aggregationFunction;
    // Additional metric-specific options
}
  • The DataCubesConfig class contains a list of DataCubeField objects and additional configuration options as needed.
  • The DataCubeField represents a data cube field and contains lists of Dimension and Metric fields and additional custom values.
  • DataCubeFieldProvider provides methods to read and write DataCubeField objects from/to input/output.
  • The DataCubesConfig is supplied via the IndexWriteConfig and saved as part of the SegmentInfo. Custom data for custom formats can be supplied based on the DataCubeFieldProvider. [ Similar to sort field ]

DataCubesDocValuesConsumer

The DataCubesDocValuesConsumer consumes the DocValues writer to read the DocValues data and create new indices based on the DataCubesConfig.

DataCubesProducer

The DataCubesProducer/Reader is used to read the 'DataCubes' index from the segment.

Example

In this example, I've implemented 'StarTree' index by extending 'DataCubeFormat'

   { 
    Directory dir = newDirectory();
    IndexWriterConfig iwc =  newIndexWriterConfig().setCodec(Codec.forName("Lucene95"));

    // Supply the dimensions and metrics fields to form the data cube config
    Set<String> dims = Set.of("clientip", "targetip");
    Set<String> metrics = Set.of("status");

    StarTreeDataCubeField field = new StarTreeDataCubeField("datacube-1", dims, metrics);
    StarTreeDataCubeField[] fieldArr = new StarTreeDataCubeField[]{field};
    StarTreeCubeConfig starConfig = new StarTreeCubeConfig(fieldArr);
    iwc.setDataCube(starConfig);
    IndexWriter writer = new IndexWriter(dir, iwc);

    int numDoc = 10;
    add(writer);
    writer.commit();
    writer.close();

    IndexReader reader = DirectoryReader.open(dir);
    
    // read the DataCubeValues from the underlying index
    for (LeafReaderContext leafReaderContext : reader.leaves()) {
      DataCubeValues<?> dataCubeValues = 
            leafReaderContext.reader().getDataCubeValues(DATA_CUBE_FIELD);
      // todo : implementation
    }

    reader.close();
    dir.close();
  }

@msfroh
Copy link
Contributor

msfroh commented Apr 23, 2024

Wow! Adding data cube (OLAP) capabilities to Lucene could be really powerful.

Adding it as a new format does sound like the right idea to me.

I would like to better understand how DataCubeField would interact with, say, NumericDocValues or SortedSetDocValues. In theory, I would like to compute dimensional metrics based on a numeric field without explicitly needing to "add it twice".

I'm thinking, e.g. of adding an IntField to a bunch of documents, and it could either be used as a dimensional field (where the dimension could be derived by mapping the value to a range bucket) or it could contribute a metric (where e.g. I would like to know the min/max/average value of the field subject to dimensional constraints). Similarly, I would like to be able to add a KeywordField and use its value as a dimension.

Could something like that be achieved by adding a "datacube dimensionality" attribute to a field, similar to how KeywordField is both an indexed string field and a SortedSetDocValuesField?

@bharath-techie
Copy link
Author

bharath-techie commented Apr 24, 2024

Thanks for the comments @msfroh .

Good idea, if we want to supply Dims and metric values to DataCubesWriter as part of addDocument flow and consume them similar to other formats.

But there are some challenges:

  1. For adding an attribute to the field : (Lets take IntField for example )

The same IntField can be part of both dimension and metric ( in fact multiple metrics ) as part of a DataCubeField. And same IntField can be part of multiple DataCubeField.

  1. If we solve the above, and supply values via DataCubesWriter for each DataCubeField, there will be duplicate values depending on the configuration.

So in order to avoid the duplication of values , how about we derive the values of DataCubeField from the original values of DocValuesWriter during flush ?

Flush

IntField values will be already part of DocValuesWriter , so we can supply DataCubesConsumer and keep track of the resultant values.

  1. During flush, in a new method writeDataCubes, we supply dataCubeDocValuesConsumer to docValuesWriter.flush
  // For all doc values fields
  if(perField.docValuesWriter !=null) {

    {
      if (dataCubeDocValuesConsumer == null) {
        // lazy init
        DataCubesFormat fmt = state.segmentInfo.getCodec().dataCubesFormat();
        dataCubeDocValuesConsumer = fmt.fieldsConsumer(state, dataCubesConfig);
      }
      perField.docValuesWriter.flush(state, sortMap, dataCubeDocValuesConsumer);
    }
  }

  // This creates the dataCubes indices
  dataCubeDocValuesConsumer.flush(dataCubesConfig);

DocValuesWriter.flush calls respective addNumericField , addSortedSetField in the supplied consumer.

  1. Then in the DataCubesDocValuesConsumer, we keep track of the fields and the associated doc values. And in flush we can make use of the DocValues for eachDataCubeField
public class DataCubeDocValuesConsumer extends DocValuesConsumer {

  Map<String, NumericDocValues> numericDocValuesMap = new ConcurrentHashMap<>();
  Map<String, SortedSetDocValues> sortedSetDocValuesMap = new ConcurrentHashMap<>();

  @Override
  public void addSortedSetField(FieldInfo field, DocValuesProducer valuesProducer)
      throws IOException {
    sortedSetDocValuesMap.put(field.name, valuesProducer.getSortedSet(field));
  }

  @Override
  public void addNumericField(FieldInfo field, DocValuesProducer valuesProducer)
      throws IOException {
   numericDocValuesMap.put(field.name, valuesProducer.getNumeric(field));
  }
}

 public void flush(DataCubesConfig dataCubesConfig) throws IOException {
      for(DataCubeField field : config.getFields()) {
             for(String dim : field.getDims()) {
                     // Get docValues from the map ( we can get a clone / singleton )
                     // Custom implementation over docValuesIterator
             }
             for(String metric : field.getMetrics()) {
                   // Get docValues from the map
                   // Custom implementation over docValuesIterator
             }
      }
 }


Merge

During merge, we will most likely not need DocValues , instead Merge will be for DataCubeIndices and associated structures.

POC code

@jpountz
Copy link
Contributor

jpountz commented Apr 28, 2024

It's not clear to me how we'd take advantage of this information at search time. What changes would we make to e.g. Collector to allow it to take advantage of these new data structures?

@bharath-techie
Copy link
Author

bharath-techie commented Apr 29, 2024

Hi @jpountz ,
Good question, if we take StarTreeDataCube as an example implementation of the above format :

We will traverse the StarTree and StarTreeDocValues (dimensionDocValues - if needed ) during query to get the resultant docIdSet.

StarTreeQuery

@Override
  public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost)
      throws IOException {
    return new ConstantScoreWeight(this, boost) {
      @Override
      public Scorer scorer(LeafReaderContext context) throws IOException {
        StarTreeAggregatedValues starTreeValues = null;
        DocIdSetIterator result = null;
        DataCubeValues<?> dataCubVals = context.reader().getDataCubeValues(field);
        Object dataCubeValues = dataCubVals.getDataCubeValues();
        if (dataCubeValues != null) {
          starTreeValues = (StarTreeAggregatedValues) dataCubeValues;
          StarTreeFilter filter = new StarTreeFilter(starTreeValues, filterPredicateMap, groupByCols);
          // Traverse star tree and get the resultant DocIdSetIterator of star tree + star tree doc values
          result = filter.getStarTreeResult();
        }
        return new ConstantScoreScorer(this, score(), scoreMode, result);
      }

      @Override
      public boolean isCacheable(LeafReaderContext ctx) {
        return false;
      }
    };
  }

And coming to collector changes, the collector needs to traverse the StarTreeDocValues to get the values based on the associated DocIds.

For example collect override for sum aggregation will look like below where we traverse the associated metricDocValues.

SumCollector

      @Override
      public void collect(int doc)
          throws IOException {
        docsBuilder.grow(1).add(doc);
        DataCubeValues<?> dataCubVals = context.reader().getDataCubeValues(field);
        Object dataCubeValues = dataCubVals.getDataCubeValues();
        if(dataCubeValues != null) {
          StarTreeAggregatedValues starAggs = (StarTreeAggregatedValues) dataCubeValues;

          NumericDocValues dv = starAggs.metricValues.get("status_sum");
          dv.advanceExact(doc);

          sum += dv.longValue();
        }
        totalHits++;
      }

Example:

private void queryWithFilter(IndexWriter w)
      throws IOException {

    final IndexReader reader = DirectoryReader.open(w);
    final IndexSearcher searcher = newSearcher(reader, false);
    Set<String> groupByCols = new HashSet<>();
    //groupByCols.add("day");
    Map<String, List<Predicate<Integer>>> predicateMap = new HashMap<>();
    List<Predicate<Integer>> predicates = new ArrayList<>();
    predicates.add(day -> day > 2 && day < 5);
    predicates.add(day -> day == 30);
    predicateMap.put("day", predicates);
    predicates = new ArrayList<>();
    predicates.add(status -> status == 200);
    predicateMap.put("status", predicates);
    final Query starTreeQuery = new StarTreeQuery(predicateMap, groupByCols);

    searcher.search(starTreeQuery, getStarTreeSumCollector());

Code reference - This contains star tree implementation but this is old code where I've not integrated yet with DataCubes format etc. , I've followed Approach 2 - Extend DocValues format but the query/collect logic should remain similar.

@msokolov
Copy link
Contributor

This reminded me of an older issue: #11463 that seems to have foundered. Maybe there is something to be learned from that, not sure.

@bharath-techie
Copy link
Author

Thanks for the inputs @msokolov . I do see the similarities but the linked issue seems to be tied to rollups done as part of merge aided by index sorting on the dimensions. Index sorting is quite expensive.

The difference here is that, all the computation is deferred to the format and its custom logic. And query time gains could be higher as we are using efficient cubing structures.

For star tree implementation, the algorithm sorts the dims and then aggregates during flush , the successive merges just need to sort and aggregate the compacted, sorted data cube structures.
So W.r.t performance , if the dimensions are of relatively lower cardinality , then there is minimal impact on index-append throughput ( < 2% ) as the difference is mainly due to write threads helping during flush. ( reference in OpenSearch RFC )

There are some cons here as well ,

  • If we need to account for deletes, probably we will need similar solution such as deleted docs iterator proposed here and do decrements during merge/query. [ still cost is proportional to number of deleted docs ] Maybe we can add this incrementally ?
  • We will need to add guardrails against high cardinality dimensions - as it slows down indexing as sort and aggregation becomes expensive , and data cubes will also be less effective overall + advantages in storage size etc lowers.

Let me know your thoughts.

@msokolov
Copy link
Contributor

My main concern was to ensure this exciting effort didn't get blocked by the need to do major changes to existing indexing workloads. It sounds like the plan here is less intrusive and confined to the new format, for which +1

@jpountz
Copy link
Contributor

jpountz commented Apr 30, 2024

At first sight I don't like the fact that this seems to plug in a whole new way of doing things. Either you don't use a star tree index and you do things the usual way with filters and collectors, or you want to use a star tree index and then you need to craft queries in a very specific way if you want to be able to take advantage of the optimization for aggregations. Since this optimization is about aggregating data, I'd like this to mostly require changes on the collector side from the end user's perspective.

It would be somewhat less efficient, but an alternative I'm contemplating would consist of the following:

  • Queries can optionally match ranges of doc IDs at once.
  • A new LeafCollector#collectRange(int docIdStart, int docIdEnd) API for these queries to use.
  • Doc values formats don't know about dimensions, but create pre-aggregates for blocks of N (e.g. 128) doc IDs.
  • A sum aggregate could be implemented by asking doc values for the sum of all blocks that are contained within the range of doc IDs passed to LeafCollector#collectRange.
  • Users are responsible for configuring an index sort that is likely to allow queries to match ranges of doc IDs at once, typically by sorting on these dimensions in some order.

@bharath-techie
Copy link
Author

bharath-techie commented Apr 30, 2024

Thanks for the inputs @jpountz . Let me spend some more time on this.

But this is a topic which was thought of as well and one idea was to do query abstraction / planning. Let me know your thoughts :

Can the concern with query can be solved by abstracting it by introducing DataCubeOrOriginalQuery similar to IndexOrDocValuesQuery ?

In fact the input can remain the same as of the original query , we can do a check if that can be solved via DataCubesIndex .
Terms / multi terms/ range / boolean queries and associated fields are something we can parse and check if there are any DataCubeField that contains the same fields as dims and if so we can choose to pick the optimal DataCubeField and form associated query in the background based on the cost.

Or we can also think of query rewriting if a particular query can be solved using DataCubes.

But there is an issue in collection still, user needs to pass two collector implementations which needs to be picked based on which query path is taken. We can see how to better implement this if the above idea works.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants