Skip to content
Permalink
Browse files
add DataDirManager (#43)
  • Loading branch information
houzhizhen committed May 8, 2021
1 parent b666ff2 commit 1aaf9c0654fab58732c8388f14ad8376252d7460
Showing 7 changed files with 330 additions and 3 deletions.
@@ -33,9 +33,12 @@
import com.baidu.hugegraph.computer.core.master.DefaultMasterComputation;
import com.baidu.hugegraph.computer.core.network.NettyTransportProvider;
import com.baidu.hugegraph.computer.core.network.TransportConf;
import com.baidu.hugegraph.config.ConfigListOption;
import com.baidu.hugegraph.config.ConfigOption;
import com.baidu.hugegraph.config.OptionHolder;
import com.baidu.hugegraph.util.Bytes;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;

public class ComputerOptions extends OptionHolder {
@@ -286,6 +289,28 @@ public static synchronized ComputerOptions instance() {
WorkerAggrManager.class
);

public static final ConfigOption<Long> WORKER_RECEIVED_BUFFERS_BYTES_LIMIT =
new ConfigOption<>(
"worker.received_buffers_bytes_limit",
"The limit bytes of buffers of received data, " +
"the total size of all buffers can't excess this limit. " +
"If received buffers reach this limit, they will be " +
"merged into a file.",
positiveInt(),
100 * Bytes.MB
);

public static final ConfigListOption<String> WORKER_DATA_DIRS =
new ConfigListOption<>(
"worker.data_dirs",
true,
"The directories separated by ',' that received " +
"vertices and messages can persist into.",
disallowEmpty(),
String.class,
ImmutableList.of("jobs")
);

public static final ConfigOption<Class<?>> MASTER_COMPUTATION_CLASS =
new ConfigOption<>(
"master.computation_class",
@@ -0,0 +1,36 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.store;

import java.io.File;

public interface DataFileGenerator {

/**
* @return the next data dir to persist data like vertices, edges and
* messages.
*/
File nextDir();

/**
* @return the next file to persist data like vertices, edges and messages.
*/
File nextFile(String type, int superstep);
}
@@ -0,0 +1,109 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.store;

import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;

import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.manager.Manager;
import com.baidu.hugegraph.util.Log;

public class DataFileManager implements DataFileGenerator, Manager {

private static final Logger LOG = Log.logger(DataFileManager.class);

public static final String NAME = "data_dir";

private List<File> dirs;
private AtomicInteger sequence;

public DataFileManager() {
this.dirs = new ArrayList<>();
this.sequence = new AtomicInteger();
}

@Override
public String name() {
return NAME;
}

@Override
public void init(Config config) {
String jobId = config.get(ComputerOptions.JOB_ID);
List<String> paths = config.get(ComputerOptions.WORKER_DATA_DIRS);
LOG.info("The directories '{}' configured to persist data for job {}",
paths, jobId);
for (String path : paths) {
File dir = new File(path);
File jobDir = new File(dir, jobId);
this.mkdirs(jobDir);
LOG.debug("Initialized directory '{}' to directory list", jobDir);
this.dirs.add(jobDir);
}
/*
* Shuffle dirs to prevent al workers of the same computer start from
* same dir.
*/
Collections.shuffle(this.dirs);
}

@Override
public void close(Config config) {
for (File dir : this.dirs) {
FileUtils.deleteQuietly(dir);
}
}

@Override
public File nextDir() {
int index = this.sequence.incrementAndGet();
assert index >= 0;
return this.dirs.get(index % this.dirs.size());
}

@Override
public File nextFile(String type, int superstep) {
File dir = this.nextDir();
File labelDir = new File(dir, type);
File superStepDir = new File(labelDir, Integer.toString(superstep));
this.mkdirs(superStepDir);
return new File(superStepDir, UUID.randomUUID().toString());
}

/**
* Creates the directory named by specified dir.
*/
private void mkdirs(File dir) {
if (!dir.mkdirs() && !dir.exists()) {
throw new ComputerException("Can't create dir %s",
dir.getAbsolutePath());
}
}
}
@@ -33,8 +33,9 @@
import com.baidu.hugegraph.computer.core.graph.GraphTestSuite;
import com.baidu.hugegraph.computer.core.input.InputTestSuite;
import com.baidu.hugegraph.computer.core.io.IOTestSuite;
import com.baidu.hugegraph.computer.core.sort.SortTestSuite;
import com.baidu.hugegraph.computer.core.network.NetworkTestSuite;
import com.baidu.hugegraph.computer.core.sort.SortTestSuite;
import com.baidu.hugegraph.computer.core.store.StoreTestSuite;
import com.baidu.hugegraph.computer.core.worker.WorkerTestSuite;
import com.baidu.hugegraph.config.OptionSpace;
import com.baidu.hugegraph.util.Log;
@@ -52,7 +53,8 @@
InputTestSuite.class,
WorkerTestSuite.class,
SortTestSuite.class,
NetworkTestSuite.class
NetworkTestSuite.class,
StoreTestSuite.class
})
public class UnitTestSuite {

@@ -0,0 +1,125 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.store;

import java.io.File;
import java.io.IOException;

import org.junit.Test;

import com.baidu.hugegraph.computer.core.UnitTestBase;
import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.config.RpcOptions;
import com.baidu.hugegraph.testutil.Assert;

public class DataDirManagerTest extends UnitTestBase {

@Test
public void testInitWithFile() throws IOException {
File file = new File("exist");
file.createNewFile();
Config config = UnitTestBase.updateWithRequiredOptions(
ComputerOptions.JOB_ID, "local_001",
ComputerOptions.WORKER_DATA_DIRS, "[" + file.getAbsolutePath() + "]"
);
DataFileManager dataFileManager = new DataFileManager();
Assert.assertEquals(DataFileManager.NAME, dataFileManager.name());
Assert.assertThrows(ComputerException.class, () -> {
dataFileManager.init(config);
}, e -> {
Assert.assertContains("Can't create dir ", e.getMessage());
});
file.delete();
}

@Test
public void testInitWithReadOnlyDir() throws IOException {
Config config = UnitTestBase.updateWithRequiredOptions(
ComputerOptions.JOB_ID, "local_001",
ComputerOptions.WORKER_DATA_DIRS, "[/etc]"
);
DataFileManager dataFileManager = new DataFileManager();
Assert.assertThrows(ComputerException.class, () -> {
dataFileManager.init(config);
}, e -> {
Assert.assertContains("Can't create dir", e.getMessage());
});
}

@Test
public void testNextDir() {
Config config = UnitTestBase.updateWithRequiredOptions(
RpcOptions.RPC_REMOTE_URL, "127.0.0.1:8090",
ComputerOptions.JOB_ID, "local_001",
ComputerOptions.JOB_WORKERS_COUNT, "1",
ComputerOptions.JOB_PARTITIONS_COUNT, "2",
ComputerOptions.WORKER_DATA_DIRS, "[data_dir1, data_dir2]",
ComputerOptions.WORKER_RECEIVED_BUFFERS_BYTES_LIMIT, "300"
);
DataFileManager dataFileManager = new DataFileManager();

dataFileManager.init(config);

File dir1 = dataFileManager.nextDir();
File dir2 = dataFileManager.nextDir();
File dir3 = dataFileManager.nextDir();
File dir4 = dataFileManager.nextDir();
File dir5 = dataFileManager.nextDir();
Assert.assertEquals(dir1, dir3);
Assert.assertEquals(dir3, dir5);
Assert.assertEquals(dir2, dir4);

dataFileManager.close(config);
}

@Test
public void testNextFile() {
Config config = UnitTestBase.updateWithRequiredOptions(
RpcOptions.RPC_REMOTE_URL, "127.0.0.1:8090",
ComputerOptions.JOB_ID, "local_001",
ComputerOptions.JOB_WORKERS_COUNT, "1",
ComputerOptions.JOB_PARTITIONS_COUNT, "2",
ComputerOptions.WORKER_DATA_DIRS, "[data_dir1, data_dir2]",
ComputerOptions.WORKER_RECEIVED_BUFFERS_BYTES_LIMIT, "300"
);
DataFileManager dataFileManager = new DataFileManager();

dataFileManager.init(config);

File vertexFile = dataFileManager.nextFile("vertex", -1);
File vertexSuperstepDir = vertexFile.getParentFile();
Assert.assertEquals("-1", vertexSuperstepDir.getName());
File vertexRootDir = vertexSuperstepDir.getParentFile();
Assert.assertEquals("vertex", vertexRootDir.getName());

File messageFile = dataFileManager.nextFile("message", 0);
File messageSuperstepDir = messageFile.getParentFile();
Assert.assertEquals("0", messageSuperstepDir.getName());
File messageRootDir = messageSuperstepDir.getParentFile();
Assert.assertEquals("message", messageRootDir.getName());

File messageFile2 = dataFileManager.nextFile("message", 0);
Assert.assertNotEquals(messageFile, messageFile2);

dataFileManager.close(config);
}
}
@@ -0,0 +1,30 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.store;

import org.junit.runner.RunWith;
import org.junit.runners.Suite;

@RunWith(Suite.class)
@Suite.SuiteClasses({
DataDirManagerTest.class
})
public class StoreTestSuite {
}
@@ -52,7 +52,7 @@
<dependency>
<groupId>com.baidu.hugegraph</groupId>
<artifactId>hugegraph-common</artifactId>
<version>1.8.5</version>
<version>1.8.8</version>
</dependency>
<dependency>
<groupId>com.baidu.hugegraph</groupId>

0 comments on commit 1aaf9c0

Please sign in to comment.