Skip to content

Commit

Permalink
prepare relase 0.19.0
Browse files Browse the repository at this point in the history
  • Loading branch information
davidrabinowitz committed Feb 25, 2021
1 parent 29fc02c commit 67cbc83
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 23 deletions.
10 changes: 10 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Release Notes

## 0.19.0 - 2021-02-24
* Issue #247: Allowing to load results of any arbitrary SELECT query from BigQuery.
* Issue #310: Allowing to configure the expiration time of materialized data.
* PR #283: Implemented Datasource v2 write support.
* Improved Spark 3 compatibility.
* BigQuery API has been upgraded to version 1.127.4
* BigQuery Storage API has been upgraded to version 1.10.0
* Guava has been upgraded to version 30.1-jre
* Netty has been upgraded to version 4.1.52.Final

## 0.18.1 - 2021-01-21
* Issue #248: Reducing the size of the URI list when writing to BigQuery. This allows larger DataFrames (>10,000 partitions) to be safely written.
* Issue #296: Removed redundant packaged slf4j-api.
Expand Down
124 changes: 102 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Since BigQuery is [backed by a columnar datastore](https://cloud.google.com/blog

#### Predicate Filtering

The Storage API supports arbitrary pushdown of predicate filters. Connector version 0.8.0-beta and above support pushdown of arbitrary filters to Bigquery.
The Storage API supports arbitrary pushdown of predicate filters. Connector version 0.8.0-beta and above support pushdown of arbitrary filters to Bigquery.

There is a known issue in Spark that does not allow pushdown of filters on nested fields. For example - filters like `address.city = "Sunnyvale"` will not get pushdown to Bigquery.

Expand Down Expand Up @@ -68,21 +68,21 @@ The latest version of the connector is publicly available in the following links

| version | Link |
| --- | --- |
| Scala 2.11 | `gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.18.1.jar` ([HTTP link](https://storage.googleapis.com/spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.18.1.jar)) |
| Scala 2.12 | `gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.18.1.jar` ([HTTP link](https://storage.googleapis.com/spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.18.1.jar)) |
| Scala 2.11 | `gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.19.0.jar` ([HTTP link](https://storage.googleapis.com/spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.19.0.jar)) |
| Scala 2.12 | `gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.19.0.jar` ([HTTP link](https://storage.googleapis.com/spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.19.0.jar)) |

The connector is also available from the
[Maven Central](https://repo1.maven.org/maven2/com/google/cloud/spark/)
[Maven Central](https://repo1.maven.org/maven2/com/google/cloud/spark/)
repository. It can be used using the `--packages` option or the
`spark.jars.packages` configuration property. Use the following value

| version | Connector Artifact |
| --- | --- |
| Scala 2.11 | `com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.18.1` |
| Scala 2.12 | `com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.18.1` |
| Scala 2.11 | `com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.19.0` |
| Scala 2.12 | `com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.19.0` |

If you want to keep up with the latest version of the connector the following links can be used. Notice that for production
environments where the connector version should be pinned, one of the above links should be used.
environments where the connector version should be pinned, one of the above links should be used.

| version | Link |
| --- | --- |
Expand Down Expand Up @@ -117,7 +117,7 @@ https://codelabs.developers.google.com/codelabs/pyspark-bigquery

The connector uses the cross language [Spark SQL Data Source API](https://spark.apache.org/docs/latest/sql-programming-guide.html#data-sources):

### Reading data from BigQuery
### Reading data from a BigQuery table

```
df = spark.read \
Expand All @@ -132,7 +132,75 @@ import com.google.cloud.spark.bigquery._
val df = spark.read.bigquery("bigquery-public-data.samples.shakespeare")
```

See [Shakespeare.scala](src/main/scala/com/google/cloud/spark/bigquery/examples/Shakespeare.scala) and [shakespeare.py](examples/python/shakespeare.py) for more information.
For more information, see additional code samples in
[Python](examples/python/shakespeare.py),
[Scala](connector/src/main/scala/com/google/cloud/spark/bigquery/examples/Shakespeare.scala)
and
[Java](connector/src/main/java/com/google/cloud/spark/bigquery/examples/JavaShakespeare.java).

### Reading data from a BigQuery query

The connector allows you to run any
[Standard SQL](https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax)
SELECT query on BigQuery and fetch its results directly to a Spark Dataframe.
This is easily done as described in the following code sample:
```
spark.conf.set("viewsEnabled","true")
spark.conf.set("materializationDataset","<dataset>")
sql = """
SELECT tag, COUNT(*) c
FROM (
SELECT SPLIT(tags, '|') tags
FROM `bigquery-public-data.stackoverflow.posts_questions` a
WHERE EXTRACT(YEAR FROM creation_date)>=2014
), UNNEST(tags) tag
GROUP BY 1
ORDER BY 2 DESC
LIMIT 10
"""
df = spark.read.format("bigquery").load(sql)
df.show()
```
Which yields the result
```
+----------+-------+
| tag| c|
+----------+-------+
|javascript|1643617|
| python|1352904|
| java|1218220|
| android| 913638|
| php| 911806|
| c#| 905331|
| html| 769499|
| jquery| 608071|
| css| 510343|
| c++| 458938|
+----------+-------+
```
A second option is to use the `query` option like this:
```
df = spark.read.format("bigquery").option("query", sql).load()
```

Notice that the execution should be faster as only the result is transmitted
over the wire. In a similar fashion the queries can include JOINs more
efficiently then running joins on Spark or use other BigQuery features such as
[subqueries](https://cloud.google.com/bigquery/docs/reference/standard-sql/subqueries),
[BigQuery user defined functions](https://cloud.google.com/bigquery/docs/reference/standard-sql/user-defined-functions),
[wildcard tables](https://cloud.google.com/bigquery/docs/reference/standard-sql/wildcard-table-reference),
[BigQuery ML](https://cloud.google.com/bigquery-ml/docs)
and more.

In order to use this feature the following configurations MUST be set:
* `viewsEnabled` must be set to `true`.
* `materializationDataset` must be set to a dataset where the GCP user has table
creation permission. `materializationProject` is optional.

**Important:** This feature is implemented by running the query on BigQuery and
saving the result into a temporary table, of which Spark will read the results
from. This may add additional costs on your BigQuery account.

### Writing data to BigQuery

Expand Down Expand Up @@ -185,7 +253,7 @@ The API Supports a number of options to configure the read
<td><code>table</code>
</td>
<td>The BigQuery table in the format <code>[[project:]dataset.]table</code>.
It is recommended to use the <code>path</code> parameter of
It is recommended to use the <code>path</code> parameter of
<code>load()</code>/<code>save()</code> instead. This option has been
deprecated and will be removed in a future version.
<br/><strong>(Deprecated)</strong>
Expand Down Expand Up @@ -257,6 +325,18 @@ The API Supports a number of options to configure the read
</td>
<td>Read</td>
</tr>
<tr valign="top">
<td><code>materializationExpirationTimeInMinutes</code>
</td>
<td>The expiration time of the temporary table holding the materialized data
of a view or a query, in minutes. Notice that the connector may re-use
the temporary table due to the use of local cache and in order to reduce
BigQuery computation, so very low values may cause errors. The value must
be a positive integer.
<br/>(Optional. Defaults to 1440, or 24 hours)
</td>
<td>Read</td>
</tr>
<tr valign="top">
<td><code>readDataFormat</code>
</td>
Expand Down Expand Up @@ -310,7 +390,7 @@ The API Supports a number of options to configure the read
<td><code>persistentGcsBucket</code>
</td>
<td>The GCS bucket that holds the data before it is loaded to
BigQuery. If informed, the data won't be deleted after write data
BigQuery. If informed, the data won't be deleted after write data
into BigQuery.
</td>
<td>Write</td>
Expand Down Expand Up @@ -380,7 +460,7 @@ The API Supports a number of options to configure the read
<tr valign="top">
<td><code>clusteredFields</code>
</td>
<td>Comma separated list of non-repeated, top level columns. Clustering is only supported for partitioned tables
<td>Comma separated list of non-repeated, top level columns. Clustering is only supported for partitioned tables
<br/>(Optional).
</td>
<td>Write</td>
Expand Down Expand Up @@ -531,14 +611,14 @@ When casting to Timestamp TIME have the same TimeZone issues as DATETIME

#### Spark ML Data Types Support

The Spark ML [Vector](https://spark.apache.org/docs/2.4.5/api/python/pyspark.ml.html#pyspark.ml.linalg.Vector) and
The Spark ML [Vector](https://spark.apache.org/docs/2.4.5/api/python/pyspark.ml.html#pyspark.ml.linalg.Vector) and
[Matrix](https://spark.apache.org/docs/2.4.5/api/python/pyspark.ml.html#pyspark.ml.linalg.Matrix) are supported,
including their dense and sparse versions. The data is saved as a BigQuery RECORD. Notice that a suffix is added to
the field's description which includes the spark type of the field.

In order to write those types to BigQuery, use the ORC or Avro intermediate format, and have them as column of the
Row (i.e. not a field in a struct).

### Filtering

The connector automatically computes column and pushdown filters the DataFrame's `SELECT` statement e.g.
Expand Down Expand Up @@ -581,7 +661,7 @@ val df = spark.read.format("bigquery")

### Configuring Partitioning

By default the connector creates one partition per 400MB in the table being read (before filtering). This should roughly correspond to the maximum number of readers supported by the BigQuery Storage API.
By default the connector creates one partition per 400MB in the table being read (before filtering). This should roughly correspond to the maximum number of readers supported by the BigQuery Storage API.
This can be configured explicitly with the <code>[parallelism](#properties)</code> property. BigQuery may limit the number of partitions based on server constraints.

### Reading From Views
Expand Down Expand Up @@ -614,7 +694,7 @@ using the following code:
```python
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.18.1")\
.config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.19.0")\
.getOrCreate()
df = spark.read.format("bigquery")\
.load("dataset.table")
Expand All @@ -623,15 +703,15 @@ df = spark.read.format("bigquery")\
**Scala:**
```python
val spark = SparkSession.builder
.config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.18.1")
.config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.19.0")
.getOrCreate()
val df = spark.read.format("bigquery")
.load("dataset.table")
```

In case Spark cluster is using Scala 2.12 (it's optional for Spark 2.4.x,
mandatory in 3.0.x), then the relevant package is
com.google.cloud.spark:spark-bigquery-with-dependencies_**2.12**:0.18.1. In
com.google.cloud.spark:spark-bigquery-with-dependencies_**2.12**:0.19.0. In
order to know which Scala version is used, please run the following code:

**Python:**
Expand All @@ -655,14 +735,14 @@ To include the connector in your project:
<dependency>
<groupId>com.google.cloud.spark</groupId>
<artifactId>spark-bigquery-with-dependencies_${scala.version}</artifactId>
<version>0.18.1</version>
<version>0.19.0</version>
</dependency>
```

### SBT

```sbt
libraryDependencies += "com.google.cloud.spark" %% "spark-bigquery-with-dependencies" % "0.18.1"
libraryDependencies += "com.google.cloud.spark" %% "spark-bigquery-with-dependencies" % "0.19.0"
```

## Building the Connector
Expand Down Expand Up @@ -713,7 +793,7 @@ spark.conf.set("credentialsFile", "</path/to/key/file>")

Another alternative to passing the credentials, is to pass the access token used for authenticating
the API calls to the Google Cloud Platform APIs. You can get the access token by running
`gcloud auth application-default print-access-token`.
`gcloud auth application-default print-access-token`.

```
spark.read.format("bigquery").option("gcpAccessToken", "<acccess-token>")
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ lazy val nettyTcnativeVersion = "2.0.34.Final"

lazy val commonSettings = Seq(
organization := "com.google.cloud.spark",
version := "0.18.2-SNAPSHOT",
version := "0.19.0",
scalaVersion := scala211Version,
crossScalaVersions := Seq(scala211Version, scala212Version)
)
Expand Down

0 comments on commit 67cbc83

Please sign in to comment.