diff --git a/data/simple.csv b/data/simple.csv new file mode 100644 index 0000000..01a0403 --- /dev/null +++ b/data/simple.csv @@ -0,0 +1,2 @@ +person_id,name +1,ulysses diff --git a/data/simple.json b/data/simple.json new file mode 100644 index 0000000..7741818 --- /dev/null +++ b/data/simple.json @@ -0,0 +1,17 @@ + +{ + "primaryKey": "person_id", + "fields": [ + { + "name": "person_id", + "type": "integer", + "constraints": { + "required": true + } + }, + { + "name": "name", + "type": "string" + } + ] +} diff --git a/jsontableschema_sql/mappers.py b/jsontableschema_sql/mappers.py index dd43f6c..7849962 100644 --- a/jsontableschema_sql/mappers.py +++ b/jsontableschema_sql/mappers.py @@ -6,7 +6,7 @@ import six from sqlalchemy import ( - Column, PrimaryKeyConstraint, ForeignKeyConstraint, + Column, PrimaryKeyConstraint, ForeignKeyConstraint, Index, Text, VARCHAR, Float, Integer, Boolean, Date, Time, DateTime) from sqlalchemy.dialects.postgresql import ARRAY, JSON, JSONB, UUID @@ -27,13 +27,15 @@ def tablename_to_bucket(prefix, tablename): return None -def descriptor_to_columns_and_constraints(prefix, bucket, descriptor): +def descriptor_to_columns_and_constraints(prefix, bucket, descriptor, index_fields): """Convert descriptor to SQLAlchemy columns and constraints. """ # Init columns = [] + column_mapping = {} constraints = [] + indexes = [] tablename = bucket_to_tablename(prefix, bucket) # Mapping @@ -61,6 +63,13 @@ def descriptor_to_columns_and_constraints(prefix, bucket, descriptor): nullable = not field.get('constraints', {}).get('required', False) column = Column(field['name'], column_type, nullable=nullable) columns.append(column) + column_mapping[field['name']] = column + + # Indexes + for i, index_definition in enumerate(index_fields): + name = tablename + '_ix%03d' % i + index_columns = [column_mapping[field_name] for field_name in index_definition] + indexes.append(Index(name, *index_columns)) # Primary key pk = descriptor.get('primaryKey', None) @@ -87,7 +96,7 @@ def descriptor_to_columns_and_constraints(prefix, bucket, descriptor): constraint = ForeignKeyConstraint(fields, foreign_fields) constraints.append(constraint) - return (columns, constraints) + return (columns, constraints, indexes) def columns_and_constraints_to_descriptor(prefix, tablename, columns, constraints): diff --git a/jsontableschema_sql/storage.py b/jsontableschema_sql/storage.py index 18d9220..6df9fa0 100644 --- a/jsontableschema_sql/storage.py +++ b/jsontableschema_sql/storage.py @@ -23,24 +23,29 @@ class Storage(object): engine (object): SQLAlchemy engine dbschema (str): database schema name prefix (str): prefix for all buckets - + reflect_only (callable): a boolean predicate to filter + the list of table names when reflecting """ # Public - def __init__(self, engine, dbschema=None, prefix=''): + def __init__(self, engine, dbschema=None, prefix='', reflect_only=None): # Set attributes self.__connection = engine.connect() self.__dbschema = dbschema self.__prefix = prefix self.__descriptors = {} + if reflect_only is not None: + self.__only = reflect_only + else: + self.__only = lambda _: True # Create metadata self.__metadata = MetaData( bind=self.__connection, - schema=self.__dbschema, - reflect=True) + schema=self.__dbschema) + self.__reflect() def __repr__(self): @@ -64,7 +69,24 @@ def buckets(self): return buckets - def create(self, bucket, descriptor, force=False): + def create(self, bucket, descriptor, force=False, indexes_fields=None): + """Create table by schema. + + Parameters + ---------- + table: str/list + Table name or list of table names. + schema: dict/list + JSONTableSchema schema or list of schemas. + indexes_fields: list + list of tuples containing field names, or list of such lists + + Raises + ------ + RuntimeError + If table already exists. + + """ # Make lists buckets = bucket @@ -73,6 +95,12 @@ def create(self, bucket, descriptor, force=False): descriptors = descriptor if isinstance(descriptor, dict): descriptors = [descriptor] + if indexes_fields is None or len(indexes_fields) == 0: + indexes_fields = [()] * len(descriptors) + elif type(indexes_fields[0][0]) not in {list, tuple}: + indexes_fields = [indexes_fields] + assert len(indexes_fields) == len(descriptors) + assert len(buckets) == len(descriptors) # Check buckets for existence for bucket in reversed(self.buckets): @@ -83,17 +111,17 @@ def create(self, bucket, descriptor, force=False): self.delete(bucket) # Define buckets - for bucket, descriptor in zip(buckets, descriptors): + for bucket, descriptor, index_fields in zip(buckets, descriptors, indexes_fields): # Add to schemas self.__descriptors[bucket] = descriptor - # Crate table + # Create table jsontableschema.validate(descriptor) tablename = mappers.bucket_to_tablename(self.__prefix, bucket) - columns, constraints = mappers.descriptor_to_columns_and_constraints( - self.__prefix, bucket, descriptor) - Table(tablename, self.__metadata, *(columns+constraints)) + columns, constraints, indexes = mappers.descriptor_to_columns_and_constraints( + self.__prefix, bucket, descriptor, index_fields) + Table(tablename, self.__metadata, *(columns+constraints+indexes)) # Create tables, update metadata self.__metadata.create_all() @@ -128,7 +156,7 @@ def delete(self, bucket=None, ignore=False): # Drop tables, update metadata self.__metadata.drop_all(tables=tables) self.__metadata.clear() - self.__metadata.reflect() + self.__reflect() def describe(self, bucket, descriptor=None): @@ -208,3 +236,13 @@ def __get_table(self, bucket): tablename = '.'.join(self.__dbschema, tablename) return self.__metadata.tables[tablename] + + def __reflect(self): + def only(name, _): + ret = ( + self.__only(name) and + mappers.tablename_to_bucket(self.__prefix, name) is not None + ) + return ret + + self.__metadata.reflect(only=only) diff --git a/tests/test_mappers.py b/tests/test_mappers.py index b1f2487..3cf91fe 100644 --- a/tests/test_mappers.py +++ b/tests/test_mappers.py @@ -17,7 +17,7 @@ def test_bucket_to_tablename(): def test_tablename_to_bucket(): assert mappers.tablename_to_bucket('prefix_', 'prefix_bucket') == 'bucket' - assert mappers.tablename_to_bucket('prefix_', 'xxxxxx_bucket') == None + assert mappers.tablename_to_bucket('prefix_', 'xxxxxx_bucket') is None def test_descriptor_to_columns_and_constraints_not_supported_type(): @@ -26,7 +26,7 @@ def test_descriptor_to_columns_and_constraints_not_supported_type(): } with pytest.raises(TypeError): mappers.descriptor_to_columns_and_constraints( - 'prefix_', 'bucket', descriptor) + 'prefix_', 'bucket', descriptor, []) def test_columns_and_constraints_to_descriptor_not_supported_type(): diff --git a/tests/test_storage.py b/tests/test_storage.py index aacf2fc..7f1281c 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -9,7 +9,6 @@ import json import pytest from copy import deepcopy -from decimal import Decimal from tabulator import Stream from jsontableschema import Schema from sqlalchemy import create_engine @@ -38,8 +37,9 @@ def test_storage(): # Create buckets storage.create( - ['articles', 'comments'], - [articles_descriptor, comments_descriptor]) + ['articles', 'comments'], + [articles_descriptor, comments_descriptor], + indexes_fields=[[['rating'], ['name'], ['created_datetime']], []]) # Recreate bucket storage.create('comments', comments_descriptor, force=True) @@ -73,10 +73,42 @@ def test_storage(): with pytest.raises(RuntimeError): storage.delete('non_existent') + # Delete buckets storage.delete() +def test_only_parameter(): + # Check the 'only' parameter + + # Get resources + simple_descriptor = json.load(io.open('data/simple.json', encoding='utf-8')) + + # Engine + engine = create_engine(os.environ['DATABASE_URL'], echo=True) + + # Storage + storage = Storage(engine=engine, prefix='test_only_') + + # Delete buckets + storage.delete() + + # Create buckets + storage.create( + 'names', + simple_descriptor, + indexes_fields=[['person_id']]) + + def only(table): + ret = 'name' not in table + return ret + engine = create_engine(os.environ['DATABASE_URL'], echo=True) + storage = Storage(engine=engine, prefix='test_only_', reflect_only=only) + # Delete non existent bucket + with pytest.raises(RuntimeError): + storage.delete('names') + + def test_storage_bigdata(): # Generate schema/data