Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add interface to auto clean mysql pendingSegments table #3831

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
94 changes: 94 additions & 0 deletions api/src/main/java/io/druid/timeline/TaskDataSegment.java
@@ -0,0 +1,94 @@
package io.druid.timeline;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing license header

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@haoxiang47 there are many whitespace changes, can we please revert them?
https://github.com/druid-io/druid/pull/3831/files?w=1 indicates there's only new code added

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Interval;

import java.util.Map;

/**
* Created by haoxiang on 16/11/11.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we generally dont have author info

*/
public class TaskDataSegment
{

private final String type;
private final String id;
private final String groupId;
private final String dataSource;
private final Map<String, Object> ioConfig;
private final Interval interval;

@JsonCreator
public TaskDataSegment(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure this class is needed? What is the class used for serde when writing to pending Segments table?

@JsonProperty("type") String type,
@JsonProperty("id") String id,
@JsonProperty("groupId") String groupId,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("ioConfig") Map<String, Object> ioConfig,
@JsonProperty("interval") Interval interval
)
{
this.type = type;
this.id = id;
this.groupId = groupId;
this.dataSource = dataSource;
this.ioConfig = ioConfig;
this.interval = interval;
}

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a weird comment

* Get dataSource
*
* @return the dataSource
*/

@JsonProperty
public String getType()
{
return type;
}

@JsonProperty
public String getId()
{
return id;
}

@JsonProperty
public String getGroupId()
{
return groupId;
}

@JsonProperty
public String getDataSource()
{
return dataSource;
}

@JsonProperty
public Map<String, Object> getIoConfig()
{
return ioConfig;
}

@JsonProperty
public Interval getInterval()
{
return interval;
}

@Override
public String toString()
{
return "TaskDataSegment{" +
"type=" + type +
", id=" + id +
", groupId=" + groupId +
", dataSource=" + dataSource +
'}';
}


}
Expand Up @@ -28,6 +28,7 @@
import io.druid.indexing.overlord.SegmentPublishResult;
import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TaskDataSegment;
import org.joda.time.Interval;

import java.io.IOException;
Expand All @@ -39,10 +40,12 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
final private Set<DataSegment> published = Sets.newConcurrentHashSet();
final private Set<DataSegment> nuked = Sets.newConcurrentHashSet();
final private List<DataSegment> unusedSegments;
final private List<TaskDataSegment> taskDataSegments;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inactiveTaskSegments


public TestIndexerMetadataStorageCoordinator()
{
unusedSegments = Lists.newArrayList();
taskDataSegments = Lists.newArrayList();
}

@Override
Expand Down Expand Up @@ -120,6 +123,12 @@ public void deleteSegments(Set<DataSegment> segments)
nuked.addAll(segments);
}

@Override
public List<TaskDataSegment> getNotActiveTask(final Interval interval){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InactiveTasks*


return ImmutableList.copyOf(taskDataSegments);
}

@Override
public void updateSegmentMetadata(Set<DataSegment> segments) throws IOException
{
Expand Down
Expand Up @@ -21,6 +21,7 @@

import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TaskDataSegment;
import org.joda.time.Interval;

import java.io.IOException;
Expand Down Expand Up @@ -141,4 +142,11 @@ SegmentPublishResult announceHistoricalSegments(
* @return DataSegments which include ONLY data within the requested interval and are not flagged as used. Data segments NOT returned here may include data in the interval
*/
List<DataSegment> getUnusedSegmentsForInterval(String dataSource, Interval interval);

/**
* Get all task which active property is 0, is used to delete the useless segments in pendingSegment table.
* @param interval Filter the tasks to ones that include tasks in this interval exclusively. Start is inclusive, end is exclusive
* @return The TaskDataSegment list which used to match the pendingSegment table's payload
*/
List<TaskDataSegment> getNotActiveTask(final Interval interval);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InactiveTasks

}
Expand Up @@ -43,6 +43,7 @@
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TaskDataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.LinearShardSpec;
Expand Down Expand Up @@ -879,6 +880,54 @@ private void updatePayload(final Handle handle, final DataSegment segment) throw
}
}

