Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add shuffleSegmentPusher for data shuffle #8115

Merged
merged 19 commits into from
Aug 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ public File getTaskWorkDir(String taskId)
return new File(getTaskDir(taskId), "work");
}

public File getTaskTempDir(String taskId)
{
return new File(getTaskDir(taskId), "temp");
}

public File getTaskLockFile(String taskId)
{
return new File(getTaskDir(taskId), "lock");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,21 @@
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.loading.StorageLocation;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CompressionUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;

import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -57,12 +60,14 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* This class manages intermediary segments for data shuffle between native parallel index tasks.
* In native parallel indexing, phase 1 tasks store segment files in local storage of middleManagers
* In native parallel indexing, phase 1 tasks store segment files in local storage of middleManagers (or indexer)
* and phase 2 tasks read those files via HTTP.
*
* The directory where segment files are placed is structured as
Expand All @@ -75,11 +80,12 @@
@ManageLifecycle
public class IntermediaryDataManager
{
private static final Logger log = new Logger(IntermediaryDataManager.class);
private static final Logger LOG = new Logger(IntermediaryDataManager.class);

private final long intermediaryPartitionDiscoveryPeriodSec;
private final long intermediaryPartitionCleanupPeriodSec;
private final Period intermediaryPartitionTimeout;
private final TaskConfig taskConfig;
private final List<StorageLocation> shuffleDataLocations;
private final IndexingServiceClient indexingServiceClient;

Expand Down Expand Up @@ -108,6 +114,7 @@ public IntermediaryDataManager(
this.intermediaryPartitionDiscoveryPeriodSec = workerConfig.getIntermediaryPartitionDiscoveryPeriodSec();
this.intermediaryPartitionCleanupPeriodSec = workerConfig.getIntermediaryPartitionCleanupPeriodSec();
this.intermediaryPartitionTimeout = workerConfig.getIntermediaryPartitionTimeout();
this.taskConfig = taskConfig;
this.shuffleDataLocations = taskConfig
.getShuffleDataLocations()
.stream()
Expand All @@ -119,6 +126,7 @@ public IntermediaryDataManager(
@LifecycleStart
public void start()
{
discoverSupervisorTaskPartitions();
supervisorTaskChecker = Execs.scheduledSingleThreaded("intermediary-data-manager-%d");
// Discover partitions for new supervisorTasks
supervisorTaskChecker.scheduleAtFixedRate(
Expand All @@ -127,7 +135,7 @@ public void start()
discoverSupervisorTaskPartitions();
}
catch (Exception e) {
log.warn(e, "Error while discovering supervisorTasks");
LOG.warn(e, "Error while discovering supervisorTasks");
}
},
intermediaryPartitionDiscoveryPeriodSec,
Expand All @@ -141,10 +149,10 @@ public void start()
deleteExpiredSuprevisorTaskPartitionsIfNotRunning();
}
catch (InterruptedException e) {
log.error(e, "Error while cleaning up partitions for expired supervisors");
LOG.error(e, "Error while cleaning up partitions for expired supervisors");
}
catch (Exception e) {
log.warn(e, "Error while cleaning up partitions for expired supervisors");
LOG.warn(e, "Error while cleaning up partitions for expired supervisors");
}
},
intermediaryPartitionCleanupPeriodSec,
Expand All @@ -163,9 +171,13 @@ public void stop() throws InterruptedException
supervisorTaskCheckTimes.clear();
}

/**
* IntermediaryDataManager periodically calls this method after it starts up to search for unknown intermediary data.
*/
private void discoverSupervisorTaskPartitions()
{
for (StorageLocation location : shuffleDataLocations) {
final Path locationPath = location.getPath().toPath().toAbsolutePath();
final MutableInt numDiscovered = new MutableInt(0);
final File[] dirsPerSupervisorTask = location.getPath().listFiles();
if (dirsPerSupervisorTask != null) {
Expand All @@ -174,13 +186,32 @@ private void discoverSupervisorTaskPartitions()
supervisorTaskCheckTimes.computeIfAbsent(
supervisorTaskId,
k -> {
for (File eachFile : FileUtils.listFiles(supervisorTaskDir, null, true)) {
final String relativeSegmentPath = locationPath
.relativize(eachFile.toPath().toAbsolutePath())
.toString();
// StorageLocation keeps track of how much storage capacity is being used.
// Newly found files should be known to the StorageLocation to keep it up to date.
final File reservedFile = location.reserve(
Copy link
Contributor

@himanshug himanshug Jul 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couldn't understand why we need to do this, can you please add some comments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

relativeSegmentPath,
eachFile.getName(),
eachFile.length()
);
if (reservedFile == null) {
LOG.warn("Can't add a discovered partition[%s]", eachFile.getAbsolutePath());
}
}
numDiscovered.increment();
return DateTimes.nowUtc().plus(intermediaryPartitionTimeout);
}
);
}
}
log.info("Discovered partitions for [%s] new supervisor tasks", numDiscovered.getValue());
LOG.info(
"Discovered partitions for [%s] new supervisor tasks under location[%s]",
numDiscovered.getValue(),
location.getPath()
);
}
}

