Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.relational.it.db.it;

import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.TableClusterIT;
import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.sql.Connection;
import java.sql.Statement;

import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest;
import static org.junit.Assert.fail;

@RunWith(IoTDBTestRunner.class)
@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
public class IoTDBAsofJoinTableIT {
private static final String DATABASE_NAME = "test";

private static final String[] sql =
new String[] {
"create database test",
"use test",
"create table table1(device string tag, value int32 field)",
"insert into table1(time,device,value) values(2020-01-01 00:00:01.000,'d1',1)",
"insert into table1(time,device,value) values(2020-01-01 00:00:03.000,'d1',3)",
"insert into table1(time,device,value) values(2020-01-01 00:00:05.000,'d1',5)",
"insert into table1(time,device,value) values(2020-01-01 00:00:08.000,'d2',8)",
"create table table2(device string tag, value int32 field)",
"insert into table2(time,device,value) values(2020-01-01 00:00:02.000,'d1',20)",
"insert into table2(time,device,value) values(2020-01-01 00:00:03.000,'d1',30)",
"insert into table2(time,device,value) values(2020-01-01 00:00:04.000,'d2',40)",
"insert into table2(time,device,value) values(2020-01-01 00:00:05.000,'d2',50)"
};
String[] expectedHeader =
new String[] {"time1", "device1", "value1", "time2", "device2", "value2"};
;
String[] retArray;

@BeforeClass
public static void setUp() throws Exception {
EnvFactory.getEnv().initClusterEnvironment();
EnvFactory.getEnv()
.getConfig()
.getCommonConfig()
.setMaxTsBlockLineNumber(2)
.setMaxNumberOfPointsInPage(5);
insertData();
}

@AfterClass
public static void tearDown() throws Exception {
EnvFactory.getEnv().cleanClusterEnvironment();
}

private static void insertData() {
try (Connection connection = EnvFactory.getEnv().getTableConnection();
Statement statement = connection.createStatement()) {
for (String sql : sql) {
statement.execute(sql);
}
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}

@Test
public void innerJoinTest() {
retArray =
new String[] {
"2020-01-01T00:00:03.000Z,d1,3,2020-01-01T00:00:03.000Z,d1,30,",
"2020-01-01T00:00:05.000Z,d1,5,2020-01-01T00:00:05.000Z,d2,50,",
"2020-01-01T00:00:08.000Z,d2,8,2020-01-01T00:00:05.000Z,d2,50,"
};
tableResultSetEqualTest(
"SELECT t1.time as time1, t1.device as device1, t1.value as value1, \n"
+ " t2.time as time2, t2.device as device2, t2.value as value2 \n"
+ "FROM \n"
+ "table1 t1 ASOF INNER JOIN table2 t2\n"
+ "ON\n"
+ "t1.time>=t2.time\n"
+ "order by time1",
expectedHeader,
retArray,
DATABASE_NAME);

retArray =
new String[] {
"2020-01-01T00:00:03.000Z,d1,3,2020-01-01T00:00:03.000Z,d1,30,",
"2020-01-01T00:00:05.000Z,d1,5,2020-01-01T00:00:05.000Z,d2,50,",
};
tableResultSetEqualTest(
"SELECT t1.time as time1, t1.device as device1, t1.value as value1, \n"
+ " t2.time as time2, t2.device as device2, t2.value as value2 \n"
+ "FROM \n"
+ "table1 t1 ASOF(tolerance 2s) INNER JOIN table2 t2\n"
+ "ON\n"
+ "t1.time>=t2.time\n"
+ "order by time1",
expectedHeader,
retArray,
DATABASE_NAME);

retArray =
new String[] {
"2020-01-01T00:00:03.000Z,d1,3,2020-01-01T00:00:03.000Z,d1,30,",
"2020-01-01T00:00:05.000Z,d1,5,2020-01-01T00:00:03.000Z,d1,30,",
};
tableResultSetEqualTest(
"SELECT t1.time as time1, t1.device as device1, t1.value as value1, \n"
+ " t2.time as time2, t2.device as device2, t2.value as value2 \n"
+ "FROM \n"
+ "table1 t1 ASOF(tolerance 2s) INNER JOIN table2 t2\n"
+ "ON\n"
+ "t1.device=t2.device AND t1.time>=t2.time\n"
+ "order by time1",
expectedHeader,
retArray,
DATABASE_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2213,6 +2213,121 @@ public void lastCacheTest() {
repeatTest(sql, expectedHeader, retArray, DATABASE_NAME, 3);
}

@Test
public void asofJoinTest() {
expectedHeader = new String[] {"time", "device", "level", "time", "device", "level"};
retArray =
new String[] {
"1971-01-01T00:00:00.000Z,d1,l1,1970-01-01T00:00:00.100Z,d1,l5,",
"1971-01-01T00:01:40.000Z,d1,l1,1971-01-01T00:00:00.000Z,d1,l1,",
"1971-01-01T00:01:40.000Z,d1,l1,1971-01-01T00:00:00.000Z,d999,null,",
"1971-01-01T00:01:40.000Z,d1,l1,1971-01-01T00:00:00.000Z,null,l999,",
"1970-01-01T00:00:00.020Z,d1,l2,1970-01-01T00:00:00.010Z,d11,l11,",
"1971-01-01T00:00:00.100Z,d1,l2,1971-01-01T00:00:00.000Z,d1,l1,",
"1971-01-01T00:00:00.100Z,d1,l2,1971-01-01T00:00:00.000Z,d999,null,",
"1971-01-01T00:00:00.100Z,d1,l2,1971-01-01T00:00:00.000Z,null,l999,",
"1971-04-26T17:46:40.000Z,d1,l2,1971-01-01T00:00:00.000Z,d1,l1,",
"1971-04-26T17:46:40.000Z,d1,l2,1971-01-01T00:00:00.000Z,d999,null,"
};
// test single join condition
tableResultSetEqualTest(
"select table0.time,table0.device,table0.level,table1.time,table1.device,table1.level from table0 asof join table1 on "
+ "table0.time>table1.time "
+ "order by table0.device,table0.level,table0.time,table1.device,table1.level limit 10",
expectedHeader,
retArray,
DATABASE_NAME);
// test expr and '>=' in ASOF condition
tableResultSetEqualTest(
"select table0.time,table0.device,table0.level,table1.time,table1.device,table1.level from table0 asof join table1 on "
+ "table0.time>=table1.time+1 "
+ "order by table0.device,table0.level,table0.time,table1.device,table1.level limit 10",
expectedHeader,
retArray,
DATABASE_NAME);

retArray =
new String[] {
"1971-01-01T00:00:00.000Z,d1,l1,1970-01-01T00:00:00.000Z,d1,l1,",
"1971-01-01T00:01:40.000Z,d1,l1,1971-01-01T00:00:00.000Z,d1,l1,",
"1971-01-01T00:00:00.100Z,d1,l2,1970-01-01T00:00:00.020Z,d1,l2,",
"1971-04-26T17:46:40.000Z,d1,l2,1970-01-01T00:00:00.020Z,d1,l2,",
"1971-01-01T00:00:00.500Z,d1,l3,1970-01-01T00:00:00.040Z,d1,l3,",
"1971-04-26T17:46:40.020Z,d1,l3,1970-01-01T00:00:00.040Z,d1,l3,",
"1971-01-01T00:00:01.000Z,d1,l4,1970-01-01T00:00:00.080Z,d1,l4,",
"1971-04-26T18:01:40.000Z,d1,l4,1970-01-01T00:00:00.080Z,d1,l4,",
"1971-01-01T00:00:10.000Z,d1,l5,1970-01-01T00:00:00.100Z,d1,l5,",
"1971-08-20T11:33:20.000Z,d1,l5,1970-01-01T00:00:00.100Z,d1,l5,"
};
// test multi join conditions
tableResultSetEqualTest(
"select table0.time,table0.device,table0.level,table1.time,table1.device,table1.level from table0 asof join table1 on "
+ "table0.device=table1.device and table1.level=table0.level and table0.time>table1.time "
+ "order by table0.device,table0.level,table0.time,table1.device,table1.level",
expectedHeader,
retArray,
DATABASE_NAME);
// test expr and '>=' in ASOF condition
tableResultSetEqualTest(
"select table0.time,table0.device,table0.level,table1.time,table1.device,table1.level from table0 asof join table1 on "
+ "table0.device=table1.device and table1.level=table0.level and table0.time>=table1.time+1 "
+ "order by table0.device,table0.level,table0.time,table1.device,table1.level",
expectedHeader,
retArray,
DATABASE_NAME);

retArray =
new String[] {
"1970-01-01T00:00:00.000Z,d1,l1,1970-01-01T00:00:00.010Z,d11,l11,",
"1970-01-01T00:00:00.020Z,d1,l2,1970-01-01T00:00:00.030Z,d11,l11,",
"1970-01-01T00:00:00.040Z,d1,l3,1970-01-01T00:00:00.080Z,d1,l4,",
"1970-01-01T00:00:00.080Z,d1,l4,1970-01-01T00:00:00.100Z,d1,l5,",
"1970-01-01T00:00:00.100Z,d1,l5,1971-01-01T00:00:00.000Z,d1,l1,",
"1970-01-01T00:00:00.100Z,d1,l5,1971-01-01T00:00:00.000Z,d999,null,",
"1970-01-01T00:00:00.100Z,d1,l5,1971-01-01T00:00:00.000Z,null,l999,",
"1970-01-01T00:00:00.000Z,d2,l1,1970-01-01T00:00:00.010Z,d11,l11,",
"1970-01-01T00:00:00.020Z,d2,l2,1970-01-01T00:00:00.030Z,d11,l11,",
"1970-01-01T00:00:00.040Z,d2,l3,1970-01-01T00:00:00.080Z,d1,l4,"
};
// test single join condition
tableResultSetEqualTest(
"select table0.time,table0.device,table0.level,table1.time,table1.device,table1.level from table0 asof join table1 on "
+ "table0.time<table1.time "
+ "order by table0.device,table0.level,table0.time,table1.device,table1.level limit 10",
expectedHeader,
retArray,
DATABASE_NAME);
// test expr and '<=' in ASOF condition
tableResultSetEqualTest(
"select table0.time,table0.device,table0.level,table1.time,table1.device,table1.level from table0 asof join table1 on "
+ "table0.time<=table1.time-1 "
+ "order by table0.device,table0.level,table0.time,table1.device,table1.level limit 10",
expectedHeader,
retArray,
DATABASE_NAME);

retArray =
new String[] {
"1970-01-01T00:00:00.000Z,d1,l1,1971-01-01T00:00:00.000Z,d1,l1,",
};
// test multi join conditions
tableResultSetEqualTest(
"select table0.time,table0.device,table0.level,table1.time,table1.device,table1.level from table0 asof join table1 on "
+ "table0.device=table1.device and table1.level=table0.level and table0.time<table1.time "
+ "order by table0.device,table0.level,table0.time,table1.device,table1.level",
expectedHeader,
retArray,
DATABASE_NAME);
// test expr and '>=' in ASOF condition
tableResultSetEqualTest(
"select table0.time,table0.device,table0.level,table1.time,table1.device,table1.level from table0 asof join table1 on "
+ "table0.device=table1.device and table1.level=table0.level and table0.time<=table1.time-1 "
+ "order by table0.device,table0.level,table0.time,table1.device,table1.level",
expectedHeader,
retArray,
DATABASE_NAME);
}

@Test
public void exceptionTest() {
String errMsg = TSStatusCode.SEMANTIC_ERROR.getStatusCode() + ": " + ONLY_SUPPORT_EQUI_JOIN;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.comparator;

import org.apache.tsfile.read.common.block.TsBlock;

import java.util.Optional;

// This Comparator is used to handle the case where the join condition is l<=r, making its interface
// consistent with the case that l<r, such unity helps to avoid branch judgments during the process
// of operator.
public class AscLongTypeIgnoreEqualJoinKeyComparator implements JoinKeyComparator {

private static final AscLongTypeIgnoreEqualJoinKeyComparator INSTANCE =
new AscLongTypeIgnoreEqualJoinKeyComparator();

private AscLongTypeIgnoreEqualJoinKeyComparator() {
// hide constructor
}

public static AscLongTypeIgnoreEqualJoinKeyComparator getInstance() {
return INSTANCE;
}

@Override
public Optional<Boolean> lessThan(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
|| right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
return Optional.empty();
}

return Optional.of(
left.getColumn(leftColumnIndex).getLong(leftRowIndex)
<= right.getColumn(rightColumnIndex).getLong(rightRowIndex));
}

@Override
public Optional<Boolean> equalsTo(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
|| right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
return Optional.empty();
}

return Optional.of(
left.getColumn(leftColumnIndex).getLong(leftRowIndex)
== right.getColumn(rightColumnIndex).getLong(rightRowIndex));
}

@Override
public Optional<Boolean> lessThanOrEqual(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
|| right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
return Optional.empty();
}

return Optional.of(
left.getColumn(leftColumnIndex).getLong(leftRowIndex)
< right.getColumn(rightColumnIndex).getLong(rightRowIndex));
}
}
Loading
Loading