Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

to_bigquery ideas - no intermediate storage #3

Open
ncclementi opened this issue Aug 9, 2021 · 13 comments
Open

to_bigquery ideas - no intermediate storage #3

ncclementi opened this issue Aug 9, 2021 · 13 comments

Comments

@ncclementi
Copy link
Contributor

Currently, the to_bigquery presented in the gist uses temporary storage, I think this is not ideal given that the user will have to create the storage to be able to do this.

I was wondering if it would be possible to take a similar approach what to was done for dask-mongo where the write_bgq would be using pandas.to_gbq() on the pandas df that comes from each partition. Where partitions will look something like

def to_bgq(ddf, some_args):

      partitions = [
            write_gbq(partition, connection_args)
            for partition in ddf.to_delayed()
        ]

       dask.compute(partitions)

and write_bigquery will have something of the form:

@delayed
def write_gbq():
     with bigquery.Client() as bq_client:
            pd.to_gbq(df, some_args) 
@ncclementi ncclementi changed the title to_bigquery ideas - no intermetdiate storage to_bigquery ideas - no intermediate storage Aug 10, 2021
@ncclementi
Copy link
Contributor Author

This is what I got so far, the code below works if the table already exists, however, if I try with a none existing table
it's only able to write one partition. I'm using the keyword if_exists='append.

