Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
.DS_Store
*.egg
/.env/
/.vscode/
*.pyc
.cache/
*.iml
/pybigquery.egg-info
/dist
/build
/dist
/pybigquery.egg-info
.idea/
.cache/
.pytest_cache/
env
venv/
*.egg
*.iml
*.pyc
1 change: 1 addition & 0 deletions dev_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ google-cloud-bigquery>=1.6.0
future==0.16.0

pytest==3.2.2
pytest-flake8==1.0.6
pytz==2017.2
33 changes: 21 additions & 12 deletions pybigquery/parse_url.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import re
GROUP_DELIMITER = re.compile(r'\s*\,\s*')
KEY_VALUE_DELIMITER = re.compile(r'\s*\:\s*')

from google.cloud.bigquery import QueryJobConfig
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery.job import CreateDisposition, WriteDisposition, QueryPriority, SchemaUpdateOption

from google.cloud.bigquery.table import EncryptionConfiguration, TableReference
from google.cloud.bigquery.dataset import DatasetReference

GROUP_DELIMITER = re.compile(r'\s*\,\s*')
KEY_VALUE_DELIMITER = re.compile(r'\s*\:\s*')


def parse_boolean(bool_string):
bool_string = bool_string.lower()
Expand All @@ -17,17 +18,22 @@ def parse_boolean(bool_string):
else:
raise ValueError()

def parse_url(url):

def parse_url(url): # noqa: C901
query = url.query

# use_legacy_sql (legacy)
if 'use_legacy_sql' in query: raise ValueError("legacy sql is not supported by this dialect")
if 'use_legacy_sql' in query:
raise ValueError("legacy sql is not supported by this dialect")
# allow_large_results (legacy)
if 'allow_large_results' in query: raise ValueError("allow_large_results is only allowed for legacy sql, which is not supported by this dialect")
if 'allow_large_results' in query:
raise ValueError("allow_large_results is only allowed for legacy sql, which is not supported by this dialect")
# flatten_results (legacy)
if 'flatten_results' in query: raise ValueError("flatten_results is only allowed for legacy sql, which is not supported by this dialect")
if 'flatten_results' in query:
raise ValueError("flatten_results is only allowed for legacy sql, which is not supported by this dialect")
# maximum_billing_tier (deprecated)
if 'maximum_billing_tier' in query: raise ValueError("maximum_billing_tier is a deprecated argument")
if 'maximum_billing_tier' in query:
raise ValueError("maximum_billing_tier is a deprecated argument")

project_id = url.host
location = None
Expand Down Expand Up @@ -77,7 +83,8 @@ def parse_url(url):

# default_dataset
if 'default_dataset' in query or 'dataset_id' in query or 'project_id' in query:
raise ValueError("don't pass default_dataset, dataset_id, project_id in url query, instead use the url host and database")
raise ValueError(
"don't pass default_dataset, dataset_id, project_id in url query, instead use the url host and database")

# destination
if 'destination' in query:
Expand All @@ -88,13 +95,15 @@ def parse_url(url):
try:
dest_project, dest_dataset, dest_table = query['destination'].split('.')
except ValueError:
raise ValueError("url query destination parameter should be fully qualified with project, dataset, and table")
raise ValueError(
"url query destination parameter should be fully qualified with project, dataset, and table")

job_config.destination = TableReference(DatasetReference(dest_project, dest_dataset), dest_table)

# destination_encryption_configuration
if 'destination_encryption_configuration' in query:
job_config.destination_encryption_configuration = EncryptionConfiguration(query['destination_encryption_configuration'])
job_config.destination_encryption_configuration = EncryptionConfiguration(
query['destination_encryption_configuration'])

# dry_run
if 'dry_run' in query:
Expand Down
20 changes: 15 additions & 5 deletions pybigquery/sqlalchemy_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

from google import auth
from google.cloud import bigquery
from google.cloud.bigquery import dbapi, QueryJobConfig
from google.cloud.bigquery import dbapi
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.table import EncryptionConfiguration, TableReference
from google.cloud.bigquery.table import TableReference
from google.oauth2 import service_account
from google.api_core.exceptions import NotFound
from sqlalchemy.exc import NoSuchTableError
Expand Down Expand Up @@ -92,6 +92,7 @@ def format_label(self, label, name=None):
result = self.quote(name)
return result


