Skip to content

Commit

Permalink
Samza-2287: Removing RegEx based matching while parsing the sql state…
Browse files Browse the repository at this point in the history
…ment (#1121)

* Removing RegEx while parsing the sql statement

* Adding few more parser tests

* Adding verbose logging
  • Loading branch information
srinipunuru authored Jul 31, 2019
1 parent 1e6f1fd commit 179d394
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,15 @@
import org.apache.samza.SamzaException;
import org.apache.samza.sql.interfaces.SamzaSqlDriver;
import org.apache.samza.sql.interfaces.SamzaSqlJavaTypeFactoryImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Utility class that is used to parse the Samza sql query to figure out the sources, sink etc..
*/
public class SamzaSqlQueryParser {
private static final Logger LOG = LoggerFactory.getLogger(SamzaSqlQueryParser.class);

private SamzaSqlQueryParser() {
}
Expand Down Expand Up @@ -94,13 +97,6 @@ public String getSql() {
}

public static QueryInfo parseQuery(String sql) {

Pattern insertIntoSqlPattern = Pattern.compile("insert into (.*) (select .* from (.*))", Pattern.CASE_INSENSITIVE);
Matcher m = insertIntoSqlPattern.matcher(sql);
if (!m.matches()) {
throw new SamzaException("Invalid query format");
}

Planner planner = createPlanner();
SqlNode sqlNode;
try {
Expand All @@ -117,13 +113,20 @@ public static QueryInfo parseQuery(String sql) {
sink = sqlInsert.getTargetTable().toString();
if (sqlInsert.getSource() instanceof SqlSelect) {
SqlSelect sqlSelect = (SqlSelect) sqlInsert.getSource();
selectQuery = m.group(2);
selectQuery = sqlSelect.toString();
LOG.info("Parsed select query {} from sql {}", selectQuery, sql);
sources = getSourcesFromSelectQuery(sqlSelect);
} else {
throw new SamzaException("Sql query is not of the expected format");
String msg = String.format("Sql query is not of the expected format. Select node expected, found %s",
sqlInsert.getSource().getClass().toString());
LOG.error(msg);
throw new SamzaException(msg);
}
} else {
throw new SamzaException("Sql query is not of the expected format");
String msg = String.format("Sql query is not of the expected format. Insert node expected, found %s",
sqlNode.getClass().toString());
LOG.error(msg);
throw new SamzaException(msg);
}

return new QueryInfo(selectQuery, sources, sink, sql);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,66 @@

import org.apache.samza.SamzaException;
import org.apache.samza.sql.util.SamzaSqlQueryParser.QueryInfo;
import org.junit.Assert;
import org.junit.Test;

import junit.framework.Assert;

public class TestSamzaSqlQueryParser {

@Test
public void testParseQuery() {
QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery("insert into log.foo select * from tracking.bar");
Assert.assertEquals("log.foo", queryInfo.getSink());
Assert.assertEquals(queryInfo.getSelectQuery(), "select * from tracking.bar", queryInfo.getSelectQuery());
Assert.assertEquals(1, queryInfo.getSources().size());
Assert.assertEquals("tracking.bar", queryInfo.getSources().get(0));
}

@Test
public void testParseGroupyByQuery() {
QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery("insert into log.foo select b.pageKey, count(*) from tracking.bar as b group by b.pageKey");
Assert.assertEquals("log.foo", queryInfo.getSink());
Assert.assertEquals(1, queryInfo.getSources().size());
Assert.assertEquals("tracking.bar", queryInfo.getSources().get(0));
}

@Test
public void testParseUnNestSubQuery() {
QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery("insert into log.foo SELECT * FROM unnest(SELECT int_array_field1 FROM tracking.bar) ");
Assert.assertEquals("log.foo", queryInfo.getSink());
Assert.assertEquals(1, queryInfo.getSources().size());
Assert.assertEquals("tracking.bar", queryInfo.getSources().get(0));
}

@Test
public void testParseJoinSubQuery() {
String sql =
"Insert into testavro.enrichedPageViewTopic"
+ " select p.name as profileName, pv.pageKey"
+ " from (SELECT * FROM testavro.PAGEVIEW pv1 where pv1.field1='foo') as pv"
+ " join testavro.PROFILE.`$table` as p"
+ " on p.id = pv.profileId";
QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery(sql);
Assert.assertEquals("testavro.enrichedPageViewTopic", queryInfo.getSink());
Assert.assertEquals(2, queryInfo.getSources().size());
Assert.assertEquals("testavro.PAGEVIEW", queryInfo.getSources().get(0));
Assert.assertEquals("testavro.PROFILE.$table", queryInfo.getSources().get(1));
}

@Test
public void testParseJoinUnNestQuery() {
String sql =
"Insert into testavro.enrichedPageViewTopic"
+ " select p.name as profileName, pv.pageKey"
+ " from unnest(SELECT int_array_field1 FROM testavro.PAGEVIEW) as pv"
+ " join testavro.PROFILE.`$table` as p"
+ " on p.id = pv.profileId";
QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery(sql);
Assert.assertEquals("testavro.enrichedPageViewTopic", queryInfo.getSink());
Assert.assertEquals(2, queryInfo.getSources().size());
Assert.assertEquals("testavro.PAGEVIEW", queryInfo.getSources().get(0));
Assert.assertEquals("testavro.PROFILE.$table", queryInfo.getSources().get(1));
}

@Test
public void testParseJoinQuery() {
String sql =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ public void testEndToEndWithProjection() throws Exception {
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
String sql1 = "Insert into testavro.outputTopic(id, long_value) "
+ " select id, TIMESTAMPDIFF(HOUR, CURRENT_TIMESTAMP, LOCALTIMESTAMP) + MONTH(CURRENT_DATE) as long_value from testavro.SIMPLE1";
+ " select id, TIMESTAMPDIFF(HOUR, CURRENT_TIMESTAMP(), LOCALTIMESTAMP()) + MONTH(CURRENT_DATE()) as long_value from testavro.SIMPLE1";
List<String> sqlStmts = Arrays.asList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
runApplication(new MapConfig(staticConfigs));
Expand Down

0 comments on commit 179d394

Please sign in to comment.