Exception: GenericGBQException('Reason: 409 POST https://bigquery.googleapis.com/bigquery/v2/projects/dask-
bigquery/datasets/test_dataset/tables?prettyPrint=false: Already Exists: Table dask-
bigquery:test_dataset.test_dask_write')

I think the reason why this happens, is because initially all the partitions are trying to create a table, and once one of them did it, then all the others erred. I'm trying to see if for the case where the table doesn't exist and a user would like to use if_exisits = 'append' if we can create an empty table before hand and the write to it.

from dask import delayed
import pandas as pd
import pandas_gbq
import dask.dataframe as dd
import dask 
from distributed import get_client
from typing import Dict

@delayed
def write_gbq(
    df: pd.DataFrame,
    destination_table,
    project_id,
    table_cond,
):

    pandas_gbq.to_gbq(df,
            destination_table=destination_table, 
            project_id=project_id, if_exists=table_cond)


def to_gbq(
    ddf: dd.DataFrame,
    *,
    destination_table: str,
    project_id: str,
    table_cond: str,
    compute_options: Dict = None,
):

    if compute_options is None:
        compute_options = {}

    partitions = [
        write_gbq(partition, destination_table, project_id, table_cond)
        for partition in ddf.to_delayed()
    ]

    try:
        client = get_client()
    except ValueError:
        # Using single-machine scheduler
        dask.compute(partitions, **compute_options)
    else:
        return client.compute(partitions, **compute_options)

Any ideas are welcome...I Will put this into a separate branch soon, trying to figure out the best approach since the reading part is not ready/merged yet.

cc: @bnaul, @jrbourbeau

@ncclementi
Copy link
Contributor Author

It seems we will have to provide some sort of schema to be able to create an empty table, or somehow write one row to create the table and then append the rest of the partitions

https://stackoverflow.com/questions/59179196/how-to-create-an-empty-table-with-specific-schema-in-gbq

@bnaul
Copy link
Contributor

bnaul commented Aug 11, 2021

We went down this road (with bigquery.Client.load_table_from_dataframe instead of pandas_gbq but same thing basically) but abandoned it for a couple of reasons in favor of a Parquet intermediary:

  1. you will quickly run into BQ rate limits with more than a couple of concurrent writes; GCS has basically no such limits.
  2. tasks that retry will duplicate data, whereas GCS writes are idempotent
  3. I think you can get around both of these with the Streaming API instead, but then you have to wait an indeterminate amount of time for all the data to actually appear in the table, which won’t work for many (most?) use cases

@ncclementi
Copy link
Contributor Author

Thanks for your response @bnaul, then we might want to wait to implement the writing since relying on an intermediate storage step might not be ideal.

@ncclementi
Copy link
Contributor Author

It seems we will have to provide some sort of schema to be able to create an empty table, or somehow write one row to create the table and then append the rest of the partitions

https://stackoverflow.com/questions/59179196/how-to-create-an-empty-table-with-specific-schema-in-gbq

I've been trying to implement this by creating an empty table with the proper schema inferred from dask dataframes, but no luck. the table gets created but with no schema at all. I open an issue describing the problem on the pandas-gbq end, leaving the link here for documentation purposes

googleapis/python-bigquery-pandas#376

@bnaul
Copy link
Contributor

bnaul commented Oct 13, 2021

@ncclementi @jrbourbeau there's a new(ish?) "Storage Write API" that's the analog of what we're currently using for reads: https://cloud.google.com/bigquery/docs/write-api#advantages. This bit in particular seems to address my comment above:

Exactly-once delivery semantics. The Storage Write API supports exactly-once semantics through the use of stream offsets. Unlike the insertAll method, the Storage Write API never writes two messages that have the same offset within a stream, if the client provides stream offsets when appending records.

I don't see anything about dataframe or pyarrow support though, only GRPC...maybe @tswast could clarify whether there's anything in the works upstream that might facilitate using this API here?

@tswast
Copy link

tswast commented Oct 13, 2021

The API semantics are a great fit, and I do eventually want to build a pandas DataFrame -> BQ Storage Write API connector.

Unfortunately, that's a tough task, as the only supported data format in the backend is Protocol Buffers v2 wire format (proto2). Converting from DataFrame -> proto2 is going to take some work, especially if we want to do it efficiently.

@mrocklin
Copy link
Member

@tswast or @bnaul any update on this? Has GBQ grown anything like an Arrow bulk insert adapter?

@mrocklin
Copy link
Member

If there isn't a nice way to push data in directly I'd be open to using an ephemeral Parquet dataset.

def to_gbq(df, table, ...):
    temp_filename = "gs://some-temporary-storage
    try:
        df.to_parquet(temp_filename)
        bigquery.Client.load_table_from_uri("gs://some_temp_path/")
    finally:
        gcsfs.rmdir(temp_filename)

This is error prone though, and potentially in an expensive way. We'd maybe want that function to include a runtime warning?

Or maybe this is a bad idea generally, and we should resolve it with best practices documentation instead.

@tswast
Copy link

tswast commented Apr 12, 2023

@tswast or @bnaul any update on this? Has GBQ grown anything like an Arrow bulk insert adapter?

Unfortunately not. I found a public issue, but the corresponding internal issue is still in the backlog. https://issuetracker.google.com/249245481 Arrow format support for BigQuery Storage Write API

Ephemeral Parquet seems to be the way some of my colleagues over on the AI side of GCP are leaning too. You might be able to use some data cycle management to automatically delete temp files in case of the program ending before cleanup can run. https://cloud.google.com/storage/docs/lifecycle-configurations#deletion-example

@mrocklin
Copy link
Member

You might be able to use some data cycle management to automatically delete temp files in case of the program ending before cleanup can run.

Ah, that does sound like a more robust approach. Thanks for the suggestion.

@bnaul
Copy link
Contributor

bnaul commented Apr 12, 2023

Yep that's exactly what we do, we have a temporary bucket with a 24 hour lifespan and we run this function for the upload (might be some internal helpers here, probably isn't 100% runnable but you get the gist)

