Skip to content

Commit

Permalink
DRILL-5751: Fix unit tests to use local file system even if it is not…
Browse files Browse the repository at this point in the history
… set by default
  • Loading branch information
arina-ielchiieva committed Aug 30, 2017
1 parent a51c98b commit 079aa8a
Show file tree
Hide file tree
Showing 13 changed files with 196 additions and 174 deletions.
Expand Up @@ -255,8 +255,8 @@ String getBinaryName() {
* @throws IOException in case of binary or source absence or problems during copying jars
*/
void initRemoteBackup() throws IOException {
fs.getFileStatus(stagingBinary);
fs.getFileStatus(stagingSource);
checkPathExistence(stagingBinary);
checkPathExistence(stagingSource);
fs.mkdirs(remoteTmpDir);
FileUtil.copy(fs, stagingBinary, fs, tmpRemoteBinary, false, true, fs.getConf());
FileUtil.copy(fs, stagingSource, fs, tmpRemoteSource, false, true, fs.getConf());
Expand Down Expand Up @@ -315,6 +315,19 @@ void cleanUp() {
deleteQuietly(remoteTmpDir, true);
}

/**
* Checks if passed path exists on predefined file system.
*
* @param path path to be checked
* @throws IOException if path does not exist
*/
private void checkPathExistence(Path path) throws IOException {
if (!fs.exists(path)) {
throw new IOException(String.format("File %s does not exist on file system %s",
path.toUri().getPath(), fs.getUri()));
}
}

/**
* Deletes quietly file or directory, in case of errors, logs warning and proceeds.
*
Expand Down
Expand Up @@ -83,7 +83,8 @@
public class BaseTestQuery extends ExecTest {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseTestQuery.class);

public static final String TEMP_SCHEMA = "dfs_test.tmp";
public static final String TEST_SCHEMA = "dfs_test";
public static final String TEMP_SCHEMA = TEST_SCHEMA + ".tmp";

private static final int MAX_WIDTH_PER_NODE = 2;

Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.common.util.TestTools;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.VersionMismatchException;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.expr.fn.registry.LocalFunctionRegistry;
Expand All @@ -32,9 +33,11 @@
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.sys.store.DataChangeVersion;
import org.apache.drill.exec.util.JarUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
Expand All @@ -43,7 +46,6 @@
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
Expand All @@ -67,18 +69,28 @@
@RunWith(MockitoJUnitRunner.class)
public class TestDynamicUDFSupport extends BaseTestQuery {

private static final File jars = new File(TestTools.getWorkingPath() + "/src/test/resources/jars");
private static final Path jars = new Path(TestTools.getWorkingPath(), "src/test/resources/jars");
private static final String default_binary_name = "DrillUDF-1.0.jar";
private static final String default_source_name = JarUtil.getSourceName(default_binary_name);

@Rule
public final TemporaryFolder base = new TemporaryFolder();

private static FileSystem localFileSystem;

@BeforeClass
public static void init() throws IOException {
Configuration configuration = new Configuration();
configuration.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
localFileSystem = FileSystem.get(configuration);
}

@Before
public void setup() {
Properties overrideProps = new Properties();
overrideProps.setProperty("drill.exec.udf.directory.root", base.getRoot().getPath());
overrideProps.setProperty("drill.tmp-dir", base.getRoot().getPath());
overrideProps.setProperty(ExecConstants.UDF_DIRECTORY_ROOT, base.getRoot().getPath());
overrideProps.setProperty(ExecConstants.DRILL_TMP_DIR, base.getRoot().getPath());
overrideProps.setProperty(ExecConstants.UDF_DIRECTORY_FS, FileSystem.DEFAULT_FS);
updateTestCluster(1, DrillConfig.create(overrideProps));
}

Expand Down Expand Up @@ -120,8 +132,11 @@ public void testDisableDynamicSupport() throws Exception {
@Test
public void testAbsentBinaryInStaging() throws Exception {
Path staging = getDrillbitContext().getRemoteFunctionRegistry().getStagingArea();
FileSystem fs = getDrillbitContext().getRemoteFunctionRegistry().getFs();
copyJar(fs, jars, staging, default_binary_name);

String summary = String.format("File %s does not exist", new Path(staging, default_binary_name).toUri().getPath());
String summary = String.format("File %s does not exist on file system %s",
new Path(staging, default_source_name).toUri().getPath(), fs.getUri());

testBuilder()
.sqlQuery("create function using jar '%s'", default_binary_name)
Expand All @@ -133,11 +148,13 @@ public void testAbsentBinaryInStaging() throws Exception {

@Test
public void testAbsentSourceInStaging() throws Exception {
Path staging = getDrillbitContext().getRemoteFunctionRegistry().getStagingArea();
copyJar(getDrillbitContext().getRemoteFunctionRegistry().getFs(), new Path(jars.toURI()),
staging, default_binary_name);
RemoteFunctionRegistry remoteFunctionRegistry = getDrillbitContext().getRemoteFunctionRegistry();
FileSystem fs = remoteFunctionRegistry.getFs();
Path staging = remoteFunctionRegistry.getStagingArea();
copyJar(fs, jars, staging, default_binary_name);

String summary = String.format("File %s does not exist", new Path(staging, default_source_name).toUri().getPath());
String summary = String.format("File %s does not exist on file system %s",
new Path(staging, default_source_name).toUri().getPath(), fs.getUri());

testBuilder()
.sqlQuery("create function using jar '%s'", default_binary_name)
Expand Down Expand Up @@ -432,10 +449,11 @@ public void testLazyInit() throws Exception {

Path localUdfDirPath = Deencapsulation.getField(
getDrillbitContext().getFunctionImplementationRegistry(), "localUdfDir");
File localUdfDir = new File(localUdfDirPath.toUri().getPath());

assertTrue("Binary should exist in local udf directory", new File(localUdfDir, default_binary_name).exists());
assertTrue("Source should exist in local udf directory", new File(localUdfDir, default_source_name).exists());
assertTrue("Binary should exist in local udf directory",
localFileSystem.exists(new Path(localUdfDirPath, default_binary_name)));
assertTrue("Source should exist in local udf directory",
localFileSystem.exists(new Path(localUdfDirPath, default_source_name)));
}

@Test
Expand Down Expand Up @@ -498,10 +516,11 @@ public void testDropFunction() throws Exception {

Path localUdfDirPath = Deencapsulation.getField(
getDrillbitContext().getFunctionImplementationRegistry(), "localUdfDir");
File localUdfDir = new File(localUdfDirPath.toUri().getPath());

assertTrue("Binary should exist in local udf directory", new File(localUdfDir, default_binary_name).exists());
assertTrue("Source should exist in local udf directory", new File(localUdfDir, default_source_name).exists());
assertTrue("Binary should exist in local udf directory",
localFileSystem.exists(new Path(localUdfDirPath, default_binary_name)));
assertTrue("Source should exist in local udf directory",
localFileSystem.exists(new Path(localUdfDirPath, default_source_name)));

String summary = "The following UDFs in jar %s have been unregistered:\n" +
"[custom_lower(VARCHAR-REQUIRED)]";
Expand Down Expand Up @@ -530,9 +549,9 @@ public void testDropFunction() throws Exception {
fs.exists(new Path(remoteFunctionRegistry.getRegistryArea(), default_source_name)));

assertFalse("Binary should not be present in local udf directory",
new File(localUdfDir, default_binary_name).exists());
localFileSystem.exists(new Path(localUdfDirPath, default_binary_name)));
assertFalse("Source should not be present in local udf directory",
new File(localUdfDir, default_source_name).exists());
localFileSystem.exists(new Path(localUdfDirPath, default_source_name)));
}

@Test
Expand All @@ -549,7 +568,7 @@ public void testReRegisterTheSameJarWithDifferentContent() throws Exception {

Thread.sleep(1000);

Path src = new Path(jars.toURI().getPath(), "v2");
Path src = new Path(jars, "v2");
copyJarsToStagingArea(src, default_binary_name, default_source_name);
test("create function using jar '%s'", default_binary_name);
testBuilder()
Expand Down Expand Up @@ -887,11 +906,11 @@ public Boolean answer(InvocationOnMock invocation) throws Throwable {
}

private void copyDefaultJarsToStagingArea() throws IOException {
copyJarsToStagingArea(new Path(jars.toURI()), default_binary_name, default_source_name);
copyJarsToStagingArea(jars, default_binary_name, default_source_name);
}

private void copyJarsToStagingArea(String binaryName, String sourceName) throws IOException {
copyJarsToStagingArea(new Path(jars.toURI()), binaryName, sourceName);
copyJarsToStagingArea(jars, binaryName, sourceName);
}

private void copyJarsToStagingArea(Path src, String binaryName, String sourceName) throws IOException {
Expand Down
Expand Up @@ -731,6 +731,7 @@ public void runTestAndValidate(String selection, String validationSelection, Str
.go();

Configuration hadoopConf = new Configuration();
hadoopConf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
Path output = new Path(getDfsTestTmpSchemaLocation(), outputFile);
FileSystem fs = output.getFileSystem(hadoopConf);
for (FileStatus file : fs.listStatus(output)) {
Expand Down
Expand Up @@ -196,53 +196,45 @@ public void testOneArgQueryDirFunctions() throws Exception {

@Test // DRILL-4720
public void testDirectoryUDFsWithAndWithoutMetadataCache() throws Exception {
FileSystem fs = null;
try {
fs = FileSystem.get(new Configuration());

// prepare test table with partitions
Path table = new Path(getTempDir("table_with_partitions"));
String tablePath = table.toUri().getPath();
Path dataFile = new Path(TestTools.getWorkingPath(),"src/test/resources/parquet/alltypes_required.parquet");
createPartitions(fs, table, dataFile, 2);

Map<String, String> configurations = ImmutableMap.<String, String>builder()
.put("mindir", "part_1")
.put("imindir", "part_1")
.put("maxdir", "part_2")
.put("imaxdir", "part_2")
.build();

String query = "select dir0 from dfs.`%s` where dir0 = %s('dfs', '%s') limit 1";

// run tests without metadata cache
for (Map.Entry<String, String> entry : configurations.entrySet()) {
testBuilder()
.sqlQuery(query, tablePath, entry.getKey(), tablePath)
.unOrdered()
.baselineColumns("dir0")
.baselineValues(entry.getValue())
.go()
;
}
Configuration configuration = new Configuration();
configuration.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
FileSystem fs = FileSystem.get(configuration);
// prepare test table with partitions
Path table = new Path(getTempDir("table_with_partitions"));
String tablePath = table.toUri().getPath();
Path dataFile = new Path(TestTools.getWorkingPath(),"src/test/resources/parquet/alltypes_required.parquet");
createPartitions(fs, table, dataFile, 2);

Map<String, String> configurations = ImmutableMap.<String, String>builder()
.put("mindir", "part_1")
.put("imindir", "part_1")
.put("maxdir", "part_2")
.put("imaxdir", "part_2")
.build();

// generate metadata
test("refresh table metadata dfs.`%s`", tablePath);
String query = "select dir0 from dfs.`%s` where dir0 = %s('dfs', '%s') limit 1";

// run tests with metadata cache
for (Map.Entry<String, String> entry : configurations.entrySet()) {
testBuilder()
.sqlQuery(query, tablePath, entry.getKey(), tablePath)
.unOrdered()
.baselineColumns("dir0")
.baselineValues(entry.getValue())
.go();
}
// run tests without metadata cache
for (Map.Entry<String, String> entry : configurations.entrySet()) {
testBuilder()
.sqlQuery(query, tablePath, entry.getKey(), tablePath)
.unOrdered()
.baselineColumns("dir0")
.baselineValues(entry.getValue())
.go();
}

} finally {
if (fs != null) {
fs.close();
}
// generate metadata
test("refresh table metadata dfs.`%s`", tablePath);

// run tests with metadata cache
for (Map.Entry<String, String> entry : configurations.entrySet()) {
testBuilder()
.sqlQuery(query, tablePath, entry.getKey(), tablePath)
.unOrdered()
.baselineColumns("dir0")
.baselineValues(entry.getValue())
.go();
}
}

Expand Down
Expand Up @@ -25,11 +25,13 @@
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.util.TestUtilities;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

import java.io.File;
import java.util.Properties;
import java.util.UUID;

Expand All @@ -41,8 +43,10 @@ public class TemporaryTablesAutomaticDropTest extends BaseTestQuery {

private static final String session_id = "sessionId";

private FileSystem fs;

@Before
public void init() throws Exception {
public void setup() throws Exception {
new MockUp<UUID>() {
@Mock
public UUID randomUUID() {
Expand All @@ -52,47 +56,51 @@ public UUID randomUUID() {
Properties testConfigurations = cloneDefaultTestConfigProperties();
testConfigurations.put(ExecConstants.DEFAULT_TEMPORARY_WORKSPACE, TEMP_SCHEMA);
updateTestCluster(1, DrillConfig.create(testConfigurations));

Configuration configuration = new Configuration();
configuration.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
fs = FileSystem.get(configuration);
}

@Test
public void testAutomaticDropWhenClientIsClosed() throws Exception {
File sessionTemporaryLocation = createAndCheckSessionTemporaryLocation("client_closed",
Path sessionTemporaryLocation = createAndCheckSessionTemporaryLocation("client_closed",
getDfsTestTmpSchemaLocation());
updateClient("new_client");
assertFalse("Session temporary location should be absent", sessionTemporaryLocation.exists());
assertFalse("Session temporary location should be absent", fs.exists(sessionTemporaryLocation));
}

@Test
public void testAutomaticDropWhenDrillbitIsClosed() throws Exception {
File sessionTemporaryLocation = createAndCheckSessionTemporaryLocation("drillbit_closed",
Path sessionTemporaryLocation = createAndCheckSessionTemporaryLocation("drillbit_closed",
getDfsTestTmpSchemaLocation());
bits[0].close();
assertFalse("Session temporary location should be absent", sessionTemporaryLocation.exists());
assertFalse("Session temporary location should be absent", fs.exists(sessionTemporaryLocation));
}

@Test
public void testAutomaticDropOfSeveralSessionTemporaryLocations() throws Exception {
File firstSessionTemporaryLocation = createAndCheckSessionTemporaryLocation("first_location",
Path firstSessionTemporaryLocation = createAndCheckSessionTemporaryLocation("first_location",
getDfsTestTmpSchemaLocation());
StoragePluginRegistry pluginRegistry = getDrillbitContext().getStorage();
String tempDir = TestUtilities.createTempDir();
try {
TestUtilities.updateDfsTestTmpSchemaLocation(pluginRegistry, tempDir);
File secondSessionTemporaryLocation = createAndCheckSessionTemporaryLocation("second_location", tempDir);
Path secondSessionTemporaryLocation = createAndCheckSessionTemporaryLocation("second_location", tempDir);
updateClient("new_client");
assertFalse("First session temporary location should be absent", firstSessionTemporaryLocation.exists());
assertFalse("Second session temporary location should be absent", secondSessionTemporaryLocation.exists());
assertFalse("First session temporary location should be absent", fs.exists(firstSessionTemporaryLocation));
assertFalse("Second session temporary location should be absent", fs.exists(secondSessionTemporaryLocation));
} finally {
TestUtilities.updateDfsTestTmpSchemaLocation(pluginRegistry, getDfsTestTmpSchemaLocation());
}
}

private File createAndCheckSessionTemporaryLocation(String suffix, String schemaLocation) throws Exception {
private Path createAndCheckSessionTemporaryLocation(String suffix, String schemaLocation) throws Exception {
String temporaryTableName = "temporary_table_automatic_drop_" + suffix;
test("create TEMPORARY table %s.%s as select 'A' as c1 from (values(1))", TEMP_SCHEMA, temporaryTableName);
File sessionTemporaryLocation = new File(schemaLocation,
Path sessionTemporaryLocation = new Path(schemaLocation,
UUID.nameUUIDFromBytes(session_id.getBytes()).toString());
assertTrue("Session temporary location should exist", sessionTemporaryLocation.exists());
assertTrue("Session temporary location should exist", fs.exists(sessionTemporaryLocation));
return sessionTemporaryLocation;
}

Expand Down
Expand Up @@ -288,7 +288,10 @@ public void createTableWithCustomUmask() throws Exception {
test("use %s", TEMP_SCHEMA);
String tableName = "with_custom_permission";
StorageStrategy storageStrategy = new StorageStrategy("000", false);
try (FileSystem fs = FileSystem.get(new Configuration())) {
Configuration configuration = new Configuration();
configuration.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
FileSystem fs = FileSystem.get(configuration);
try {
test("alter session set `%s` = '%s'", ExecConstants.PERSISTENT_TABLE_UMASK, storageStrategy.getUmask());
test("create table %s as select 'A' from (values(1))", tableName);
Path tableLocation = new Path(getDfsTestTmpSchemaLocation(), tableName);
Expand Down

0 comments on commit 079aa8a

Please sign in to comment.