# Exporting data to BigQuery

Google provides a [BigQuery connector](https://cloud.google.com/dataproc/docs/concepts/connectors/bigquery) for Hadoop that [can be used with Spark](https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example),
allowing us to easily export data from Cognite Data Fusion to BigQuery.

In order to do so "natively" (without having to call the `bq` command line tool) we'll use Scala instead of Python, and convert our data to the GSON objects required by the BigQuery connector.
For an example of using `bq` together with Python, instead of Scala, see the PySpark version of the code in [this tutorial](https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example).

Import this notebook to Databricks before continuing. If you are using Jupyter or another notebook environment you will have to make
some modifications on your own with regards to setting up credentials and installing the Google Hadoop connectors.

To run this you need the following:
- Databricks CLI installed and configured.
- API key for a Cognite Data Fusion project. You can use a key for [Open Industrial Data](https://openindustrialdata.com/) (the `publicdata` project), saved in a secret in Databricks.
- Access to a Google Cloud Platform project, with sufficient permissions to create a service account with access to BigQuery, or access to such a service account's key.

In [2]:
%scala
val secret_scope = "your_scope"
val project = "publicdata"
val resourceType = "events"

val gcpProjectId = "your-project-id"
val bigQueryDataset = "your_bigquery_dataset"
val bigQueryTable = "events"
val gcsBucket = "databricks-bq"  // used for temporary storage

## Installing Google Hadoop connectors and credentials on the cluster

We need to install the [Cloud Storage](https://cloud.google.com/dataproc/docs/concepts/connectors/cloud-storage) and the BigQuery Hadoop connectors on our cluster.
In addition, all nodes in the cluster need a Google service account credentials file.

One way to do this is by uploading an init script like this to DBFS, [the Databricks File System](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html).
First you need to create a [Google Service Account and key file](https://cloud.google.com/iam/docs/creating-managing-service-account-keys) with the "BigQuery Data Owner" and "BigQuery Job User" roles.

The service account must also be given full permissions to the Google Cloud Storage bucket you specified in `gcsBucket` above.

Copy the contents of the key file into the following init script.

```
#!/bin/bash

mkdir /google
cat <<EOF >/google/credentials.json
{
  "type": "service_account",
  "project_id": "some-gcp-project",
  "private_key_id": "2e95aed5e9588518a476111c1111222233334444",
  "private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAABC/ABCDEFGHIJK\nLMNOPQRSTUVWXYZ/ABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKLMNOPQRSTUV\nABCDEFGHIJKLMNOPQRSTUVWXYZABCD+3sVnFOW7ggurUCDcpIo13rjA2oZ7aOhNG\nABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKL\novFhQkBCERGTvhK3fkGMDdaGJ+vh7yecY59T9F4PivcbGB/5syHYwAiRrb7PzJ/Y\n/k1/lzjFDdtyyo4qyoit16H6VKROesEmEnBQW8lrrvxfo+UEEcpphF4V4AsACZRa\nnrbfBgFBAgMBAAECggEAAkrs7mvhpa6b+Z9h4qhDXHGc8QisYmxzjQM9rqD98XKI\nmTh1eR6Rz+KuPqHjiQTePZ5THWxHEeP7uo9V2nGRitDo/jvqCjjYoUm0ZGsR15gi\nc1zQk+c4r8dS/uIu7BUWJFEC/wMn3WsM/GzZH99VQHSevcHcMuzYjOnkq2DdEfSQ\nrqXKRHHDA4ydPMGHuwbOh0ir4i3rwa0cfxv6hkshcV/Y8ef2VTSB34Ga8O8PfHJV\nd+LRGbDS6poL0pp4Z5m+3OYyWgwW85Wz9jlCpVs7hDUROTrUkClbTEv42wfY12BJ\nwgZpSWr29Y+C2cUeFZpNVkKmvHyrJSoUeGrd0rQhAQKBgQDPaI20cgK8NMqypM3r\n3+iCAe3wKk8gSLyAG7OchErqReGIlHtPMaehzllrFfBVS/viq9hCKPv0udIdMA3M\n/k4tg21ipPw4I6knHQIHNfMmfzUTfjNxnwmLv1qtV83pbCO21x7aG9+0QgG7MDZU\n5dEYiiA/FPriDVvYThx74KdiIQKBgQC1a52sjfreihdFmx+5LWQQYTaKLH0yqmUK\ntJX1MbJ4uxB29yUnRsVDJFK0DXxJw+LjN7hBrnbJxKvJChUnM7P5PjS4xVQN3B6T\nnHBBYAWlGfA2of+Hew7RkbMSj3FpwI/6pA39zL6aIjvlRTkigDLPqGzTBs3WSTUu\nrtOvPLz7IQKBgEGjENU+D2eIPW1zgkdXQLmD6szKVugcnKreGWU66IpjOxCCDNPv\nHuGx79JXywrzVO9S+slVNwcnlzrtbjClehAlO4SwObF6d5mNMIsfo6dXMnDMy3L1\npYu4LvYUh3GLa3H9eiIEGDNvgCTBCTCs2hMuAy5zcUIAgERf09vQKKiBAoGBALLr\nsZNOa04yroTtWjoMtBUbauiWu9rOBdyrAQHSw4siIjjRwYic2UtAdXgxnJQ7ZnZk\nq4nLgEe5eRpKoVYKPcLgQKrBogYRQu6daHBxPN/5Vtjqp9J74L96jDsP0PBVAypT\nxaC63newr5WL0a9e872tA7aTjUl65cJmlUrMAi1BAoGBAJmLlOMjMMoqTzHCX0EB\nW8p+lYqap05eD7M7M9YXUI5kKx4EZb2hHs9U9QzO2kDDX4sYMsUQs1S8++dhnJ3Z\nABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKLMNOPQRSTUVABCDEFGHIJKLMNOP\nlN8OQNMZJ16xRiIgOMk7eAM2\n-----END PRIVATE KEY-----\n",
  "client_email": "databricks-service-account@some-gcp-project.iam.gserviceaccount.com",
  "client_id": "111111111111111111111",
  "auth_uri": "https://accounts.google.com/o/oauth2/auth",
  "token_uri": "https://oauth2.googleapis.com/token",
  "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
  "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/databricks-service-account%40some-gcp-project.iam.gserviceaccount.com"
}
EOF

curl https://storage.googleapis.com/hadoop-lib/bigquery/bigquery-connector-hadoop2-latest.jar > /databricks/jars/bigquery-connector-hadoop2-latest.jar
curl https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar > /databricks/jars/gcs-connector-hadoop2-latest.jar
```

Save this using your favorite text editor and use the [Databricks CLI](https://docs.databricks.com/user-guide/dev-tools/databricks-cli.html) to upload it: `dbfs cp my-init-script.sh dbfs:/init_scripts/init_google.sh`
Edit your cluster, expand the "Advanced Options" section and click the "Init Scripts" tab. In "Init Script Path", enter the location of your init script (for example, `dbfs:/init_scripts/init_google.sh`) and click "Add".

**Note that everyone with access to DBFS will be able to read this file.**
There's currently no way to get around this, so make sure that it's acceptable that everyone with access to your Databricks
workspace can access your service account's credentials.

We also need to add the following to the Spark config of our cluster (also in the "Advanced Options" section):

```
fs.AbstractFileSystem.gs.impl com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS
fs.gs.impl com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
google.cloud.auth.service.account.enable true
google.cloud.auth.service.account.json.keyfile /google/credentials.json
spark.hadoop.fs.AbstractFileSystem.gs.impl com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS
spark.hadoop.fs.gs.impl com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
spark.hadoop.google.cloud.auth.service.account.enable true
spark.hadoop.google.cloud.auth.service.account.json.keyfile /google/credentials.json
```

Then click "Confirm and restart" at the top of the cluster page.

## Configure the BigQuery export settings

We'll set up the configuration to export to our destination table in BigQuery, and configure our GCS bucket to be used as temporary storage.

The way this works is by having Spark export data in JSON format to a GCS bucket, where the data is then loaded from through a BigQuery import job. The details of the intermediate export to GCS is taken care of by the Hadoop connector.
It also supports a "direct" mode which does not use GCS, but it's not recommended for loading larger amounts of data.

`Spark -> GCS -> BigQuery`

We'll configure the bucket to be cleaned up after the export, and use the `WRITE_TRUNCATE` mode for the import to completely replace any existing data.

Make sure a [BigQuery dataset](https://cloud.google.com/bigquery/docs/datasets) with the name you specified in `bigQueryDataset` exists. If not, please create it now before continuing.

In [5]:
%scala
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat
import com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputFormat

val conf = spark.sparkContext.hadoopConfiguration
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, gcpProjectId)
conf.set(BigQueryConfiguration.GCS_BUCKET_KEY, gcsBucket)

val outputTableId = s"$gcpProjectId:$bigQueryDataset.$bigQueryTable"
val outputGcsPath = s"gs://$gcsBucket/hadoop/tmp/bigquery/$bigQueryTable"

conf.set("mapreduce.job.outputformat.class", classOf[IndirectBigQueryOutputFormat[_,_]].getName)
conf.set(BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION_KEY, "WRITE_TRUNCATE")
conf.set(BigQueryConfiguration.OUTPUT_CLEANUP_TEMP_KEY, "true")

# Create a data frame for the source data

We use `"partitions", "100"` to load data in parallel, and since we'll be using this data twice (first for metadata schema inference, then for loading into BigQuery) we use `.cache()`

As of this writing the default is 20 partitions, but we may have many more cores than that, which is why we increase it.

In [7]:
%scala
val sourceDf = spark.read.format("com.cognite.spark.datasource")
  .option("apikey", dbutils.secrets.get(secret_scope, project))
  .option("type", resourceType)
  .option("partitions", "100")
  .load()
  .cache()

## Make metadata compatible with BigQuery

BigQuery has [limitations on field names](https://cloud.google.com/bigquery/docs/schemas#column_names) that are not always compatible with what can be found in the metadata field.
We'll register a [User Defined Function](https://docs.databricks.com/spark/latest/spark-sql/udf-scala.html) (UDF) to rename the keys to valid BigQuery field names.

In addition, BigQuery does not allow us to have both a key named `ABC` and `abc`, so we convert all field names to lowercase here.

In [9]:
%scala
import org.apache.spark.sql.functions.to_json
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.struct

import org.apache.spark.sql.functions.udf

val bigQueryCompatibleKeys = (m: Map[String, String]) => {
  if (m != null) {
    // Fields must contain only letters, numbers, and underscores, start with a letter or underscore, and be at most 128 characters long
    m.filter(f => f._1 != null).map { case (k, v) =>
      val newKey = k.replaceAll("[^\\w_]", "_") match {
        case x if !x.matches("^[a-zA-Z_].*") => s"_${x}"
        case x => x
      }
      (newKey.take(128).toLowerCase, v) }
  } else {
    m
  }
}
val bigQueryCompatibleKeysUDF = udf(bigQueryCompatibleKeys)

## Use schema inference on the metadata field

After renaming the metadata fields we'll use Spark's support for schema inference on JSON to get a schema for our metadata.
BigQuery also supports schema auto-detection, but [it only uses 100 rows](https://cloud.google.com/bigquery/docs/schema-detect#auto-detect) which is not nearly enough for our use case.

Using Spark we can run schema inference over the full data set.

In [11]:
%scala
val jsonDf = spark.read.json(
  sourceDf
    .where(col("metadata").isNotNull)
    .withColumn("metadata", bigQueryCompatibleKeysUDF(col("metadata")))
    .select(to_json(col("metadata"))).as[String]
)

## Convert the inferred schema to a BigQuery table schema

In order to use the schema with BigQuery we'll need to convert it to a `BigQueryTableSchema`.
This is mostly a simple matter of mapping [Spark SQL's data types](https://spark.apache.org/docs/latest/sql-reference.html) to [BigQuery's SQL data types](https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types).
Arrays are converted to BigQuery fields with the field mode set to `REPEATED`.

The only difficulty is the `metadata` field, where we use the inferred schema from the previous step to create a BigQuery `RECORD` field.

In [13]:
%scala
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableSchema
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types._
import collection.JavaConverters._

def sparkTypeToBigQueryType(c: StructField): BigQueryTableFieldSchema = {
  val fieldSchema = new BigQueryTableFieldSchema()
  fieldSchema.setName(c.name)
  val bqType = c.dataType match {
    case BinaryType => fieldSchema.setType("BYTES")
    case BooleanType => fieldSchema.setType("BOOL")
    //case CalendarIntervalType
    //case DateType => "DATE"
    case DoubleType => fieldSchema.setType("FLOAT64")
    case FloatType => fieldSchema.setType("FLOAT64")
    case IntegerType => fieldSchema.setType("INT64")
    case LongType => fieldSchema.setType("INT64")
    //case NullType => "STRING"
    case ShortType => fieldSchema.setType("INT64")
    case StringType => fieldSchema.setType("STRING")
    //case TimestampType
    case ArrayType(t, _) =>
      val elementType = sparkTypeToBigQueryType(c.copy(dataType = t))
      fieldSchema.setType(elementType.getType)
      if (elementType.getType() == "RECORD" && elementType.getFields() != null) {
        fieldSchema.setFields(elementType.getFields())
      }
      fieldSchema.setMode("REPEATED")
    case StructType(fields) =>
      fieldSchema.setType("RECORD")
      fieldSchema.setFields(fields.map(sparkTypeToBigQueryType).toList.asJava)
  }
  fieldSchema
}

val bigQuerySchema = new BigQueryTableSchema()
bigQuerySchema.setFields(sourceDf.schema.map(sf =>
  if (sf.name == "metadata") {
    val fieldSchema = new BigQueryTableFieldSchema()
    fieldSchema.setName(sf.name)
    fieldSchema.setType("RECORD")
    val fields = jsonDf.schema.map(sf1 => {
      sparkTypeToBigQueryType(sf1)
    })
    fieldSchema.setFields(fields.toList.asJava)
    fieldSchema
  } else {
    sparkTypeToBigQueryType(sf)
  }
).toList.asJava)

## Configure the BigQuery output

The output will be using the newline delimited JSON format, which is not the most efficient format but which is very well supported.
We also use the BigQuery schema we created in the previous step.

In [15]:
%scala
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat
import com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

BigQueryOutputConfiguration.configure(
    conf,
    outputTableId,
    bigQuerySchema,
    outputGcsPath,
    BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
    classOf[TextOutputFormat[_,_]])

## Write to BigQuery

There is one final complication before our data will be accepted by BigQuery. After renaming the keys to lowercase we can end up with multiple keys of the same name. This is technically valid JSON, but BigQuery does not allow it.

We can get around this by writing the row as JSON using the [Jackson ObjectMapper](https://www.baeldung.com/jackson-object-mapper-tutorial), which removes duplicate keys,
and then reading it using the [GSON JsonParser](http://tutorials.jenkov.com/java-json/gson-jsonparser.html) to get a GSON JsonObject, which is the format required by the BigQuery Hadoop connector.

**Note that this means duplicated keys will be removed at random**, which can be an issue for certain use cases.
An alternative would be to use a more sophisticated way of doing the renaming.
Please let us know if you have such a use case, or if you implement such an alternative.

Finally we use `saveAsNewAPIHadoopDataset` with the output configuration from the previous step to get the data into BigQuery.

In [17]:
%scala
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.core.`type`.TypeReference
import com.google.gson.JsonParser

import java.util.HashMap

sourceDf
  .withColumn("metadata", bigQueryCompatibleKeysUDF(col("metadata")))
  .select(to_json(struct(sourceDf.schema.map(c => col(c.name)): _*)).alias("json")) // Convert the whole row to JSON.
  .rdd
  .mapPartitions { p =>
    val mapper = new ObjectMapper()
    val typeRef = new TypeReference[java.util.Map[String, Object]](){}
    val parser = new JsonParser()
    p.map { v =>
      // Remove duplicate keys.
      val map: HashMap[String, Object] = mapper.readValue(v.getString(0), typeRef)
      val string = mapper.writeValueAsString(map)
      // Convert to a GSON JsonObject.
      // Use null as the Hadoop data set key, which is not used by the BigQuery connector.
      (null, parser.parse(string).getAsJsonObject())
    }
  }
  .saveAsNewAPIHadoopDataset(conf)

## Use BigQuery

Your data is now available for querying from BigQuery!

Here are some examples you can try using `bq` or the BigQuery UI:
- Assets:
```SELECT a.* FROM `your_bigquery_dataset.assets` a
CROSS JOIN unnest(a.types) AS type
WHERE type.name = "valve"
``` (lists all assets of type "valve"),
```
select id, description from `your_bigquery_dataset.assets`
where exists (
  select * from unnest(path) as x inner join `your_bigquery_dataset.assets` a on x = a.id where a.description = 'VRD - PH 1STSTGDISCHCLR GAS OUT EQ'
) order by 1
``` (descriptions of all assets related to, and including, "VRD - PH 1STSTGDISCHCLR GAS OUT EQ")
- Events: ```
SELECT count(*), type, subtype FROM `your_bigquery_dataset.events` GROUP BY type, subtype ORDER BY 1 DESC
``` (number of events by type and subtype)
- Timeseries metadata: ```
SELECT metadata.engunits, count(*) FROM `your_bigquery_dataset.timeseries` GROUP BY metadata.engunits ORDER BY 2 DESC
``` (number of time series by engineering unit)