From ed5b133fa3aaad22efc870ade734b6cc8a36a7a2 Mon Sep 17 00:00:00 2001 From: Harish Butani Date: Fri, 9 Sep 2016 11:30:14 -0700 Subject: [PATCH] selectQueryItr set fromNext for pagination, simplify pagination logic --- .../druid/DruidSelectResultIterator.scala | 23 +++++++------------ .../sparklinedata/druid/DruidQuerySpec.scala | 9 +++++--- 2 files changed, 14 insertions(+), 18 deletions(-) diff --git a/src/main/scala/org/apache/spark/sql/sources/druid/DruidSelectResultIterator.scala b/src/main/scala/org/apache/spark/sql/sources/druid/DruidSelectResultIterator.scala index c97a54f..ae03354 100644 --- a/src/main/scala/org/apache/spark/sql/sources/druid/DruidSelectResultIterator.scala +++ b/src/main/scala/org/apache/spark/sql/sources/druid/DruidSelectResultIterator.scala @@ -94,6 +94,12 @@ private class DruidSelectResultIterator(val useSmile : Boolean, val pagingIdentifiersJV: JsonAST.JValue = jValDeser.deserialize(jp, ctxt) nextPagingIdentifiers = pagingIdentifiersJV.extract[Map[String, Int]] +// nextPagingIdentifiers = nextPagingIdentifiers.map { +// case (s,i) if !selectSpec.descending => (s, i + 1) +// case (s,i) if selectSpec.descending => (s, i - 1) +// } + + // 1.2.2 events events field t = jp.nextToken // FIELD_NAME, jp.getCurrentName == events t = jp.nextToken // START_ARRAY @@ -118,21 +124,8 @@ private class DruidSelectResultIterator(val useSmile : Boolean, val o: JsonAST.JValue = jValDeser.deserialize(jp, ctxt) val r = o.extract[SelectResultRow] t = jp.nextToken() - if (!thisRoundHadData) { - // if this the first row in this round, - // check that it's offset is greater - // than the nextPagingIdentifiers value - val nextOffset = nextPagingIdentifiers(r.segmentId) - if (r.offset >= nextOffset) { - finished = true - null - } else { - thisRoundHadData = true - r - } - } else { - r - } + thisRoundHadData = true + r } } diff --git a/src/main/scala/org/sparklinedata/druid/DruidQuerySpec.scala b/src/main/scala/org/sparklinedata/druid/DruidQuerySpec.scala index e6a70e3..f1fe1ef 100644 --- a/src/main/scala/org/sparklinedata/druid/DruidQuerySpec.scala +++ b/src/main/scala/org/sparklinedata/druid/DruidQuerySpec.scala @@ -917,7 +917,8 @@ case class SearchQuerySpecWithSegIntervals( } case class PagingSpec(pagingIdentifiers : Map[String, Int], - threshold : Int) + threshold : Int, + fromNext : Option[Boolean] = None) abstract class SelectSpec extends QuerySpec { self : Product => @@ -956,6 +957,8 @@ abstract class SelectSpec extends QuerySpec { case dc => dc.name } + def descending : Boolean + } case class SelectSpecWithIntervals(queryType: String, @@ -984,7 +987,7 @@ case class SelectSpecWithIntervals(queryType: String, ) def withPagingIdentifier(ps : Map[String, Int]) = { - copy(pagingSpec = pagingSpec.copy(pagingIdentifiers = ps)) + copy(pagingSpec = pagingSpec.copy(pagingIdentifiers = ps, fromNext = Some(true))) } override def setIntervals(ins : List[Interval]) = this.copy(intervals = ins.map(_.toString)) @@ -1021,7 +1024,7 @@ case class SelectSpecWithSegmentIntervals(queryType: String, extends SelectSpec { def withPagingIdentifier(ps : Map[String, Int]) = { - copy(pagingSpec = pagingSpec.copy(pagingIdentifiers = ps)) + copy(pagingSpec = pagingSpec.copy(pagingIdentifiers = ps, fromNext = Some(true))) } override def setSegIntervals(segInAssignments: List[(DruidSegmentInfo, Interval)]): QuerySpec = {