Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding s3a schema and s3a implem to hdfs storage module. #3940

Closed
wants to merge 3 commits into from

Conversation

b-slim
Copy link
Contributor

@b-slim b-slim commented Feb 15, 2017

This Pr adds S3a file schema to the current list of supported file systems by the hadoop indexer.
Also it adds the iplementation Jars of hadoop-aws to hdfs deep storage module that will be used to load segments indexed with S3a schema.
FYI tagged as 0.10 but no rush to have it in.


This change is Reviewable

@b-slim b-slim added the Feature label Feb 15, 2017
@b-slim b-slim added this to the 0.10.0 milestone Feb 15, 2017
@drcrallen
Copy link
Contributor

Copy link
Member

@nishantmonu51 nishantmonu51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 , LGTM.

pom.xml Outdated
@@ -71,7 +71,7 @@
<netty.version>4.1.6.Final</netty.version>
<slf4j.version>1.7.12</slf4j.version>
<!-- If compiling with different hadoop version also modify default hadoop coordinates in TaskConfig.java -->
<hadoop.compile.version>2.3.0</hadoop.compile.version>
<hadoop.compile.version>2.7.0</hadoop.compile.version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not 2.7.3?

@@ -421,6 +421,7 @@ public long push() throws IOException
case "hdfs":
case "viewfs":
case "maprfs":
case "s3a":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason s3a is here instead of in the "s3_zip" section? I think s3a should be treated like s3 deep storage, not hdfs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gianm as you can see here i have added the hadoop aws jars to be able to read the segment with hdfs loadspec. That makes it more uniform IMO. The idea here is to have one deep storage module that read all the supported file system by hadoop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we really want uniformity. There's no good reason for people with S3 deep storage to have to load the druid-hdfs-storage extension or hadoop classes on their historical nodes.

The idea behind how things work now is that even if a hadoop indexing job uses the hadoop fs classes to push data to s3 deep storage, you'd still configure your historicals with s3 deep storage, not hdfs.

String segmentDir = "hdfs".equals(fileSystem.getScheme()) || "viewfs".equals(fileSystem.getScheme())
String segmentDir = "hdfs".equals(fileSystem.getScheme())
|| "viewfs".equals(fileSystem.getScheme())
|| "s3a".equals(fileSystem.getScheme())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gianm how about the issue with : it still not supported by hadoop thought.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s3a shouldn't really be here… probably just like determining the loadSpec, the directory should be based on the kind of deep storage configured and not on the scheme. I think adding a getStorageDir to DataSegmentPusher, and getting rid of DataSegmentPusherUtil, would solve that. It could use java 8 default methods to prevent any pusher other than HDFS from having to override it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hum this method is used everywhere... anyway will change that.

@b-slim
Copy link
Contributor Author

b-slim commented Feb 17, 2017

Review status: 0 of 4 files reviewed at latest revision, 3 unresolved discussions.


indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java, line 424 at r1 (raw file):

Previously, gianm (Gian Merlino) wrote…

I don't think we really want uniformity. There's no good reason for people with S3 deep storage to have to load the druid-hdfs-storage extension or hadoop classes on their historical nodes.

The idea behind how things work now is that even if a hadoop indexing job uses the hadoop fs classes to push data to s3 deep storage, you'd still configure your historicals with s3 deep storage, not hdfs.

@gianm for the long term we are trying to use only hdfs pusher/puller for all hadoop related file systems. the good reason is to have one module that does everything Thought. I get the idea that the S3 module was developed in absence of current hadoop aws module but i guess it is always good to move out of it and have one loadspec for all the hadoop file systems.


Comments from Reviewable

@b-slim
Copy link
Contributor Author

b-slim commented Feb 17, 2017

Review status: 0 of 4 files reviewed at latest revision, 3 unresolved discussions.


pom.xml, line 74 at r1 (raw file):

Previously, gianm (Gian Merlino) wrote…

Why not 2.7.3?

Done.


Comments from Reviewable

@gianm
Copy link
Contributor

gianm commented Feb 17, 2017

@gian for the long term we are trying to use only hdfs pusher/puller for all hadoop related file systems. the good reason is to have one module that does everything Thought. I get the idea that the S3 module was developed in absence of current hadoop aws module but i guess it is always good to move out of it and have one loadspec for all the hadoop file systems.

Just because you use the s3a fs to push data to S3 doesn't mean S3 is a hadoop file system. It's not. It's S3.

@b-slim
Copy link
Contributor Author

