From afb0dcdca866317365e338eebf5277a24d94796d Mon Sep 17 00:00:00 2001 From: dhatchayani Date: Mon, 3 Jul 2017 11:53:11 +0530 Subject: [PATCH] [CARBONDATA-1193] ViewFS Support - improvement --- .../filesystem/AbstractDFSCarbonFile.java | 77 +++++++++++++++- .../filesystem/AlluxioCarbonFile.java | 77 +--------------- .../datastore/filesystem/HDFSCarbonFile.java | 88 ++----------------- .../filesystem/ViewFSCarbonFile.java | 85 ++---------------- .../core/datastore/impl/FileFactory.java | 14 ++- .../filesystem/ViewFsCarbonFileTest.java | 6 +- .../spark/load/CarbonLoaderUtil.java | 2 +- .../carbondata/spark/util/CommonUtil.scala | 3 +- 8 files changed, 112 insertions(+), 240 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java index 8a86d924078..991539a1765 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java @@ -20,6 +20,8 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; @@ -30,8 +32,10 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.viewfs.ViewFileSystem; +import org.apache.hadoop.hdfs.DistributedFileSystem; -public abstract class AbstractDFSCarbonFile implements CarbonFile { +public abstract class AbstractDFSCarbonFile implements CarbonFile { /** * LOGGER */ @@ -124,6 +128,77 @@ public boolean renameTo(String changetoName) { } } + @Override public boolean renameForce(String changetoName) { + FileSystem fs; + try { + fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration()); + if (fs instanceof DistributedFileSystem) { + ((DistributedFileSystem) fs).rename(fileStatus.getPath(), new Path(changetoName), + org.apache.hadoop.fs.Options.Rename.OVERWRITE); + return true; + } else if (fs instanceof ViewFileSystem) { + fs.delete(new Path(changetoName), true); + fs.rename(fileStatus.getPath(), new Path(changetoName)); + return true; + } else { + return false; + } + } catch (IOException e) { + LOGGER.error("Exception occured" + e.getMessage()); + return false; + } + } + + public CarbonFile[] getFiles(FileStatus[] listStatus, FileFactory.FileType fileType) { + if (listStatus == null) { + return new CarbonFile[0]; + } + CarbonFile[] files = new CarbonFile[listStatus.length]; + for (int i = 0; i < files.length; i++) { + files[i] = FileFactory.getCarbonFile(listStatus[i], fileType); + } + return files; + } + + public CarbonFile[] listFiles(FileFactory.FileType fileType) { + FileStatus[] listStatus = null; + try { + if (null != fileStatus && fileStatus.isDirectory()) { + Path path = fileStatus.getPath(); + listStatus = path.getFileSystem(FileFactory.getConfiguration()).listStatus(path); + } else { + return new CarbonFile[0]; + } + } catch (IOException ex) { + LOGGER.error("Exception occured" + ex.getMessage()); + return new CarbonFile[0]; + } + return getFiles(listStatus, fileType); + } + + @Override public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) { + CarbonFile[] files = listFiles(); + if (files != null && files.length >= 1) { + List fileList = new ArrayList(files.length); + for (int i = 0; i < files.length; i++) { + if (fileFilter.accept(files[i])) { + fileList.add(files[i]); + } + } + if (fileList.size() >= 1) { + return fileList.toArray(new CarbonFile[fileList.size()]); + } else { + return new CarbonFile[0]; + } + } + return files; + } + + public CarbonFile getParentFile(FileFactory.FileType fileType) { + Path parent = fileStatus.getPath().getParent(); + return null == parent ? null : FileFactory.getCarbonFile(parent.toString(), fileType); + } + public boolean delete() { FileSystem fs; try { diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java index c3ccd0cb62b..3f2708d59fc 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java @@ -17,27 +17,13 @@ package org.apache.carbondata.core.datastore.filesystem; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DistributedFileSystem; - public class AlluxioCarbonFile extends AbstractDFSCarbonFile { - /** - * LOGGER - */ - private static final LogService LOGGER = - LogServiceFactory.getLogService(AlluxioCarbonFile.class.getName()); public AlluxioCarbonFile(String filePath) { super(filePath); @@ -51,78 +37,23 @@ public AlluxioCarbonFile(FileStatus fileStatus) { super(fileStatus); } - /** - * @param listStatus - * @return - */ - private CarbonFile[] getFiles(FileStatus[] listStatus) { - if (listStatus == null) { - return new CarbonFile[0]; - } - CarbonFile[] files = new CarbonFile[listStatus.length]; - for (int i = 0; i < files.length; i++) { - files[i] = new AlluxioCarbonFile(listStatus[i]); - } - return files; - } - @Override public CarbonFile[] listFiles() { - FileStatus[] listStatus = null; - try { - if (null != fileStatus && fileStatus.isDirectory()) { - Path path = fileStatus.getPath(); - listStatus = path.getFileSystem(FileFactory.getConfiguration()).listStatus(path); - } else { - return new CarbonFile[0]; - } - } catch (IOException e) { - LOGGER.error("Exception occured: " + e.getMessage()); - return new CarbonFile[0]; - } - return getFiles(listStatus); + return super.listFiles(FileFactory.FileType.ALLUXIO); } @Override public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) { - CarbonFile[] files = listFiles(); - if (files != null && files.length >= 1) { - List fileList = new ArrayList(files.length); - for (int i = 0; i < files.length; i++) { - if (fileFilter.accept(files[i])) { - fileList.add(files[i]); - } - } - if (fileList.size() >= 1) { - return fileList.toArray(new CarbonFile[fileList.size()]); - } else { - return new CarbonFile[0]; - } - } - return files; + return super.listFiles(fileFilter); } @Override public CarbonFile getParentFile() { - Path parent = fileStatus.getPath().getParent(); - return null == parent ? null : new AlluxioCarbonFile(parent); + return super.getParentFile(FileFactory.FileType.ALLUXIO); } @Override public boolean renameForce(String changetoName) { - FileSystem fs; - try { - fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration()); - if (fs instanceof DistributedFileSystem) { - ((DistributedFileSystem) fs).rename(fileStatus.getPath(), new Path(changetoName), - org.apache.hadoop.fs.Options.Rename.OVERWRITE); - return true; - } else { - return false; - } - } catch (IOException e) { - LOGGER.error("Exception occured: " + e.getMessage()); - return false; - } + return super.renameForce(changetoName); } } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java index 19bdc7bba30..c9299d81484 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java @@ -17,25 +17,12 @@ package org.apache.carbondata.core.datastore.filesystem; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DistributedFileSystem; public class HDFSCarbonFile extends AbstractDFSCarbonFile { - /** - * LOGGER - */ - private static final LogService LOGGER = - LogServiceFactory.getLogService(HDFSCarbonFile.class.getName()); public HDFSCarbonFile(String filePath) { super(filePath); @@ -49,78 +36,19 @@ public HDFSCarbonFile(FileStatus fileStatus) { super(fileStatus); } - /** - * @param listStatus - * @return - */ - private CarbonFile[] getFiles(FileStatus[] listStatus) { - if (listStatus == null) { - return new CarbonFile[0]; - } - CarbonFile[] files = new CarbonFile[listStatus.length]; - for (int i = 0; i < files.length; i++) { - files[i] = new HDFSCarbonFile(listStatus[i]); - } - return files; - } - - @Override - public CarbonFile[] listFiles() { - FileStatus[] listStatus = null; - try { - if (null != fileStatus && fileStatus.isDirectory()) { - Path path = fileStatus.getPath(); - listStatus = path.getFileSystem(FileFactory.getConfiguration()).listStatus(path); - } else { - return new CarbonFile[0]; - } - } catch (IOException e) { - LOGGER.error("Exception occured: " + e.getMessage()); - return new CarbonFile[0]; - } - return getFiles(listStatus); + @Override public CarbonFile[] listFiles() { + return super.listFiles(FileFactory.FileType.HDFS); } - @Override - public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) { - CarbonFile[] files = listFiles(); - if (files != null && files.length >= 1) { - List fileList = new ArrayList(files.length); - for (int i = 0; i < files.length; i++) { - if (fileFilter.accept(files[i])) { - fileList.add(files[i]); - } - } - if (fileList.size() >= 1) { - return fileList.toArray(new CarbonFile[fileList.size()]); - } else { - return new CarbonFile[0]; - } - } - return files; + @Override public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) { + return super.listFiles(fileFilter); } - @Override - public CarbonFile getParentFile() { - Path parent = fileStatus.getPath().getParent(); - return null == parent ? null : new HDFSCarbonFile(parent); + @Override public CarbonFile getParentFile() { + return super.getParentFile(FileFactory.FileType.HDFS); } - @Override - public boolean renameForce(String changetoName) { - FileSystem fs; - try { - fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration()); - if (fs instanceof DistributedFileSystem) { - ((DistributedFileSystem) fs).rename(fileStatus.getPath(), new Path(changetoName), - org.apache.hadoop.fs.Options.Rename.OVERWRITE); - return true; - } else { - return false; - } - } catch (IOException e) { - LOGGER.error("Exception occured: " + e.getMessage()); - return false; - } + @Override public boolean renameForce(String changetoName) { + return super.renameForce(changetoName); } } \ No newline at end of file diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/ViewFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/ViewFSCarbonFile.java index e05112d0ba2..add5069967d 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/ViewFSCarbonFile.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/ViewFSCarbonFile.java @@ -16,25 +16,12 @@ */ package org.apache.carbondata.core.datastore.filesystem; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.viewfs.ViewFileSystem; public class ViewFSCarbonFile extends AbstractDFSCarbonFile { - /** - * LOGGER - */ - private static final LogService LOGGER = - LogServiceFactory.getLogService(ViewFSCarbonFile.class.getName()); public ViewFSCarbonFile(String filePath) { super(filePath); @@ -48,77 +35,19 @@ public ViewFSCarbonFile(FileStatus fileStatus) { super(fileStatus); } - /** - * @param listStatus - * @return - */ - private CarbonFile[] getFiles(FileStatus[] listStatus) { - if (listStatus == null) { - return new CarbonFile[0]; - } - CarbonFile[] files = new CarbonFile[listStatus.length]; - for (int i = 0; i < files.length; i++) { - files[i] = new ViewFSCarbonFile(listStatus[i]); - } - return files; - } - - @Override - public CarbonFile[] listFiles() { - FileStatus[] listStatus = null; - try { - if (null != fileStatus && fileStatus.isDirectory()) { - Path path = fileStatus.getPath(); - listStatus = path.getFileSystem(FileFactory.getConfiguration()).listStatus(path); - } else { - return new CarbonFile[0]; - } - } catch (IOException ex) { - LOGGER.error("Exception occured" + ex.getMessage()); - return new CarbonFile[0]; - } - return getFiles(listStatus); + @Override public CarbonFile[] listFiles() { + return super.listFiles(FileFactory.FileType.VIEWFS); } - @Override - public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) { - CarbonFile[] files = listFiles(); - if (files != null && files.length >= 1) { - List fileList = new ArrayList(files.length); - for (int i = 0; i < files.length; i++) { - if (fileFilter.accept(files[i])) { - fileList.add(files[i]); - } - } - if (fileList.size() >= 1) { - return fileList.toArray(new CarbonFile[fileList.size()]); - } else { - return new CarbonFile[0]; - } - } - return files; + @Override public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) { + return super.listFiles(fileFilter); } @Override public CarbonFile getParentFile() { - Path parent = fileStatus.getPath().getParent(); - return null == parent ? null : new ViewFSCarbonFile(parent); + return super.getParentFile(FileFactory.FileType.VIEWFS); } - @Override - public boolean renameForce(String changetoName) { - FileSystem fs; - try { - fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration()); - if (fs instanceof ViewFileSystem) { - fs.delete(new Path(changetoName), true); - fs.rename(fileStatus.getPath(), new Path(changetoName)); - return true; - } else { - return false; - } - } catch (IOException e) { - LOGGER.error("Exception occured" + e.getMessage()); - return false; - } + @Override public boolean renameForce(String changetoName) { + return super.renameForce(changetoName); } } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java index 347783413cd..8bf85716b4f 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java @@ -38,6 +38,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.BZip2Codec; @@ -87,8 +88,17 @@ else if (path.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX)) { return FileType.LOCAL; } - public static CarbonFile getCarbonFile(String path) { - return getCarbonFile(path, getFileType(path)); + public static CarbonFile getCarbonFile(FileStatus listStatus, FileType fileType) { + switch (fileType) { + case HDFS: + return new HDFSCarbonFile(listStatus); + case ALLUXIO: + return new AlluxioCarbonFile(listStatus); + case VIEWFS: + return new ViewFSCarbonFile(listStatus); + default: + return null; + } } public static CarbonFile getCarbonFile(String path, FileType fileType) { diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/ViewFsCarbonFileTest.java b/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/ViewFsCarbonFileTest.java index ba661b14194..086f6b9efdd 100644 --- a/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/ViewFsCarbonFileTest.java +++ b/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/ViewFsCarbonFileTest.java @@ -1,7 +1,6 @@ /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -24,7 +23,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.viewfs.ViewFileSystem; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -230,7 +228,7 @@ public void testGetParentFIle() { new MockUp() { @Mock public FileSystem getFileSystem(Configuration conf) throws IOException { - return new DistributedFileSystem(); + return new ViewFileSystem(); } }; @@ -249,7 +247,7 @@ public Path getPath() { } }; - new MockUp() { + new MockUp() { @Mock public FileStatus getFileStatus(Path f) throws IOException { diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java index 905f9777d5d..e6849140b9c 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java @@ -318,7 +318,7 @@ public static boolean recordLoadMetadata(LoadMetadataDetails newMetaEntry, // For insert overwrite, we will delete the old segment folder immediately // So collect the old segments here String path = carbonTablePath.getCarbonDataDirectoryPath("0", entry.getLoadName()); - staleFolders.add(FileFactory.getCarbonFile(path)); + staleFolders.add(FileFactory.getCarbonFile(path, FileFactory.getFileType(path))); } } } diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index 705daeaa9fa..2d42d17dd55 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -720,7 +720,8 @@ object CommonUtil { load.getLoadStatus.equals(LoadStatusType.INSERT_OVERWRITE.getMessage)) { load.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE) staleFolders :+ FileFactory.getCarbonFile( - carbonTablePath.getCarbonDataDirectoryPath("0", load.getLoadName)) + carbonTablePath.getCarbonDataDirectoryPath("0", load.getLoadName), + FileFactory.getFileType(carbonTablePath.toString)) loadInprogressExist = true } }