Skip to content

Commit

Permalink
[Bugfix] Support "starrocks.filter.query" for Spark SQL read (#92)
Browse files Browse the repository at this point in the history
Signed-off-by: feihengye <qiangsheng.yet@gmail.com>
  • Loading branch information
feihengye committed Nov 3, 2023
1 parent 81f7b48 commit 8bbc057
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,11 @@ private[sql] class StarrocksRelation(
}

if (filters != null && filters.length > 0) {
paramWithScan += (ConfigurationOptions.STARROCKS_FILTER_QUERY -> filterWhereClause)
val userFilters = paramWithScan.get(ConfigurationOptions.STARROCKS_FILTER_QUERY)
.filter(filters => filters.nonEmpty)
.map(filters => " and (" + filters + ")")
.getOrElse("")
paramWithScan += (ConfigurationOptions.STARROCKS_FILTER_QUERY -> (filterWhereClause + userFilters))
}

new ScalaStarrocksRowRDD(sqlContext.sparkContext, paramWithScan.toMap, lazySchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,45 @@ public void testSql() throws Exception {
spark.stop();
}

@Test
public void testMultiTableJoinSql() throws Exception {
String tableName = "testSql_" + genRandomUuid();
prepareScoreBoardTable(tableName);

SparkSession spark = SparkSession
.builder()
.master("local[1]")
.appName("testSql")
.getOrCreate();

List<List<Object>> expectedData = new ArrayList<>();
expectedData.add(Arrays.asList(1, "2", 3));
expectedData.add(Arrays.asList(2, "3", 4));

String ddl = String.format("CREATE TEMPORARY VIEW sr_table \n" +
" USING starrocks\n" +
"OPTIONS(\n" +
" \"starrocks.table.identifier\"=\"%s\",\n" +
" \"starrocks.filter.query\"=\"%s\",\n" +
" \"starrocks.fe.http.url\"=\"%s\",\n" +
" \"starrocks.fe.jdbc.url\"=\"%s\",\n" +
" \"starrocks.user\"=\"%s\",\n" +
" \"starrocks.password\"=\"%s\"\n" +
")", String.join(".", DB_NAME, tableName), "id=1", FE_HTTP, FE_JDBC, USER, PASSWORD);
spark.sql(ddl);
spark.sql("INSERT INTO sr_table VALUES (1, \"2\", 3), (2, \"3\", 4)");

List<List<Object>> actualWriteData = scanTable(DB_CONNECTION, DB_NAME, tableName);
verifyResult(expectedData, actualWriteData);

List<List<Object>> joinExpectedData = new ArrayList<>();
joinExpectedData.add(Arrays.asList(1, "2", 3, 1, "2", 3));
List<Row> readRows = spark.sql("SELECT a.*, b.* FROM sr_table a join sr_table b on a.id=b.id").collectAsList();
verifyRows(joinExpectedData, readRows);

spark.stop();
}

@Test
public void testConditionalUpdates() throws Exception {
String tableName = "testConditionalUpdates_" + genRandomUuid();
Expand Down

0 comments on commit 8bbc057

Please sign in to comment.