b-slim commented Feb 17, 2017

@gianm i get your point, but what i am trying to say is that if we are using hadoop implementations to push the files would it make sense to use hadoop implementations to pull/kill etc ?


Review status: 0 of 4 files reviewed at latest revision, 3 unresolved discussions.


Comments from Reviewable

@b-slim
Copy link
Contributor Author

b-slim commented Feb 17, 2017

Review status: 0 of 4 files reviewed at latest revision, 4 unresolved discussions.


a discussion (no related file):
@gianm let me explain this more. What i am trying to achieve is the following. Ideally any segment that i used hdfs pusher to push it i can use hdfs puller to read it. For instance here i can use hdfs pusher with a file system such as S3a then since the loadspec is hdfs, the only way to read it back is via hdfs puller. So in general what i want to get to is if i use pusher X to push data i can read it what puller X.


Comments from Reviewable

@gianm
Copy link
Contributor

gianm commented Feb 17, 2017

What i am trying to achieve is the following. Ideally any segment that i used hdfs pusher to push it i can use hdfs puller to read it. For instance here i can use hdfs pusher with a file system such as S3a then since the loadspec is hdfs, the only way to read it back is via hdfs puller. So in general what i want to get to is if i use pusher X to push data i can read it what puller X.

Your argument is reasonable, but I think the reason we are disagreeing is that in a Hadoop-based indexing job you're not actually using the hdfs pusher. It doesn't use the pushers at all, it has its own hard coded pushing code that is based purely on the scheme.

Put another way, a user could configure S3 deep storage in two ways:

druid.storage.type=hdfs
druid.storage.storageDirectory=s3n://foo/bar
druid.storage.type=s3
druid.storage.bucket=foo
druid.storage.baseKey=bar

In non-Hadoop based indexing jobs, where the pushers actually do get used, the former gets loadSpec "hdfs" and the latter gets "s3_zip" even though the segments end up in the same place. In Hadoop-based indexing jobs the loadSpec for both will be "s3_zip" since it only looks at the scheme.

It seems to me that it would be best for the user to have similar control over the loadSpec in Hadoop-based jobs, rather than forcing one choice or another. Does that sound right to you and would you be open to working on that?

For my part, as long as the user has no choice in loadSpec used for S3 storages in Hadoop-based jobs, I think they should stay what they currently are, which is "s3_zip", for all S3 schemes (s3, s3n, s3a).

@b-slim
Copy link
Contributor Author

b-slim commented Feb 17, 2017

@gianm agree about

It seems to me that it would be best for the user to have similar control over the loadSpec in Hadoop-based jobs, rather than forcing one choice or another. Does that sound right to you and would you be open to working on that?

Ok seems like to make this working we need to let the user select the pusher/puller. Hence if S3 module is loaded the loadspec will be s3_zip and if hdfs storage module is loaded it will be hdfs load spec. Do you agree with this approach ?

@gianm
Copy link
Contributor

gianm commented Feb 17, 2017

Yeah that seems like a good approach. Although rather than based on loaded modules I would do it based on which deep storage is configured as the main one -- since I think it's possible to load two deep storage modules at once.

To do that I think we need to resolve this todo in JobHelper:

    // TODO: Make this a part of Pushers or Pullers
    switch (outputFS.getScheme()) {
      case "hdfs":
      case "viewfs":
      case "maprfs":
        loadSpec = ImmutableMap.<String, Object>of(
            "type", "hdfs",
            "path", indexOutURI.toString()
        );
        break;
      case "gs":
        loadSpec = ImmutableMap.<String, Object>of(
            "type", "google",
            "bucket", indexOutURI.getHost(),
            "path", indexOutURI.getPath().substring(1) // remove the leading "/"
        );
        break;
      case "s3":
      case "s3n":
        loadSpec = ImmutableMap.<String, Object>of(
            "type", "s3_zip",
            "bucket", indexOutURI.getHost(),
            "key", indexOutURI.getPath().substring(1) // remove the leading "/"
        );
        break;
      case "file":
        loadSpec = ImmutableMap.<String, Object>of(
            "type", "local",
            "path", indexOutURI.getPath()
        );
        break;
      default:
        throw new IAE("Unknown file system scheme [%s]", outputFS.getScheme());
    }

Instead of the big switch statement that should probably call a method like makeLoadSpec on the DataSegmentPusher for whatever deep storage is primary.

@gianm
Copy link
Contributor

gianm commented Feb 17, 2017

