Skip to content

Commit

Permalink
[CARBONDATA-3369] Fix issues during concurrent execution of Create ta…
Browse files Browse the repository at this point in the history
…ble If not exists

Create table if not exists has following problems if run concurrently from different drivers
Sometimes It fails with error Table <db.table> already exists.
Create table failed driver still holds the table with wrong path or schema. Eventual operations refer the wrong path
Stale path created during create table is not deleted for ever [After 1.5.0 version table will be created in a new folder using UUID if folder with table name already exists]
This PR fixes above 3 issues.

This closes #3198
  • Loading branch information
KanakaKumar authored and ravipesala committed May 8, 2019
1 parent 3268a45 commit 157de1d
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.carbondata.spark.testsuite.createTable

import java.util.concurrent.{Callable, ExecutorService, Executors, Future, TimeUnit}

import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll

Expand Down Expand Up @@ -51,11 +53,45 @@ class TestCreateTableIfNotExists extends QueryTest with BeforeAndAfterAll {
assert(exception.getMessage.contains("Operation not allowed, when source table is carbon table"))
}

test("test create table if not exist concurrently") {

val executorService: ExecutorService = Executors.newFixedThreadPool(10)
var futures: List[Future[_]] = List()
for (i <- 0 until (3)) {
futures = futures :+ runAsync()
}

executorService.shutdown();
executorService.awaitTermination(30L, TimeUnit.SECONDS)

futures.foreach { future =>
assertResult("PASS")(future.get.toString)
}

def runAsync(): Future[String] = {
executorService.submit(new Callable[String] {
override def call() = {
// Create table
var result = "PASS"
try {
sql("create table IF NOT EXISTS TestIfExists(name string) stored by 'carbondata'")
} catch {
case exception: Exception =>
result = exception.getMessage
}
result
}
})
}
}


override def afterAll {
sql("use default")
sql("drop table if exists test")
sql("drop table if exists sourceTable")
sql("drop table if exists targetTable")
sql("drop table if exists TestIfExists")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._

import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, _}
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
import org.apache.spark.sql.execution.command.MetadataCommand

Expand Down Expand Up @@ -166,7 +167,37 @@ case class CarbonCreateTableCommand(
""".stripMargin)
}
} catch {
case e: AnalysisException => throw e
case e: AnalysisException =>
// AnalysisException thrown with table already exists msg incase of conurrent drivers
if (e.getMessage().contains("already exists")) {

// Clear the cache first
CarbonEnv.getInstance(sparkSession).carbonMetaStore
.removeTableFromMetadata(dbName, tableName)

// Delete the folders created by this call if the actual path is different
val actualPath = CarbonEnv
.getCarbonTable(TableIdentifier(tableName, Option(dbName)))(sparkSession)
.getTablePath

if (!actualPath.equalsIgnoreCase(tablePath)) {
LOGGER
.error(
"TableAlreadyExists with path : " + actualPath + " So, deleting " + tablePath)
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(tablePath))
}

// No need to throw for create if not exists
if (ifNotExistsSet) {
LOGGER.error(e, e)
} else {
throw e
}
}
else {
throw e
}

case e: Exception =>
// call the drop table to delete the created table.
try {
Expand Down

0 comments on commit 157de1d

Please sign in to comment.