Skip to content

Commit

Permalink
HADOOP-12159. Move DistCpUtils#compareFs() to org.apache.hadoop.fs.Fi…
Browse files Browse the repository at this point in the history
…leUtil and fix for HA namespaces (rchiang via rkanter)
  • Loading branch information
rkanter committed Jun 30, 2015
1 parent ad60807 commit aaafa0b
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 44 deletions.
3 changes: 3 additions & 0 deletions hadoop-common-project/hadoop-common/CHANGES.txt
Expand Up @@ -910,6 +910,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-10798. globStatus() should always return a sorted list of files HADOOP-10798. globStatus() should always return a sorted list of files
(cmccabe) (cmccabe)


HADOOP-12159. Move DistCpUtils#compareFs() to org.apache.hadoop.fs.FileUtil
and fix for HA namespaces (rchiang via rkanter)

Release 2.7.2 - UNRELEASED Release 2.7.2 - UNRELEASED


INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
Expand Down
Expand Up @@ -19,6 +19,9 @@
package org.apache.hadoop.fs; package org.apache.hadoop.fs;


import java.io.*; import java.io.*;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Enumeration; import java.util.Enumeration;
Expand Down Expand Up @@ -1329,4 +1332,43 @@ public static String[] createJarWithClassPath(String inputClassPath, Path pwd,
unexpandedWildcardClasspath.toString()}; unexpandedWildcardClasspath.toString()};
return jarCp; return jarCp;
} }

public static boolean compareFs(FileSystem srcFs, FileSystem destFs) {
if (srcFs==null || destFs==null) {
return false;
}
URI srcUri = srcFs.getUri();
URI dstUri = destFs.getUri();
if (srcUri.getScheme()==null) {
return false;
}
if (!srcUri.getScheme().equals(dstUri.getScheme())) {
return false;
}
String srcHost = srcUri.getHost();
String dstHost = dstUri.getHost();
if ((srcHost!=null) && (dstHost!=null)) {
if (srcHost.equals(dstHost)) {
return srcUri.getPort()==dstUri.getPort();
}
try {
srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
} catch (UnknownHostException ue) {
if (LOG.isDebugEnabled()) {
LOG.debug("Could not compare file-systems. Unknown host: ", ue);
}
return false;
}
if (!srcHost.equals(dstHost)) {
return false;
}
} else if (srcHost==null && dstHost!=null) {
return false;
} else if (srcHost!=null) {
return false;
}
// check for ports
return srcUri.getPort()==dstUri.getPort();
}
} }
Expand Up @@ -17,16 +17,20 @@
*/ */
package org.apache.hadoop.fs; package org.apache.hadoop.fs;


import org.junit.Before; import org.junit.*;

import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.FileReader; import java.io.FileReader;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.InetAddress;
import java.net.URI; import java.net.URI;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
Expand All @@ -44,10 +48,12 @@
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.tools.tar.TarEntry; import org.apache.tools.tar.TarEntry;
import org.apache.tools.tar.TarOutputStream; import org.apache.tools.tar.TarOutputStream;
import org.junit.After;
import org.junit.Assert; import javax.print.attribute.URISyntax;
import org.junit.Test;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;


public class TestFileUtil { public class TestFileUtil {
private static final Log LOG = LogFactory.getLog(TestFileUtil.class); private static final Log LOG = LogFactory.getLog(TestFileUtil.class);
Expand All @@ -64,6 +70,25 @@ public class TestFileUtil {
private final File dir2 = new File(del, DIR + "2"); private final File dir2 = new File(del, DIR + "2");
private final File partitioned = new File(TEST_DIR, "partitioned"); private final File partitioned = new File(TEST_DIR, "partitioned");


private InetAddress inet1;
private InetAddress inet2;
private InetAddress inet3;
private InetAddress inet4;
private InetAddress inet5;
private InetAddress inet6;
private URI uri1;
private URI uri2;
private URI uri3;
private URI uri4;
private URI uri5;
private URI uri6;
private FileSystem fs1;
private FileSystem fs2;
private FileSystem fs3;
private FileSystem fs4;
private FileSystem fs5;
private FileSystem fs6;

/** /**
* Creates multiple directories for testing. * Creates multiple directories for testing.
* *
Expand All @@ -80,6 +105,7 @@ public class TestFileUtil {
* file: part-r-00000, contents: "foo" * file: part-r-00000, contents: "foo"
* file: part-r-00001, contents: "bar" * file: part-r-00001, contents: "bar"
*/ */
@Ignore
private void setupDirs() throws IOException { private void setupDirs() throws IOException {
Assert.assertFalse(del.exists()); Assert.assertFalse(del.exists());
Assert.assertFalse(tmp.exists()); Assert.assertFalse(tmp.exists());
Expand Down Expand Up @@ -1096,4 +1122,82 @@ public void testCreateJarWithClassPath() throws Exception {
} }
} }
} }