public boolean deletePendingSegments(final List<TaskDataSegment> segments) throws IOException
{
return connector.getDBI().inTransaction(
new TransactionCallback<Boolean>()
{
int res = 0;
@Override
public Boolean inTransaction(Handle handle, TransactionStatus transactionStatus) throws IOException
{
for (final TaskDataSegment segment : segments) {
if(deletePendingSegment(segment))
{
res +=1;
}
}

return res == segments.size();
}
}
);
}


public boolean deletePendingSegment(final TaskDataSegment taskDataSegment)
{
return connector.retryWithHandle(
new HandleCallback<Boolean>()
{
@Override
public Boolean withHandle(Handle handle) throws Exception
{
int rows = handle.createStatement(
String.format(
"DELETE from %1s WHERE sequence_name LIKE '%2s%%'",
dbTables.getPendingSegmentsTable(),
taskDataSegment.getIoConfig().get("baseSequenceName")
)
)
.execute();
log.info("Delete sequenceName:"+taskDataSegment.getIoConfig().get("baseSequenceName"));

return rows > 0;
}
}
);
}


@Override
public List<DataSegment> getUnusedSegmentsForInterval(final String dataSource, final Interval interval)
{
Expand Down Expand Up @@ -929,4 +978,55 @@ public List<DataSegment> fold(
log.info("Found %,d segments for %s for interval %s.", matchingSegments.size(), dataSource, interval);
return matchingSegments;
}


@Override
public List<TaskDataSegment> getNotActiveTask(final Interval interval)
{
List<TaskDataSegment> notActiveTasks = connector.inReadOnlyTransaction(
new TransactionCallback<List<TaskDataSegment>>()
{
@Override
public List<TaskDataSegment> inTransaction(Handle handle, TransactionStatus transactionStatus)
throws Exception
{
return handle.createQuery(
String.format(
"SELECT payload FROM %s WHERE active = :active",
dbTables.getTasksTable()
)
)
.bind("active", 0)
.map(ByteArrayMapper.FIRST)
.fold(
Lists.<TaskDataSegment>newArrayList(),
new Folder3<List<TaskDataSegment>, byte[]>()
{
@Override
public List<TaskDataSegment> fold(
List<TaskDataSegment> accumulator,
byte[] payload,
FoldController foldController,
StatementContext statementContext
) throws SQLException
{
try {
accumulator.add(jsonMapper.readValue(payload, TaskDataSegment.class));
return accumulator;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);

}
}
);

log.info("Found %,d tasks for interval %s.", notActiveTasks.size(), interval);
return notActiveTasks;
}

}
Expand Up @@ -28,6 +28,7 @@
import io.druid.indexing.overlord.SegmentPublishResult;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TaskDataSegment;
import io.druid.timeline.partition.LinearShardSpec;
import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.Interval;
Expand Down Expand Up @@ -99,6 +100,15 @@ public class IndexerSQLMetadataStorageCoordinatorTest
100
);

private final TaskDataSegment defaultTaskSegment = new TaskDataSegment(
"kafka",
"12345",
"test",
"datasource_test_1",
ImmutableMap.<String, Object>of("baseSequenceName","test_baseSequenceName"),
Interval.parse("2015-01-01T00Z/2015-01-02T00Z")
);

private final Set<DataSegment> SEGMENTS = ImmutableSet.of(defaultSegment, defaultSegment2);
private final AtomicLong metadataUpdateCounter = new AtomicLong();
private IndexerSQLMetadataStorageCoordinator coordinator;
Expand Down Expand Up @@ -713,4 +723,16 @@ public void testDeleteDataSourceMetadata() throws IOException

Assert.assertNull("getDataSourceMetadataNullAfterDelete", coordinator.getDataSourceMetadata("fooDataSource"));
}

@Test
public void testDeletePendingSegments() throws IOException
{
Assert.assertTrue("deletePendingSegment",coordinator.deletePendingSegments(
coordinator.getNotActiveTask(
defaultTaskSegment.getInterval()
)
)
);

}
}