Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,14 @@ public boolean isTimeout() {
return System.currentTimeMillis() - createTimeMs > timeoutMs;
}

public long getCreateTimeMs() {
return createTimeMs;
}

public void setCreateTimeMs(long createTimeMs) {
this.createTimeMs = createTimeMs;
}

public boolean isExpire() {
return isDone() && (System.currentTimeMillis() - finishedTimeMs) / 1000 > Config.history_job_keep_max_second;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2427,12 +2427,12 @@ public int getAsInt() {
long jobId = Env.getCurrentEnv().getNextId();
//for schema change add/drop value column optimize, direct modify table meta.
modifyTableLightSchemaChange(rawSql, db, olapTable, indexSchemaMap, newIndexes,
null, isDropIndex, jobId, false, propertyMap);
null, isDropIndex, jobId, false, propertyMap, null, null);
} else if (Config.enable_light_index_change && lightIndexChange) {
long jobId = Env.getCurrentEnv().getNextId();
//for schema change add/drop inverted index and ngram_bf optimize, direct modify table meta firstly.
modifyTableLightSchemaChange(rawSql, db, olapTable, indexSchemaMap, newIndexes,
alterIndexes, isDropIndex, jobId, false, propertyMap);
alterIndexes, isDropIndex, jobId, false, propertyMap, null, null);
} else if (buildIndexChange) {
if (alterIndexes.isEmpty()) {
throw new DdlException("Altered index is empty. please check your alter stmt.");
Expand Down Expand Up @@ -3194,7 +3194,8 @@ public void replayAlterJobV2(AlterJobV2 alterJob) throws AnalysisException {
public void modifyTableLightSchemaChange(String rawSql, Database db, OlapTable olapTable,
Map<Long, LinkedList<Column>> indexSchemaMap, List<Index> indexes,
List<Index> alterIndexes, boolean isDropIndex,
long jobId, boolean isReplay, Map<String, String> propertyMap)
long jobId, boolean isReplay, Map<String, String> propertyMap,
Long createTimeMs, Long finishedTimeMs)
throws DdlException, AnalysisException {

if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -3225,6 +3226,10 @@ public void modifyTableLightSchemaChange(String rawSql, Database db, OlapTable o
SchemaChangeJobV2 schemaChangeJob = AlterJobV2Factory.createSchemaChangeJobV2(
rawSql, jobId, db.getId(), olapTable.getId(),
olapTable.getName(), 1000);
if (createTimeMs != null) {
schemaChangeJob.setCreateTimeMs(createTimeMs);
}
long jobFinishedTimeMs = finishedTimeMs != null ? finishedTimeMs : System.currentTimeMillis();

for (Map.Entry<Long, List<Column>> entry : changedIndexIdToSchema.entrySet()) {
long originIndexId = entry.getKey();
Expand All @@ -3251,7 +3256,8 @@ public void modifyTableLightSchemaChange(String rawSql, Database db, OlapTable o
if (alterIndexes != null) {
if (!isReplay) {
TableAddOrDropInvertedIndicesInfo info = new TableAddOrDropInvertedIndicesInfo(rawSql, db.getId(),
olapTable.getId(), indexSchemaMap, indexes, alterIndexes, isDropIndex, jobId);
olapTable.getId(), indexSchemaMap, indexes, alterIndexes, isDropIndex, jobId,
schemaChangeJob.getCreateTimeMs(), jobFinishedTimeMs);
if (LOG.isDebugEnabled()) {
LOG.debug("logModifyTableAddOrDropInvertedIndices info:{}", info);
}
Expand Down Expand Up @@ -3284,7 +3290,8 @@ public void modifyTableLightSchemaChange(String rawSql, Database db, OlapTable o
Map<String, Long> indexNameToId = new HashMap<>(olapTable.getIndexNameToId());
TableAddOrDropColumnsInfo info = new TableAddOrDropColumnsInfo(
rawSql, db.getId(), olapTable.getId(), olapTable.getBaseIndexId(),
indexSchemaMap, oldIndexSchemaMap, indexNameToId, indexes, jobId);
indexSchemaMap, oldIndexSchemaMap, indexNameToId, indexes, jobId,
schemaChangeJob.getCreateTimeMs(), jobFinishedTimeMs);
if (LOG.isDebugEnabled()) {
LOG.debug("logModifyTableAddOrDropColumns info:{}", info);
}
Expand Down Expand Up @@ -3321,7 +3328,7 @@ public void modifyTableLightSchemaChange(String rawSql, Database db, OlapTable o
// add job after edit log
// set Job state then add job
schemaChangeJob.setJobState(AlterJobV2.JobState.FINISHED);
schemaChangeJob.setFinishedTimeMs(System.currentTimeMillis());
schemaChangeJob.setFinishedTimeMs(jobFinishedTimeMs);
this.addAlterJobV2(schemaChangeJob);
}

Expand All @@ -3342,7 +3349,7 @@ public void replayModifyTableLightSchemaChange(TableAddOrDropColumnsInfo info)
olapTable.writeLock();
try {
modifyTableLightSchemaChange("", db, olapTable, indexSchemaMap, indexes, null, false, jobId,
true, new HashMap<>());
true, new HashMap<>(), info.getCreateTimeMs(), info.getFinishedTimeMs());
} catch (DdlException e) {
// should not happen
LOG.warn("failed to replay modify table add or drop or modify columns", e);
Expand Down Expand Up @@ -3483,7 +3490,8 @@ public void replayModifyTableAddOrDropInvertedIndices(TableAddOrDropInvertedIndi
olapTable.writeLock();
try {
modifyTableLightSchemaChange("", db, olapTable, indexSchemaMap, newIndexes,
alterIndexes, isDropIndex, jobId, true, new HashMap<>());
alterIndexes, isDropIndex, jobId, true, new HashMap<>(),
info.getCreateTimeMs(), info.getFinishedTimeMs());
} catch (UserException e) {
// should not happen
LOG.warn("failed to replay modify table add or drop indexes", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,18 @@ public class TableAddOrDropColumnsInfo implements Writable {
private List<Index> indexes;
@SerializedName(value = "jobId")
private long jobId;
@SerializedName(value = "createTimeMs")
private Long createTimeMs;
@SerializedName(value = "finishedTimeMs")
private Long finishedTimeMs;
@SerializedName(value = "rawSql")
private String rawSql;

public TableAddOrDropColumnsInfo(String rawSql, long dbId, long tableId, long baseIndexId,
Map<Long, LinkedList<Column>> indexSchemaMap,
Map<Long, List<Column>> oldIndexSchemaMap,
Map<String, Long> indexNameToId,
List<Index> indexes, long jobId) {
List<Index> indexes, long jobId, Long createTimeMs, Long finishedTimeMs) {
this.rawSql = rawSql;
this.dbId = dbId;
this.tableId = tableId;
Expand All @@ -69,6 +73,8 @@ public TableAddOrDropColumnsInfo(String rawSql, long dbId, long tableId, long ba
this.indexNameToId = indexNameToId;
this.indexes = indexes;
this.jobId = jobId;
this.createTimeMs = createTimeMs;
this.finishedTimeMs = finishedTimeMs;
}

public long getDbId() {
Expand All @@ -91,6 +97,14 @@ public long getJobId() {
return jobId;
}

public Long getCreateTimeMs() {
return createTimeMs;
}

public Long getFinishedTimeMs() {
return finishedTimeMs;
}

@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,17 @@ public class TableAddOrDropInvertedIndicesInfo implements Writable {
private boolean isDropInvertedIndex;
@SerializedName(value = "jobId")
private long jobId;
@SerializedName(value = "createTimeMs")
private Long createTimeMs;
@SerializedName(value = "finishedTimeMs")
private Long finishedTimeMs;
@SerializedName(value = "rawSql")
private String rawSql;

public TableAddOrDropInvertedIndicesInfo(String rawSql, long dbId, long tableId,
Map<Long, LinkedList<Column>> indexSchemaMap, List<Index> indexes,
List<Index> alterInvertedIndexes, boolean isDropInvertedIndex,
long jobId) {
long jobId, Long createTimeMs, Long finishedTimeMs) {
this.rawSql = rawSql;
this.dbId = dbId;
this.tableId = tableId;
Expand All @@ -65,6 +69,8 @@ public TableAddOrDropInvertedIndicesInfo(String rawSql, long dbId, long tableId,
this.alterInvertedIndexes = alterInvertedIndexes;
this.isDropInvertedIndex = isDropInvertedIndex;
this.jobId = jobId;
this.createTimeMs = createTimeMs;
this.finishedTimeMs = finishedTimeMs;
}

public long getDbId() {
Expand Down Expand Up @@ -95,6 +101,14 @@ public long getJobId() {
return jobId;
}

public Long getCreateTimeMs() {
return createTimeMs;
}

public Long getFinishedTimeMs() {
return finishedTimeMs;
}

public String toJson() {
return GsonUtils.GSON.toJson(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public void testSerialization() throws IOException {
long dbId = 12345678;
long tableId = 87654321;
long jobId = 23456781;
long createTimeMs = 1234L;
long finishedTimeMs = 5678L;
LinkedList<Column> fullSchema = new LinkedList<>();
fullSchema.add(new Column("testCol1", ScalarType.createType(PrimitiveType.INT)));
fullSchema.add(new Column("testCol2", ScalarType.createType(PrimitiveType.VARCHAR)));
Expand All @@ -75,7 +77,7 @@ public void testSerialization() throws IOException {

TableAddOrDropColumnsInfo tableAddOrDropColumnsInfo1 = new TableAddOrDropColumnsInfo(
"", dbId, tableId, tableId,
indexSchemaMap, oldIndexSchemaMap, indexNameToId, indexes, jobId);
indexSchemaMap, oldIndexSchemaMap, indexNameToId, indexes, jobId, createTimeMs, finishedTimeMs);

String c1Json = GsonUtils.GSON.toJson(tableAddOrDropColumnsInfo1);
Text.writeString(out, c1Json);
Expand All @@ -93,6 +95,9 @@ public void testSerialization() throws IOException {
Assert.assertEquals(tableAddOrDropColumnsInfo1.getTableId(), tableAddOrDropColumnsInfo2.getTableId());
Assert.assertEquals(tableAddOrDropColumnsInfo1.getIndexSchemaMap(),
tableAddOrDropColumnsInfo2.getIndexSchemaMap());
Assert.assertEquals(tableAddOrDropColumnsInfo1.getCreateTimeMs(), tableAddOrDropColumnsInfo2.getCreateTimeMs());
Assert.assertEquals(tableAddOrDropColumnsInfo1.getFinishedTimeMs(),
tableAddOrDropColumnsInfo2.getFinishedTimeMs());

}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.persist;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Index;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.info.IndexType;
import org.apache.doris.common.io.Text;
import org.apache.doris.persist.gson.GsonUtils;

import com.google.common.collect.Lists;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

public class TableAddOrDropInvertedIndicesInfoTest {
private static String fileName = "./TableAddOrDropInvertedIndicesInfoTest";

@Test
public void testSerialization() throws IOException {
File file = new File(fileName);
file.createNewFile();

DataOutputStream out = new DataOutputStream(new FileOutputStream(file));

long dbId = 12345678;
long tableId = 87654321;
long jobId = 23456781;
long createTimeMs = 1234L;
long finishedTimeMs = 5678L;
LinkedList<Column> fullSchema = new LinkedList<>();
fullSchema.add(new Column("testCol1", ScalarType.createType(PrimitiveType.INT)));
fullSchema.add(new Column("testCol2", ScalarType.createType(PrimitiveType.VARCHAR)));

Map<Long, LinkedList<Column>> indexSchemaMap = new HashMap<>();
indexSchemaMap.put(tableId, fullSchema);

List<Index> indexes = Lists.newArrayList(
new Index(0, "index", Lists.newArrayList("testCol1"), IndexType.INVERTED, null, "xxxxxx"));
List<Index> alterIndexes = Lists.newArrayList(
new Index(1, "index_1", Lists.newArrayList("testCol2"), IndexType.INVERTED, null, "yyyyyy"));

TableAddOrDropInvertedIndicesInfo info1 = new TableAddOrDropInvertedIndicesInfo(
"", dbId, tableId, indexSchemaMap, indexes, alterIndexes, false, jobId, createTimeMs, finishedTimeMs);

Text.writeString(out, GsonUtils.GSON.toJson(info1));
out.flush();
out.close();

DataInputStream in = new DataInputStream(new FileInputStream(file));
String readJson = Text.readString(in);
TableAddOrDropInvertedIndicesInfo info2 = GsonUtils.GSON.fromJson(readJson,
TableAddOrDropInvertedIndicesInfo.class);

Assert.assertEquals(info1.getDbId(), info2.getDbId());
Assert.assertEquals(info1.getTableId(), info2.getTableId());
Assert.assertEquals(info1.getIndexSchemaMap(), info2.getIndexSchemaMap());
Assert.assertEquals(info1.getCreateTimeMs(), info2.getCreateTimeMs());
Assert.assertEquals(info1.getFinishedTimeMs(), info2.getFinishedTimeMs());
}

@After
public void tearDown() {
File file = new File(fileName);
file.delete();
}
}
Loading