Skip to content

Commit

Permalink
fix meta
Browse files Browse the repository at this point in the history
  • Loading branch information
mymeiyi committed Apr 8, 2024
1 parent 3fb042c commit 8ed7e2d
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TxnCommitAttachment;

import com.google.gson.annotations.SerializedName;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
Expand All @@ -33,13 +35,20 @@
* It is used to edit the job final state.
*/
public class LoadJobFinalOperation extends TxnCommitAttachment implements Writable {
@SerializedName(value = "id")
private long id;
@SerializedName(value = "loadingStatus")
private EtlStatus loadingStatus = new EtlStatus();
@SerializedName(value = "progress")
private int progress;
@SerializedName(value = "loadStartTimestamp")
private long loadStartTimestamp;
@SerializedName(value = "finishTimestamp")
private long finishTimestamp;
@SerializedName(value = "jobState")
private JobState jobState;
// optional
@SerializedName(value = "failMsg")
private FailMsg failMsg;

public LoadJobFinalOperation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,19 @@
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TxnCommitAttachment;

import com.google.gson.annotations.SerializedName;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class MiniLoadTxnCommitAttachment extends TxnCommitAttachment {
@SerializedName(value = "loadedRows")
private long loadedRows;
@SerializedName(value = "filteredRows")
private long filteredRows;
// optional
@SerializedName(value = "errorLogUrl")
private String errorLogUrl;

public MiniLoadTxnCommitAttachment() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TxnCommitAttachment;

import com.google.gson.annotations.SerializedName;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
Expand All @@ -33,11 +35,17 @@ public class RLTaskTxnCommitAttachment extends TxnCommitAttachment {

private long jobId;
private TUniqueId taskId;
@SerializedName(value = "filteredRows")
private long filteredRows;
@SerializedName(value = "loadedRows")
private long loadedRows;
@SerializedName(value = "unselectedRows")
private long unselectedRows;
@SerializedName(value = "receivedBytes")
private long receivedBytes;
@SerializedName(value = "taskExecutionTimeMs")
private long taskExecutionTimeMs;
@SerializedName(value = "progress")
private RoutineLoadProgress progress;
private String errorLogUrl;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,11 @@
import org.apache.doris.job.extensions.insert.InsertJob;
import org.apache.doris.job.extensions.mtmv.MTMVJob;
import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo;
import org.apache.doris.load.loadv2.LoadJobFinalOperation;
import org.apache.doris.load.loadv2.MiniLoadTxnCommitAttachment;
import org.apache.doris.load.loadv2.SparkLoadJob.SparkLoadJobStateUpdateInfo;
import org.apache.doris.load.routineload.AbstractDataSourceProperties;
import org.apache.doris.load.routineload.RLTaskTxnCommitAttachment;
import org.apache.doris.load.routineload.kafka.KafkaDataSourceProperties;
import org.apache.doris.load.sync.SyncJob;
import org.apache.doris.load.sync.canal.CanalSyncJob;
Expand All @@ -117,6 +120,7 @@
import org.apache.doris.system.BrokerHbResponse;
import org.apache.doris.system.FrontendHbResponse;
import org.apache.doris.system.HeartbeatResponse;
import org.apache.doris.transaction.TxnCommitAttachment;

import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
Expand Down Expand Up @@ -331,6 +335,14 @@ public class GsonUtils {
.registerSubtype(Partition.class, Partition.class.getSimpleName())
.registerSubtype(CloudPartition.class, CloudPartition.class.getSimpleName());

// runtime adapter for class "TxnCommitAttachment".
private static RuntimeTypeAdapterFactory<TxnCommitAttachment> txnCommitAttachmentTypeAdapterFactory
= RuntimeTypeAdapterFactory.of(TxnCommitAttachment.class, "clazz")
.registerDefaultSubtype(TxnCommitAttachment.class)
.registerSubtype(LoadJobFinalOperation.class, LoadJobFinalOperation.class.getSimpleName())
.registerSubtype(MiniLoadTxnCommitAttachment.class, MiniLoadTxnCommitAttachment.class.getSimpleName())
.registerSubtype(RLTaskTxnCommitAttachment.class, RLTaskTxnCommitAttachment.class.getSimpleName());

// the builder of GSON instance.
// Add any other adapters if necessary.
private static final GsonBuilder GSON_BUILDER = new GsonBuilder().addSerializationExclusionStrategy(
Expand All @@ -356,6 +368,7 @@ public class GsonUtils {
.registerTypeAdapterFactory(jobExecutorRuntimeTypeAdapterFactory)
.registerTypeAdapterFactory(mtmvSnapshotTypeAdapterFactory)
.registerTypeAdapterFactory(constraintTypeAdapterFactory)
.registerTypeAdapterFactory(txnCommitAttachmentTypeAdapterFactory)
.registerTypeAdapter(ImmutableMap.class, new ImmutableMapDeserializer())
.registerTypeAdapter(AtomicBoolean.class, new AtomicBooleanAdapter())
.registerTypeAdapter(PartitionKey.class, new PartitionKey.PartitionKeySerializer())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.doris.transaction;

import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.load.loadv2.JobState;
import org.apache.doris.load.loadv2.LoadJobFinalOperation;
import org.apache.doris.meta.MetaContext;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
Expand All @@ -40,11 +42,12 @@
public class TransactionStateTest {

private static String fileName = "./TransactionStateTest";
private static String fileName2 = "./TransactionStateTest2";

@After
public void tearDown() {
File file = new File(fileName);
file.delete();
new File(fileName).delete();
new File(fileName2).delete();
}

@Test
Expand Down Expand Up @@ -76,4 +79,39 @@ LoadJobSourceType.BACKEND_STREAMING, new TxnCoordinator(TxnSourceType.BE, "127.0
in.close();
}

@Test
public void testSerDeWithTxnCommitAttachment() throws IOException {
MetaContext metaContext = new MetaContext();
metaContext.setMetaVersion(FeMetaVersion.VERSION_CURRENT);
metaContext.setThreadLocalInfo();

// 1. Write objects to file
File file = new File(fileName2);
file.createNewFile();
DataOutputStream out = new DataOutputStream(new FileOutputStream(file));

UUID uuid = UUID.randomUUID();
TransactionState transactionState = new TransactionState(1000L, Lists.newArrayList(20000L, 20001L),
3000, "label123", new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()),
LoadJobSourceType.BACKEND_STREAMING, new TxnCoordinator(TxnSourceType.BE, "127.0.0.1"),
TransactionStatus.COMMITTED, "", 100, 50000L,
new LoadJobFinalOperation(1000L, null, 0, 0, 0, JobState.FINISHED, null),
100, 200, 300, 400);

transactionState.write(out);
out.flush();
out.close();

// 2. Read objects from file
DataInputStream in = new DataInputStream(new FileInputStream(file));
TransactionState readTransactionState = TransactionState.read(in);

Assert.assertEquals(TransactionState.LoadJobSourceType.BATCH_LOAD_JOB,
readTransactionState.getTxnCommitAttachment().sourceType);
Assert.assertTrue(readTransactionState.getTxnCommitAttachment() instanceof LoadJobFinalOperation);
LoadJobFinalOperation loadJobFinalOperation
= (LoadJobFinalOperation) (readTransactionState.getTxnCommitAttachment());
Assert.assertEquals(1000L, loadJobFinalOperation.getId());
in.close();
}
}

0 comments on commit 8ed7e2d

Please sign in to comment.