def dask_df_to_gbq(
    ddf: dd.DataFrame,
    project_id: str = None,
    dataset_id: str = None,
    table_id: str = None,
    bq_schema: List[bigquery.schema.SchemaField] = None,
    pa_schema: pyarrow.Schema = None,
    partition_by: str = None,
    cluster_by: List[str] = None,
    clear_existing: bool = True,
    retries: int = None,
    write_index: bool = False,
    row_group_size: int = None,
    write_disposition: str = "WRITE_EMPTY",
    job_id: str = None,
):
    """Upload dask dataframe to BigQuery using GCS intermediary.

    Args:
        ddf: dask dataframe to upload
        project_id: BigQuery project
        dataset_id: BigQuery dataset within project
        table_id: BigQuery table within dataset
        pa_schema: parquet schema
        partition_by: (date or timestamp) field to partition by
        cluster_by: field to cluster by
        clear_existing: whether to delete the existing table
        retries: number of retries for dask computation
        write_index: whether to write index in parquet
        row_group_size: the number of rows within a group when saving the parquet file
    """

    dask_tmp_pattern = "gs://model_bigquery_tmp/dask_dataframe_tmp/{token}/{timestamp}/*.parquet"
    dask_tmp_path = dask_tmp_pattern.format(token=tokenize(ddf), timestamp=int(1e6 * time.time()))
    logging.info(f"Writing dask dataframe to {dask_tmp_path} ...")
    ddf.to_parquet(
        path=os.path.dirname(dask_tmp_path),
        engine="pyarrow",
        write_index=write_index,
        write_metadata_file=False,
        schema=pa_schema,
        row_group_size=row_group_size,
    )

    with bigquery_client(project_id) as bq_client:
        if table_id:
            if not dataset_id:
                raise ValueError("Cannot pass table_id without dataset_id")
            dataset_ref = bq_client.create_dataset(dataset_id, exists_ok=True)
            table_ref = dataset_ref.table(table_id)
            if clear_existing:
                bq_client.delete_table(table_ref, not_found_ok=True)
        else:
            table_ref = get_temporary_table(bq_client)
            logging.info("Loading to temporary table %s", table_ref.table_id)
        logging.info(
            "Loading %s to %s.%s.%s ...",
            dask_tmp_path,
            table_ref.project,
            table_ref.dataset_id,
            table_ref.table_id,
        )

        table_ref = bq_client.create_dataset(dataset_id, exists_ok=True).table(table_id)
        job_config = bigquery.LoadJobConfig(source_format=source_format, **job_config_kwargs)
        load_job = bq_client.load_table_from_uri(
                outfile_pattern,
                table_ref,
                job_config=job_config,
                job_id_prefix=job_id_prefix,
                job_id=job_id,
            )

        if block:
            try:
                load_job.result()  # Waits for table load to complete.
            except ClientError:
                logging.error(f"Load job failed with the following errors: {load_job.errors}")
                raise
            logging.info(
                "Done importing %d rows from %s to %s.%s.%s",
                load_job.output_rows,
                outfile_pattern,
                project,
                dataset_id,
                table_id,
            )

    return load_job

@dchudz
Copy link

dchudz commented Apr 12, 2023

Brett's function looks much better, but in case it helps anyone (e.g. anyone wanting to walk through things in ipython like I just was) I was just messing around and had got this example working:

import coiled
c = coiled.Cluster(n_workers=4)
import pandas as pd
df = pd.DataFrame({"aaa": ["a" + str(i) for i in range(100)], "bbb": ["b" + str(i) for i in range(100)]})
client = c.get_client()
import dask.dataframe as dd
ddf = dd.from_pandas(df, npartitions=8)

creds_info = ... # redacted! This is a dict that looks like GCP creds JSON

from google.cloud import storage
parquet_location = "gs://david-demo-1-bucket/ab12345.parquet"
ddf.to_parquet(parquet_location, storage_options={"token": creds_info})

from google.cloud import bigquery
from google.oauth2.service_account import Credentials
creds = Credentials.from_service_account_info(info=creds_info)
bq_client = bigquery.Client(credentials=creds)
job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.PARQUET,
    autodetect=True,
)
uri = "gs://david-demo-1-bucket/ab12345.parquet/*"
load_job = bq_client.load_table_from_uri(
    parquet_location + "/*", "test_dataset.ab123456", job_config=job_config
)
load_job.result()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants