Skip to content

Commit

Permalink
Change insert into to streaming
Browse files Browse the repository at this point in the history
* The non-streaming hint of insert into will use the streamin plan which is same as the plan of stream insert.
  It will also record the load info and return the label of insert stmt.
* The partition is supportted in insert into stmt. The result which meet the target partitions will be loaded.
  • Loading branch information
EmmyMiao87 committed May 23, 2019
1 parent 5e245e0 commit 23c4323
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 59 deletions.
3 changes: 3 additions & 0 deletions be/src/exec/broker_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ Status BrokerReader::open() {
try {
client->openReader(response, request);
} catch (apache::thrift::transport::TTransportException& e) {
usleep(1000 * 1000);
RETURN_IF_ERROR(client.reopen());
client->openReader(response, request);
}
Expand Down Expand Up @@ -143,6 +144,7 @@ Status BrokerReader::read(uint8_t* buf, size_t* buf_len, bool* eof) {
try {
client->pread(response, request);
} catch (apache::thrift::transport::TTransportException& e) {
usleep(1000 * 1000);
RETURN_IF_ERROR(client.reopen());
LOG(INFO) << "retry reading from broker: " << broker_addr << ". reason: " << e.what();
client->pread(response, request);
Expand Down Expand Up @@ -197,6 +199,7 @@ void BrokerReader::close() {
try {
client->closeReader(response, request);
} catch (apache::thrift::transport::TTransportException& e) {
usleep(1000 * 1000);
status = client.reopen();
if (!status.ok()) {
LOG(WARNING) << "Close broker reader failed. broker=" << broker_addr
Expand Down
40 changes: 24 additions & 16 deletions fe/src/main/java/org/apache/doris/analysis/InsertStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand All @@ -57,7 +58,22 @@
import java.util.Set;
import java.util.UUID;

// InsertStmt used to
/**
* Insert into is performed to load data from the result of query stmt.
* <p>
* syntax:
* INSERT INTO table_name [partition_info] [col_list] [plan_hints] query_stmt
* <p>
* table_name: is the name of target table
* partition_info: PARTITION (p1,p2)
* the partition info of target table
* col_list: (c1,c2)
* the column list of target table
* plan_hints: [STREAMING,SHUFFLE_HINT]
* The streaming plan is used by both streaming and non-streaming insert stmt.
* The only difference is that non-streaming will record the load info in LoadManager and return label.
* User can check the load info by show load stmt.
*/
public class InsertStmt extends DdlStmt {
private static final Logger LOG = LogManager.getLogger(InsertStmt.class);

Expand Down Expand Up @@ -239,16 +255,13 @@ public void analyze(Analyzer analyzer) throws UserException {
// if all previous job finished
UUID uuid = UUID.randomUUID();
String jobLabel = "insert_" + uuid;
LoadJobSourceType sourceType = isStreaming ? LoadJobSourceType.INSERT_STREAMING
: LoadJobSourceType.FRONTEND;
LoadJobSourceType sourceType = LoadJobSourceType.INSERT_STREAMING;
long timeoutSecond = ConnectContext.get().getSessionVariable().getQueryTimeoutS();
transactionId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(),
jobLabel, "FE: " + FrontendOptions.getLocalHostAddress(), sourceType, timeoutSecond);
if (isStreaming) {
OlapTableSink sink = (OlapTableSink) dataSink;
TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
sink.init(loadId, transactionId, db.getId());
}
OlapTableSink sink = (OlapTableSink) dataSink;
TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
sink.init(loadId, transactionId, db.getId());
}
}

Expand Down Expand Up @@ -498,13 +511,8 @@ public DataSink createDataSink() throws AnalysisException {
return dataSink;
}
if (targetTable instanceof OlapTable) {
if (isStreaming) {
dataSink = new OlapTableSink((OlapTable) targetTable, olapTuple);
dataPartition = dataSink.getOutputPartition();
} else {
dataSink = new DataSplitSink((OlapTable) targetTable, olapTuple);
dataPartition = dataSink.getOutputPartition();
}
dataSink = new OlapTableSink((OlapTable) targetTable, olapTuple, Joiner.on(",").join(targetPartitions));
dataPartition = dataSink.getOutputPartition();
} else if (targetTable instanceof BrokerTable) {
BrokerTable table = (BrokerTable) targetTable;
// TODO(lingbin): think use which one if have more than one path
Expand All @@ -525,7 +533,7 @@ public DataSink createDataSink() throws AnalysisException {
}

public void finalize() throws UserException {
if (isStreaming) {
if (targetTable instanceof OlapTable) {
((OlapTableSink) dataSink).finalize();
}
}
Expand Down
105 changes: 105 additions & 0 deletions fe/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.apache.doris.load.loadv2;

import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.FailMsg;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

/**
* The class is performed to record the finished info of insert load job.
* It is created after txn is visible which belongs to insert load job.
* The state of insert load job is always finished, so it will never be scheduled by JobScheduler.
*/
public class InsertLoadJob extends LoadJob {

private long tableId;

// only for log replay
public InsertLoadJob() {
super();
this.jobType = EtlJobType.INSERT;
}

public InsertLoadJob(String label, long dbId, long tableId, long createTimestamp) {
this.label = label;
this.dbId = dbId;
this.tableId = tableId;
this.createTimestamp = createTimestamp;
this.loadStartTimestamp = createTimestamp;
this.finishTimestamp = System.currentTimeMillis();
this.state = JobState.FINISHED;
this.progress = 100;
this.jobType = EtlJobType.INSERT;
}

@Override
protected Set<String> getTableNames() throws MetaNotFoundException {
Database database = Catalog.getCurrentCatalog().getDb(dbId);
if (database == null) {
throw new MetaNotFoundException("Database " + dbId + "has been deleted");
}
database.readLock();
try {
Table table = database.getTable(tableId);
if (table == null) {
throw new MetaNotFoundException("Failed to find table " + tableId + " in db " + dbId);
}
return new HashSet<>(Arrays.asList(table.getName()));
} finally {
database.readUnlock();
}
}

@Override
void executeJob() {
}

@Override
public void onTaskFinished(TaskAttachment attachment) {
}

@Override
public void onTaskFailed(long taskId, FailMsg failMsg) {
}

@Override
public void write(DataOutput out) throws IOException {
super.write(out);
out.writeLong(tableId);
}

@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
tableId = in.readLong();
}
}
10 changes: 6 additions & 4 deletions fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ protected static void checkDataSourceInfo(Database db, List<DataDescription> dat
}
}

abstract Set<String> getTableNames();
abstract Set<String> getTableNames() throws MetaNotFoundException;

public void isJobTypeRead(boolean jobTypeRead) {
isJobTypeRead = jobTypeRead;
Expand Down Expand Up @@ -331,7 +331,7 @@ public void cancelJobWithoutCheck(FailMsg failMsg) {
logFinalOperation();
}

public void cancelJob(FailMsg failMsg) throws DdlException {
public void cancelJob(FailMsg failMsg) throws DdlException, MetaNotFoundException {
writeLock();
try {
// check
Expand All @@ -357,7 +357,7 @@ public void cancelJob(FailMsg failMsg) throws DdlException {
logFinalOperation();
}

private void checkAuth() throws DdlException {
private void checkAuth() throws DdlException, MetaNotFoundException {
Database db = Catalog.getInstance().getDb(dbId);
if (db == null) {
throw new DdlException("Db does not exist. id: " + dbId);
Expand Down Expand Up @@ -467,7 +467,7 @@ public void unprotectReadEndOperation(LoadJobFinalOperation loadJobFinalOperatio
failMsg = loadJobFinalOperation.getFailMsg();
}

public List<Comparable> getShowInfo() throws DdlException {
public List<Comparable> getShowInfo() throws DdlException, MetaNotFoundException {
readLock();
try {
// check auth
Expand Down Expand Up @@ -537,6 +537,8 @@ public static LoadJob read(DataInput in) throws IOException {
EtlJobType type = EtlJobType.valueOf(Text.readString(in));
if (type == EtlJobType.BROKER) {
job = new BrokerLoadJob();
} else if (type == EtlJobType.INSERT) {
job = new InsertLoadJob();
} else {
throw new IOException("Unknown load type: " + type.name());
}
Expand Down
72 changes: 54 additions & 18 deletions fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.FailMsg;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
Expand All @@ -43,6 +46,7 @@
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.LinkedList;
Expand Down Expand Up @@ -87,7 +91,7 @@ public void createLoadJobFromStmt(LoadStmt stmt) throws DdlException {
throw new DdlException("LoadManager only support the broker load.");
}
loadJob = BrokerLoadJob.fromLoadStmt(stmt);
addLoadJob(loadJob);
createLoadJob(loadJob);
// submit it
loadJobScheduler.submitJob(loadJob);
} finally {
Expand All @@ -98,12 +102,19 @@ public void createLoadJobFromStmt(LoadStmt stmt) throws DdlException {
}

public void replayCreateLoadJob(LoadJob loadJob) {
addLoadJob(loadJob);
createLoadJob(loadJob);
LOG.info(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId())
.add("msg", "replay create load job")
.build());
}

private void createLoadJob(LoadJob loadJob) {
addLoadJob(loadJob);
// add callback before txn created, because callback will be performed on replay without txn begin
// register txn state listener
Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(loadJob);
}

private void addLoadJob(LoadJob loadJob) {
idToLoadJob.put(loadJob.getId(), loadJob);
long dbId = loadJob.getDbId();
Expand All @@ -115,12 +126,31 @@ private void addLoadJob(LoadJob loadJob) {
labelToLoadJobs.put(loadJob.getLabel(), new ArrayList<>());
}
labelToLoadJobs.get(loadJob.getLabel()).add(loadJob);
// add callback before txn created, because callback will be performed on replay without txn begin
// register txn state listener
Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(loadJob);
}

public void cancelLoadJob(CancelLoadStmt stmt) throws DdlException {
public void recordFinishedLoadJob(String label, String dbName, long tableId, EtlJobType jobType,
long createTimestamp) throws MetaNotFoundException {

// get db id
Database db = Catalog.getCurrentCatalog().getDb(dbName);
if (db == null) {
throw new MetaNotFoundException("Database[" + dbName + "] does not exist");
}

LoadJob loadJob;
switch (jobType) {
case INSERT:
loadJob = new InsertLoadJob(label, db.getId(), tableId, createTimestamp);
break;
default:
return;
}
addLoadJob(loadJob);
// persistent
Catalog.getCurrentCatalog().getEditLog().logCreateLoadJob(loadJob);
}

public void cancelLoadJob(CancelLoadStmt stmt) throws DdlException, MetaNotFoundException {
Database db = Catalog.getInstance().getDb(stmt.getDbName());
if (db == null) {
throw new DdlException("Db does not exist. name: " + stmt.getDbName());
Expand Down Expand Up @@ -221,17 +251,23 @@ public List<List<Comparable>> getLoadJobInfosByDb(long dbId, String labelValue,
try {
Map<String, List<LoadJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(dbId);
List<LoadJob> loadJobList = Lists.newArrayList();
// check label value
if (accurateMatch) {
if (!labelToLoadJobs.containsKey(labelValue)) {
return loadJobInfos;
}
loadJobList.addAll(labelToLoadJobs.get(labelValue));
}
// non-accurate match
for (Map.Entry<String, List<LoadJob>> entry : labelToLoadJobs.entrySet()) {
if (entry.getKey().contains(labelValue)) {
loadJobList.addAll(entry.getValue());
if (Strings.isNullOrEmpty(labelValue)) {
loadJobList.addAll(labelToLoadJobs.values()
.stream().flatMap(Collection::stream).collect(Collectors.toList()));
} else {
// check label value
if (accurateMatch) {
if (!labelToLoadJobs.containsKey(labelValue)) {
return loadJobInfos;
}
loadJobList.addAll(labelToLoadJobs.get(labelValue));
} else {
// non-accurate match
for (Map.Entry<String, List<LoadJob>> entry : labelToLoadJobs.entrySet()) {
if (entry.getKey().contains(labelValue)) {
loadJobList.addAll(entry.getValue());
}
}
}
}

Expand All @@ -243,7 +279,7 @@ public List<List<Comparable>> getLoadJobInfosByDb(long dbId, String labelValue,
}
// add load job info
loadJobInfos.add(loadJob.getShowInfo());
} catch (DdlException e) {
} catch (DdlException | MetaNotFoundException e) {
continue;
}
}
Expand Down
5 changes: 0 additions & 5 deletions fe/src/main/java/org/apache/doris/planner/Planner.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,6 @@ public void createPlanFragments(StatementBase statment, Analyzer analyzer, TQuer
if (statment instanceof InsertStmt) {
InsertStmt insertStmt = (InsertStmt) statment;
insertStmt.prepareExpressions();
if (insertStmt.getOlapTuple() != null && !insertStmt.isStreaming()) {
singleNodePlan = new OlapRewriteNode(plannerContext.getNextNodeId(), singleNodePlan, insertStmt);
singleNodePlan.init(analyzer);
resultExprs = insertStmt.getResultExprs();
}
}

// TODO chenhao16 , no used materialization work
Expand Down
Loading

0 comments on commit 23c4323

Please sign in to comment.