_type_map = {
'STRING': types.String,
'BOOLEAN': types.Boolean,
Expand Down Expand Up @@ -310,7 +311,6 @@ def _add_default_dataset_to_job_config(job_config, project_id, dataset_id):

job_config.default_dataset = '{}.{}'.format(project_id, dataset_id)


def _create_client_from_credentials(self, credentials, default_query_job_config, project_id):
if project_id is None:
project_id = credentials.project_id
Expand Down Expand Up @@ -426,10 +426,20 @@ def _table_reference(self, provided_schema_name, provided_table_name,
raise ValueError("Did not understand schema: {}".format(provided_schema_name))
if (dataset_id_from_schema and dataset_id_from_table and
dataset_id_from_schema != dataset_id_from_table):
raise ValueError("dataset_id specified in schema and table_name disagree: got {} in schema, and {} in table_name".format(dataset_id_from_schema, dataset_id_from_table))
raise ValueError(
"dataset_id specified in schema and table_name disagree: "
"got {} in schema, and {} in table_name".format(
dataset_id_from_schema, dataset_id_from_table
)
)
if (project_id_from_schema and project_id_from_table and
project_id_from_schema != project_id_from_table):
raise ValueError("project_id specified in schema and table_name disagree: got {} in schema, and {} in table_name".format(project_id_from_schema, project_id_from_table))
raise ValueError(
"project_id specified in schema and table_name disagree: "
"got {} in schema, and {} in table_name".format(
project_id_from_schema, project_id_from_table
)
)
project_id = project_id_from_schema or project_id_from_table or client_project
dataset_id = dataset_id_from_schema or dataset_id_from_table or self.dataset_id

Expand Down
9 changes: 8 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
[egg_info]
tag_build =

[flake8]
application-import-names = pybigquery
import-order-style = pep8
inline-quotes = double
max-line-length = 120
max-complexity = 18

[tool:pytest]
addopts= --tb short --capture no
addopts= --tb short --capture no --color=yes --flake8
python_files=test/*test_*.py

[sqla_testing]
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ def readme():
with io.open("README.rst", "r", encoding="utf8") as f:
return f.read()


setup(
name="pybigquery",
version='0.4.15',
Expand Down
2 changes: 1 addition & 1 deletion test/conftest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from sqlalchemy.dialects import registry

registry.register("bigquery", "pybigquery.sqlalchemy_bigquery", "BigQueryDialect")
registry.register("bigquery", "pybigquery.sqlalchemy_bigquery", "BigQueryDialect")
16 changes: 9 additions & 7 deletions test/test_parse_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,15 @@ def test_basic(url_with_everything):
assert credentials_path == '/some/path/to.json'
assert isinstance(job_config, QueryJobConfig)


@pytest.mark.parametrize('param, value', [
('clustering_fields', ['a', 'b', 'c']),
('create_disposition', 'CREATE_IF_NEEDED'),
('destination', TableReference(DatasetReference('different-project', 'different-dataset'), 'table')),
('destination_encryption_configuration', lambda enc: enc.kms_key_name == EncryptionConfiguration('some-configuration').kms_key_name),
('destination_encryption_configuration',
lambda enc: enc.kms_key_name == EncryptionConfiguration('some-configuration').kms_key_name),
('dry_run', True),
('labels', { 'a': 'b', 'c': 'd' }),
('labels', {'a': 'b', 'c': 'd'}),
('maximum_bytes_billed', 1000),
('priority', 'INTERACTIVE'),
('schema_update_options', ['ALLOW_FIELD_ADDITION', 'ALLOW_FIELD_RELAXATION']),
Expand All @@ -77,11 +79,6 @@ def test_all_values(url_with_everything, param, value):
else:
assert config_value == value

# def test_malformed():
# location, dataset_id, arraysize, credentials_path, job_config = parse_url(make_url('bigquery:///?credentials_path=a'))

# print(credentials_path)
# assert False

@pytest.mark.parametrize("param, value", [
('arraysize', 'not-int'),
Expand All @@ -100,13 +97,15 @@ def test_bad_values(param, value):
with pytest.raises(ValueError):
parse_url(url)


def test_empty_url():
for value in parse_url(make_url('bigquery://')):
assert value is None

for value in parse_url(make_url('bigquery:///')):
assert value is None


def test_empty_with_non_config():
url = parse_url(make_url('bigquery:///?location=some-location&arraysize=1000&credentials_path=/some/path/to.json'))
project_id, location, dataset_id, arraysize, credentials_path, job_config = url
Expand All @@ -118,6 +117,7 @@ def test_empty_with_non_config():
assert credentials_path == '/some/path/to.json'
assert job_config is None


def test_only_dataset():
url = parse_url(make_url('bigquery:///some-dataset'))
project_id, location, dataset_id, arraysize, credentials_path, job_config = url
Expand All @@ -131,6 +131,7 @@ def test_only_dataset():
# we can't actually test that the dataset is on the job_config,
# since we take care of that afterwards, when we have a client to fill in the project


@pytest.mark.parametrize('disallowed_arg', [
'use_legacy_sql',
'allow_large_results',
Expand All @@ -145,6 +146,7 @@ def test_disallowed(disallowed_arg):
with pytest.raises(ValueError):
parse_url(url)


@pytest.mark.parametrize('not_implemented_arg', [
'query_parameters',
'table_definitions',
Expand Down
24 changes: 15 additions & 9 deletions test/test_sqlalchemy_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ def session_using_test_dataset(engine_using_test_dataset):
def inspector(engine):
return inspect(engine)


@pytest.fixture(scope='session')
def inspector_using_test_dataset(engine_using_test_dataset):
return inspect(engine_using_test_dataset)
Expand Down Expand Up @@ -197,8 +198,8 @@ def test_dry_run(engine, api_client):
sql = 'SELECT * FROM sample_one_row'
with pytest.raises(BadRequest) as excinfo:
api_client.dry_run_query(sql)

assert 'Table name "sample_one_row" missing dataset while no default dataset is set in the request.' in str(excinfo.value.message)
expected_message = 'Table name "sample_one_row" missing dataset while no default dataset is set in the request.'
assert expected_message in str(excinfo.value.message)


def test_engine_with_dataset(engine_using_test_dataset):
Expand Down Expand Up @@ -278,7 +279,7 @@ def test_reflect_select_shared_table(engine):

def test_reflect_table_does_not_exist(engine):
with pytest.raises(NoSuchTableError):
table = Table('test_pybigquery.table_does_not_exist', MetaData(bind=engine), autoload=True)
Table('test_pybigquery.table_does_not_exist', MetaData(bind=engine), autoload=True)

assert Table('test_pybigquery.table_does_not_exist', MetaData(bind=engine)).exists() is False

Expand Down Expand Up @@ -313,8 +314,8 @@ def test_nested_labels(engine, table):
col = table.c.integer
exprs = [
sqlalchemy.func.sum(
sqlalchemy.func.sum(col.label("inner")
).label("outer")).over(),
sqlalchemy.func.sum(col.label("inner")).label("outer")
).over(),
sqlalchemy.func.sum(
sqlalchemy.case([[
sqlalchemy.literal(True),
Expand Down Expand Up @@ -345,7 +346,10 @@ def test_session_query(session, table, session_using_test_dataset, table_using_t
table.c.string,
col_concat,
func.avg(table.c.integer),
func.sum(case([(table.c.boolean == True, 1)], else_=0))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: I was getting a test failure for "is True". Changing to compare with equality and a sqlalchemy literal fixed the test case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot!

func.sum(case(
[(table.c.boolean == sqlalchemy.literal(True), 1)],
else_=0
))
)
.group_by(table.c.string, col_concat)
.having(func.avg(table.c.integer) > 10)
Expand Down Expand Up @@ -442,7 +446,7 @@ def test_dml(engine, session, table_dml):

def test_create_table(engine):
meta = MetaData()
table = Table(
Table(
'test_pybigquery.test_table_create', meta,
Column('integer_c', sqlalchemy.Integer, doc="column description"),
Column('float_c', sqlalchemy.Float),
Expand Down Expand Up @@ -472,6 +476,7 @@ class TableTest(Base):
Base.metadata.create_all(engine)
Base.metadata.drop_all(engine)


def test_schemas_names(inspector, inspector_using_test_dataset):
datasets = inspector.get_schema_names()
assert 'test_pybigquery' in datasets
Expand Down Expand Up @@ -507,10 +512,10 @@ def test_view_names(inspector, inspector_using_test_dataset):


def test_get_indexes(inspector, inspector_using_test_dataset):
for table in ['test_pybigquery.sample', 'test_pybigquery.sample_one_row']:
for _ in ['test_pybigquery.sample', 'test_pybigquery.sample_one_row']:
indexes = inspector.get_indexes('test_pybigquery.sample')
assert len(indexes) == 2
assert indexes[0] == {'name':'partition', 'column_names': ['timestamp'], 'unique': False}
assert indexes[0] == {'name': 'partition', 'column_names': ['timestamp'], 'unique': False}
assert indexes[1] == {'name': 'clustering', 'column_names': ['integer', 'string'], 'unique': False}


Expand Down Expand Up @@ -555,6 +560,7 @@ def test_table_reference(dialect, provided_schema_name,
assert ref.dataset_id == 'dataset'
assert ref.project == 'project'


@pytest.mark.parametrize('provided_schema_name,provided_table_name,client_project',
[
('project.dataset', 'other_dataset.table', 'project'),
Expand Down