Skip to content

Commit

Permalink
[CARBONDATA-2474] Support Modular Plan for Materialized View DataMap
Browse files Browse the repository at this point in the history
Currently carbon supports preaggregate datamap, which only supports preaggregate on single table. To improve it, we can add join capability by implementing Materialized View.

In carbon Materialized View, Modular Plan is the basic abstraction for materialized view query plan. In this PR, modular plan module is added, it defines plan tree structure and conversion from Spark Logical Plan.

This closes #2301
  • Loading branch information
jackylk authored and ravipesala committed May 12, 2018
1 parent 443b717 commit ffddba7
Show file tree
Hide file tree
Showing 41 changed files with 10,214 additions and 11 deletions.
Expand Up @@ -64,7 +64,7 @@ public final class DataMapStoreManager {
/**
* Contains the datamap catalog for each datamap provider.
*/
private Map<String, DataMapCatalog> dataMapCatalogs = new ConcurrentHashMap<>();
private Map<String, DataMapCatalog> dataMapCatalogs = null;

private Map<String, TableSegmentRefresher> segmentRefreshMap = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -166,7 +166,8 @@ public void dropDataMapSchema(String dataMapName) throws IOException {
* @param dataMapSchema
*/
public synchronized void registerDataMapCatalog(DataMapProvider dataMapProvider,
DataMapSchema dataMapSchema) {
DataMapSchema dataMapSchema) throws IOException {
intializeDataMapCatalogs(dataMapProvider);
String name = dataMapSchema.getProviderName();
DataMapCatalog dataMapCatalog = dataMapCatalogs.get(name);
if (dataMapCatalog == null) {
Expand All @@ -185,6 +186,9 @@ public synchronized void registerDataMapCatalog(DataMapProvider dataMapProvider,
* @param dataMapSchema
*/
public synchronized void unRegisterDataMapCatalog(DataMapSchema dataMapSchema) {
if (dataMapCatalogs == null) {
return;
}
String name = dataMapSchema.getProviderName();
DataMapCatalog dataMapCatalog = dataMapCatalogs.get(name);
if (dataMapCatalog != null) {
Expand All @@ -197,10 +201,36 @@ public synchronized void unRegisterDataMapCatalog(DataMapSchema dataMapSchema) {
* @param providerName
* @return
*/
public DataMapCatalog getDataMapCatalog(String providerName) {
public DataMapCatalog getDataMapCatalog(DataMapProvider dataMapProvider, String providerName)
throws IOException {
intializeDataMapCatalogs(dataMapProvider);
return dataMapCatalogs.get(providerName);
}

/**
* Initialize by reading all datamaps from store and re register it
* @param dataMapProvider
*/
private void intializeDataMapCatalogs(DataMapProvider dataMapProvider) throws IOException {
if (dataMapCatalogs == null) {
dataMapCatalogs = new ConcurrentHashMap<>();
List<DataMapSchema> dataMapSchemas = getAllDataMapSchemas();
for (DataMapSchema schema : dataMapSchemas) {
DataMapCatalog dataMapCatalog = dataMapCatalogs.get(schema.getProviderName());
if (dataMapCatalog == null) {
dataMapCatalog = dataMapProvider.createDataMapCatalog();
dataMapCatalogs.put(schema.getProviderName(), dataMapCatalog);
}
try {
dataMapCatalog.registerSchema(schema);
} catch (Exception e) {
// Ignore the schema
LOGGER.error(e, "Error while registering schema");
}
}
}
}

/**
* It gives the default datamap of the table. Default datamap of any table is BlockletDataMap
*
Expand Down
Expand Up @@ -30,7 +30,8 @@ public enum DataMapClassProvider {
PREAGGREGATE("org.apache.carbondata.core.datamap.AggregateDataMap", "preaggregate"),
TIMESERIES("org.apache.carbondata.core.datamap.TimeSeriesDataMap", "timeseries"),
LUCENE("org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory","lucene"),
BLOOMFILTER("org.apache.carbondata.datamap.bloom.BloomCoarseGrainDataMapFactory", "bloomfilter");
BLOOMFILTER("org.apache.carbondata.datamap.bloom.BloomCoarseGrainDataMapFactory", "bloomfilter"),
MV("org.apache.carbondata.core.datamap.MVDataMap", "mv");

/**
* Fully qualified class name of datamap
Expand Down
Expand Up @@ -152,7 +152,9 @@ public void setChildSchema(TableSchema childSchema) {
*/
public boolean isIndexDataMap() {
if (providerName.equalsIgnoreCase(DataMapClassProvider.PREAGGREGATE.getShortName()) ||
providerName.equalsIgnoreCase(DataMapClassProvider.TIMESERIES.getShortName())) {
providerName.equalsIgnoreCase(DataMapClassProvider.TIMESERIES.getShortName()) ||
providerName.equalsIgnoreCase(DataMapClassProvider.MV.getShortName()) ||
ctasQuery != null) {
return false;
} else {
return true;
Expand Down
157 changes: 157 additions & 0 deletions datamap/mv/plan/pom.xml
@@ -0,0 +1,157 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-parent</artifactId>
<version>1.4.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>

<artifactId>carbondata-mv-plan</artifactId>
<name>Apache CarbonData :: Materialized View Plan</name>

<properties>
<dev.path>${basedir}/../../../dev</dev.path>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-spark2</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18</version>
<!-- Note config is repeated in scalatest config -->
<configuration>
<skip>false</skip>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine>
<systemProperties>
<java.awt.headless>true</java.awt.headless>
</systemProperties>
<testFailureIgnore>false</testFailureIgnore>
<failIfNoTests>false</failIfNoTests>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>2.17</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<phase>compile</phase>
</execution>
<execution>
<id>testCompile</id>
<goals>
<goal>testCompile</goal>
</goals>
<phase>test</phase>
</execution>
<execution>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
<version>1.4.1</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>com.ning.maven.plugins</groupId>
<artifactId>maven-duplicate-finder-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>1.0</version>
<!-- Note config is repeated in surefire config -->
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<testFailureIgnore>false</testFailureIgnore>
<filereports>CarbonTestSuite.txt</filereports>
<argLine>-ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
</argLine>
<stderr />
<environmentVariables>
</environmentVariables>
<systemProperties>
<java.awt.headless>true</java.awt.headless>
</systemProperties>
</configuration>
<executions>
<execution>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.carbondata.mv

import scala.language.implicitConversions

import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

import org.apache.carbondata.mv.plans._
import org.apache.carbondata.mv.plans.modular.{JoinEdge, ModularPlan}
import org.apache.carbondata.mv.plans.modular.Flags._
import org.apache.carbondata.mv.plans.util._

/**
* A collection of implicit conversions that create a DSL for constructing data structures
* for modular plans.
*
*/
package object dsl {

// object plans {

implicit class DslModularPlan(val modularPlan: ModularPlan) {
def select(outputExprs: NamedExpression*)
(inputExprs: Expression*)
(predicateExprs: Expression*)
(aliasMap: Map[Int, String])
(joinEdges: JoinEdge*): ModularPlan = {
modular
.Select(
outputExprs,
inputExprs,
predicateExprs,
aliasMap,
joinEdges,
Seq(modularPlan),
NoFlags,
Seq.empty,
Seq.empty)
}

def groupBy(outputExprs: NamedExpression*)
(inputExprs: Expression*)
(predicateExprs: Expression*): ModularPlan = {
modular
.GroupBy(outputExprs, inputExprs, predicateExprs, None, modularPlan, NoFlags, Seq.empty)
}

def harmonize: ModularPlan = modularPlan.harmonized
}

implicit class DslModularPlans(val modularPlans: Seq[ModularPlan]) {
def select(outputExprs: NamedExpression*)
(inputExprs: Expression*)
(predicateList: Expression*)
(aliasMap: Map[Int, String])
(joinEdges: JoinEdge*): ModularPlan = {
modular
.Select(
outputExprs,
inputExprs,
predicateList,
aliasMap,
joinEdges,
modularPlans,
NoFlags,
Seq.empty,
Seq.empty)
}

def union(): ModularPlan = modular.Union(modularPlans, NoFlags, Seq.empty)
}

implicit class DslLogical2Modular(val logicalPlan: LogicalPlan) {
def resolveonly: LogicalPlan = analysis.SimpleAnalyzer.execute(logicalPlan)

def modularize: ModularPlan = modular.SimpleModularizer.modularize(logicalPlan).next

def optimize: LogicalPlan = BirdcageOptimizer.execute(logicalPlan)
}

// }

}

0 comments on commit ffddba7

Please sign in to comment.