Skip to content

Commit

Permalink
Add QueryableRALExecuteEngine and UpdatableRALExecuteEngine
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Jan 25, 2024
1 parent 18cb9c9 commit 1001990
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 61 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.distsql.handler.type.ral.query;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.distsql.handler.type.ral.query.aware.ConnectionSizeAwareQueryableRALExecutor;
import org.apache.shardingsphere.distsql.handler.type.ral.query.aware.DatabaseAwareQueryableRALExecutor;
import org.apache.shardingsphere.distsql.handler.type.ral.query.aware.InstanceContextAwareQueryableRALExecutor;
import org.apache.shardingsphere.distsql.handler.util.DatabaseNameUtils;
import org.apache.shardingsphere.distsql.statement.ral.queryable.QueryableRALStatement;
import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.mode.manager.ContextManager;

import java.util.Collection;

/**
* Queryable RAL execute engine.
*/
@RequiredArgsConstructor
public abstract class QueryableRALExecuteEngine {

private final QueryableRALStatement sqlStatement;

private final String currentDatabaseName;

private final ContextManager contextManager;

@Getter
private Collection<String> columnNames;

@Getter
private Collection<LocalDataQueryResultRow> rows;

/**
* Execute query.
*
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public void executeQuery() {
QueryableRALExecutor executor = TypedSPILoader.getService(QueryableRALExecutor.class, sqlStatement.getClass());
rows = getRows(executor);
columnNames = executor.getColumnNames();
}

@SuppressWarnings({"unchecked", "rawtypes"})
private Collection<LocalDataQueryResultRow> getRows(final QueryableRALExecutor executor) {
if (executor instanceof InstanceContextAwareQueryableRALExecutor) {
((InstanceContextAwareQueryableRALExecutor) executor).setInstanceContext(contextManager.getInstanceContext());
}
if (executor instanceof DatabaseAwareQueryableRALExecutor) {
((DatabaseAwareQueryableRALExecutor) executor).setDatabase(getDatabase(DatabaseNameUtils.getDatabaseName(sqlStatement, currentDatabaseName)));
}
if (executor instanceof ConnectionSizeAwareQueryableRALExecutor) {
((ConnectionSizeAwareQueryableRALExecutor) executor).setConnectionSize(getConnectionSize());
}
return executor.getRows(sqlStatement, contextManager.getMetaDataContexts().getMetaData());
}

protected abstract ShardingSphereDatabase getDatabase(String databaseName);

protected abstract int getConnectionSize();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.distsql.handler.type.ral.update;

import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.distsql.handler.util.DatabaseNameUtils;
import org.apache.shardingsphere.distsql.statement.ral.updatable.UpdatableRALStatement;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;

import java.sql.SQLException;

/**
* Updatable RAL execute engine.
*/
@RequiredArgsConstructor
public abstract class UpdatableRALExecuteEngine {

private final UpdatableRALStatement sqlStatement;

private final String currentDatabaseName;

/**
* Execute update.
*
* @throws SQLException SQL exception
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public void executeUpdate() throws SQLException {
UpdatableRALExecutor executor = TypedSPILoader.getService(UpdatableRALExecutor.class, sqlStatement.getClass());
if (executor instanceof DatabaseAwareUpdatableRALExecutor) {
((DatabaseAwareUpdatableRALExecutor) executor).setDatabase(getDatabase(DatabaseNameUtils.getDatabaseName(sqlStatement, currentDatabaseName)));
}
executor.executeUpdate(sqlStatement);
}

protected abstract ShardingSphereDatabase getDatabase(String databaseName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,11 @@

package org.apache.shardingsphere.proxy.backend.handler.distsql.ral;

import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.distsql.handler.type.ral.query.aware.ConnectionSizeAwareQueryableRALExecutor;
import org.apache.shardingsphere.distsql.handler.type.ral.query.aware.DatabaseAwareQueryableRALExecutor;
import org.apache.shardingsphere.distsql.handler.type.ral.query.aware.InstanceContextAwareQueryableRALExecutor;
import org.apache.shardingsphere.distsql.handler.type.ral.query.QueryableRALExecutor;
import org.apache.shardingsphere.distsql.handler.type.ral.query.QueryableRALExecuteEngine;
import org.apache.shardingsphere.distsql.statement.ral.queryable.QueryableRALStatement;
import org.apache.shardingsphere.infra.merge.result.MergedResult;
import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataMergedResult;
import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.handler.distsql.DistSQLBackendHandler;
import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseCell;
Expand All @@ -35,7 +30,6 @@
import org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeader;
import org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import org.apache.shardingsphere.proxy.backend.util.DatabaseNameUtils;

import java.sql.SQLException;
import java.sql.Types;
Expand All @@ -46,50 +40,32 @@

/**
* Queryable RAL backend handler.
*
* @param <T> type of queryable RAL statement
*/
@RequiredArgsConstructor
public final class QueryableRALBackendHandler<T extends QueryableRALStatement> implements DistSQLBackendHandler {

private final T sqlStatement;
public final class QueryableRALBackendHandler extends QueryableRALExecuteEngine implements DistSQLBackendHandler {

private final ConnectionSession connectionSession;

private List<QueryHeader> queryHeaders;

private MergedResult mergedResult;

@SuppressWarnings("unchecked")
public QueryableRALBackendHandler(final QueryableRALStatement sqlStatement, final ConnectionSession connectionSession) {
super(sqlStatement, connectionSession.getDatabaseName(), ProxyContext.getInstance().getContextManager());
this.connectionSession = connectionSession;
}

@Override
public ResponseHeader execute() {
QueryableRALExecutor<T> executor = TypedSPILoader.getService(QueryableRALExecutor.class, sqlStatement.getClass());
mergedResult = getMergedResult(executor);
queryHeaders = createQueryHeader(executor.getColumnNames());
executeQuery();
mergedResult = new LocalDataMergedResult(getRows());
queryHeaders = createQueryHeader(getColumnNames());
return new QueryResponseHeader(queryHeaders);
}

private MergedResult getMergedResult(final QueryableRALExecutor<T> executor) {
if (executor instanceof InstanceContextAwareQueryableRALExecutor) {
((InstanceContextAwareQueryableRALExecutor<T>) executor).setInstanceContext(ProxyContext.getInstance().getContextManager().getInstanceContext());
}
if (executor instanceof DatabaseAwareQueryableRALExecutor) {
((DatabaseAwareQueryableRALExecutor<T>) executor).setDatabase(ProxyContext.getInstance().getDatabase(DatabaseNameUtils.getDatabaseName(sqlStatement, connectionSession)));
}
if (executor instanceof ConnectionSizeAwareQueryableRALExecutor) {
((ConnectionSizeAwareQueryableRALExecutor<T>) executor).setConnectionSize(connectionSession.getDatabaseConnectionManager().getConnectionSize());
}
return createMergedResult(executor.getRows(sqlStatement, ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData()));
}

private List<QueryHeader> createQueryHeader(final Collection<String> columnNames) {
return columnNames.stream().map(each -> new QueryHeader("", "", each, each, Types.CHAR, "CHAR", 255, 0, false, false, false, false)).collect(Collectors.toList());
}

private MergedResult createMergedResult(final Collection<LocalDataQueryResultRow> rows) {
return new LocalDataMergedResult(rows);
}

@Override
public boolean next() throws SQLException {
return null != mergedResult && mergedResult.next();
Expand All @@ -103,4 +79,14 @@ public QueryResponseRow getRowData() throws SQLException {
}
return new QueryResponseRow(cells);
}

@Override
protected ShardingSphereDatabase getDatabase(final String databaseName) {
return ProxyContext.getInstance().getDatabase(databaseName);
}

@Override
protected int getConnectionSize() {
return connectionSession.getDatabaseConnectionManager().getConnectionSize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public final class RALBackendHandlerFactory {
*/
public static ProxyBackendHandler newInstance(final RALStatement sqlStatement, final ConnectionSession connectionSession) {
return sqlStatement instanceof QueryableRALStatement
? new QueryableRALBackendHandler<>((QueryableRALStatement) sqlStatement, connectionSession)
: new UpdatableRALBackendHandler<>((UpdatableRALStatement) sqlStatement, connectionSession);
? new QueryableRALBackendHandler((QueryableRALStatement) sqlStatement, connectionSession)
: new UpdatableRALBackendHandler((UpdatableRALStatement) sqlStatement, connectionSession);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,37 @@

package org.apache.shardingsphere.proxy.backend.handler.distsql.ral;

import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.distsql.handler.type.ral.update.UpdatableRALExecutor;
import org.apache.shardingsphere.distsql.handler.type.ral.update.DatabaseAwareUpdatableRALExecutor;
import org.apache.shardingsphere.distsql.handler.type.ral.update.UpdatableRALExecuteEngine;
import org.apache.shardingsphere.distsql.statement.ral.updatable.UpdatableRALStatement;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.handler.distsql.DistSQLBackendHandler;
import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import org.apache.shardingsphere.proxy.backend.util.DatabaseNameUtils;

import java.sql.SQLException;

/**
* Updatable RAL backend handler.
*
* @param <T> type of SQL statement
*/
@RequiredArgsConstructor
public final class UpdatableRALBackendHandler<T extends UpdatableRALStatement> implements DistSQLBackendHandler {
public final class UpdatableRALBackendHandler extends UpdatableRALExecuteEngine implements DistSQLBackendHandler {

private final UpdatableRALStatement sqlStatement;

private final ConnectionSession connectionSession;
public UpdatableRALBackendHandler(final UpdatableRALStatement sqlStatement, final ConnectionSession connectionSession) {
super(sqlStatement, connectionSession.getDatabaseName());
this.sqlStatement = sqlStatement;
}

@SuppressWarnings("unchecked")
@Override
public ResponseHeader execute() throws SQLException {
UpdatableRALExecutor<T> updater = TypedSPILoader.getService(UpdatableRALExecutor.class, sqlStatement.getClass());
if (updater instanceof DatabaseAwareUpdatableRALExecutor) {
((DatabaseAwareUpdatableRALExecutor<T>) updater).setDatabase(ProxyContext.getInstance().getDatabase(DatabaseNameUtils.getDatabaseName(sqlStatement, connectionSession)));
}
updater.executeUpdate((T) sqlStatement);
executeUpdate();
return new UpdateResponseHeader(sqlStatement);
}

@Override
protected ShardingSphereDatabase getDatabase(final String databaseName) {
return ProxyContext.getInstance().getDatabase(databaseName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class QueryableRALBackendHandlerTest {
@Test
void assertExecuteWithNoDatabase() {
when(ProxyContext.getInstance().getDatabase(null)).thenThrow(NoDatabaseSelectedException.class);
assertThrows(NoDatabaseSelectedException.class, () -> new QueryableRALBackendHandler<>(mock(ExportDatabaseConfigurationStatement.class), mock(ConnectionSession.class)).execute());
assertThrows(NoDatabaseSelectedException.class, () -> new QueryableRALBackendHandler(mock(ExportDatabaseConfigurationStatement.class), mock(ConnectionSession.class)).execute());
}

@Test
Expand All @@ -74,12 +74,12 @@ void assertExecuteWithUnknownDatabase() {
when(ProxyContext.getInstance().getDatabase("unknown")).thenThrow(UnknownDatabaseException.class);
ContextManager contextManager = new ContextManager(metaDataContexts, mock(InstanceContext.class));
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
assertThrows(UnknownDatabaseException.class, () -> new QueryableRALBackendHandler<>(mock(ExportDatabaseConfigurationStatement.class), connectionSession).execute());
assertThrows(UnknownDatabaseException.class, () -> new QueryableRALBackendHandler(mock(ExportDatabaseConfigurationStatement.class), connectionSession).execute());
}

@Test
void assertExecuteWithAbstractStatement() {
assertThrows(ServiceProviderNotFoundException.class, () -> new QueryableRALBackendHandler<>(mock(QueryableRALStatement.class), mock(ConnectionSession.class)).execute());
assertThrows(ServiceProviderNotFoundException.class, () -> new QueryableRALBackendHandler(mock(QueryableRALStatement.class), mock(ConnectionSession.class)).execute());
}

@Test
Expand All @@ -89,7 +89,7 @@ void assertExecute() {
when(database.getProtocolType()).thenReturn(TypedSPILoader.getService(DatabaseType.class, "FIXTURE"));
when(database.getSchema("foo_db")).thenReturn(new ShardingSphereSchema(createTableMap(), Collections.emptyMap()));
when(ProxyContext.getInstance().getDatabase("foo_db")).thenReturn(database);
assertDoesNotThrow(() -> new QueryableRALBackendHandler<>(createSqlStatement(), mock(ConnectionSession.class)).execute());
assertDoesNotThrow(() -> new QueryableRALBackendHandler(createSqlStatement(), mock(ConnectionSession.class)).execute());
}

private Map<String, ShardingSphereTable> createTableMap() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ void assertEmptyResource() {
ShardingSphereDatabase database = mock(ShardingSphereDatabase.class);
when(database.getName()).thenReturn("foo_db");
when(ProxyContext.getInstance().getDatabase("foo_db")).thenReturn(database);
UpdatableRALBackendHandler<?> backendHandler = new UpdatableRALBackendHandler<>(new RefreshTableMetaDataStatement(), mockConnectionSession("foo_db"));
UpdatableRALBackendHandler backendHandler = new UpdatableRALBackendHandler(new RefreshTableMetaDataStatement(), mockConnectionSession("foo_db"));
assertThrows(EmptyStorageUnitException.class, backendHandler::execute);
}

Expand All @@ -68,7 +68,7 @@ void assertMissingRequiredResources() {
ContextManager contextManager = mock(ContextManager.class, RETURNS_DEEP_STUBS);
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
when(ProxyContext.getInstance().databaseExists("foo_db")).thenReturn(true);
UpdatableRALBackendHandler<?> backendHandler = new UpdatableRALBackendHandler<>(
UpdatableRALBackendHandler backendHandler = new UpdatableRALBackendHandler(
new RefreshTableMetaDataStatement("t_order", "ds_1", null), mockConnectionSession("foo_db"));
assertThrows(MissingRequiredStorageUnitsException.class, backendHandler::execute);
}
Expand All @@ -81,7 +81,7 @@ void assertUpdate() throws SQLException {
when(database.getName()).thenReturn("foo_db");
when(database.getProtocolType()).thenReturn(TypedSPILoader.getService(DatabaseType.class, "FIXTURE"));
when(ProxyContext.getInstance().getDatabase("foo_db")).thenReturn(database);
UpdatableRALBackendHandler<?> backendHandler = new UpdatableRALBackendHandler<>(new RefreshTableMetaDataStatement(), mockConnectionSession("foo_db"));
UpdatableRALBackendHandler backendHandler = new UpdatableRALBackendHandler(new RefreshTableMetaDataStatement(), mockConnectionSession("foo_db"));
ResponseHeader actual = backendHandler.execute();
assertThat(actual, instanceOf(UpdateResponseHeader.class));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ void setUp() {

@Test
void assertNotSupportedVariable() {
UpdatableRALBackendHandler<?> handler = new UpdatableRALBackendHandler<>(new SetDistVariableStatement("unsupported", "XXX"), connectionSession);
UpdatableRALBackendHandler handler = new UpdatableRALBackendHandler(new SetDistVariableStatement("unsupported", "XXX"), connectionSession);
assertThrows(UnsupportedVariableException.class, handler::execute);
}
}

0 comments on commit 1001990

Please sign in to comment.