From 50023c27bddda416e6235acf644859b1c6f89174 Mon Sep 17 00:00:00 2001 From: Sounak Chakraborty Date: Mon, 2 Oct 2017 07:28:11 +0530 Subject: [PATCH] Lucene Index Implementation. --- .../core/datamap/DataMapStoreManager.java | 10 +- .../core/datamap/dev/DataMapWriter.java | 8 +- examples/spark2/pom.xml | 27 ++++ .../examples/LuceneBlockIndexDetails.java | 52 +++++++ .../carbondata/examples/LuceneDataMap.java | 100 ++++++++++++ .../examples/LuceneDataMapFactory.java | 123 +++++++++++++++ .../examples/LuceneDataMapWriter.java | 145 ++++++++++++++++++ .../examples/LuceneDataMapWriterSuite.scala | 71 +++++++++ .../datamap/DataMapWriterSuite.scala | 7 +- .../datamap/DataMapWriterListener.java | 9 +- .../store/writer/AbstractFactDataWriter.java | 10 +- 11 files changed, 545 insertions(+), 17 deletions(-) create mode 100644 examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneBlockIndexDetails.java create mode 100644 examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneDataMap.java create mode 100644 examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneDataMapFactory.java create mode 100644 examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneDataMapWriter.java create mode 100644 examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneDataMapWriterSuite.scala diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java index 2b5d5cd370b..2030682cf78 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java @@ -103,7 +103,7 @@ public TableDataMap createAndRegisterDataMap(AbsoluteTableIdentifier identifier, tableDataMaps = new ArrayList<>(); } TableDataMap dataMap = getTableDataMap(dataMapName, tableDataMaps); - if (dataMap != null) { + if (dataMap != null && dataMap.getDataMapName().equalsIgnoreCase(dataMapName)) { throw new RuntimeException("Already datamap exists in that path with type " + dataMapName); } @@ -128,11 +128,11 @@ public TableDataMap createAndRegisterDataMap(AbsoluteTableIdentifier identifier, return dataMap; } - private TableDataMap getTableDataMap(String dataMapName, - List tableDataMaps) { + private TableDataMap getTableDataMap(String dataMapName, List tableDataMaps) { TableDataMap dataMap = null; - for (TableDataMap tableDataMap: tableDataMaps) { - if (tableDataMap.getDataMapName().equals(dataMapName)) { + for (TableDataMap tableDataMap : tableDataMaps) { + if (tableDataMap.getDataMapName().equals(dataMapName) || (!tableDataMap.getDataMapName() + .equals(""))) { dataMap = tableDataMap; break; } diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java index 28163d78614..a2581a289bc 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java @@ -16,7 +16,11 @@ */ package org.apache.carbondata.core.datamap.dev; +import java.util.List; + import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.metadata.index.BlockIndexInfo; + /** * Data Map writer @@ -27,12 +31,12 @@ public interface DataMapWriter { * Start of new block notification. * @param blockId file name of the carbondata file */ - void onBlockStart(String blockId); + void onBlockStart(String blockId, String directoryPath); /** * End of block notification */ - void onBlockEnd(String blockId); + void onBlockEnd(String blockId, List blockIndexInfoList); /** * Start of new blocklet notification. diff --git a/examples/spark2/pom.xml b/examples/spark2/pom.xml index 94af8ec5900..0fd06484099 100644 --- a/examples/spark2/pom.xml +++ b/examples/spark2/pom.xml @@ -65,6 +65,33 @@ org.apache.spark spark-repl_${scala.binary.version} + + + org.apache.lucene + lucene-core + 7.0.0 + + + org.apache.lucene + lucene-analyzers-common + 7.0.0 + + + org.apache.lucene + lucene-queryparser + 7.0.0 + + + org.apache.lucene + lucene-queries + 7.0.0 + + + org.apache.lucene + lucene-highlighter + 7.0.0 + + diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneBlockIndexDetails.java b/examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneBlockIndexDetails.java new file mode 100644 index 00000000000..3898ce14100 --- /dev/null +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneBlockIndexDetails.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples; + +import java.io.Serializable; + +public class LuceneBlockIndexDetails implements Serializable { + private static final long serialVersionUID = 1206104914911491724L; + + /** + * Min value of a column of one blocklet Bit-Packed + */ + private byte[] columnVal; + + + /** + * value of the blockletId + */ + private String BlockletId; + + + public String getBlockletId() { + return BlockletId; + } + + public void setBlockletId(String blockletId) { + BlockletId = blockletId; + } + + public byte[] getColumnVal() { + return columnVal; + } + + public void setColumnVal(byte[] columnVal) { + this.columnVal = columnVal; + } +} diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneDataMap.java b/examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneDataMap.java new file mode 100644 index 00000000000..1a6788395ab --- /dev/null +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneDataMap.java @@ -0,0 +1,100 @@ +package org.apache.carbondata.examples;/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datamap.dev.DataMap; +import org.apache.carbondata.core.indexstore.Blocklet; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.*; +import org.apache.lucene.store.FSDirectory; + +import java.io.IOException; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +/** + * Datamap implementation for min max blocklet. + */ +public class LuceneDataMap implements DataMap { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(LuceneDataMap.class.getName()); + + public static final String NAME = "clustered.lucene"; + + private String filePath; + + @Override public void init(String filePath) throws MemoryException, IOException { + this.filePath = filePath; + } + + /** + * Block Prunning logic for Min Max DataMap. + * + * @param filterExps + * @return + */ + @Override public List prune(FilterResolverIntf filterExps) { + + Set blocklets = new HashSet<>(); + Map blockletPath = new HashMap<>(); + try { + FSDirectory index = FSDirectory.open(Paths.get(filePath)); + IndexReader indexReader = DirectoryReader.open(index); + IndexSearcher indexSearcher = new IndexSearcher(indexReader); + + Term t = new Term("Content", "b"); + org.apache.lucene.search.Query query = new TermQuery(t); + + TopDocs doc = indexSearcher.search(query, 10); + LOGGER.debug("Total Hits are :: " + doc.totalHits); + + ScoreDoc[] hits = doc.scoreDocs; + for (int i = 0; i < hits.length; i ++) { + Document document = indexSearcher.doc(hits[i].doc); + LOGGER.debug( + "The values are " + document.get("BlockletPath") + " " + document.get("Content") + + " " + document.get("BlockletId")); + if (blockletPath.get(document.get("BlockletPath")) == null) { + blocklets.add(new Blocklet(document.get("BlockletPath"), document.get("BlockletId"))); + blockletPath.put(document.get("BlockletPath"), document.get("BlockletId")); + } + } + } catch (IOException ex) { + LOGGER.info("Error while searching the lucene Index"); + } + List blockletList = new ArrayList<>(blocklets); + return blockletList; + } + + @Override + public void clear() { + } +} \ No newline at end of file diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneDataMapFactory.java b/examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneDataMapFactory.java new file mode 100644 index 00000000000..0159569aeeb --- /dev/null +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneDataMapFactory.java @@ -0,0 +1,123 @@ +package org.apache.carbondata.examples;/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.carbondata.core.datamap.DataMapDistributable; +import org.apache.carbondata.core.datamap.DataMapMeta; +import org.apache.carbondata.core.datamap.dev.DataMap; +import org.apache.carbondata.core.datamap.dev.DataMapFactory; +import org.apache.carbondata.core.datamap.dev.DataMapWriter; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.events.ChangeEvent; +import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; +import org.apache.carbondata.core.indexstore.schema.FilterType; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; + +import java.io.File; +import java.io.IOException; +import java.util.*; + +/** + * Min Max DataMap Factory + */ +public class LuceneDataMapFactory implements DataMapFactory { + + private AbsoluteTableIdentifier identifier; + + @Override public void init(AbsoluteTableIdentifier identifier, String dataMapName) { + this.identifier = identifier; + } + + /** + * createWriter will return the MinMaxDataWriter. + * + * @param segmentId + * @return + */ + @Override public DataMapWriter createWriter(String segmentId) { + return new LuceneDataMapWriter(); + } + + /** + * getDataMaps Factory method Initializes the Min Max Data Map and returns. + * + * @param segmentId + * @return + * @throws IOException + */ + @Override public List getDataMaps(String segmentId) throws IOException { + + List dataMapList = new ArrayList<>(); + // Form a dataMap of Type LuceneDataMap. + LuceneDataMap dataMap = new LuceneDataMap(); + try { + String path = identifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId + File.separator; + CarbonFile[] carbonfile = getCarbonMinMaxIndexFiles(path); + dataMap.init(carbonfile[0].getPath()); + } catch (MemoryException ex) { + + } + dataMapList.add(dataMap); + return dataMapList; + } + + private CarbonFile[] getCarbonMinMaxIndexFiles(String filePath) { + String path = filePath.substring(0, filePath.lastIndexOf("/") + 1); + CarbonFile carbonFile = FileFactory.getCarbonFile(path); + return carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return file.getName().endsWith(".luceneIndex"); + } + }); + } + + /** + * @param segmentId + * @return + */ + @Override public List toDistributable(String segmentId) { + return null; + } + + /** + * Clear the DataMap. + * + * @param segmentId + */ + @Override public void clear(String segmentId) { + } + + /** + * Clearing the data map. + */ + @Override public void clear() { + } + + @Override public DataMap getDataMap(DataMapDistributable distributable) { + return null; + } + + @Override public void fireEvent(ChangeEvent event) { + + } + + @Override public DataMapMeta getMeta() { + return new DataMapMeta(new ArrayList(Arrays.asList("c2")), FilterType.EQUALTO); + } +} \ No newline at end of file diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneDataMapWriter.java b/examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneDataMapWriter.java new file mode 100644 index 00000000000..8802b871e5c --- /dev/null +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneDataMapWriter.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.examples; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.file.Paths; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datamap.dev.DataMapWriter; +import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.metadata.index.BlockIndexInfo; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.core.WhitespaceAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.FieldType; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.RAMDirectory; + +import static org.apache.lucene.index.IndexOptions.DOCS_AND_FREQS; + +public class LuceneDataMapWriter implements DataMapWriter { + + /** + * The events for lucene index are as following. + * a) on Block Start - Initialize the Index Writer and the Directory. + * b) On Block End - Close the index writer. + * c) On Blocklet Start - Get a new + * d) On Blocklet End - Do Nothing + * e) On Page Add - Get a new document for each page And add the couments to the writer. + */ + + private static final LogService LOGGER = + LogServiceFactory.getLogService(TableInfo.class.getName()); + + private IndexWriter indexWriter; + + private String luceneindexFilePath; + + private Analyzer analyzer; + + private String directoryPath; + + private String filePath; + + private IndexWriterConfig config; + + @Override public void onBlockStart(String blockId, String filePath) { + try { + this.filePath = filePath; + directoryPath = filePath.substring(0, filePath.lastIndexOf(File.separator) + 1); + luceneindexFilePath = constructIndexFullPath(blockId, directoryPath); + analyzer = new WhitespaceAnalyzer(); + FSDirectory index = FSDirectory.open(Paths.get(luceneindexFilePath)); + config = new IndexWriterConfig(analyzer); + indexWriter = new IndexWriter(index, config); + } catch (IOException ex) { + LOGGER.audit("Error while creating the index"); + } + } + + @Override public void onBlockEnd(String blockId, List blockIndexInfoList) { + try { + indexWriter.commit(); + indexWriter.close(); + } catch (IOException ex) { + LOGGER.audit("Error while writing the index"); + } + } + + @Override public void onBlockletStart(int blockletId) { + } + + @Override public void onBlockletEnd(int blockletId) { + + } + + @Override public void onPageAdded(int blockletId, int pageId, ColumnPage[] pages) { + constructLuceneIndex(blockletId, pages); + } + + /** + * Construct Lucene Index. + * @param blockletId + * @param pageId + * @param pages + */ + public void constructLuceneIndex(int blockletId, ColumnPage[] pages) { + // Construct Lucene Index. + Document doc = new Document(); + FieldType textFieldType = new FieldType(); + textFieldType.setStored(true); + textFieldType.setTokenized(false); + textFieldType.setIndexOptions(DOCS_AND_FREQS); + byte[] value = new byte[pages[0].getBytes(0).length - 2]; + Charset charset = Charset.forName("UTF-8"); + + for (int rowIndex = 0; rowIndex < pages[0].getPageSize(); rowIndex++) { + System.arraycopy(pages[0].getBytes(rowIndex), 2, value, 0, value.length); + doc.add(new Field("BlockletId", Integer.toString(blockletId), textFieldType)); + doc.add(new Field("BlockletPath", filePath, textFieldType)); + doc.add(new Field("Content", new String(value, charset), textFieldType)); + try { + indexWriter.addDocument(doc); + } catch (IOException ex) { + LOGGER.audit("Error while adding values to the index"); + } + } + } + + /** + * construct the index file from the blockID. + * + * @param blockId + * @param directoryPath + * @return + */ + private String constructIndexFullPath(String blockId, String directoryPath) { + String sub1 = blockId.substring(blockId.indexOf("_") - 1, blockId.indexOf(".")); + return directoryPath + sub1 + ".luceneIndex"; + } +} \ No newline at end of file diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneDataMapWriterSuite.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneDataMapWriterSuite.scala new file mode 100644 index 00000000000..49bf706b594 --- /dev/null +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneDataMapWriterSuite.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import org.apache.spark.sql.{DataFrame, SaveMode} +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier + +/** + * Lucene Test Case. + */ +class LuceneDataMapWriterSuite extends QueryTest with BeforeAndAfterAll { + + def buildTestData(numRows: Int): DataFrame = { + import sqlContext.implicits._ + sqlContext.sparkContext.parallelize(1 to numRows) + .map(x => ("a", "b", x)) + .toDF("c1", "c2", "c3") + } + + def dropTable(): Unit = { + sql("DROP TABLE IF EXISTS carbonlucene") + } + + override def beforeAll { + dropTable() + } + + test("Test Lucene Datamap") { + // register datamap writer + DataMapStoreManager.getInstance().createAndRegisterDataMap( + AbsoluteTableIdentifier.from(storeLocation, "default", "carbonlucene"), + classOf[LuceneDataMapFactory].getName, + LuceneDataMap.NAME) + + val df = buildTestData(33) + + // save dataframe to carbon file + df.write + .format("carbondata") + .option("tableName", "carbonlucene") + .mode(SaveMode.Overwrite) + .save() + + // Query the table. + sql("select c2 from carbonlucene").show(20, false) + + } + + override def afterAll { + dropTable() + } +} diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala index 0b5141e4467..acb84c4ac93 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala @@ -20,11 +20,9 @@ package org.apache.carbondata.spark.testsuite.datamap import java.util import scala.collection.JavaConverters._ - import org.apache.spark.sql.{DataFrame, SaveMode} import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll - import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory, DataMapWriter} import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager} import org.apache.carbondata.core.datastore.page.ColumnPage @@ -32,6 +30,7 @@ import org.apache.carbondata.core.events.ChangeEvent import org.apache.carbondata.core.indexstore.schema.FilterType import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.datatype.DataType +import org.apache.carbondata.core.metadata.index.BlockIndexInfo import org.apache.carbondata.core.util.CarbonProperties class C2DataMapFactory() extends DataMapFactory { @@ -173,7 +172,7 @@ object DataMapWriterSuite { callbackSeq :+= s"blocklet end: $blockletId" } - override def onBlockEnd(blockId: String): Unit = { + override def onBlockEnd(blockId: String, blockIndexInfoList: util.List[BlockIndexInfo]): Unit = { callbackSeq :+= s"block end $blockId" } @@ -181,7 +180,7 @@ object DataMapWriterSuite { callbackSeq :+= s"blocklet start $blockletId" } - override def onBlockStart(blockId: String): Unit = { + override def onBlockStart(blockId: String, directoryPath: String): Unit = { callbackSeq :+= s"block start $blockId" } diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java index 4b0113c4566..0f5cdbaa359 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java @@ -32,6 +32,7 @@ import org.apache.carbondata.core.datamap.dev.DataMapWriter; import org.apache.carbondata.core.datastore.page.ColumnPage; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.index.BlockIndexInfo; import org.apache.carbondata.processing.store.TablePage; /** @@ -82,18 +83,18 @@ private void register(DataMapFactory factory, String segmentId) { LOG.info("DataMapWriter " + writer + " added"); } - public void onBlockStart(String blockId) { + public void onBlockStart(String blockId, String directoryPath) { for (List writers : registry.values()) { for (DataMapWriter writer : writers) { - writer.onBlockStart(blockId); + writer.onBlockStart(blockId, directoryPath); } } } - public void onBlockEnd(String blockId) { + public void onBlockEnd(String blockId, List blockIndexInfoList) { for (List writers : registry.values()) { for (DataMapWriter writer : writers) { - writer.onBlockEnd(blockId); + writer.onBlockEnd(blockId, blockIndexInfoList); } } } diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java index 972e4143b2b..0959cbfe97a 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java @@ -269,13 +269,19 @@ protected void createNewFileIfReachThreshold(long blockletSizeToBeAdded) private void notifyDataMapBlockStart() { if (listener != null) { - listener.onBlockStart(carbonDataFileName); + listener.onBlockStart(carbonDataFileName, constructFactFileFullPath()); } } + private String constructFactFileFullPath() { + String factFilePath = + this.dataWriterVo.getCarbonDataDirectoryPath() + File.separator + this.carbonDataFileName; + return factFilePath; + } + private void notifyDataMapBlockEnd() { if (listener != null) { - listener.onBlockEnd(carbonDataFileName); + listener.onBlockEnd(carbonDataFileName, blockIndexInfoList); } blockletId = 0; }