Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQuery: New BigQuery connector #2548

Merged
merged 82 commits into from
Mar 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
82 commits
Select commit Hold shift + click to select a range
496baaf
BigQuery: New BigQuery connector
armanbilge Dec 20, 2020
e35d856
Don't use Java 11 method
armanbilge Dec 20, 2020
59461f2
Remove another Java 11 method call
armanbilge Dec 20, 2020
716aba4
Reimplement PaginatedRequest
armanbilge Dec 21, 2020
c72b45b
Use syntax shorthand for implicits
armanbilge Dec 21, 2020
58f140c
Rename type class PageToken -> Paginated
armanbilge Dec 21, 2020
dcd1a77
Use FiniteDuration instead of int
armanbilge Dec 21, 2020
f0001fc
Make `query` robust to user exceptions
armanbilge Dec 21, 2020
e78bf25
Replace deprecated method call
armanbilge Jan 18, 2021
9c4722f
Refine LoadJob implementation
armanbilge Jan 18, 2021
e069a42
Remove silencer annotation (not working?)
armanbilge Jan 18, 2021
df16ca2
Create BigQueryRootJsonFormat
armanbilge Jan 18, 2021
36cd236
Reorganize SchemaWriter type classes
armanbilge Jan 18, 2021
5500ccb
Create ProductSchemaWriter
armanbilge Jan 18, 2021
7c17772
Fix bug
armanbilge Jan 18, 2021
cc8b55a
Add dependency to spray v1.3.6 for float patch
armanbilge Jan 19, 2021
c8adcb6
Add complete end-to-end hoverfly simulation
armanbilge Jan 19, 2021
befa0f1
Use syntax shorthand for implicits
armanbilge Jan 19, 2021
c2dcc14
Fix travis build
armanbilge Jan 20, 2021
12870ae
Finish implementation of model classes w/ Java DSLs
armanbilge Jan 20, 2021
4bced5a
Reorganize code to satisfy @tg44's nitpicks ;)
armanbilge Jan 20, 2021
49ea383
Implement Java DSL
armanbilge Jan 21, 2021
6ae4159
Use plural
armanbilge Jan 21, 2021
cbde365
Implement Jackson parsing for Java DSL; add Java DSL e2e test
armanbilge Jan 21, 2021
8d25b24
Separate query-related methods from jobs
armanbilge Jan 21, 2021
f9ec80c
Create examples for docs; fix some Java APIs
armanbilge Jan 22, 2021
47191f1
Replace non-JDK8 APIs
armanbilge Jan 22, 2021
6223da8
Write documentation and other fixes along the way
armanbilge Jan 22, 2021
da1ff97
Add BigQueryQueriesSpec and fix bugs
armanbilge Jan 23, 2021
e662063
Add a few more docs
armanbilge Jan 23, 2021
e6dde8e
Improve handling of settings/attributes
armanbilge Jan 23, 2021
442a26b
More docs
armanbilge Jan 23, 2021
8b284a8
Remove callback API
armanbilge Jan 23, 2021
d5351d2
Format BigQueryQueriesSpec
armanbilge Jan 23, 2021
2df788e
Silence warnings
armanbilge Jan 23, 2021
fac5589
Remove unused dependencies
armanbilge Jan 23, 2021
530c1ea
Minor polishing of APIs and docs
armanbilge Jan 24, 2021
eb14b66
Clarify Java docs
armanbilge Jan 25, 2021
d066a7a
Small doc fixes
armanbilge Feb 3, 2021
f17a65b
Document workaround load job bug
armanbilge Feb 3, 2021
96a086b
Rewrite Java DSL e2e test in Java
armanbilge Feb 3, 2021
3ec8c1a
Add docs for SchemaWriter
armanbilge Feb 4, 2021
b08e2e2
Small fixes for internal API
armanbilge Feb 4, 2021
4340706
Add @InternalApi annotations
armanbilge Feb 5, 2021
1898a29
Fix build
armanbilge Feb 22, 2021
5155ce4
Avoid reallocating Future Nones
armanbilge Feb 22, 2021
00d65f2
Remove unused import
armanbilge Feb 22, 2021
a561b9c
Show Jackson dependency only for Java
armanbilge Feb 23, 2021
3024e68
Check for specific exception
armanbilge Feb 23, 2021
ed71146
Cache parsed query
armanbilge Feb 24, 2021
f655c4e
Improve/optimize paginated request API
armanbilge Feb 24, 2021
88f10ef
Reorganize JsonFormats
armanbilge Feb 24, 2021
f26a2b1
More docs for SchemaWriters
armanbilge Feb 25, 2021
10fcd5d
Relax implicit format to reader/writer
armanbilge Feb 25, 2021
82c8768
Silence warnings
armanbilge Feb 25, 2021
9b4b55e
Small doc fixes
armanbilge Feb 25, 2021
b66c825
Provide early access to query job reference
armanbilge Feb 25, 2021
08bc206
Simplify async code (unneeded promise)
armanbilge Feb 25, 2021
09a7b2a
Mark constructors private/internal API
armanbilge Mar 2, 2021
692eab0
Update docs referencing latest Akka HTTP
armanbilge Mar 2, 2021
ac30537
Bump Akka HTTP version
armanbilge Mar 2, 2021
e9e9247
Add support for Date/Time types
armanbilge Mar 3, 2021
d6798dd
Fix typo
armanbilge Mar 3, 2021
ce3834b
Remove fetch depth limit for link validator
armanbilge Mar 3, 2021
00e973c
Revert use of HTTP 10.2.4-only method
armanbilge Mar 3, 2021
90ff755
Bug fix+tests for dry queries w/o job id
armanbilge Mar 4, 2021
e1e2999
Merge branch 'master' into wip-new-bigquery
armanbilge Mar 9, 2021
6d2b3e8
Fix names of implicit JSON readers/writers
armanbilge Mar 9, 2021
0a2cffc
Add support for BigQuery bytes type
armanbilge Mar 10, 2021
146f149
Disable pretty-printing of API response
armanbilge Mar 10, 2021
f474acb
Annotate internal APIs and make private
armanbilge Mar 10, 2021
5f657c7
Fix reference to Akka HTTP version
armanbilge Mar 10, 2021
5d79786
Enable fatal warnings
armanbilge Mar 10, 2021
b8c0c32
Fix exception parameter name w/ override
armanbilge Mar 10, 2021
a3add92
Add @ApiMayChange annotations
armanbilge Mar 10, 2021
f52677d
Replaced deprecated constructor
armanbilge Mar 10, 2021
253ddd5
Revert to deprecrated ctr; silence it
armanbilge Mar 11, 2021
f2c8872
Update @ApiMayChange annotations to point to #2548
armanbilge Mar 11, 2021
f26ef83
Move @silent to relevant line
armanbilge Mar 11, 2021
b29a690
Use pattern from Akka docs
armanbilge Mar 11, 2021
08ccbcb
Add covariance annotations to models
armanbilge Mar 11, 2021
d2c4285
Merge branch 'master' into wip-new-bigquery
armanbilge Mar 11, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ lazy val googleCloudBigQuery = alpakkaProject(
"google-cloud-bigquery",
"google.cloud.bigquery",
Dependencies.GoogleBigQuery,
Test / fork := true
Test / fork := true,
fatalWarnings := true
).disablePlugins(MimaPlugin).enablePlugins(spray.boilerplate.BoilerplatePlugin)

