Skip to content

Commit

Permalink
refactoring from HLG to from_map
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-rakowski committed Jun 25, 2024
1 parent f76e5ca commit e30edbd
Showing 1 changed file with 31 additions and 21 deletions.
52 changes: 31 additions & 21 deletions dask_bigquery/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@
from contextlib import contextmanager
from functools import partial

import dask.dataframe as dd
import gcsfs
import pandas as pd
import pyarrow
from dask.base import tokenize
from dask.dataframe.core import new_dd_object
from dask.highlevelgraph import HighLevelGraph
from dask.layers import DataFrameIOLayer
from google.api_core import client_info as rest_client_info
from google.api_core import exceptions
from google.api_core.gapic_v1 import client_info as grpc_client_info
Expand Down Expand Up @@ -205,20 +203,7 @@ def make_create_read_session_request():
pyarrow.py_buffer(session.arrow_schema.serialized_schema)
)
meta = schema.empty_table().to_pandas(**arrow_options)

label = "read-gbq-"
output_name = label + tokenize(
project_id,
dataset_id,
table_id,
row_filter,
read_kwargs,
)

layer = DataFrameIOLayer(
output_name,
meta.columns,
[stream.name for stream in session.streams],
return dd.from_map(
partial(
bigquery_read,
make_create_read_session_request=make_create_read_session_request,
Expand All @@ -227,12 +212,37 @@ def make_create_read_session_request():
arrow_options=arrow_options,
credentials=credentials,
),
label=label,
[stream.name for stream in session.streams],
meta=meta,
)
divisions = tuple([None] * (len(session.streams) + 1))

graph = HighLevelGraph({output_name: layer}, {output_name: set()})
return new_dd_object(graph, output_name, meta, divisions)
# label = "read-gbq-"
# output_name = label + tokenize(
# project_id,
# dataset_id,
# table_id,
# row_filter,
# read_kwargs,
# )

# layer = DataFrameIOLayer(
# output_name,
# meta.columns,
# [stream.name for stream in session.streams],
# partial(
# bigquery_read,
# make_create_read_session_request=make_create_read_session_request,
# project_id=project_id,
# read_kwargs=read_kwargs,
# arrow_options=arrow_options,
# credentials=credentials,
# ),
# label=label,
# )
# divisions = tuple([None] * (len(session.streams) + 1))

# graph = HighLevelGraph({output_name: layer}, {output_name: set()})
# return new_dd_object(graph, output_name, meta, divisions)


def to_gbq(
Expand Down

0 comments on commit e30edbd

Please sign in to comment.