Skip to content
This repository has been archived by the owner on Aug 4, 2022. It is now read-only.

[REEF-1339] Adding IInputPartition.Cache() for data download and cache #968

Closed
wants to merge 4 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -41,13 +41,14 @@ internal sealed class FileSystemInputPartition<T> : IInputPartition<T>, IDisposa
private readonly IFileDeSerializer<T> _fileSerializer;
private readonly object _lock = new object();
private readonly ITempFileCreator _tempFileCreator;
private readonly ISet<string> _filePaths;
private readonly ISet<string> _remoteFilePaths;
private readonly bool _copyToLocal;

private Optional<ISet<string>> _localFiles;

[Inject]
private FileSystemInputPartition([Parameter(typeof(PartitionId))] string id,
[Parameter(typeof(FilePathsInInputPartition))] ISet<string> filePaths,
[Parameter(typeof(FilePathsInInputPartition))] ISet<string> remoteFilePaths,
[Parameter(typeof(CopyToLocal))] bool copyToLocal,
IFileSystem fileSystem,
ITempFileCreator tempFileCreator,
Expand All @@ -57,24 +58,19 @@ internal sealed class FileSystemInputPartition<T> : IInputPartition<T>, IDisposa
_fileSystem = fileSystem;
_fileSerializer = fileSerializer;
_tempFileCreator = tempFileCreator;
_filePaths = filePaths;

if (!copyToLocal)
{
// Implies that the files are already local.
_localFiles = Optional<ISet<string>>.Of(filePaths);
}
else
{
_localFiles = Optional<ISet<string>>.Empty();
}
_remoteFilePaths = remoteFilePaths;
_copyToLocal = copyToLocal;
_localFiles = Optional<ISet<string>>.Empty();
}

public string Id
{
get { return _id; }
}

/// <summary>
/// Caches from the remote File System to a local disk.
/// </summary>
public void Cache()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cache is to download files from remote to local. Not cache the data into memory. Then what is the essential difference from original Initial method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what "cache" in this context means. It seems that the original usage of the term "cache" here meant to load into disk. @dkm2110 can you comment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We did not have any concept of cache earlier. But the concept here seems to be that we will load data in to disk which is desirable. Still I am confused what are you trying to say?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dkm2110 who is "you?"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You = @afchung

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have been talking about cache, do we? Even the method name is called Cache. That's was why I was asking in Jira this morning what does cache mean.
There are two layers as my understanding:

  1. Data file Downloading from remote to local. Currently this is configurable.
  2. Data cache, - get IEnumerable, iterate it so that it is cached.

Markus replied it is
2. Data cache, - get IEnumerable, iterate it so that it is cached.

If this Jira is to address 1 and REEF-1357 is to address 2. Then we are fine. But may need to update the title of the Jira.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jwang98052 1) is also a cache type where we add HDD to remote file. Does that make sense and removes confusion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jwang98052 OK, so are you proposing deserialize the files and iterate through to get the type of T?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, that iteration is done as part of DeSerialize currently. Where to desrialize and get the type of T all depends on user and default implementations we provide. For example, we might have tw implementation of HDD cache where one involved simple downloading to disk while some other customized implementation might involve reading from remote row by row, converting in to some intermediate format and then writing to disk. Similarly, Memory cache might also have two implementations - in one we simply load the full raw data in memory while in some customized implementation we might also do parsing and then load only parsed data in memory. It all depends on the user now. And we will provide some commonly known and used interpretations of these cache.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dkm2110 do you expect the user to iterate the data or REEF provide default implementation? If it is controlled by user, we should be able to just do it now in ContextStarthandler without this change.

{
lock (_lock)
Expand All @@ -88,7 +84,7 @@ public void Cache()
string localFileFolder = _tempFileCreator.CreateTempDirectory("-partition-");
Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "Local file temp folder: {0}", localFileFolder));

foreach (var sourceFilePath in _filePaths)
foreach (var sourceFilePath in _remoteFilePaths)
{
Uri sourceUri = _fileSystem.CreateUriForPath(sourceFilePath);
Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "sourceUri {0}: ", sourceUri));
Expand Down Expand Up @@ -127,7 +123,9 @@ public void Cache()
}

/// <summary>
/// This method copy remote files to local and then deserialize the files.
/// This method copies remote files to local if CopyToLocal is enabled, and then deserializes the files.
/// Otherwise, this method assumes that the files are remote, and that the injected IFileDeSerializer
/// can handle the remote file system access.
/// It returns the IEnumerble of T, the details is defined in the Deserialize() method
/// provided by the Serializer
/// </summary>
Expand All @@ -136,12 +134,21 @@ public T GetPartitionHandle()
{
lock (_lock)
{
if (!_localFiles.IsPresent())
if (_copyToLocal)
{
Cache();
}
if (!_localFiles.IsPresent())
{
Cache();
}

return _fileSerializer.Deserialize(_localFiles.Value);
// For now, assume IFileDeSerializer is local.
return _fileSerializer.Deserialize(_localFiles.Value);
}
else
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need else.

{
// For now, assume IFileDeSerializer is remote.
return _fileSerializer.Deserialize(_remoteFilePaths);
}
}
}

Expand Down