Skip to content
This repository has been archived by the owner on Aug 4, 2022. It is now read-only.

Commit

Permalink
[REEF-1339] Adding IInputPartition.Cache() for data download and cache
Browse files Browse the repository at this point in the history
This addressed the issue by
  * Adding an `[Unstable]` Cache function.
  * Modify existing implementations of IInputPartition to allow them to cache
    only on invocation of Cache instead of on initialization.

JIRA:
  [REEF-1339](https://issues.apache.org/jira/browse/REEF-1339)

Pull Request:
  This closes #968
  • Loading branch information
afchung authored and Markus Weimer committed Apr 28, 2016
1 parent c90f8aa commit d34441c
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 75 deletions.
Expand Up @@ -24,28 +24,31 @@
using Org.Apache.REEF.IO.PartitionedData.Random.Parameters;
using Org.Apache.REEF.IO.TempFileCreation;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Utilities;
using Org.Apache.REEF.Utilities.Attributes;
using Org.Apache.REEF.Utilities.Diagnostics;
using Org.Apache.REEF.Utilities.Logging;

namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
{
[ThreadSafe]
internal sealed class FileSystemInputPartition<T> : IInputPartition<T>, IDisposable
{
private static readonly Logger Logger = Logger.GetLogger(typeof(FileSystemInputPartition<T>));

private readonly string _id;
private readonly IFileSystem _fileSystem;
private readonly IFileDeSerializer<T> _fileSerializer;
private readonly ISet<string> _filePaths;
private bool _isInitialized;
private readonly bool _copyToLocal;
private readonly object _lock = new object();
private readonly ITempFileCreator _tempFileCreator;
private readonly ISet<string> _localFiles = new HashSet<string>();
private readonly ISet<string> _remoteFilePaths;
private readonly bool _copyToLocal;

private Optional<ISet<string>> _localFiles;

[Inject]
private FileSystemInputPartition([Parameter(typeof(PartitionId))] string id,
[Parameter(typeof(FilePathsInInputPartition))] ISet<string> filePaths,
[Parameter(typeof(FilePathsInInputPartition))] ISet<string> remoteFilePaths,
[Parameter(typeof(CopyToLocal))] bool copyToLocal,
IFileSystem fileSystem,
ITempFileCreator tempFileCreator,
Expand All @@ -54,86 +57,89 @@ internal sealed class FileSystemInputPartition<T> : IInputPartition<T>, IDisposa
_id = id;
_fileSystem = fileSystem;
_fileSerializer = fileSerializer;
_filePaths = filePaths;
_tempFileCreator = tempFileCreator;
_isInitialized = false;
_remoteFilePaths = remoteFilePaths;
_copyToLocal = copyToLocal;
_localFiles = Optional<ISet<string>>.Empty();
}

public string Id
{
get { return _id; }
}

private void Initialize()
/// <summary>
/// Caches from the remote File System to a local disk.
/// </summary>
public void Cache()
{
lock (_lock)
{
if (!_isInitialized)
if (!_localFiles.IsPresent())
{
CopyFromRemote();
_isInitialized = true;
_localFiles = Optional<ISet<string>>.Of(Download());
}
}
}

/// <summary>
/// This method copy remote files to local and then deserialize the files.
/// It returns the IEnumerble of T, the details is defined in the Deserialize() method
/// provided by the Serializer
/// Downloads the remote file to local disk.
/// </summary>
/// <returns></returns>
public T GetPartitionHandle()
private ISet<string> Download()
{
if (_copyToLocal)
lock (_lock)
{
if (!_isInitialized)
var set = new HashSet<string>();
var localFileFolder = _tempFileCreator.CreateTempDirectory("-partition-");
Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "Local file temp folder: {0}", localFileFolder));

foreach (var sourceFilePath in _remoteFilePaths)
{
Initialize();
var sourceUri = _fileSystem.CreateUriForPath(sourceFilePath);
Logger.Log(Level.Verbose, "sourceUri {0}: ", sourceUri);

var localFilePath = Path.Combine(localFileFolder, Guid.NewGuid().ToString("N").Substring(0, 8));
set.Add(localFilePath);

Logger.Log(Level.Verbose, "LocalFilePath {0}: ", localFilePath);
if (File.Exists(localFilePath))
{
File.Delete(localFilePath);
Logger.Log(Level.Warning, "localFile {0} already exists, deleting it. ", localFilePath);
}

_fileSystem.CopyToLocal(sourceUri, localFilePath);
}

return _fileSerializer.Deserialize(_localFiles);
return set;
}
return _fileSerializer.Deserialize(_filePaths);
}

private void CopyFromRemote()
/// <summary>
/// This method copies remote files to local if CopyToLocal is enabled, and then deserializes the files.
/// Otherwise, this method assumes that the files are remote, and that the injected IFileDeSerializer
/// can handle the remote file system access.
/// It returns the IEnumerble of T, the details is defined in the Deserialize() method
/// provided by the Serializer
/// </summary>
/// <returns></returns>
public T GetPartitionHandle()
{
string localFileFolder = _tempFileCreator.CreateTempDirectory("-partition-");
Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "Local file temp folder: {0}", localFileFolder));

foreach (var sourceFilePath in _filePaths)
lock (_lock)
{
Uri sourceUri = _fileSystem.CreateUriForPath(sourceFilePath);
Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "sourceUri {0}: ", sourceUri));
if (!_fileSystem.Exists(sourceUri))
if (_copyToLocal)
{
throw new FileNotFoundException(string.Format(CultureInfo.CurrentCulture,
"Remote File {0} does not exists.", sourceUri));
}
if (!_localFiles.IsPresent())
{
Cache();
}

var localFilePath = localFileFolder + "\\" + Guid.NewGuid().ToString("N").Substring(0, 8);
_localFiles.Add(localFilePath);

Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "LocalFilePath {0}: ", localFilePath));
if (File.Exists(localFilePath))
{
File.Delete(localFilePath);
Logger.Log(Level.Warning, "localFile already exists, delete it: " + localFilePath);
// For now, assume IFileDeSerializer is local.
return _fileSerializer.Deserialize(_localFiles.Value);
}

_fileSystem.CopyToLocal(sourceUri, localFilePath);
if (File.Exists(localFilePath))
{
Logger.Log(Level.Info,
string.Format(CultureInfo.CurrentCulture, "File {0} is Copied to local {1}.", sourceUri, localFilePath));
}
else
{
string msg = string.Format(CultureInfo.CurrentCulture,
"The IFilesystem completed the copy of `{0}` to `{1}`. But the file `{1}` does not exist.", sourceUri, localFilePath);
Exceptions.Throw(new FileLoadException(msg), msg, Logger);
}
// For now, assume IFileDeSerializer is remote.
return _fileSerializer.Deserialize(_remoteFilePaths);
}
}

Expand All @@ -144,11 +150,14 @@ private void CopyFromRemote()
/// </summary>
public void Dispose()
{
if (_localFiles.Count > 0)
lock (_lock)
{
foreach (var fileName in _localFiles)
if (_localFiles.IsPresent())
{
File.Delete(fileName);
foreach (var fileName in _localFiles.Value)
{
File.Delete(fileName);
}
}
}
}
Expand Down
10 changes: 8 additions & 2 deletions lang/cs/Org.Apache.REEF.IO/PartitionedData/IInputPartition.cs
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

using System;
using Org.Apache.REEF.Utilities.Attributes;

namespace Org.Apache.REEF.IO.PartitionedData
{
Expand All @@ -24,13 +24,19 @@ namespace Org.Apache.REEF.IO.PartitionedData
/// </summary>
/// <typeparam name="T">Generic Type representing data pointer.
/// For example, for data in local file it can be file pointer </typeparam>
public interface IInputPartition<T>
public interface IInputPartition<T>
{
/// <summary>
/// The id of the partition.
/// </summary>
string Id { get; }

/// <summary>
/// Caches the data locally, cached location is based on the implementation.
/// </summary>
[Unstable("0.14", "Contract may change.")]
void Cache();

/// <summary>
/// Gives a pointer to the underlying partition.
/// </summary>
Expand Down
Expand Up @@ -20,47 +20,76 @@
using System.IO;
using Org.Apache.REEF.IO.PartitionedData.Random.Parameters;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Utilities;
using Org.Apache.REEF.Utilities.Attributes;

namespace Org.Apache.REEF.IO.PartitionedData.Random
{
/// <summary>
/// An implementation of IInputPartition that returns a configurable number of random doubles.
/// </summary>
[ThreadSafe]
internal sealed class RandomInputPartition : IInputPartition<Stream>
{
private readonly object _lock = new object();
private readonly string _id;
private readonly byte[] _randomData;
private readonly int _numberOfDoubles;

private Optional<byte[]> _randomData;

[Inject]
private RandomInputPartition([Parameter(typeof(PartitionId))] string id,
private RandomInputPartition(
[Parameter(typeof(PartitionId))] string id,
[Parameter(typeof(NumberOfDoublesPerPartition))] int numberOfDoubles)
{
_id = id;
_randomData = new byte[numberOfDoubles * 8];
var random = new System.Random();

for (var i = 0; i < numberOfDoubles; ++i)
{
var randomDouble = random.NextDouble();
var randomDoubleAsBytes = BitConverter.GetBytes(randomDouble);
Debug.Assert(randomDoubleAsBytes.Length == 8, "randomDoubleAsBytes.Length should be 8.");
for (var j = 0; j < 8; ++j)
{
var index = (i * 8) + j;
Debug.Assert(index < _randomData.Length, "Index should be less than _randomData.Length.");
_randomData[index] = randomDoubleAsBytes[j];
}
}
_numberOfDoubles = numberOfDoubles;
_randomData = Optional<byte[]>.Empty();
}

public string Id
{
get { return _id; }
}

public void Cache()
{
lock (_lock)
{
if (_randomData.IsPresent())
{
return;
}

var random = new System.Random();
var generatedData = new byte[_numberOfDoubles * sizeof(long)];
for (var i = 0; i < _numberOfDoubles; ++i)
{
var randomDouble = random.NextDouble();
var randomDoubleAsBytes = BitConverter.GetBytes(randomDouble);
Debug.Assert(randomDoubleAsBytes.Length == 8, "randomDoubleAsBytes.Length should be 8.");
for (var j = 0; j < sizeof(long); ++j)
{
var index = (i * 8) + j;
Debug.Assert(index < generatedData.Length, "Index should be less than _randomData.Length.");
generatedData[index] = randomDoubleAsBytes[j];
}
}
_randomData = Optional<byte[]>.Of(generatedData);
}
}

public Stream GetPartitionHandle()
{
return new MemoryStream(_randomData, false);
lock (_lock)
{
if (!_randomData.IsPresent())
{
Cache();
}

return new MemoryStream(_randomData.Value, false);
}
}
}
}

0 comments on commit d34441c

Please sign in to comment.