Expand All @@ -203,7 +234,7 @@ private void deleteExpiredSuprevisorTaskPartitionsIfNotRunning() throws Interrup
}
}

log.info("Found [%s] expired supervisor tasks", expiredSupervisorTasks.size());
LOG.info("Found [%s] expired supervisor tasks", expiredSupervisorTasks.size());

final Map<String, TaskStatus> taskStatuses = indexingServiceClient.getTaskStatuses(expiredSupervisorTasks);
for (Entry<String, TaskStatus> entry : taskStatuses.entrySet()) {
Expand All @@ -215,7 +246,7 @@ private void deleteExpiredSuprevisorTaskPartitionsIfNotRunning() throws Interrup
deletePartitions(supervisorTaskId);
}
catch (IOException e) {
log.warn(e, "Failed to delete partitions for task[%s]", supervisorTaskId);
LOG.warn(e, "Failed to delete partitions for task[%s]", supervisorTaskId);
}
} else {
// If it's still running, update last access time.
Expand All @@ -227,17 +258,74 @@ private void deleteExpiredSuprevisorTaskPartitionsIfNotRunning() throws Interrup
/**
* Write a segment into one of configured locations. The location to write is chosen in a round-robin manner per
* supervisorTaskId.
*
* This method is only useful for the new Indexer model. Tasks running in the existing middleManager should the static
* addSegment method.
*/
public void addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentFile)
long addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentDir)
throws IOException
{
// Get or create the location iterator for supervisorTask.
final Iterator<StorageLocation> iterator = locationIterators.computeIfAbsent(
supervisorTaskId,
k -> Iterators.cycle(shuffleDataLocations)
k -> {
final Iterator<StorageLocation> cyclicIterator = Iterators.cycle(shuffleDataLocations);
// Random start of the iterator
final int random = ThreadLocalRandom.current().nextInt(shuffleDataLocations.size());
IntStream.range(0, random).forEach(i -> cyclicIterator.next());
return cyclicIterator;
}
);
addSegment(iterator, shuffleDataLocations.size(), supervisorTaskId, subTaskId, segment, segmentFile);

// Create a zipped segment in a temp directory.
final File taskTempDir = taskConfig.getTaskTempDir(subTaskId);

try (final Closer resourceCloser = Closer.create()) {
if (taskTempDir.mkdirs()) {
resourceCloser.register(() -> {
try {
FileUtils.forceDelete(taskTempDir);
}
catch (IOException e) {
LOG.warn(e, "Failed to delete directory[%s]", taskTempDir.getAbsolutePath());
}
});
}

// Tempary compressed file. Will be removed when taskTempDir is deleted.
final File tempZippedFile = new File(taskTempDir, segment.getId().toString());
final long unzippedSizeBytes = CompressionUtils.zip(segmentDir, tempZippedFile);
if (unzippedSizeBytes == 0) {
throw new IOE(
"Read 0 bytes from segmentDir[%s]",
segmentDir.getAbsolutePath()
);
}

// Try copying the zipped segment to one of storage locations
for (int i = 0; i < shuffleDataLocations.size(); i++) {
final StorageLocation location = iterator.next();
final String partitionFilePath = getPartitionFilePath(
supervisorTaskId,
subTaskId,
segment.getInterval(),
segment.getShardSpec().getPartitionNum()
);
final File destFile = location.reserve(partitionFilePath, segment.getId().toString(), tempZippedFile.length());
if (destFile != null) {
FileUtils.forceMkdirParent(destFile);
org.apache.druid.java.util.common.FileUtils.writeAtomically(
destFile,
out -> Files.asByteSource(tempZippedFile).copyTo(out)
);
LOG.info(
"Wrote intermediary segment for segment[%s] of subtask[%s] at [%s]",
segment.getId(),
subTaskId,
destFile
);
return unzippedSizeBytes;
}
}
throw new ISE("Can't find location to handle segment[%s]", segment);
}
}

