From 0665c053cef8b2db62e9ffb8038d5cb3d88599eb Mon Sep 17 00:00:00 2001 From: kunal642 Date: Thu, 21 Jun 2018 15:40:05 +0530 Subject: [PATCH] added s3 carbon file --- .../core/constants/CarbonCommonConstants.java | 11 +- .../core/datamap/DataMapStoreManager.java | 2 +- .../DiskBasedDataMapStatusProvider.java | 2 +- .../filesystem/AbstractDFSCarbonFile.java | 14 +- .../core/datastore/filesystem/CarbonFile.java | 2 +- .../datastore/filesystem/HDFSCarbonFile.java | 4 - .../datastore/filesystem/LocalCarbonFile.java | 4 +- .../datastore/filesystem/S3CarbonFile.java | 136 ++++++++++++++++++ .../impl/DefaultFileTypeProvider.java | 7 +- .../core/datastore/impl/FileFactory.java | 12 +- .../core/locks/CarbonLockFactory.java | 64 ++++++--- .../carbondata/core/locks/HdfsFileLock.java | 17 --- .../carbondata/core/locks/LocalFileLock.java | 10 -- .../carbondata/core/locks/MemoryLock.java | 86 +++++++++++ .../carbondata/core/locks/S3FileLock.java | 105 -------------- .../core/metadata/SegmentFileStore.java | 10 +- .../metadata/schema/table/CarbonTable.java | 9 +- .../core/util/CarbonProperties.java | 2 + .../writer/CarbonIndexFileMergeWriter.java | 3 +- .../carbon/AbsoluteTableIdentifierTest.java | 5 +- .../TestBlockletDataMapFactory.java | 6 +- docs/configuration-parameters.md | 1 + docs/faq.md | 2 +- .../examples/HadoopFileExample.scala | 1 + .../hadoop/api/CarbonInputFormat.java | 1 + .../apache/carbondata/api/CarbonStore.scala | 4 +- .../carbondata/spark/util/CommonUtil.scala | 7 +- .../sql/test/ResourceRegisterAndCopier.scala | 2 +- .../spark/rdd/CarbonDataRDDFactory.scala | 5 +- .../datamap/CarbonDropDataMapCommand.scala | 8 +- .../lcm/locks/LocalFileLockTest.java | 34 ++++- .../carbondata/lcm/locks/MemoryLockTest.java | 36 +++++ .../lcm/locks/ZooKeeperLockingTest.java | 3 +- .../sdk/file/CarbonReaderBuilder.java | 4 +- 34 files changed, 400 insertions(+), 219 deletions(-) create mode 100644 core/src/main/java/org/apache/carbondata/core/datastore/filesystem/S3CarbonFile.java create mode 100644 core/src/main/java/org/apache/carbondata/core/locks/MemoryLock.java delete mode 100644 core/src/main/java/org/apache/carbondata/core/locks/S3FileLock.java create mode 100644 processing/src/test/java/org/apache/carbondata/lcm/locks/MemoryLockTest.java diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 355bcb69d69..2a421858ca8 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -729,6 +729,13 @@ public final class CarbonCommonConstants { @CarbonProperty public static final String LOCK_TYPE = "carbon.lock.type"; + /** + * Specifies the path where the lock files have to be created. + * By default, lock files are created in table path. + */ + @CarbonProperty + public static final String LOCK_PATH = "carbon.lock.path"; + /** * ZOOKEEPER_ENABLE_DEFAULT the default value for zookeeper will be true for carbon */ @@ -963,9 +970,9 @@ public final class CarbonCommonConstants { public static final String CARBON_LOCK_TYPE_HDFS = "HDFSLOCK"; /** - * S3LOCK TYPE + * MEMORY LOCK TYPE */ - public static final String CARBON_LOCK_TYPE_S3 = "S3LOCK"; + public static final String CARBON_LOCK_TYPE_MEMORY = "MEMORYLOCK"; /** * Invalid filter member log string diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java index 96d2b1ccde8..462cfa28908 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java @@ -407,7 +407,7 @@ public CarbonTable getCarbonTable(AbsoluteTableIdentifier identifier) { try { carbonTable = CarbonTable .buildFromTablePath(identifier.getTableName(), identifier.getDatabaseName(), - identifier.getTablePath()); + identifier.getTablePath(), identifier.getCarbonTableIdentifier().getTableId()); } catch (IOException e) { LOGGER.error("failed to get carbon table from table Path"); // ignoring exception diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java b/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java index 83e141d6132..9db8fc8decc 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java @@ -189,6 +189,6 @@ private static void writeLoadDetailsIntoFile(String location, private static ICarbonLock getDataMapStatusLock() { return CarbonLockFactory .getCarbonLockObj(CarbonProperties.getInstance().getSystemFolderLocation(), - LockUsage.DATAMAP_STATUS_LOCK); + LockUsage.DATAMAP_STATUS_LOCK, "1"); } } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java index 05f96c57c55..5ac92206e23 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java @@ -269,17 +269,7 @@ public boolean delete() { // append to a file only if file already exists else file not found // exception will be thrown by hdfs if (CarbonUtil.isFileExists(path)) { - if (FileFactory.FileType.S3 == fileType) { - DataInputStream dataInputStream = fileSystem.open(pt); - int count = dataInputStream.available(); - // create buffer - byte[] byteStreamBuffer = new byte[count]; - int bytesRead = dataInputStream.read(byteStreamBuffer); - stream = fileSystem.create(pt, true, bufferSize); - stream.write(byteStreamBuffer, 0, bytesRead); - } else { - stream = fileSystem.append(pt, bufferSize); - } + stream = fileSystem.append(pt, bufferSize); } else { stream = fileSystem.create(pt, true, bufferSize); } @@ -461,7 +451,7 @@ public boolean createNewFile(String filePath, FileFactory.FileType fileType, boo return fs.delete(path, true); } - @Override public boolean mkdirs(String filePath, FileFactory.FileType fileType) + @Override public boolean mkdirs(String filePath) throws IOException { filePath = filePath.replace("\\", "/"); Path path = new Path(filePath); diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java index a10413713f4..abfed3756d5 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java @@ -144,7 +144,7 @@ boolean createNewFile(String filePath, FileFactory.FileType fileType, boolean do boolean deleteFile(String filePath, FileFactory.FileType fileType) throws IOException; - boolean mkdirs(String filePath, FileFactory.FileType fileType) throws IOException; + boolean mkdirs(String filePath) throws IOException; DataOutputStream getDataOutputStreamUsingAppend(String path, FileFactory.FileType fileType) throws IOException; diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java index fc5420de399..8837d1b792d 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java @@ -120,10 +120,6 @@ public boolean renameForce(String changetoName) { ((DistributedFileSystem) fs).rename(fileStatus.getPath(), new Path(changetoName), org.apache.hadoop.fs.Options.Rename.OVERWRITE); return true; - } else if (fileStatus.getPath().toString().startsWith("s3n") - || fileStatus.getPath().toString().startsWith("s3a")) { - fs.delete(new Path(changetoName), true); - return fs.rename(fileStatus.getPath(), new Path(changetoName)); } else { return fs.rename(fileStatus.getPath(), new Path(changetoName)); } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java index 5b6f657892c..f0794f4867d 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java @@ -433,10 +433,10 @@ public boolean createNewFile(String filePath, FileFactory.FileType fileType, boo return FileFactory.deleteAllFilesOfDir(file); } - @Override public boolean mkdirs(String filePath, FileFactory.FileType fileType) + @Override public boolean mkdirs(String filePath) throws IOException { filePath = filePath.replace("\\", "/"); - filePath = FileFactory.getUpdatedFilePath(filePath, fileType); + filePath = FileFactory.getUpdatedFilePath(filePath); File file = new File(filePath); return file.mkdirs(); } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/S3CarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/S3CarbonFile.java new file mode 100644 index 00000000000..6aae337ed8a --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/S3CarbonFile.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.datastore.filesystem; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.util.CarbonUtil; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; + +public class S3CarbonFile extends HDFSCarbonFile { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(HDFSCarbonFile.class.getName()); + + public S3CarbonFile(String filePath) { + super(filePath); + } + + public S3CarbonFile(String filePath, Configuration hadoopConf) { + super(filePath, hadoopConf); + } + + public S3CarbonFile(Path path) { + super(path); + } + + public S3CarbonFile(Path path, Configuration hadoopConf) { + super(path, hadoopConf); + } + + public S3CarbonFile(FileStatus fileStatus) { + super(fileStatus); + } + + @Override + public boolean renameForce(String changetoName) { + FileSystem fs; + try { + fs = fileStatus.getPath().getFileSystem(hadoopConf); + return fs.rename(fileStatus.getPath(), new Path(changetoName)); + } catch (IOException e) { + LOGGER.error("Exception occured: " + e.getMessage()); + return false; + } + } + + @Override + public DataOutputStream getDataOutputStreamUsingAppend(String path, FileFactory.FileType fileType) + throws IOException { + return getDataOutputStream(path, fileType, CarbonCommonConstants.BYTEBUFFER_SIZE, true); + } + + @Override public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType, + int bufferSize, boolean append) throws IOException { + Path pt = new Path(path); + FileSystem fileSystem = pt.getFileSystem(FileFactory.getConfiguration()); + FSDataOutputStream stream; + if (append) { + // append to a file only if file already exists else file not found + // exception will be thrown by hdfs + if (CarbonUtil.isFileExists(path)) { + DataInputStream dataInputStream = fileSystem.open(pt); + int count = dataInputStream.available(); + // create buffer + byte[] byteStreamBuffer = new byte[count]; + int bytesRead = dataInputStream.read(byteStreamBuffer); + stream = fileSystem.create(pt, true, bufferSize); + stream.write(byteStreamBuffer, 0, bytesRead); + } else { + stream = fileSystem.create(pt, true, bufferSize); + } + } else { + stream = fileSystem.create(pt, true, bufferSize); + } + return stream; + } + + @Override + public CarbonFile getParentFile() { + Path parent = fileStatus.getPath().getParent(); + return null == parent ? null : new S3CarbonFile(parent, hadoopConf); + } + + @Override + protected CarbonFile[] getFiles(FileStatus[] listStatus) { + if (listStatus == null) { + return new CarbonFile[0]; + } + CarbonFile[] files = new CarbonFile[listStatus.length]; + for (int i = 0; i < files.length; i++) { + files[i] = new S3CarbonFile(listStatus[i]); + } + return files; + } + + @Override + protected List getFiles(RemoteIterator listStatus) + throws IOException { + List carbonFiles = new ArrayList<>(); + while (listStatus.hasNext()) { + Path filePath = listStatus.next().getPath(); + carbonFiles.add(new S3CarbonFile(filePath)); + } + return carbonFiles; + } + +} diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java index f54e9af1152..c4761c977bf 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java @@ -22,6 +22,7 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.filesystem.HDFSCarbonFile; import org.apache.carbondata.core.datastore.filesystem.LocalCarbonFile; +import org.apache.carbondata.core.datastore.filesystem.S3CarbonFile; import org.apache.carbondata.core.datastore.filesystem.ViewFSCarbonFile; import org.apache.hadoop.conf.Configuration; @@ -47,8 +48,9 @@ public CarbonFile getCarbonFile(String path, FileFactory.FileType fileType) { case LOCAL: return new LocalCarbonFile(FileFactory.getUpdatedFilePath(path, fileType)); case HDFS: - case S3: return new HDFSCarbonFile(path); + case S3: + return new S3CarbonFile(path); case ALLUXIO: return new AlluxioCarbonFile(path); case VIEWFS: @@ -63,8 +65,9 @@ public CarbonFile getCarbonFile(String path, FileFactory.FileType fileType, Conf case LOCAL: return new LocalCarbonFile(FileFactory.getUpdatedFilePath(path, fileType)); case HDFS: - case S3: return new HDFSCarbonFile(path, conf); + case S3: + return new S3CarbonFile(path, conf); case ALLUXIO: return new AlluxioCarbonFile(path); case VIEWFS: diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java index 5c46bcf6a73..6250fde4329 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java @@ -256,7 +256,7 @@ public static boolean deleteAllCarbonFilesOfDir(CarbonFile path) { } public static boolean mkdirs(String filePath, FileType fileType) throws IOException { - return getCarbonFile(filePath).mkdirs(filePath, fileType); + return getCarbonFile(filePath).mkdirs(filePath); } /** @@ -269,15 +269,7 @@ public static boolean mkdirs(String filePath, FileType fileType) throws IOExcept */ public static DataOutputStream getDataOutputStreamUsingAppend(String path, FileType fileType) throws IOException { - if (FileType.S3 == fileType) { - CarbonFile carbonFile = getCarbonFile(path); - if (carbonFile.exists()) { - carbonFile.delete(); - } - return carbonFile.getDataOutputStream(path,fileType); - } else { - return getCarbonFile(path).getDataOutputStreamUsingAppend(path, fileType); - } + return getCarbonFile(path).getDataOutputStreamUsingAppend(path, fileType); } /** diff --git a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java index 3226a631158..b578e67c61f 100644 --- a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java @@ -39,8 +39,11 @@ public class CarbonLockFactory { */ private static String lockTypeConfigured; + private static String lockPath; + static { CarbonLockFactory.getLockTypeConfigured(); + getLockpath(); } /** @@ -52,44 +55,57 @@ public class CarbonLockFactory { */ public static ICarbonLock getCarbonLockObj(AbsoluteTableIdentifier absoluteTableIdentifier, String lockFile) { - - String tablePath = absoluteTableIdentifier.getTablePath(); + String tablePath; + if (lockPath.isEmpty()) { + tablePath = absoluteTableIdentifier.getTablePath(); + } else { + if (absoluteTableIdentifier + .getCarbonTableIdentifier().getTableId().isEmpty()) { + throw new RuntimeException("Table id is empty"); + } + tablePath = lockPath + CarbonCommonConstants.FILE_SEPARATOR + absoluteTableIdentifier + .getCarbonTableIdentifier().getTableId(); + } if (lockTypeConfigured.equals(CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER)) { - return new ZooKeeperLocking(absoluteTableIdentifier, lockFile); - } else if (tablePath.startsWith(CarbonCommonConstants.S3A_PREFIX) || - tablePath.startsWith(CarbonCommonConstants.S3N_PREFIX) || - tablePath.startsWith(CarbonCommonConstants.S3_PREFIX)) { - lockTypeConfigured = CarbonCommonConstants.CARBON_LOCK_TYPE_S3; - return new S3FileLock(absoluteTableIdentifier, lockFile); + return new ZooKeeperLocking(tablePath, lockFile); + } else if (tablePath.startsWith(CarbonCommonConstants.S3A_PREFIX) || tablePath + .startsWith(CarbonCommonConstants.S3N_PREFIX) || tablePath + .startsWith(CarbonCommonConstants.S3_PREFIX) || lockTypeConfigured + .equals(CarbonCommonConstants.CARBON_LOCK_TYPE_MEMORY)) { + lockTypeConfigured = CarbonCommonConstants.CARBON_LOCK_TYPE_MEMORY; + return new MemoryLock(absoluteTableIdentifier.getCarbonTableIdentifier().getTableUniqueName(), + lockFile); } else if (tablePath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) { lockTypeConfigured = CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS; - return new HdfsFileLock(absoluteTableIdentifier, lockFile); + return new HdfsFileLock(tablePath, lockFile); } else { lockTypeConfigured = CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL; - return new LocalFileLock(absoluteTableIdentifier, lockFile); + return new LocalFileLock(tablePath, lockFile); } } /** * * @param locFileLocation - * @param lockFile * @return carbon lock */ - public static ICarbonLock getCarbonLockObj(String locFileLocation, String lockFile) { + public static ICarbonLock getCarbonLockObj(String locFileLocation, String lockFile, + String tableId) { + String lockFileLocation; + if (lockPath.isEmpty()) { + lockFileLocation = locFileLocation; + } else { + lockFileLocation = lockPath + CarbonCommonConstants.FILE_SEPARATOR + tableId; + } switch (lockTypeConfigured) { case CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL: - return new LocalFileLock(locFileLocation, lockFile); - + return new LocalFileLock(lockFileLocation, lockFile); case CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER: - return new ZooKeeperLocking(locFileLocation, lockFile); - + return new ZooKeeperLocking(lockFileLocation, lockFile); case CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS: - return new HdfsFileLock(locFileLocation, lockFile); - - case CarbonCommonConstants.CARBON_LOCK_TYPE_S3: - return new S3FileLock(locFileLocation, lockFile); - + return new HdfsFileLock(lockFileLocation, lockFile); + case CarbonCommonConstants.CARBON_LOCK_TYPE_MEMORY: + return new MemoryLock(tableId, lockFile); default: throw new UnsupportedOperationException("Not supported the lock type"); } @@ -105,4 +121,10 @@ private static void getLockTypeConfigured() { LOGGER.info("Configured lock type is: " + lockTypeConfigured); } + private static void getLockpath() { + lockPath = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.LOCK_PATH, "") + .toUpperCase(); + } + } diff --git a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java index 3c28f9d575f..ade4212ff45 100644 --- a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java +++ b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java @@ -23,7 +23,6 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.util.path.CarbonTablePath; /** @@ -57,22 +56,6 @@ public HdfsFileLock(String lockFileLocation, String lockFile) { initRetry(); } - /** - * @param lockFilePath - */ - public HdfsFileLock(String lockFilePath) { - this.lockFilePath = lockFilePath; - initRetry(); - } - - /** - * @param absoluteTableIdentifier - * @param lockFile - */ - public HdfsFileLock(AbsoluteTableIdentifier absoluteTableIdentifier, String lockFile) { - this(absoluteTableIdentifier.getTablePath(), lockFile); - } - /* (non-Javadoc) * @see org.apache.carbondata.core.locks.ICarbonLock#lock() */ diff --git a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java index 698356269d9..1148ae27ba0 100644 --- a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java +++ b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java @@ -27,7 +27,6 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; @@ -72,15 +71,6 @@ public LocalFileLock(String lockFileLocation, String lockFile) { initRetry(); } - /** - * @param tableIdentifier - * @param lockFile - */ - public LocalFileLock(AbsoluteTableIdentifier tableIdentifier, String lockFile) { - this(tableIdentifier.getTablePath(), lockFile); - initRetry(); - } - /** * Lock API for locking of the file channel of the lock file. * diff --git a/core/src/main/java/org/apache/carbondata/core/locks/MemoryLock.java b/core/src/main/java/org/apache/carbondata/core/locks/MemoryLock.java new file mode 100644 index 00000000000..c9d152659a9 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/locks/MemoryLock.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.locks; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class is used to handle the Memory based lock. + * This is acheived using maintaining a map of table id to list of locks acquired. + */ +public class MemoryLock extends AbstractCarbonLock { + + private static Map> locks = new ConcurrentHashMap<>(); + + private String tableId; + private String lockType; + + /** + * @param tableId + * @param lockFile + */ + public MemoryLock(String tableId, String lockFile) { + this.tableId = tableId; + this.lockType = lockFile; + initRetry(); + } + + /* (non-Javadoc) + * @see org.apache.carbondata.core.locks.ICarbonLock#unlock() + */ + @Override public boolean unlock() { + List lockList = locks.get(tableId); + if (lockList == null || !lockList.contains(lockType)) { + return true; + } else { + if (lockList.remove(lockType)) { + if (lockList.size() == 0) { + locks.remove(tableId); + } else { + locks.put(tableId, lockList); + } + } + return true; + } + } + + /* (non-Javadoc) + * @see org.apache.carbondata.core.locks.ICarbonLock#lock() + */ + @Override public boolean lock() { + if (locks.containsKey(tableId)) { + List lockList = locks.get(tableId); + if (lockList.contains(lockType)) { + return false; + } else { + lockList.add(lockType); + } + locks.put(tableId, lockList); + return true; + } else { + List lockList = new ArrayList<>(); + lockList.add(lockType); + locks.put(tableId, lockList); + return true; + } + } + +} diff --git a/core/src/main/java/org/apache/carbondata/core/locks/S3FileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/S3FileLock.java deleted file mode 100644 index 464becb23b0..00000000000 --- a/core/src/main/java/org/apache/carbondata/core/locks/S3FileLock.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.core.locks; - -import java.io.DataOutputStream; -import java.io.IOException; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.util.path.CarbonTablePath; - -/** - * This class is used to handle the S3 File locking. - * This is acheived using the concept of acquiring the data out stream using Append option. - */ -public class S3FileLock extends AbstractCarbonLock { - - private static final LogService LOGGER = - LogServiceFactory.getLogService(S3FileLock.class.getName()); - /** - * lockFilePath is the location of the lock file. - */ - private String lockFilePath; - - /** - * lockFileDir is the directory of the lock file. - */ - private String lockFileDir; - - private DataOutputStream dataOutputStream; - - /** - * @param tableIdentifier - * @param lockFile - */ - public S3FileLock(AbsoluteTableIdentifier tableIdentifier, String lockFile) { - this(tableIdentifier.getTablePath(), lockFile); - } - - /** - * @param lockFileLocation - * @param lockFile - */ - public S3FileLock(String lockFileLocation, String lockFile) { - this.lockFileDir = CarbonTablePath.getLockFilesDirPath(lockFileLocation); - this.lockFilePath = CarbonTablePath.getLockFilePath(lockFileLocation, lockFile); - LOGGER.info("S3 lock path:" + this.lockFilePath); - initRetry(); - } - - /* (non-Javadoc) - * @see org.apache.carbondata.core.locks.ICarbonLock#unlock() - */ - @Override public boolean unlock() { - boolean status = false; - if (null != dataOutputStream) { - try { - dataOutputStream.close(); - status = true; - } catch (IOException e) { - status = false; - } - } - return status; - } - - /* (non-Javadoc) - * @see org.apache.carbondata.core.locks.ICarbonLock#lock() - */ - @Override public boolean lock() { - try { - if (!FileFactory.isFileExist(lockFileDir)) { - FileFactory.mkdirs(lockFileDir, FileFactory.getFileType(lockFileDir)); - } - if (!FileFactory.isFileExist(lockFilePath)) { - FileFactory.createNewLockFile(lockFilePath, FileFactory.getFileType(lockFilePath)); - } - dataOutputStream = - FileFactory.getDataOutputStreamUsingAppend(lockFilePath, - FileFactory.getFileType(lockFilePath)); - return true; - } catch (IOException e) { - LOGGER.error(e, e.getMessage()); - return false; - } - } - -} diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java index 0b1c1e34302..3d3b24550af 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java @@ -93,7 +93,7 @@ public static void writeSegmentFile(String tablePath, final String taskNo, Strin String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc; CarbonFile carbonFile = FileFactory.getCarbonFile(writePath); if (!carbonFile.exists()) { - carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath)); + carbonFile.mkdirs(writePath); } CarbonFile tempFolder = FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc); @@ -178,7 +178,7 @@ public static String writeSegmentFile(CarbonTable carbonTable, String segmentId, String segmentFileFolder = CarbonTablePath.getSegmentFilesLocation(tablePath); CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFileFolder); if (!carbonFile.exists()) { - carbonFile.mkdirs(segmentFileFolder, FileFactory.getFileType(segmentFileFolder)); + carbonFile.mkdirs(segmentFileFolder); } String segmentFileName = genSegmentFileName(segmentId, UUID) + CarbonTablePath.SEGMENT_EXT; // write segment info to new file. @@ -280,8 +280,8 @@ public static String getSegmentFilePath(String tablePath, String segmentFileName * @return boolean which determines whether status update is done or not. * @throws IOException */ - public static boolean updateSegmentFile(String tablePath, String segmentId, String segmentFile) - throws IOException { + public static boolean updateSegmentFile(String tablePath, String segmentId, String segmentFile, + String tableId) throws IOException { boolean status = false; String tableStatusPath = CarbonTablePath.getTableStatusFilePath(tablePath); if (!FileFactory.isFileExist(tableStatusPath)) { @@ -289,7 +289,7 @@ public static boolean updateSegmentFile(String tablePath, String segmentId, Stri } String metadataPath = CarbonTablePath.getMetadataPath(tablePath); AbsoluteTableIdentifier absoluteTableIdentifier = - AbsoluteTableIdentifier.from(tablePath, null, null); + AbsoluteTableIdentifier.from(tablePath, null, null, tableId); SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(); int retryCount = CarbonLockUtil diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java index f48ada09353..7c00c77fad8 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java @@ -258,11 +258,12 @@ public static CarbonTable buildDummyTable(String tablePath) throws IOException { return CarbonTable.buildFromTableInfo(tableInfoInfer); } - public static CarbonTable buildFromTablePath(String tableName, String dbName, String tablePath) - throws IOException { - return SchemaReader - .readCarbonTableFromStore(AbsoluteTableIdentifier.from(tablePath, dbName, tableName)); + public static CarbonTable buildFromTablePath(String tableName, String dbName, String tablePath, + String tableId) throws IOException { + return SchemaReader.readCarbonTableFromStore( + AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableId)); } + /** * @param tableInfo */ diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index dc50ab07494..a48c3655eda 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -388,6 +388,8 @@ private void validateLockType() { // CARBON_LOCK_TYPE_LOCAL and for the distributed one CARBON_LOCK_TYPE_HDFS case CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER: break; + case CarbonCommonConstants.CARBON_LOCK_TYPE_MEMORY: + break; case CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL: case CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS: default: diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java index cb53c0be5a8..c2930645692 100644 --- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java +++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java @@ -154,7 +154,8 @@ private String writeMergeIndexFileBasedOnSegmentFile( String path = CarbonTablePath.getSegmentFilesLocation(table.getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + newSegmentFileName; SegmentFileStore.writeSegmentFile(sfs.getSegmentFile(), path); - SegmentFileStore.updateSegmentFile(table.getTablePath(), segmentId, newSegmentFileName); + SegmentFileStore.updateSegmentFile(table.getTablePath(), segmentId, newSegmentFileName, + table.getCarbonTableIdentifier().getTableId()); for (CarbonFile file : indexFiles) { file.delete(); diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifierTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifierTest.java index f34008d1ed2..3dfc515279b 100644 --- a/core/src/test/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifierTest.java +++ b/core/src/test/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifierTest.java @@ -17,6 +17,8 @@ package org.apache.carbondata.core.carbon; +import java.util.UUID; + import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; @@ -93,7 +95,8 @@ public class AbsoluteTableIdentifierTest { @Test public void fromTablePathTest() { AbsoluteTableIdentifier absoluteTableIdentifierTest = - AbsoluteTableIdentifier.from("storePath/databaseName/tableName", "databaseName", "tableName"); + AbsoluteTableIdentifier.from("storePath/databaseName/tableName", "databaseName", "tableName", + UUID.randomUUID().toString()); Assert.assertTrue(absoluteTableIdentifierTest.getTablePath() .equals(absoluteTableIdentifier4.getTablePath())); } diff --git a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java index 526f6303271..3e3b6171896 100644 --- a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java +++ b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java @@ -25,6 +25,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.UUID; import org.apache.carbondata.core.cache.Cache; import org.apache.carbondata.core.cache.CacheProvider; @@ -71,8 +72,9 @@ public class TestBlockletDataMapFactory { .getDeclaredConstructors()[0]; constructor.setAccessible(true); carbonTable = (CarbonTable) constructor.newInstance(); - absoluteTableIdentifier = - AbsoluteTableIdentifier.from("/opt/store/default/carbon_table/", "default", "carbon_table"); + absoluteTableIdentifier = AbsoluteTableIdentifier + .from("/opt/store/default/carbon_table/", "default", "carbon_table", + UUID.randomUUID().toString()); Deencapsulation.setField(tableInfo, "identifier", absoluteTableIdentifier); Deencapsulation.setField(carbonTable, "tableInfo", tableInfo); blockletDataMapFactory = new BlockletDataMapFactory(carbonTable, new DataMapSchema()); diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md index f81959ecb07..dadbf752a2d 100644 --- a/docs/configuration-parameters.md +++ b/docs/configuration-parameters.md @@ -106,6 +106,7 @@ This section provides the details of all the configurations required for CarbonD |---------------------------------------------|--------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | carbon.sort.file.write.buffer.size | 16384 | File write buffer size used during sorting. Minimum allowed buffer size is 10240 byte and Maximum allowed buffer size is 10485760 byte. | | carbon.lock.type | LOCALLOCK | This configuration specifies the type of lock to be acquired during concurrent operations on table. There are following types of lock implementation: - LOCALLOCK: Lock is created on local file system as file. This lock is useful when only one spark driver (thrift server) runs on a machine and no other CarbonData spark application is launched concurrently. - HDFSLOCK: Lock is created on HDFS file system as file. This lock is useful when multiple CarbonData spark applications are launched and no ZooKeeper is running on cluster and HDFS supports file based locking. | +| carbon.lock.path | TABLEPATH | This configuration specified the path where lock files have to be created. | carbon.sort.intermediate.files.limit | 20 | Minimum number of intermediate files after which merged sort can be started (minValue = 2, maxValue=50). | | carbon.block.meta.size.reserved.percentage | 10 | Space reserved in percentage for writing block meta data in CarbonData file. | | carbon.csv.read.buffersize.byte | 1048576 | csv reading buffer size. | diff --git a/docs/faq.md b/docs/faq.md index 9f74842a011..78fa37d30ef 100644 --- a/docs/faq.md +++ b/docs/faq.md @@ -76,7 +76,7 @@ The Apache CarbonData acquires lock on the files to prevent concurrent operation The property carbon.lock.type configuration specifies the type of lock to be acquired during concurrent operations on table. This property can be set with the following values : - **LOCALLOCK** : This Lock is created on local file system as file. This lock is useful when only one spark driver (thrift server) runs on a machine and no other CarbonData spark application is launched concurrently. - **HDFSLOCK** : This Lock is created on HDFS file system as file. This lock is useful when multiple CarbonData spark applications are launched and no ZooKeeper is running on cluster and the HDFS supports, file based locking. - +- **MEMORYLOCK** : This Lock is created in memory. This lock is useful when store is located on a storage solution like S3 where taking a lock on file is not supported. This lock can only ensure driver side concurrency. ## How to resolve Abstract Method Error? In order to build CarbonData project it is necessary to specify the spark profile. The spark profile sets the Spark Version. You need to specify the ``spark version`` while using Maven to build project. diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/HadoopFileExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/HadoopFileExample.scala index 7438638cae3..6a822a5ca70 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/HadoopFileExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/HadoopFileExample.scala @@ -18,6 +18,7 @@ package org.apache.carbondata.examples import java.io.File +import java.util.UUID import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.{SaveMode, SparkSession} diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java index 3688026e693..bc1f485e196 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java @@ -108,6 +108,7 @@ public abstract class CarbonInputFormat extends FileInputFormat { private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter"; public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName"; public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName"; + public static final String TABLE_ID = "mapreduce.input.carboninputformat.tableId"; private static final String PARTITIONS_TO_PRUNE = "mapreduce.input.carboninputformat.partitions.to.prune"; private static final String FGDATAMAP_PRUNING = "mapreduce.input.carboninputformat.fgdatamap"; diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala index 456916ffe6e..26c32eeb4ac 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala @@ -142,7 +142,7 @@ object CarbonStore { LOGGER.audit(s"The clean files request has been received for $dbName.$tableName") var carbonCleanFilesLock: ICarbonLock = null val absoluteTableIdentifier = if (forceTableClean) { - AbsoluteTableIdentifier.from(tablePath, dbName, tableName) + AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName) } else { carbonTable.getAbsoluteTableIdentifier } @@ -317,7 +317,7 @@ object CarbonStore { tableName: String, storePath: String, segmentId: String): Boolean = { - val identifier = AbsoluteTableIdentifier.from(storePath, dbName, tableName) + val identifier = AbsoluteTableIdentifier.from(storePath, dbName, tableName, tableName) val validAndInvalidSegments: SegmentStatusManager.ValidAndInvalidSegmentsInfo = new SegmentStatusManager( identifier).getValidAndInvalidSegments diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index 3995aa730f8..1cf4a5c9a3f 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -824,19 +824,18 @@ object CommonUtil { if (tableFolder.isDirectory) { val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName - val identifier = - AbsoluteTableIdentifier.from(tablePath, dbName, tableFolder.getName) + val tableUniqueName = dbName + "_" + tableFolder.getName val tableStatusFile = CarbonTablePath.getTableStatusFilePath(tablePath) if (FileFactory.isFileExist(tableStatusFile, fileType)) { try { val carbonTable = CarbonMetadata.getInstance - .getCarbonTable(identifier.getCarbonTableIdentifier.getTableUniqueName) + .getCarbonTable(tableUniqueName) SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true, null) } catch { case _: Exception => LOGGER.warn(s"Error while cleaning table " + - s"${ identifier.getCarbonTableIdentifier.getTableUniqueName }") + s"${ tableUniqueName }") } } } diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala index aeabdf19dcf..e5552dbcd59 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala @@ -49,7 +49,7 @@ object ResourceRegisterAndCopier { sys.error(s"""Provided path $hdfsPath does not exist""") } LOGGER.audit("Try downloading resource data") - val lock = new HdfsFileLock(hdfsPath + "/resource.lock") + val lock = new HdfsFileLock(hdfsPath, "/resource.lock") var bool = false try { bool = lockWithRetries(lock) diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 5d53cccad57..cc639f2e26e 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -102,7 +102,7 @@ object CarbonDataRDDFactory { val lock = CarbonLockFactory.getCarbonLockObj( configuredMdtPath + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.SYSTEM_LEVEL_COMPACTION_LOCK_FOLDER, - LockUsage.SYSTEMLEVEL_COMPACTION_LOCK) + LockUsage.SYSTEMLEVEL_COMPACTION_LOCK, carbonTable.getCarbonTableIdentifier.getTableId) if (lock.lockWithRetries()) { LOGGER.info(s"Acquired the compaction lock for table ${ carbonLoadModel.getDatabaseName }" + @@ -519,7 +519,8 @@ object CarbonDataRDDFactory { SegmentFileStore.updateSegmentFile( carbonTable.getTablePath, carbonLoadModel.getSegmentId, - segmentFileName) + segmentFileName, + carbonTable.getCarbonTableIdentifier.getTableId) operationContext.setProperty(carbonTable.getTableUniqueName + "_Segment", carbonLoadModel.getSegmentId) val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent = diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala index f1ed5d15150..7da86884b26 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala @@ -65,8 +65,6 @@ case class CarbonDropDataMapCommand( val carbonEnv = CarbonEnv.getInstance(sparkSession) val catalog = carbonEnv.carbonMetastore val tablePath = CarbonEnv.getTablePath(databaseNameOp, tableName)(sparkSession) - val tableIdentifier = - AbsoluteTableIdentifier.from(tablePath, dbName.toLowerCase, tableName.toLowerCase) catalog.checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName))) if (mainTable == null) { mainTable = try { @@ -78,6 +76,12 @@ case class CarbonDropDataMapCommand( null } } + val tableIdentifier = + AbsoluteTableIdentifier + .from(tablePath, + dbName.toLowerCase, + tableName.toLowerCase, + mainTable.getCarbonTableIdentifier.getTableId) // forceDrop will be true only when parent table schema updation has failed. // This method will forcefully drop child table instance from metastore. if (forceDrop) { diff --git a/processing/src/test/java/org/apache/carbondata/lcm/locks/LocalFileLockTest.java b/processing/src/test/java/org/apache/carbondata/lcm/locks/LocalFileLockTest.java index 8d5f3d409fb..b6b3cceb32c 100644 --- a/processing/src/test/java/org/apache/carbondata/lcm/locks/LocalFileLockTest.java +++ b/processing/src/test/java/org/apache/carbondata/lcm/locks/LocalFileLockTest.java @@ -17,7 +17,12 @@ package org.apache.carbondata.lcm.locks; import java.io.File; +import java.lang.reflect.Field; +import java.util.UUID; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.locks.CarbonLockFactory; +import org.apache.carbondata.core.locks.ICarbonLock; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.locks.LocalFileLock; import org.apache.carbondata.core.locks.LockUsage; @@ -32,11 +37,15 @@ */ public class LocalFileLockTest { + private String rootPath; + + private Class secretClass = CarbonLockFactory.class; + /** * @throws java.lang.Exception */ @Before public void setUp() throws Exception { - String rootPath = new File(this.getClass().getResource("/").getPath() + rootPath = new File(this.getClass().getResource("/").getPath() + "../../..").getCanonicalPath(); String storeLocation = rootPath + "/target/store"; CarbonProperties.getInstance() @@ -47,19 +56,22 @@ public class LocalFileLockTest { * @throws java.lang.Exception */ @After public void tearDown() throws Exception { + Field f = secretClass.getDeclaredField("lockPath"); + f.setAccessible(true); + f.set(secretClass, ""); } @Test public void testingLocalFileLockingByAcquiring2Locks() { AbsoluteTableIdentifier absoluteTableIdentifier = AbsoluteTableIdentifier .from(CarbonProperties.getInstance().getProperty("carbon.storelocation"), "databaseName", - "tableName"); + "tableName", UUID.randomUUID().toString()); LocalFileLock localLock1 = - new LocalFileLock(absoluteTableIdentifier, + new LocalFileLock(absoluteTableIdentifier.getTablePath(), LockUsage.METADATA_LOCK); Assert.assertTrue(localLock1.lock()); LocalFileLock localLock2 = - new LocalFileLock(absoluteTableIdentifier, + new LocalFileLock(absoluteTableIdentifier.getTablePath(), LockUsage.METADATA_LOCK); Assert.assertTrue(!localLock2.lock()); @@ -68,4 +80,18 @@ public class LocalFileLockTest { Assert.assertTrue(localLock2.unlock()); } + @Test public void testConfigurablePathForLock() throws Exception { + Field f = secretClass.getDeclaredField("lockPath"); + f.setAccessible(true); + f.set(secretClass, rootPath + "/target/"); + AbsoluteTableIdentifier absoluteTableIdentifier = AbsoluteTableIdentifier + .from(CarbonProperties.getInstance().getProperty("carbon.storelocation"), "databaseName", + "tableName", "1"); + ICarbonLock carbonLock = + CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK); + carbonLock.lockWithRetries(); + assert(new File(rootPath + "/target/1/LockFiles/tablestatus.lock").exists()); + assert(!new File(absoluteTableIdentifier.getTablePath() + "/LockFiles").exists()); + } + } diff --git a/processing/src/test/java/org/apache/carbondata/lcm/locks/MemoryLockTest.java b/processing/src/test/java/org/apache/carbondata/lcm/locks/MemoryLockTest.java new file mode 100644 index 00000000000..acbfcbccfca --- /dev/null +++ b/processing/src/test/java/org/apache/carbondata/lcm/locks/MemoryLockTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.lcm.locks; + +import org.apache.carbondata.core.locks.LockUsage; +import org.apache.carbondata.core.locks.MemoryLock; + +import org.junit.Assert; +import org.junit.Test; + +public class MemoryLockTest { + + @Test public void testMemoryLockOnTable() { + MemoryLock memoryLock1 = new MemoryLock("locktest_a", LockUsage.TABLE_STATUS_LOCK); + MemoryLock memoryLock2 = new MemoryLock("locktest_a", LockUsage.TABLE_STATUS_LOCK); + memoryLock1.lock(); + Assert.assertFalse(memoryLock2.lock()); + memoryLock1.unlock(); + assert(memoryLock2.lock()); + memoryLock2.unlock(); + } +} diff --git a/processing/src/test/java/org/apache/carbondata/lcm/locks/ZooKeeperLockingTest.java b/processing/src/test/java/org/apache/carbondata/lcm/locks/ZooKeeperLockingTest.java index e7a060248fa..2234d81ac48 100644 --- a/processing/src/test/java/org/apache/carbondata/lcm/locks/ZooKeeperLockingTest.java +++ b/processing/src/test/java/org/apache/carbondata/lcm/locks/ZooKeeperLockingTest.java @@ -34,6 +34,7 @@ import java.io.IOException; import java.net.ServerSocket; import java.util.Properties; +import java.util.UUID; /** * ZooKeeperLocking Test cases @@ -96,7 +97,7 @@ public void run() { AbsoluteTableIdentifier tableIdentifier = AbsoluteTableIdentifier .from(CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION), - "dbName", "tableName"); + "dbName", "tableName", UUID.randomUUID().toString()); ZooKeeperLocking zkl = new ZooKeeperLocking(tableIdentifier, LockUsage.METADATA_LOCK); diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java index ebee41ad51a..3cad317a908 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.UUID; import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.annotations.InterfaceStability; @@ -178,7 +179,8 @@ public CarbonReader build() throws IOException, InterruptedException { // DB name is not applicable for SDK reader as, table will be never registered. CarbonTable table; if (isTransactionalTable) { - table = CarbonTable.buildFromTablePath(tableName, "default", tablePath); + table = CarbonTable + .buildFromTablePath(tableName, "default", tablePath, UUID.randomUUID().toString()); } else { if (filterExpression != null) { table = CarbonTable.buildTable(tablePath, tableName);