Skip to content
Permalink
Browse files
rename DataFileManager to FileManager (#54)
* rename DataFileManager to FileManager

* remove #nextFile() from FileGenerator
  • Loading branch information
houzhizhen committed May 28, 2021
1 parent a087d27 commit aaeec64991ad912b38f53b21d2f24ba7e5ee8487
Showing 5 changed files with 101 additions and 93 deletions.

This file was deleted.

@@ -0,0 +1,58 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.store;

import java.nio.file.Paths;

public interface FileGenerator {

/**
* Allocate a base directory each call. There may be multi base
* directories configured by user, generally each base directory
* represent a disk, allocated by round mode.
*
* For example, the base directories configured
* ["/disk1/job_001/container_001", "/disk2/job_001/container_001"].
* It indicates there are two base directories and each base directory
* for one disk.
* First call returns "/disk1/job_001/container_001",
* second call returns "/disk2/job_001/container_001"
* and third call returns "/disk1/job_001/container_001" and so on in
* round mode.
*
* Note: Can't request a directory and write many files into it, this will
* cause the io pressure can't distributed over several disks.
*
* @return The directory of allocated local base directory.
*/
String nextDirectory();

/**
* Allocate a base directory each call, return allocated base directory +
* joined string of paths.
*
* @param paths The paths as sub-directory.
* @return A string representation of a directory "#nextDirectory() +
* joined string of paths"
*/
default String nextDirectory(String... paths) {
return Paths.get(this.nextDirectory(), paths).toString();
}
}
@@ -23,7 +23,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.io.FileUtils;
@@ -35,16 +34,16 @@
import com.baidu.hugegraph.computer.core.manager.Manager;
import com.baidu.hugegraph.util.Log;

public class DataFileManager implements DataFileGenerator, Manager {
public class FileManager implements FileGenerator, Manager {

private static final Logger LOG = Log.logger(DataFileManager.class);
private static final Logger LOG = Log.logger(FileManager.class);

public static final String NAME = "data_dir";

private List<File> dirs;
private List<String> dirs;
private AtomicInteger sequence;

public DataFileManager() {
public FileManager() {
this.dirs = new ArrayList<>();
this.sequence = new AtomicInteger();
}
@@ -56,6 +55,17 @@ public String name() {

@Override
public void init(Config config) {
/*
* Multiple workers run on same computer will not see each other's
* directories.
* If runs on K8S, the worker-container can only see the directories
* assigned to itself.
* If runs on YARN, the worker-container's data-dir is set the format
* "$ROOT/${job_id}/{$container_id}", so each container can only see
* the directories assigned to itself too. The main class is
* responsible for converting the setting from YARN to
* HugeGraph-Computer.
*/
String jobId = config.get(ComputerOptions.JOB_ID);
List<String> paths = config.get(ComputerOptions.WORKER_DATA_DIRS);
LOG.info("The directories '{}' configured to persist data for job {}",
@@ -65,7 +75,7 @@ public void init(Config config) {
File jobDir = new File(dir, jobId);
this.mkdirs(jobDir);
LOG.debug("Initialized directory '{}' to directory list", jobDir);
this.dirs.add(jobDir);
this.dirs.add(jobDir.toString());
}
/*
* Shuffle dirs to prevent al workers of the same computer start from
@@ -76,31 +86,22 @@ public void init(Config config) {

@Override
public void close(Config config) {
for (File dir : this.dirs) {
FileUtils.deleteQuietly(dir);
for (String dir : this.dirs) {
FileUtils.deleteQuietly(new File(dir));
}
}

@Override
public File nextDir() {
public String nextDirectory() {
int index = this.sequence.incrementAndGet();
assert index >= 0;
return this.dirs.get(index % this.dirs.size());
}

@Override
public File nextFile(String type, int superstep) {
File dir = this.nextDir();
File labelDir = new File(dir, type);
File superStepDir = new File(labelDir, Integer.toString(superstep));
this.mkdirs(superStepDir);
return new File(superStepDir, UUID.randomUUID().toString());
}

/**
* Creates the directory named by specified dir.
*/
private void mkdirs(File dir) {
private static void mkdirs(File dir) {
if (!dir.mkdirs() && !dir.exists()) {
throw new ComputerException("Can't create dir %s",
dir.getAbsolutePath());
@@ -28,10 +28,9 @@
import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.config.RpcOptions;
import com.baidu.hugegraph.testutil.Assert;

public class DataDirManagerTest extends UnitTestBase {
public class FileManagerTest extends UnitTestBase {

@Test
public void testInitWithFile() throws IOException {
@@ -41,8 +40,8 @@ public void testInitWithFile() throws IOException {
ComputerOptions.JOB_ID, "local_001",
ComputerOptions.WORKER_DATA_DIRS, "[" + file.getAbsolutePath() + "]"
);
DataFileManager dataFileManager = new DataFileManager();
Assert.assertEquals(DataFileManager.NAME, dataFileManager.name());
FileManager dataFileManager = new FileManager();
Assert.assertEquals(FileManager.NAME, dataFileManager.name());
Assert.assertThrows(ComputerException.class, () -> {
dataFileManager.init(config);
}, e -> {
@@ -52,12 +51,12 @@ public void testInitWithFile() throws IOException {
}

@Test
public void testInitWithReadOnlyDir() throws IOException {
public void testInitWithReadOnlyDir() {
Config config = UnitTestBase.updateWithRequiredOptions(
ComputerOptions.JOB_ID, "local_001",
ComputerOptions.WORKER_DATA_DIRS, "[/etc]"
);
DataFileManager dataFileManager = new DataFileManager();
FileManager dataFileManager = new FileManager();
Assert.assertThrows(ComputerException.class, () -> {
dataFileManager.init(config);
}, e -> {
@@ -68,22 +67,19 @@ public void testInitWithReadOnlyDir() throws IOException {
@Test
public void testNextDir() {
Config config = UnitTestBase.updateWithRequiredOptions(
RpcOptions.RPC_REMOTE_URL, "127.0.0.1:8090",
ComputerOptions.JOB_ID, "local_001",
ComputerOptions.JOB_WORKERS_COUNT, "1",
ComputerOptions.JOB_PARTITIONS_COUNT, "2",
ComputerOptions.WORKER_DATA_DIRS, "[data_dir1, data_dir2]",
ComputerOptions.WORKER_RECEIVED_BUFFERS_BYTES_LIMIT, "300"
ComputerOptions.WORKER_DATA_DIRS, "[data_dir1, data_dir2]"
);
DataFileManager dataFileManager = new DataFileManager();
FileManager dataFileManager = new FileManager();

dataFileManager.init(config);

File dir1 = dataFileManager.nextDir();
File dir2 = dataFileManager.nextDir();
File dir3 = dataFileManager.nextDir();
File dir4 = dataFileManager.nextDir();
File dir5 = dataFileManager.nextDir();
String dir1 = dataFileManager.nextDirectory();
String dir2 = dataFileManager.nextDirectory();
String dir3 = dataFileManager.nextDirectory();
String dir4 = dataFileManager.nextDirectory();
String dir5 = dataFileManager.nextDirectory();
Assert.assertEquals(dir1, dir3);
Assert.assertEquals(dir3, dir5);
Assert.assertEquals(dir2, dir4);
@@ -92,33 +88,22 @@ public void testNextDir() {
}

@Test
public void testNextFile() {
public void testNextDirWithPaths() {
Config config = UnitTestBase.updateWithRequiredOptions(
RpcOptions.RPC_REMOTE_URL, "127.0.0.1:8090",
ComputerOptions.JOB_ID, "local_001",
ComputerOptions.JOB_WORKERS_COUNT, "1",
ComputerOptions.JOB_PARTITIONS_COUNT, "2",
ComputerOptions.WORKER_DATA_DIRS, "[data_dir1, data_dir2]",
ComputerOptions.WORKER_RECEIVED_BUFFERS_BYTES_LIMIT, "300"
ComputerOptions.JOB_ID, "local_001",
ComputerOptions.JOB_WORKERS_COUNT, "1",
ComputerOptions.WORKER_DATA_DIRS, "[data_dir1, data_dir2]"
);
DataFileManager dataFileManager = new DataFileManager();
FileManager dataFileManager = new FileManager();

dataFileManager.init(config);

File vertexFile = dataFileManager.nextFile("vertex", -1);
File vertexSuperstepDir = vertexFile.getParentFile();
Assert.assertEquals("-1", vertexSuperstepDir.getName());
File vertexRootDir = vertexSuperstepDir.getParentFile();
Assert.assertEquals("vertex", vertexRootDir.getName());

File messageFile = dataFileManager.nextFile("message", 0);
File messageSuperstepDir = messageFile.getParentFile();
Assert.assertEquals("0", messageSuperstepDir.getName());
File messageRootDir = messageSuperstepDir.getParentFile();
Assert.assertEquals("message", messageRootDir.getName());
File dir1 = new File(dataFileManager.nextDirectory("vertex"));
Assert.assertEquals("vertex", dir1.getName());

File messageFile2 = dataFileManager.nextFile("message", 0);
Assert.assertNotEquals(messageFile, messageFile2);
File dir2 = new File(dataFileManager.nextDirectory("message", "1"));
Assert.assertEquals("1", dir2.getName());
Assert.assertEquals("message", dir2.getParentFile().getName());

dataFileManager.close(config);
}
@@ -24,7 +24,7 @@

@RunWith(Suite.class)
@Suite.SuiteClasses({
DataDirManagerTest.class
FileManagerTest.class
})
public class StoreTestSuite {
}

0 comments on commit aaeec64

Please sign in to comment.