diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java index 08e93b5cdea..444a32f5a00 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java @@ -427,7 +427,8 @@ Long getDistributedCount(CarbonTable table, } return dataMapJob.executeCountJob(dataMapFormat); } catch (Exception e) { - DataMapJob dataMapJob = (DataMapJob) DataMapUtil.getEmbeddedJob(); + LOG.error("Failed to get count from index server. Initializing fallback", e); + DataMapJob dataMapJob = DataMapUtil.getEmbeddedJob(); if (dataMapJob == null) { throw new ExceptionInInitializerError("Unable to create " + DataMapUtil.EMBEDDED_JOB_NAME); } diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala index f6c39998f4b..1fee051d0f9 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala @@ -111,7 +111,7 @@ class DistributedDataMapJob extends AbstractDataMapJob { } override def executeCountJob(dataMapFormat: DistributableDataMapFormat): java.lang.Long = { - IndexServer.getClient.getCount(dataMapFormat) + IndexServer.getClient.getCount(dataMapFormat).get() } } @@ -136,7 +136,7 @@ class EmbeddedDataMapJob extends AbstractDataMapJob { } override def executeCountJob(dataMapFormat: DistributableDataMapFormat): java.lang.Long = { - IndexServer.getCount(dataMapFormat) + IndexServer.getCount(dataMapFormat).get() } } diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala index b5cf896371c..01ef698e04e 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala @@ -23,6 +23,7 @@ import java.util.UUID import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.io.LongWritable import org.apache.hadoop.ipc.{ProtocolInfo, RPC} import org.apache.hadoop.net.NetUtils import org.apache.hadoop.security.{KerberosInfo, SecurityUtil, UserGroupInformation} @@ -59,7 +60,7 @@ trait ServerInterface { def invalidateSegmentCache(carbonTable: CarbonTable, segmentIds: Array[String], jobGroupId: String = ""): Unit - def getCount(request: DistributableDataMapFormat): Long + def getCount(request: DistributableDataMapFormat): LongWritable } @@ -105,7 +106,7 @@ object IndexServer extends ServerInterface { }) } - def getCount(request: DistributableDataMapFormat): Long = { + def getCount(request: DistributableDataMapFormat): LongWritable = { doAs { if (!request.isFallbackJob) { sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id", request.getTaskGroupId) @@ -116,7 +117,7 @@ object IndexServer extends ServerInterface { if (!request.isFallbackJob) { DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet) } - splits.map(_._2.toLong).sum + new LongWritable(splits.map(_._2.toLong).sum) } }