Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Google BigQuery integration - WIP #1155

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Expand Up @@ -50,6 +50,7 @@ jobs:
- env:
- PRE_CMD="docker-compose up -d geode"
- CMD=+geode/testChanged
- env: CMD=+google-cloud-bigquery/testChanged
- env: CMD=+google-cloud-pub-sub/testChanged
- env:
- PRE_CMD="docker-compose up -d gcloud-pubsub-client"
Expand Down
7 changes: 7 additions & 0 deletions build.sbt
Expand Up @@ -117,6 +117,13 @@ lazy val ftp = alpakkaProject(
lazy val geode =
alpakkaProject("geode", "geode", Dependencies.Geode, fork in Test := true, parallelExecution in Test := false)

lazy val googleCloudBigQuery = alpakkaProject(
"google-cloud-bigquery",
"google.cloud.bigquery",
Dependencies.GoogleBigQuery,
fork in Test := true
)

lazy val googleCloudPubSub = alpakkaProject(
"google-cloud-pub-sub",
"google.cloud.pubsub",
Expand Down
101 changes: 101 additions & 0 deletions docs/src/main/paradox/google-cloud-bigquery.md
@@ -0,0 +1,101 @@
# Google Cloud BigQuery

The [Google Cloud BigQuery](https://cloud.google.com/bigquery/) connector provides a way to connect to google bigquery,
run querys on large datasets and get the results streamed.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
run querys on large datasets and get the results streamed.
run queries on large datasets and get the results streamed.


### Reported issues

[Tagged issues at Github](https://github.com/akka/alpakka/labels/p%3Agoogle-cloud-bigquery)

## Artifacts

@@dependency [sbt,Maven,Gradle] {
group=com.lightbend.akka
artifact=akka-stream-alpakka-google-cloud-bigquery_$scalaBinaryVersion$
version=$version$
}

## Usage

Possibly needed imports for the following codes

Scala
: @@snip [snip](/google-cloud-bigquery/src/test/scala/docs/scaladsl/GoogleBigQuerySourceDoc.scala) { #imports }

Java
: @@snip [snip](/google-cloud-bigquery/src/test/java/docs/javadsl/GoogleBigQuerySourceDoc.java) { #imports }

At the beginning you will need a config to work with.

Scala
: @@snip [snip](/google-cloud-bigquery/src/test/scala/docs/scaladsl/GoogleBigQuerySourceDoc.scala) { #init-config }

Java
: @@snip [snip](/google-cloud-bigquery/src/test/java/docs/javadsl/GoogleBigQuerySourceDoc.java) { #init-config }


The connector has some fire and forget style API for meta data requests.
(These give back basic information about the tables and the fields. If you need more information, sadly you need to write some parsers yourself (see below).)

Scala
: @@snip [snip](/google-cloud-bigquery/src/test/scala/docs/scaladsl/GoogleBigQuerySourceDoc.scala) { #list-tables-and-fields }

Java
: @@snip [snip](/google-cloud-bigquery/src/test/java/docs/javadsl/GoogleBigQuerySourceDoc.java) { #list-tables-and-fields }

For the rawest representation there is a "csvStyle" source built in.
This will return a header (field names), and the fields as a list of Strings.

Scala
: @@snip [snip](/google-cloud-bigquery/src/test/scala/docs/scaladsl/GoogleBigQuerySourceDoc.scala) { #csv-style }

Java
: @@snip [snip](/google-cloud-bigquery/src/test/java/docs/javadsl/GoogleBigQuerySourceDoc.java) { #csv-style }

There is a more sophisticated way to get data from a database.
If you want to get a stream of classes, you can add your converter function too.

Scala
: @@snip [snip](/google-cloud-bigquery/src/test/scala/docs/scaladsl/GoogleBigQuerySourceDoc.scala) { #run-query }

Java
: @@snip [snip](/google-cloud-bigquery/src/test/java/docs/javadsl/GoogleBigQuerySourceDoc.java) { #run-query }

If you just want to use the built in paging implementation, or you have some specific need you can call the raw api.
The next example shows how you can access some [dryRun](https://cloud.google.com/bigquery/query-plan-explanation) data with the raw api and helpers.

Scala
: @@snip [snip](/google-cloud-bigquery/src/test/scala/docs/scaladsl/GoogleBigQuerySourceDoc.scala) { #dry-run }

Java
: @@snip [snip](/google-cloud-bigquery/src/test/java/docs/javadsl/GoogleBigQuerySourceDoc.java) { #dry-run }

### Config

The config will handle your session, and your service-token. (See the creation code above.)

If you create multiple requests to the same source (likely to happen) you should create it once and try to reuse it.
If you call multiple bigquery sources (not likely to happen) it is worth to cache the configs, so you can save a lot of unneeded authorization requests.

### Cancel on timeout

All off the provided functionality can fire a callback when the **downstream** signals a stop.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
All off the provided functionality can fire a callback when the **downstream** signals a stop.
All of the provided functionality can fire a callback when the **downstream** signals a stop.

This is handful if you want to implement some timeout in the downstream, and try to lower your costs with stopping the longrunning jobs.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
This is handful if you want to implement some timeout in the downstream, and try to lower your costs with stopping the longrunning jobs.
This is useful if you want to implement some timeout in the downstream, and try to lower your costs by stopping the longrunning jobs.

(Google doesn't provide any insurance about cost reduction, but at least we could try. [Read this for more information.](https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/cancel))

Scala
: You can use the build in @scaladoc[BigQueryCallbacks](akka.stream.alpakka.google.cloud.bigquery.scaladsl.BigQueryCallbacks)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use the @scala[@scaladoc[BigQueryCallbacks](akka.stream.alpakka.google.cloud.bigquery.scaladsl.BigQueryCallbacks)]@java... directive to get this in-line with the text.


Java
: You can use the build in @scaladoc[BigQueryCallbacks](akka.stream.alpakka.google.cloud.bigquery.javadsl.BigQueryCallbacks)

### Parsers

The parser function is a `JsObject => Option[T]` function.
This is needed, because there is a possibility that the response not contains any data, and in that case we need to retry the request with some delay.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
This is needed, because there is a possibility that the response not contains any data, and in that case we need to retry the request with some delay.
This is needed, because there is a possibility that the response doesn't contain any data, and in that case we need to retry the request with some delay.

Your parser function needs to be bulletproof and the codes in the examples are not the good practices for this.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Your parser function needs to be bulletproof and the codes in the examples are not the good practices for this.
Your parser function needs to be bulletproof and the code in the examples may not show the best solutions.

If you returns `None` in every error case; your stream will be polling forever!
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
If you returns `None` in every error case; your stream will be polling forever!
If you return `None` in every error case; your stream will be polling forever!


## Running the examples

To run the example code you will need to configure a project, create/init tables in google-bigquery and provide your own credentials.
1 change: 1 addition & 0 deletions docs/src/main/paradox/index.md
Expand Up @@ -31,6 +31,7 @@ The [Alpakka project](https://developer.lightbend.com/docs/alpakka/current/) is
* [File](file.md)
* [FS2](external/fs2.md)
* [FTP](ftp.md)
* [Google Cloud BigQuery](google-cloud-bigquery.md)
* [Google Cloud Pub/Sub](google-cloud-pub-sub.md)
* [Google Cloud Pub/Sub gRPC](google-cloud-pub-sub-grpc.md)
* [Google Firebase Cloud Messaging](google-fcm.md)
Expand Down
@@ -0,0 +1,13 @@
/*
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.google.cloud.bigquery

import akka.stream.alpakka.google.cloud.bigquery.impl.GoogleSession

object BigQueryFlowModels {

class BigQueryProjectConfig(val projectId: String, val dataset: String, val session: GoogleSession)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GoogleSession is part of impl, but exposed here.


}
@@ -0,0 +1,36 @@
/*
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.google.cloud.bigquery.client
import akka.http.scaladsl.model.{ContentTypes, HttpEntity, HttpMethods, HttpRequest}
import akka.stream.alpakka.google.cloud.bigquery.client.QueryJsonProtocol.{QueryRequest, QueryResponse}
import akka.stream.alpakka.google.cloud.bigquery.client.TableDataQueryJsonProtocol.TableDataQueryResponse
import akka.stream.alpakka.google.cloud.bigquery.client.TableListQueryJsonProtocol.TableListQueryResponse
import spray.json.JsObject

import scala.util.Try

object BigQueryCommunicationHelper {

def createQueryRequest(query: String, projectId: String, dryRun: Boolean) =
HttpRequest(HttpMethods.POST, GoogleEndpoints.queryUrl(projectId), entity = createQueryBody(query, dryRun))

def createQueryBody(query: String, dryRun: Boolean) =
HttpEntity(ContentTypes.`application/json`, QueryRequest(query, dryRun = Some(dryRun)).toJson.compactPrint)

def parseQueryResult(result: JsObject): Option[(Seq[String], Seq[Seq[String]])] =
Try {
val queryResponse = result.convertTo[QueryResponse]
val fields = queryResponse.schema.fields.map(_.name)
val rows = queryResponse.rows.fold(Seq[Seq[String]]())(rowSeq => rowSeq.map(row => row.f.map(_.v)))

(fields, rows)
}.toOption

def parseTableListResult(result: JsObject): Option[Seq[TableListQueryJsonProtocol.QueryTableModel]] =
Try(result.convertTo[TableListQueryResponse].tables).toOption

def parseFieldListResults(result: JsObject): Option[Seq[TableDataQueryJsonProtocol.Field]] =
Try(result.convertTo[TableDataQueryResponse].schema.fields).toOption
}
@@ -0,0 +1,14 @@
/*
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.google.cloud.bigquery.client

import spray.json.{DefaultJsonProtocol, JsonFormat}

object DryRunJsonProtocol extends DefaultJsonProtocol {

case class DryRunResponse(totalBytesProcessed: String, jobComplete: Boolean, cacheHit: Boolean)

implicit val dryRunFormat: JsonFormat[DryRunResponse] = jsonFormat3(DryRunResponse)
}
@@ -0,0 +1,34 @@
/*
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.google.cloud.bigquery.client

object GoogleEndpoints {

val bigQueryV2Url: String = "https://www.googleapis.com/bigquery/v2"

def queryUrl(projectId: String): String =
s"$bigQueryV2Url/projects/$projectId/queries"

def tableListUrl(projectId: String, dataset: String): String =
s"$bigQueryV2Url/projects/$projectId/datasets/$dataset/tables"

def fieldListUrl(projectId: String, dataset: String, tableName: String): String =
s"$bigQueryV2Url/projects/$projectId/datasets/$dataset/tables/$tableName"

def insertIntoUrl(projectId: String, dataset: String, tableName: String): String =
s"$bigQueryV2Url/projects/$projectId/datasets/$dataset/tables/$tableName/insertAll"

def createTableUrl(projectId: String, dataset: String): String =
s"$bigQueryV2Url/projects/$projectId/datasets/$dataset/tables"

def deleteTableUrl(projectId: String, dataset: String, tableName: String): String =
s"$bigQueryV2Url/projects/$projectId/datasets/$dataset/tables/$tableName"

def testConnectionUrl(projectId: String, dataset: String): String =
s"$bigQueryV2Url/projects/$projectId/datasets/$dataset"

def cancellationUrl(projectId: String, jobId: String): String =
s"$bigQueryV2Url/projects/$projectId/jobs/$jobId/cancel"
}
@@ -0,0 +1,43 @@
/*
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.google.cloud.bigquery.client

import spray.json.{DefaultJsonProtocol, JsBoolean, JsNull, JsObject, JsString, JsValue, JsonFormat}

object QueryJsonProtocol extends DefaultJsonProtocol {

case class QueryRequest(query: String,
useLegacySql: Boolean = false,
maxResults: Option[Int] = None,
dryRun: Option[Boolean] = None)

case class QueryResponse(schema: Schema, rows: Option[Seq[Row]])
case class Schema(fields: Seq[FieldSchema])
case class FieldSchema(name: String)
case class Row(f: Seq[RowValue])
case class RowValue(v: String)

implicit val queryRequestFormat: JsonFormat[QueryRequest] = jsonFormat4(QueryRequest)

implicit val rowValueFormat: JsonFormat[RowValue] = new JsonFormat[RowValue] {
override def write(obj: RowValue): JsValue = JsObject("v" -> JsString(obj.v))

override def read(json: JsValue): RowValue = json.asJsObject.getFields("v") match {
case Seq(JsString(value)) =>
if (value == "true") RowValue("1")
else if (value == "false") RowValue("0")
else RowValue(value)
case Seq(JsNull) => RowValue(null)
case Seq(JsBoolean(b)) => RowValue(if (b) "1" else "0")
case Seq(value) => RowValue(value.toString)
case _ => RowValue(json.toString)
}
}
implicit val rowFormat: JsonFormat[Row] = jsonFormat1(Row)
implicit val fieldFormat: JsonFormat[FieldSchema] = jsonFormat1(FieldSchema)
implicit val schemaFormat: JsonFormat[Schema] = jsonFormat1(Schema)
implicit val queryResponseFormat: JsonFormat[QueryResponse] = jsonFormat2(QueryResponse)

}
@@ -0,0 +1,18 @@
/*
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.google.cloud.bigquery.client

import spray.json.{DefaultJsonProtocol, JsonFormat}

object TableDataQueryJsonProtocol extends DefaultJsonProtocol {

case class TableDataQueryResponse(schema: TableSchema)
case class TableSchema(fields: Seq[Field])
case class Field(name: String, `type`: String)

implicit val fieldFormat: JsonFormat[Field] = jsonFormat2(Field)
implicit val tableSchemaFormat: JsonFormat[TableSchema] = jsonFormat1(TableSchema)
implicit val tableDataQueryResponseFormat: JsonFormat[TableDataQueryResponse] = jsonFormat1(TableDataQueryResponse)
}
@@ -0,0 +1,18 @@
/*
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.google.cloud.bigquery.client

import spray.json.{DefaultJsonProtocol, JsonFormat}

object TableListQueryJsonProtocol extends DefaultJsonProtocol {

case class TableListQueryResponse(tables: Seq[QueryTableModel])
case class QueryTableModel(tableReference: TableReference, `type`: String)
case class TableReference(tableId: String)

implicit val tableReferenceFormat: JsonFormat[TableReference] = jsonFormat1(TableReference)
implicit val queryTableModelFormat: JsonFormat[QueryTableModel] = jsonFormat2(QueryTableModel)
implicit val tableListQueryResponseFormat: JsonFormat[TableListQueryResponse] = jsonFormat1(TableListQueryResponse)
}
@@ -0,0 +1,83 @@
/*
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.google.cloud.bigquery.impl

import akka.NotUsed
import akka.http.scaladsl.HttpExt
import akka.http.scaladsl.model.HttpRequest
import akka.stream._
import akka.stream.alpakka.google.cloud.bigquery.impl.pagetoken.{AddPageToken, EndOfStreamDetector}
import akka.stream.alpakka.google.cloud.bigquery.impl.parser.Parser
import akka.stream.alpakka.google.cloud.bigquery.impl.parser.Parser.PagingInfo
import akka.stream.alpakka.google.cloud.bigquery.impl.sendrequest.SendRequestWithOauthHandling
import akka.stream.alpakka.google.cloud.bigquery.impl.util.{Delay, FlowInitializer, OnFinishCallback}
import akka.stream.scaladsl.{GraphDSL, Source, Zip}
import spray.json.JsObject

import scala.concurrent.ExecutionContext

object BigQueryStreamSource {

private[bigquery] def callbackConverter(onFinishCallback: PagingInfo => NotUsed): ((Boolean, PagingInfo)) => Unit =
(t: (Boolean, PagingInfo)) => { onFinishCallback(t._2); {} }

private[bigquery] def apply[T](httpRequest: HttpRequest,
parserFn: JsObject => Option[T],
onFinishCallback: PagingInfo => NotUsed,
googleSession: GoogleSession,
http: HttpExt,
)(
implicit mat: Materializer
): Source[T, NotUsed] =
Source.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._

implicit val ec: ExecutionContext = mat.executionContext

val in = builder.add(Source.repeat(httpRequest))
val requestSender = builder.add(SendRequestWithOauthHandling(googleSession, http))
val parser = builder.add(Parser(parserFn))
val uptreamFinishHandler =
builder.add(OnFinishCallback[(Boolean, PagingInfo)](callbackConverter(onFinishCallback)))
val endOfStreamDetector = builder.add(EndOfStreamDetector())
val flowInitializer = builder.add(FlowInitializer((false, PagingInfo(None, None))))
val delay = builder.add(Delay[(Boolean, PagingInfo)](_._1, 60))
val zip = builder.add(Zip[HttpRequest, (Boolean, PagingInfo)]())
val addPageTokenNode = builder.add(AddPageToken())

in ~> zip.in0
requestSender ~> parser.in
parser.out1 ~> uptreamFinishHandler
uptreamFinishHandler ~> endOfStreamDetector
endOfStreamDetector ~> delay
delay ~> flowInitializer
flowInitializer ~> zip.in1
zip.out ~> addPageTokenNode
addPageTokenNode ~> requestSender

SourceShape(parser.out0)

/*
+--------+ +------------+ +-------+ +------+
|Request | |AddPageToken| |Request| |Parser|
|Repeater+---------->+ +--------->+Sender +-------->+ +-----+(response)+----->
| | | | | | | |
+--------+ +-----+------+ +-------+ +---+--+
^ |
| |
| +-----------+ +----+------+
| | Flow | | UpStream |
+<----+Initializer| | Finish |
| | (single) | | Handler |
| +-----------+ +----+------+
| |
| +-----+ +-----------+ |
| |Delay| |EndOfStream| |
+-------+ +<------+ Detector +<----+
| | | |
+-----+ +-----------+
*/
})
}