Skip to content

Commit

Permalink
Added index creation support (#35)
Browse files Browse the repository at this point in the history
* Automatically add indexes to all primary keys

* Add interface for specifying which indexes to create

* Limit reflect only to our tables
  • Loading branch information
akariv authored and roll committed Dec 5, 2016
1 parent 82be737 commit 8c4434e
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 19 deletions.
2 changes: 2 additions & 0 deletions data/simple.csv
@@ -0,0 +1,2 @@
person_id,name
1,ulysses
17 changes: 17 additions & 0 deletions data/simple.json
@@ -0,0 +1,17 @@

{
"primaryKey": "person_id",
"fields": [
{
"name": "person_id",
"type": "integer",
"constraints": {
"required": true
}
},
{
"name": "name",
"type": "string"
}
]
}
15 changes: 12 additions & 3 deletions jsontableschema_sql/mappers.py
Expand Up @@ -6,7 +6,7 @@

import six
from sqlalchemy import (
Column, PrimaryKeyConstraint, ForeignKeyConstraint,
Column, PrimaryKeyConstraint, ForeignKeyConstraint, Index,
Text, VARCHAR, Float, Integer, Boolean, Date, Time, DateTime)
from sqlalchemy.dialects.postgresql import ARRAY, JSON, JSONB, UUID

Expand All @@ -27,13 +27,15 @@ def tablename_to_bucket(prefix, tablename):
return None


def descriptor_to_columns_and_constraints(prefix, bucket, descriptor):
def descriptor_to_columns_and_constraints(prefix, bucket, descriptor, index_fields):
"""Convert descriptor to SQLAlchemy columns and constraints.
"""

# Init
columns = []
column_mapping = {}
constraints = []
indexes = []
tablename = bucket_to_tablename(prefix, bucket)

# Mapping
Expand Down Expand Up @@ -61,6 +63,13 @@ def descriptor_to_columns_and_constraints(prefix, bucket, descriptor):
nullable = not field.get('constraints', {}).get('required', False)
column = Column(field['name'], column_type, nullable=nullable)
columns.append(column)
column_mapping[field['name']] = column

# Indexes
for i, index_definition in enumerate(index_fields):
name = tablename + '_ix%03d' % i
index_columns = [column_mapping[field_name] for field_name in index_definition]
indexes.append(Index(name, *index_columns))

# Primary key
pk = descriptor.get('primaryKey', None)
Expand All @@ -87,7 +96,7 @@ def descriptor_to_columns_and_constraints(prefix, bucket, descriptor):
constraint = ForeignKeyConstraint(fields, foreign_fields)
constraints.append(constraint)

return (columns, constraints)
return (columns, constraints, indexes)


def columns_and_constraints_to_descriptor(prefix, tablename, columns, constraints):
Expand Down
60 changes: 49 additions & 11 deletions jsontableschema_sql/storage.py
Expand Up @@ -23,24 +23,29 @@ class Storage(object):
engine (object): SQLAlchemy engine
dbschema (str): database schema name
prefix (str): prefix for all buckets
reflect_only (callable): a boolean predicate to filter
the list of table names when reflecting
"""

# Public

def __init__(self, engine, dbschema=None, prefix=''):
def __init__(self, engine, dbschema=None, prefix='', reflect_only=None):

# Set attributes
self.__connection = engine.connect()
self.__dbschema = dbschema
self.__prefix = prefix
self.__descriptors = {}
if reflect_only is not None:
self.__only = reflect_only
else:
self.__only = lambda _: True

# Create metadata
self.__metadata = MetaData(
bind=self.__connection,
schema=self.__dbschema,
reflect=True)
schema=self.__dbschema)
self.__reflect()

def __repr__(self):

Expand All @@ -64,7 +69,24 @@ def buckets(self):

return buckets

def create(self, bucket, descriptor, force=False):
def create(self, bucket, descriptor, force=False, indexes_fields=None):
"""Create table by schema.
Parameters
----------
table: str/list
Table name or list of table names.
schema: dict/list
JSONTableSchema schema or list of schemas.
indexes_fields: list
list of tuples containing field names, or list of such lists
Raises
------
RuntimeError
If table already exists.
"""

# Make lists
buckets = bucket
Expand All @@ -73,6 +95,12 @@ def create(self, bucket, descriptor, force=False):
descriptors = descriptor
if isinstance(descriptor, dict):
descriptors = [descriptor]
if indexes_fields is None or len(indexes_fields) == 0:
indexes_fields = [()] * len(descriptors)
elif type(indexes_fields[0][0]) not in {list, tuple}:
indexes_fields = [indexes_fields]
assert len(indexes_fields) == len(descriptors)
assert len(buckets) == len(descriptors)

# Check buckets for existence
for bucket in reversed(self.buckets):
Expand All @@ -83,17 +111,17 @@ def create(self, bucket, descriptor, force=False):
self.delete(bucket)

# Define buckets
for bucket, descriptor in zip(buckets, descriptors):
for bucket, descriptor, index_fields in zip(buckets, descriptors, indexes_fields):

# Add to schemas
self.__descriptors[bucket] = descriptor

# Crate table
# Create table
jsontableschema.validate(descriptor)
tablename = mappers.bucket_to_tablename(self.__prefix, bucket)
columns, constraints = mappers.descriptor_to_columns_and_constraints(
self.__prefix, bucket, descriptor)
Table(tablename, self.__metadata, *(columns+constraints))
columns, constraints, indexes = mappers.descriptor_to_columns_and_constraints(
self.__prefix, bucket, descriptor, index_fields)
Table(tablename, self.__metadata, *(columns+constraints+indexes))

# Create tables, update metadata
self.__metadata.create_all()
Expand Down Expand Up @@ -128,7 +156,7 @@ def delete(self, bucket=None, ignore=False):
# Drop tables, update metadata
self.__metadata.drop_all(tables=tables)
self.__metadata.clear()
self.__metadata.reflect()
self.__reflect()

def describe(self, bucket, descriptor=None):

Expand Down Expand Up @@ -208,3 +236,13 @@ def __get_table(self, bucket):
tablename = '.'.join(self.__dbschema, tablename)

return self.__metadata.tables[tablename]

def __reflect(self):
def only(name, _):
ret = (
self.__only(name) and
mappers.tablename_to_bucket(self.__prefix, name) is not None
)
return ret

self.__metadata.reflect(only=only)
4 changes: 2 additions & 2 deletions tests/test_mappers.py
Expand Up @@ -17,7 +17,7 @@ def test_bucket_to_tablename():

def test_tablename_to_bucket():
assert mappers.tablename_to_bucket('prefix_', 'prefix_bucket') == 'bucket'
assert mappers.tablename_to_bucket('prefix_', 'xxxxxx_bucket') == None
assert mappers.tablename_to_bucket('prefix_', 'xxxxxx_bucket') is None


def test_descriptor_to_columns_and_constraints_not_supported_type():
Expand All @@ -26,7 +26,7 @@ def test_descriptor_to_columns_and_constraints_not_supported_type():
}
with pytest.raises(TypeError):
mappers.descriptor_to_columns_and_constraints(
'prefix_', 'bucket', descriptor)
'prefix_', 'bucket', descriptor, [])


def test_columns_and_constraints_to_descriptor_not_supported_type():
Expand Down
38 changes: 35 additions & 3 deletions tests/test_storage.py
Expand Up @@ -9,7 +9,6 @@
import json
import pytest
from copy import deepcopy
from decimal import Decimal
from tabulator import Stream
from jsontableschema import Schema
from sqlalchemy import create_engine
Expand Down Expand Up @@ -38,8 +37,9 @@ def test_storage():

# Create buckets
storage.create(
['articles', 'comments'],
[articles_descriptor, comments_descriptor])
['articles', 'comments'],
[articles_descriptor, comments_descriptor],
indexes_fields=[[['rating'], ['name'], ['created_datetime']], []])

# Recreate bucket
storage.create('comments', comments_descriptor, force=True)
Expand Down Expand Up @@ -73,10 +73,42 @@ def test_storage():
with pytest.raises(RuntimeError):
storage.delete('non_existent')


# Delete buckets
storage.delete()


def test_only_parameter():
# Check the 'only' parameter

# Get resources
simple_descriptor = json.load(io.open('data/simple.json', encoding='utf-8'))

# Engine
engine = create_engine(os.environ['DATABASE_URL'], echo=True)

# Storage
storage = Storage(engine=engine, prefix='test_only_')

# Delete buckets
storage.delete()

# Create buckets
storage.create(
'names',
simple_descriptor,
indexes_fields=[['person_id']])

def only(table):
ret = 'name' not in table
return ret
engine = create_engine(os.environ['DATABASE_URL'], echo=True)
storage = Storage(engine=engine, prefix='test_only_', reflect_only=only)
# Delete non existent bucket
with pytest.raises(RuntimeError):
storage.delete('names')


def test_storage_bigdata():

# Generate schema/data
Expand Down

0 comments on commit 8c4434e

Please sign in to comment.