@Ignore
public void setupCompareFs() {
// Set up Strings
String host1 = "1.2.3.4";
String host2 = "2.3.4.5";
int port1 = 7000;
int port2 = 7001;
String uris1 = "hdfs://" + host1 + ":" + Integer.toString(port1) + "/tmp/foo";
String uris2 = "hdfs://" + host1 + ":" + Integer.toString(port2) + "/tmp/foo";
String uris3 = "hdfs://" + host2 + ":" + Integer.toString(port2) + "/tmp/foo";
String uris4 = "hdfs://" + host2 + ":" + Integer.toString(port2) + "/tmp/foo";
String uris5 = "file:///" + host1 + ":" + Integer.toString(port1) + "/tmp/foo";
String uris6 = "hdfs:///" + host1 + "/tmp/foo";
// Set up URI objects
try {
uri1 = new URI(uris1);
uri2 = new URI(uris2);
uri3 = new URI(uris3);
uri4 = new URI(uris4);
uri5 = new URI(uris5);
uri6 = new URI(uris6);
} catch (URISyntaxException use) {
}
// Set up InetAddress
inet1 = mock(InetAddress.class);
when(inet1.getCanonicalHostName()).thenReturn(host1);
inet2 = mock(InetAddress.class);
when(inet2.getCanonicalHostName()).thenReturn(host1);
inet3 = mock(InetAddress.class);
when(inet3.getCanonicalHostName()).thenReturn(host2);
inet4 = mock(InetAddress.class);
when(inet4.getCanonicalHostName()).thenReturn(host2);
inet5 = mock(InetAddress.class);
when(inet5.getCanonicalHostName()).thenReturn(host1);
inet6 = mock(InetAddress.class);
when(inet6.getCanonicalHostName()).thenReturn(host1);

// Link of InetAddress to corresponding URI
try {
when(InetAddress.getByName(uris1)).thenReturn(inet1);
when(InetAddress.getByName(uris2)).thenReturn(inet2);
when(InetAddress.getByName(uris3)).thenReturn(inet3);
when(InetAddress.getByName(uris4)).thenReturn(inet4);
when(InetAddress.getByName(uris5)).thenReturn(inet5);
} catch (UnknownHostException ue) {
}

fs1 = mock(FileSystem.class);
when(fs1.getUri()).thenReturn(uri1);
fs2 = mock(FileSystem.class);
when(fs2.getUri()).thenReturn(uri2);
fs3 = mock(FileSystem.class);
when(fs3.getUri()).thenReturn(uri3);
fs4 = mock(FileSystem.class);
when(fs4.getUri()).thenReturn(uri4);
fs5 = mock(FileSystem.class);
when(fs5.getUri()).thenReturn(uri5);
fs6 = mock(FileSystem.class);
when(fs6.getUri()).thenReturn(uri6);
}

@Test
public void testCompareFsNull() throws Exception {
setupCompareFs();
assertEquals(FileUtil.compareFs(null,fs1),false);
assertEquals(FileUtil.compareFs(fs1,null),false);
}

@Test
public void testCompareFsDirectories() throws Exception {
setupCompareFs();
assertEquals(FileUtil.compareFs(fs1,fs1),true);
assertEquals(FileUtil.compareFs(fs1,fs2),false);
assertEquals(FileUtil.compareFs(fs1,fs5),false);
assertEquals(FileUtil.compareFs(fs3,fs4),true);
assertEquals(FileUtil.compareFs(fs1,fs6),false);
}
} }
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Cluster; import org.apache.hadoop.mapreduce.Cluster;
Expand Down Expand Up @@ -347,7 +348,7 @@ private void configureOutputFormat(Job job) throws IOException {
workDir = new Path(workDir, WIP_PREFIX + targetPath.getName() workDir = new Path(workDir, WIP_PREFIX + targetPath.getName()
+ rand.nextInt()); + rand.nextInt());
FileSystem workFS = workDir.getFileSystem(configuration); FileSystem workFS = workDir.getFileSystem(configuration);
if (!DistCpUtils.compareFs(targetFS, workFS)) { if (!FileUtil.compareFs(targetFS, workFS)) {
throw new IllegalArgumentException("Work path " + workDir + throw new IllegalArgumentException("Work path " + workDir +
" and target path " + targetPath + " are in different file system"); " and target path " + targetPath + " are in different file system");
} }
Expand Down
Expand Up @@ -468,43 +468,4 @@ public static boolean checksumsAreEqual(FileSystem sourceFS, Path source,
return (sourceChecksum == null || targetChecksum == null || return (sourceChecksum == null || targetChecksum == null ||
sourceChecksum.equals(targetChecksum)); sourceChecksum.equals(targetChecksum));
} }

/* see if two file systems are the same or not
*
*/
public static boolean compareFs(FileSystem srcFs, FileSystem destFs) {
URI srcUri = srcFs.getUri();
URI dstUri = destFs.getUri();
if (srcUri.getScheme() == null) {
return false;
}
if (!srcUri.getScheme().equals(dstUri.getScheme())) {
return false;
}
String srcHost = srcUri.getHost();
String dstHost = dstUri.getHost();
if ((srcHost != null) && (dstHost != null)) {
try {
srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
} catch(UnknownHostException ue) {
if (LOG.isDebugEnabled())
LOG.debug("Could not compare file-systems. Unknown host: ", ue);
return false;
}
if (!srcHost.equals(dstHost)) {
return false;
}
}
else if (srcHost == null && dstHost != null) {
return false;
}
else if (srcHost != null) {
return false;
}

//check for ports

return srcUri.getPort() == dstUri.getPort();
}
} }

0 comments on commit aaafa0b

Please sign in to comment.