Skip to content
This repository has been archived by the owner on Aug 4, 2022. It is now read-only.

[REEF-1339] Adding IInputPartition.Cache() for data download and cache #968

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -24,28 +24,31 @@
using Org.Apache.REEF.IO.PartitionedData.Random.Parameters;
using Org.Apache.REEF.IO.TempFileCreation;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Utilities;
using Org.Apache.REEF.Utilities.Attributes;
using Org.Apache.REEF.Utilities.Diagnostics;
using Org.Apache.REEF.Utilities.Logging;

namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
{
[ThreadSafe]
internal sealed class FileSystemInputPartition<T> : IInputPartition<T>, IDisposable
{
private static readonly Logger Logger = Logger.GetLogger(typeof(FileSystemInputPartition<T>));

private readonly string _id;
private readonly IFileSystem _fileSystem;
private readonly IFileDeSerializer<T> _fileSerializer;
private readonly ISet<string> _filePaths;
private bool _isInitialized;
private readonly bool _copyToLocal;
private readonly object _lock = new object();
private readonly ITempFileCreator _tempFileCreator;
private readonly ISet<string> _localFiles = new HashSet<string>();
private readonly ISet<string> _remoteFilePaths;
private readonly bool _copyToLocal;

private Optional<ISet<string>> _localFiles;

[Inject]
private FileSystemInputPartition([Parameter(typeof(PartitionId))] string id,
[Parameter(typeof(FilePathsInInputPartition))] ISet<string> filePaths,
[Parameter(typeof(FilePathsInInputPartition))] ISet<string> remoteFilePaths,
[Parameter(typeof(CopyToLocal))] bool copyToLocal,
IFileSystem fileSystem,
ITempFileCreator tempFileCreator,
Expand All @@ -54,86 +57,96 @@ internal sealed class FileSystemInputPartition<T> : IInputPartition<T>, IDisposa
_id = id;
_fileSystem = fileSystem;
_fileSerializer = fileSerializer;
_filePaths = filePaths;
_tempFileCreator = tempFileCreator;
_isInitialized = false;
_remoteFilePaths = remoteFilePaths;
_copyToLocal = copyToLocal;
_localFiles = Optional<ISet<string>>.Empty();
}

public string Id
{
get { return _id; }
}

private void Initialize()
/// <summary>
/// Caches from the remote File System to a local disk.
/// </summary>
public void Cache()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cache is to download files from remote to local. Not cache the data into memory. Then what is the essential difference from original Initial method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what "cache" in this context means. It seems that the original usage of the term "cache" here meant to load into disk. @dkm2110 can you comment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We did not have any concept of cache earlier. But the concept here seems to be that we will load data in to disk which is desirable. Still I am confused what are you trying to say?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dkm2110 who is "you?"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You = @afchung

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have been talking about cache, do we? Even the method name is called Cache. That's was why I was asking in Jira this morning what does cache mean.
There are two layers as my understanding:

  1. Data file Downloading from remote to local. Currently this is configurable.
  2. Data cache, - get IEnumerable, iterate it so that it is cached.

Markus replied it is
2. Data cache, - get IEnumerable, iterate it so that it is cached.

If this Jira is to address 1 and REEF-1357 is to address 2. Then we are fine. But may need to update the title of the Jira.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jwang98052 1) is also a cache type where we add HDD to remote file. Does that make sense and removes confusion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jwang98052 OK, so are you proposing deserialize the files and iterate through to get the type of T?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, that iteration is done as part of DeSerialize currently. Where to desrialize and get the type of T all depends on user and default implementations we provide. For example, we might have tw implementation of HDD cache where one involved simple downloading to disk while some other customized implementation might involve reading from remote row by row, converting in to some intermediate format and then writing to disk. Similarly, Memory cache might also have two implementations - in one we simply load the full raw data in memory while in some customized implementation we might also do parsing and then load only parsed data in memory. It all depends on the user now. And we will provide some commonly known and used interpretations of these cache.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dkm2110 do you expect the user to iterate the data or REEF provide default implementation? If it is controlled by user, we should be able to just do it now in ContextStarthandler without this change.

{
lock (_lock)
{
if (!_isInitialized)
if (_localFiles.IsPresent())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned above, we cannot use copyToLocal flag to determine if the files are already in local. _localFiles need to be properly initialized so that it can represent files are already in local.

{
CopyFromRemote();
_isInitialized = true;
return;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe move this into a method of its own? e.g. Download?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per discussion below, for now, we will simply assume that Cache() in this class assumes a disk level cache.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was more concerned about the relative length of this method. The if/else would be the only thing in this method, and the actual download logic would be in the helper methods:

 if (!_localFiles.IsPresent()){ 
  Download();
}

var set = new HashSet<string>();
string localFileFolder = _tempFileCreator.CreateTempDirectory("-partition-");
Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "Local file temp folder: {0}", localFileFolder));

foreach (var sourceFilePath in _remoteFilePaths)
{
Uri sourceUri = _fileSystem.CreateUriForPath(sourceFilePath);
Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "sourceUri {0}: ", sourceUri));
if (!_fileSystem.Exists(sourceUri))
{
throw new FileNotFoundException(string.Format(CultureInfo.CurrentCulture,
"Remote File {0} does not exists.", sourceUri));
}

var localFilePath = localFileFolder + "\\" + Guid.NewGuid().ToString("N").Substring(0, 8);
set.Add(localFilePath);

Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "LocalFilePath {0}: ", localFilePath));
if (File.Exists(localFilePath))
{
File.Delete(localFilePath);
Logger.Log(Level.Warning, "localFile already exists, delete it: " + localFilePath);
}

_fileSystem.CopyToLocal(sourceUri, localFilePath);
if (File.Exists(localFilePath))
{
Logger.Log(Level.Info,
string.Format(CultureInfo.CurrentCulture, "File {0} is Copied to local {1}.", sourceUri, localFilePath));
}
else
{
string msg = string.Format(CultureInfo.CurrentCulture,
"The IFilesystem completed the copy of `{0}` to `{1}`. But the file `{1}` does not exist.", sourceUri, localFilePath);
Exceptions.Throw(new FileLoadException(msg), msg, Logger);
}
}

_localFiles = Optional<ISet<string>>.Of(set);
}
}

/// <summary>
/// This method copy remote files to local and then deserialize the files.
/// This method copies remote files to local if CopyToLocal is enabled, and then deserializes the files.
/// Otherwise, this method assumes that the files are remote, and that the injected IFileDeSerializer
/// can handle the remote file system access.
/// It returns the IEnumerble of T, the details is defined in the Deserialize() method
/// provided by the Serializer
/// </summary>
/// <returns></returns>
public T GetPartitionHandle()
{
if (_copyToLocal)
{
if (!_isInitialized)
{
Initialize();
}

return _fileSerializer.Deserialize(_localFiles);
}
return _fileSerializer.Deserialize(_filePaths);
}

private void CopyFromRemote()
{
string localFileFolder = _tempFileCreator.CreateTempDirectory("-partition-");
Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "Local file temp folder: {0}", localFileFolder));

foreach (var sourceFilePath in _filePaths)
lock (_lock)
{
Uri sourceUri = _fileSystem.CreateUriForPath(sourceFilePath);
Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "sourceUri {0}: ", sourceUri));
if (!_fileSystem.Exists(sourceUri))
if (_copyToLocal)
{
throw new FileNotFoundException(string.Format(CultureInfo.CurrentCulture,
"Remote File {0} does not exists.", sourceUri));
}
if (!_localFiles.IsPresent())
{
Cache();
}

var localFilePath = localFileFolder + "\\" + Guid.NewGuid().ToString("N").Substring(0, 8);
_localFiles.Add(localFilePath);

Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "LocalFilePath {0}: ", localFilePath));
if (File.Exists(localFilePath))
{
File.Delete(localFilePath);
Logger.Log(Level.Warning, "localFile already exists, delete it: " + localFilePath);
// For now, assume IFileDeSerializer is local.
return _fileSerializer.Deserialize(_localFiles.Value);
}

_fileSystem.CopyToLocal(sourceUri, localFilePath);
if (File.Exists(localFilePath))
{
Logger.Log(Level.Info,
string.Format(CultureInfo.CurrentCulture, "File {0} is Copied to local {1}.", sourceUri, localFilePath));
}
else
{
string msg = string.Format(CultureInfo.CurrentCulture,
"The IFilesystem completed the copy of `{0}` to `{1}`. But the file `{1}` does not exist.", sourceUri, localFilePath);
Exceptions.Throw(new FileLoadException(msg), msg, Logger);
}
// For now, assume IFileDeSerializer is remote.
return _fileSerializer.Deserialize(_remoteFilePaths);
}
}