That would mean using "hdfs" deep storage to write to s3 would let the HdfsDataSegmentPusher make an "hdfs" load spec, and using the "s3" deep storage to write to s3 would let the S3DataSegmentPusher make a "s3_zip" load spec.

@gianm gianm modified the milestones: 0.10.1, 0.10.0 Feb 21, 2017
@gianm
Copy link
Contributor

gianm commented Feb 21, 2017

Bumping to 0.10.1 as discussed in dev sync today.

@b-slim
Copy link
Contributor Author

b-slim commented Feb 23, 2017

@gianm checkout the new approach and let me know what you think.

@@ -146,4 +148,19 @@ public DataSegment call() throws Exception
}
}
}

@Override
public Map<String, Object> makeLoadSpec(URI uri)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor nit : for each pusher the makeLoadSpec logic seems duplicated with the push method, this can be extracted to a common method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @nishantmonu51, to avoid duplicated code please use this in push, or else have them both call a common helper method.

@gianm
Copy link
Contributor

gianm commented Mar 16, 2017

@b-slim sorry to let this drop for so long, I am taking another look now.

@@ -174,4 +175,17 @@ public DataSegment call() throws Exception
}
}
}

@Override
public Map<String, Object> makeLoadSpec(URI uri)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please either use this in uploadDataSegment or else have them use a common helper method.

@Override
public Map<String, Object> makeLoadSpec(URI uri)
{
throw new IAE("not supported");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use spaces instead of tabs for indenting. Also this should technically be an UnsupportedOperationException.

@@ -146,4 +148,19 @@ public DataSegment call() throws Exception
}
}
}

@Override
public Map<String, Object> makeLoadSpec(URI uri)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @nishantmonu51, to avoid duplicated code please use this in push, or else have them both call a common helper method.

@@ -142,6 +146,19 @@ public DataSegment push(final File indexFilesDir, final DataSegment segment) thr
}
}

@Override
public Map<String, Object> makeLoadSpec(URI finalIndexZipFilePath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use this in push too, or else have them both call a common helper method.

@@ -149,4 +151,14 @@ public DataSegment call() throws Exception
throw Throwables.propagate(e);
}
}

@Override
public Map<String, Object> makeLoadSpec(URI finalIndexZipFilePath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment to other pushers about code duplication.

@@ -741,7 +741,8 @@ public void doRun()
new Path(config.getSchema().getIOConfig().getSegmentOutputPath()),
outputFS,
segmentTemplate
)
),
config.DATA_SEGMENT_PUSHER
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a static, so this would be more clear as HadoopDruidIndexerConfig.DATA_SEGMENT_PUSHER.

Copy link
Contributor

@gianm gianm Mar 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s3a shouldn't really be here… probably just like determining the loadSpec, the directory should be based on the kind of deep storage configured and not on the scheme. I think adding a getStorageDir to DataSegmentPusher, and getting rid of DataSegmentPusherUtil, would solve that. It could use java 8 default methods to prevent any pusher other than HDFS from having to override it.

Oops, this comment was in the wrong spot. Moved to https://github.com/druid-io/druid/pull/3940/files#r107766956

@gianm
Copy link
Contributor

gianm commented Mar 23, 2017

@b-slim are you able to pick this back up?

@b-slim
Copy link
Contributor Author

b-slim commented Mar 23, 2017

@gianm thanks for review, give me couple of days.

@gianm
Copy link
Contributor

gianm commented Mar 23, 2017

Cool, thanks for the update.

One other suggestion: how about adding a config to S3DataSegmentPusherConfig that controls whether getPathForHadoop() uses s3n or s3a? IMO, it should be s3n by default (for compatibility) but some users would want that to flip over to s3a. Something like useS3AForHadoop?

dclim added a commit to implydata/druid-public that referenced this pull request Mar 23, 2017
dclim added a commit to implydata/druid-public that referenced this pull request Mar 24, 2017
@b-slim
Copy link
Contributor Author

b-slim commented Mar 25, 2017

@gianm opened new PR due to the amount of conflicting files, can we make sure this gets reviewed ASAP to avoid extra work on fixing conflicts.

@b-slim b-slim closed this Mar 25, 2017
@b-slim
Copy link
Contributor Author

b-slim commented Mar 25, 2017

tracked by #4116

@iainlbc
Copy link

iainlbc commented May 24, 2017

+1

@b-slim b-slim deleted the update_hadoop_compile branch April 26, 2018 01:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants