Skip to content

Commit

Permalink
Change scala long to java long
Browse files Browse the repository at this point in the history
  • Loading branch information
kunal642 committed Aug 6, 2019
1 parent c773e1b commit 57c6b18
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,8 @@ Long getDistributedCount(CarbonTable table,
}
return dataMapJob.executeCountJob(dataMapFormat);
} catch (Exception e) {
DataMapJob dataMapJob = (DataMapJob) DataMapUtil.getEmbeddedJob();
LOG.error("Failed to get count from index server. Initializing fallback", e);
DataMapJob dataMapJob = DataMapUtil.getEmbeddedJob();
if (dataMapJob == null) {
throw new ExceptionInInitializerError("Unable to create " + DataMapUtil.EMBEDDED_JOB_NAME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class DistributedDataMapJob extends AbstractDataMapJob {
}

override def executeCountJob(dataMapFormat: DistributableDataMapFormat): java.lang.Long = {
IndexServer.getClient.getCount(dataMapFormat)
IndexServer.getClient.getCount(dataMapFormat).get()
}
}

Expand All @@ -136,7 +136,7 @@ class EmbeddedDataMapJob extends AbstractDataMapJob {
}

override def executeCountJob(dataMapFormat: DistributableDataMapFormat): java.lang.Long = {
IndexServer.getCount(dataMapFormat)
IndexServer.getCount(dataMapFormat).get()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.UUID
import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.ipc.{ProtocolInfo, RPC}
import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.security.{KerberosInfo, SecurityUtil, UserGroupInformation}
Expand Down Expand Up @@ -59,7 +60,7 @@ trait ServerInterface {
def invalidateSegmentCache(carbonTable: CarbonTable,
segmentIds: Array[String], jobGroupId: String = ""): Unit

def getCount(request: DistributableDataMapFormat): Long
def getCount(request: DistributableDataMapFormat): LongWritable

}

Expand Down Expand Up @@ -105,7 +106,7 @@ object IndexServer extends ServerInterface {
})
}

def getCount(request: DistributableDataMapFormat): Long = {
def getCount(request: DistributableDataMapFormat): LongWritable = {
doAs {
if (!request.isFallbackJob) {
sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id", request.getTaskGroupId)
Expand All @@ -116,7 +117,7 @@ object IndexServer extends ServerInterface {
if (!request.isFallbackJob) {
DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet)
}
splits.map(_._2.toLong).sum
new LongWritable(splits.map(_._2.toLong).sum)
}
}

Expand Down

0 comments on commit 57c6b18

Please sign in to comment.