lazy val googleCloudPubSub = alpakkaProject(
Expand Down Expand Up @@ -354,6 +355,10 @@ lazy val docs = project
"javadoc.org.apache.kudu.base_url" -> s"https://kudu.apache.org/releases/${Dependencies.KuduVersion}/apidocs/",
armanbilge marked this conversation as resolved.
Show resolved Hide resolved
"javadoc.org.apache.hadoop.base_url" -> s"https://hadoop.apache.org/docs/r${Dependencies.HadoopVersion}/api/",
"javadoc.software.amazon.awssdk.base_url" -> "https://sdk.amazonaws.com/java/api/latest/",
"javadoc.com.fasterxml.jackson.annotation.base_url" -> "https://javadoc.io/doc/com.fasterxml.jackson.core/jackson-annotations/latest/",
"javadoc.com.fasterxml.jackson.annotation.link_style" -> "direct",
// Scala
"scaladoc.spray.json.base_url" -> s"https://javadoc.io/doc/io.spray/spray-json_${scalaBinaryVersion.value}/latest/",
// Eclipse Paho client for MQTT
"javadoc.org.eclipse.paho.client.mqttv3.base_url" -> "https://www.eclipse.org/paho/files/javadoc/",
"javadoc.org.bson.codecs.configuration.base_url" -> "https://mongodb.github.io/mongo-java-driver/3.7/javadoc/",
Expand Down
189 changes: 127 additions & 62 deletions docs/src/main/paradox/google-cloud-bigquery.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka.googlecloud.bigquery.scaladsl.schema

import akka.stream.alpakka.googlecloud.bigquery.model.TableJsonProtocol.{RequiredMode, TableFieldSchema}

import scala.collection.immutable.Seq
import scala.reflect.{classTag, ClassTag}

trait ProductSchemasInstances { this: ProductSchemas with StandardSchemas =>
[# // Case classes with 1 parameters

def bigQuerySchema1[[#P1#], T <: Product: ClassTag](ev: ([#P1#]) => T)(implicit [#writer1: SchemaWriter[P1]#]): TableSchemaWriter[T] = new ProductSchemaWriter[T](schemaFields[[#P1#], T])

private def schemaFields[[#P1#], T <: Product: ClassTag](implicit [#writer1: SchemaWriter[P1]#]): Seq[TableFieldSchema] = {
val Array([#p1#]) = extractFieldNames(classTag[T])
Seq([#writer1.write(p1, RequiredMode)#])
}#

]
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka.googlecloud.bigquery.scaladsl
package akka.stream.alpakka.googlecloud.bigquery.scaladsl.spray

import spray.json.{JsObject, JsValue, JsonFormat, ProductFormats, RootJsonFormat, StandardFormats}
import spray.json.{JsObject, JsValue, ProductFormats, StandardFormats}

import scala.reflect.{classTag, ClassTag}

trait BigQueryProductFormatsInstances { this: BigQueryProductFormats with ProductFormats with StandardFormats =>
[# // Case classes with 1 parameters

def bigQueryJsonFormat1[[#P1: JsonFormat#], T <: Product: ClassTag](construct: ([#P1#]) => T): RootJsonFormat[T] = {
def bigQueryJsonFormat1[[#P1: BigQueryJsonFormat#], T <: Product: ClassTag](construct: ([#P1#]) => T): BigQueryRootJsonFormat[T] = {
val Array([#p1#]) = extractFieldNames(classTag[T])
bigQueryJsonFormat(construct, [#p1#])
}
def bigQueryJsonFormat[[#P1: JsonFormat#], T <: Product](construct: ([#P1#]) => T, [#fieldName1: String#]): RootJsonFormat[T] =
new RootJsonFormat[T] {
def bigQueryJsonFormat[[#P1: BigQueryJsonFormat#], T <: Product](construct: ([#P1#]) => T, [#fieldName1: String#]): BigQueryRootJsonFormat[T] =
new BigQueryRootJsonFormat[T] {
def write(p: T) = {
val fields = new collection.mutable.ListBuffer[(String, JsValue)]
fields.sizeHint(1 * 2)
Expand All @@ -30,7 +30,8 @@ trait BigQueryProductFormatsInstances { this: BigQueryProductFormats with Produc
]
construct([#p1V#])
}
}#
}
implicit def bigQueryTuple1Reader[[#P1: BigQueryJsonFormat#]]: BigQueryRootJsonReader[Tuple1[[#P1#]]] = bigQueryJsonFormat1(Tuple1[[#P1#]])#

]
}
43 changes: 38 additions & 5 deletions google-cloud-bigquery/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -1,16 +1,49 @@
alpakka.google.bigquery {

credentials {
# Options: service-account, compute-engine
provider = service-account

service-account {
# If `path` is non-empty then read from file, otherwise fallback to config values
path = ""
path = ${?GOOGLE_APPLICATION_CREDENTIALS}
project-id = ""
client-email = ""
private-key = ""
scopes = ["https://www.googleapis.com/auth/bigquery"]
}

# Timeout for blocking call during settings initialization to compute engine metadata server
compute-engine.timeout = 1s
}

load-job {
# The minimum size of a chunk
chunk-size = 15 MB

# BigQuery has a hard limit of 1,500 load jobs per table per day (just over 1 job per minute)
# This sets the rate limit when loading data via BigQuery.insertAllAsync
per-table-quota = 1 minute
}

# The retry settings for requests to Google APIs
retry-settings {
max-retries = 6
min-backoff = 1 second
max-backoff = 1 minute
random-factor = 0.2
}

# An address of a proxy that will be used for all connections using HTTP CONNECT tunnel.
# forward-proxy {
# scheme = "https"
# host = "proxy"
# port = 8080
# credentials {
# username = "username"
# password = "password"
# }
# trustPem {
# pemPath = "/path/to/file.pem"
# }
# trust-pem = "/path/to/file.pem"
# }

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possile that it is just me, but when I have multiple classes/objects in the same file, I split them to multiple files, or place the "sub"-classes into the "main" object of the file... So I would move the last 3 classes into the BigQueryAttributes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's just you ;) e.g. Some and None are not inside of Option, same for Either/Left/Right.

Copy link
Contributor Author

@armanbilge armanbilge Jan 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I just copied the class structure from the other connectors, S3 IIRC.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm...

https://docs.scala-lang.org/style/files.html says that class/trait + object should be in a file, and if you start to introduce adts you should lowercase the filename (like option.scala). No idea when I started to pack every subclass of a sealed trait to the companion object, but I think it started to develop when we tried out tagless final in some projects :D I told you it is nitpicking, I would pack it, you would not, and the scala codestyle says we should lowercase the filename which I would personally never do.

I can let it go :P

* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka.googlecloud.bigquery

import akka.stream.Attributes.Attribute
import akka.stream.alpakka.googlecloud.bigquery.impl.BigQueryExt
import akka.stream.{Attributes, Materializer}

/**
* Akka Stream [[Attributes]] that are used when materializing BigQuery stream blueprints.
*/
object BigQueryAttributes {

/**
* [[BigQuerySettings]] to use for the BigQuery stream
*/
def settings(settings: BigQuerySettings): Attributes = Attributes(BigQuerySettingsValue(settings))

/**
* Config path which will be used to resolve [[BigQuerySettings]]
*/
def settingsPath(path: String): Attributes = Attributes(BigQuerySettingsPath(path))

/**
* Resolves the most specific [[BigQuerySettings]] for some [[Attributes]]
*/
def resolveSettings(attr: Attributes, mat: Materializer): BigQuerySettings =
attr.attributeList.collectFirst {
case BigQuerySettingsValue(settings) => settings
case BigQuerySettingsPath(path) => BigQueryExt(mat.system).settings(path)
} getOrElse {
BigQueryExt(mat.system).settings
}

private final case class BigQuerySettingsValue(settings: BigQuerySettings) extends Attribute
private final case class BigQuerySettingsPath(path: String) extends Attribute
}

This file was deleted.

Loading