public List<File> findPartitionFiles(String supervisorTaskId, Interval interval, int partitionId)
Expand All @@ -259,7 +347,7 @@ public void deletePartitions(String supervisorTaskId) throws IOException
for (StorageLocation location : shuffleDataLocations) {
final File supervisorTaskPath = new File(location.getPath(), supervisorTaskId);
if (supervisorTaskPath.exists()) {
log.info("Cleaning up [%s]", supervisorTaskPath);
LOG.info("Cleaning up [%s]", supervisorTaskPath);
for (File eachFile : FileUtils.listFiles(supervisorTaskPath, null, true)) {
location.removeFile(eachFile);
}
Expand All @@ -269,54 +357,6 @@ public void deletePartitions(String supervisorTaskId) throws IOException
supervisorTaskCheckTimes.remove(supervisorTaskId);
}

/**
* Iterate through the given storage locations to find one which can handle the given segment.
*/
public static void addSegment(
Iterator<StorageLocation> cyclicIterator,
int numLocations,
String supervisorTaskId,
String subTaskId,
DataSegment segment,
File segmentFile
)
{
for (int i = 0; i < numLocations; i++) {
final StorageLocation location = cyclicIterator.next();
final File destFile = location.reserve(
getPartitionFilePath(
supervisorTaskId,
subTaskId,
segment.getInterval(),
segment.getShardSpec().getPartitionNum()
),
segment.getId(),
segmentFile.length()
);
if (destFile != null) {
try {
FileUtils.forceMkdirParent(destFile);
final long copiedBytes = Files.asByteSource(segmentFile).copyTo(Files.asByteSink(destFile));
if (copiedBytes == 0) {
throw new IOE(
"0 bytes copied after copying a segment file from [%s] to [%s]",
segmentFile.getAbsolutePath(),
destFile.getAbsolutePath()
);
} else {
return;
}
}
catch (IOException e) {
// Only log here to try other locations as well.
log.warn(e, "Failed to write segmentFile at [%s]", destFile);
location.removeFile(segmentFile);
}
}
}
throw new ISE("Can't find location to handle segment[%s]", segment);
}

private static String getPartitionFilePath(
String supervisorTaskId,
String subTaskId,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.worker;

import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.timeline.DataSegment;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Map;

/**
* DataSegmentPusher used for storing intermediary data in local storage during data shuffle of native parallel
* indexing.
*/
public class ShuffleDataSegmentPusher implements DataSegmentPusher
{
private final String supervisorTaskId;
private final String subTaskId;
private final IntermediaryDataManager intermediaryDataManager;

public ShuffleDataSegmentPusher(
String supervisorTaskId,
String subTaskId,
IntermediaryDataManager intermediaryDataManager
)
{
this.supervisorTaskId = supervisorTaskId;
this.subTaskId = subTaskId;
this.intermediaryDataManager = intermediaryDataManager;
}

@Override
public String getPathForHadoop(String dataSource)
{
throw new UnsupportedOperationException();
}

@Override
public String getPathForHadoop()
{
throw new UnsupportedOperationException();
}

@Override
public DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException
{
final long unzippedSize = intermediaryDataManager.addSegment(supervisorTaskId, subTaskId, segment, file);
return segment.withSize(unzippedSize)
.withBinaryVersion(SegmentUtils.getVersionFromDir(file));
}

@Override
public Map<String, Object> makeLoadSpec(URI finalIndexZipFilePath)
{
throw new UnsupportedOperationException();
}
}
Loading