Expand All @@ -144,11 +157,14 @@ private void CopyFromRemote()
/// </summary>
public void Dispose()
{
if (_localFiles.Count > 0)
lock (_lock)
{
foreach (var fileName in _localFiles)
if (_localFiles.IsPresent())
{
File.Delete(fileName);
foreach (var fileName in _localFiles.Value)
{
File.Delete(fileName);
}
}
}
}
Expand Down
Expand Up @@ -42,7 +42,6 @@ public interface IFileDeSerializer<T>
/// If there is any IO error, IOException could be thrown.
/// </summary>
/// <param name="filePaths"></param>
/// <param name="local"></param>
/// <returns></returns>
T Deserialize(ISet<string> filePaths);
}
Expand Down
10 changes: 8 additions & 2 deletions lang/cs/Org.Apache.REEF.IO/PartitionedData/IInputPartition.cs
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

using System;
using Org.Apache.REEF.Utilities.Attributes;

namespace Org.Apache.REEF.IO.PartitionedData
{
Expand All @@ -24,13 +24,19 @@ namespace Org.Apache.REEF.IO.PartitionedData
/// </summary>
/// <typeparam name="T">Generic Type representing data pointer.
/// For example, for data in local file it can be file pointer </typeparam>
public interface IInputPartition<T>
public interface IInputPartition<T>
{
/// <summary>
/// The id of the partition.
/// </summary>
string Id { get; }

/// <summary>
/// Caches the data locally, cached location is based on the implementation.
/// </summary>
[Unstable("0.14", "Contract may change.")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Contract will change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why will? It won't necessarily change.

void Cache();

/// <summary>
/// Gives a pointer to the underlying partition.
/// </summary>
Expand Down
Expand Up @@ -20,47 +20,76 @@
using System.IO;
using Org.Apache.REEF.IO.PartitionedData.Random.Parameters;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Utilities;
using Org.Apache.REEF.Utilities.Attributes;

namespace Org.Apache.REEF.IO.PartitionedData.Random
{
/// <summary>
/// An implementation of IInputPartition that returns a configurable number of random doubles.
/// </summary>
[ThreadSafe]
internal sealed class RandomInputPartition : IInputPartition<Stream>
{
private readonly object _lock = new object();
private readonly string _id;
private readonly byte[] _randomData;
private readonly int _numberOfDoubles;

private Optional<byte[]> _randomData;

[Inject]
private RandomInputPartition([Parameter(typeof(PartitionId))] string id,
private RandomInputPartition(
[Parameter(typeof(PartitionId))] string id,
[Parameter(typeof(NumberOfDoublesPerPartition))] int numberOfDoubles)
{
_id = id;
_randomData = new byte[numberOfDoubles * 8];
var random = new System.Random();

for (var i = 0; i < numberOfDoubles; ++i)
{
var randomDouble = random.NextDouble();
var randomDoubleAsBytes = BitConverter.GetBytes(randomDouble);
Debug.Assert(randomDoubleAsBytes.Length == 8, "randomDoubleAsBytes.Length should be 8.");
for (var j = 0; j < 8; ++j)
{
var index = (i * 8) + j;
Debug.Assert(index < _randomData.Length, "Index should be less than _randomData.Length.");
_randomData[index] = randomDoubleAsBytes[j];
}
}
_numberOfDoubles = numberOfDoubles;
_randomData = Optional<byte[]>.Empty();
}

public string Id
{
get { return _id; }
}

public void Cache()
{
lock (_lock)
{
if (_randomData.IsPresent())
{
return;
}

var random = new System.Random();
var generatedData = new byte[_numberOfDoubles * sizeof(long)];
for (var i = 0; i < _numberOfDoubles; ++i)
{
var randomDouble = random.NextDouble();
var randomDoubleAsBytes = BitConverter.GetBytes(randomDouble);
Debug.Assert(randomDoubleAsBytes.Length == 8, "randomDoubleAsBytes.Length should be 8.");
for (var j = 0; j < sizeof(long); ++j)
{
var index = (i * 8) + j;
Debug.Assert(index < generatedData.Length, "Index should be less than _randomData.Length.");
generatedData[index] = randomDoubleAsBytes[j];
}
}
_randomData = Optional<byte[]>.Of(generatedData);
}
}

public Stream GetPartitionHandle()
{
return new MemoryStream(_randomData, false);
lock (_lock)
{
if (!_randomData.IsPresent())
{
Cache();
}

return new MemoryStream(_randomData.Value, false);
}
}
}
}