In [4]:
# set up a connection to a PostgreSQL database running in neighbour container

import sqlalchemy

connection_string = 'postgresql://postgres:example@db:5432/postgres'

db = sqlalchemy.create_engine(connection_string)
conn = db.connect()

In [5]:
# create a database schema with one table in it

from sqlalchemy import Column, Integer, Text
from sqlalchemy.dialects.postgresql import JSON, JSONB
from sqlalchemy.sql import select, and_, or_, not_

meta = sqlalchemy.MetaData(conn)
jsontable = sqlalchemy.Table("jsonbsample", meta,
                 Column('data', JSONB))
meta.drop_all()
meta.create_all() 

In [6]:
# insert some sample data for experiments

conn.execute(jsontable.insert(), [
    {'data': {
        'name': 'obj-1',
        'tags': ['tag-1', 'tag-2'],
        'attributes': {
            'attr-1': 'value-1',
            'attr-num': 123
        }
    }},
    {'data': {
        'name': 'obj-2',
        'tags': ['tag-2', 'tag-3'],
        'attributes': {
            'attr-1': 'value-2'
        }
    }},
    {'data': {
        'name': 'obj-2',
        'tags': ['tag-2', 'tag-3'],
        'attributes': {
            'attr-1': 'value-2',
            'attr-num': 456
        }
    }}
])


<sqlalchemy.engine.result.ResultProxy at 0x7f96a80cbbe0>

In [7]:
# select back all the inserted objects, just to see what's currently in db

q1 = select([jsontable])
conn.execute(q1).fetchall()

[({'name': 'obj-1', 'tags': ['tag-1', 'tag-2'], 'attributes': {'attr-1': 'value-1', 'attr-num': 123}},),
 ({'name': 'obj-2', 'tags': ['tag-2', 'tag-3'], 'attributes': {'attr-1': 'value-2'}},),
 ({'name': 'obj-2', 'tags': ['tag-2', 'tag-3'], 'attributes': {'attr-1': 'value-2', 'attr-num': 456}},)]

In [8]:
# query by attribute values deep inside object structure

q2 = select([jsontable]).where(
    jsontable.c.data[('attributes', 'attr-1')].astext == 'value-1'
)
print(q2)
conn.execute(q2).fetchall()

SELECT jsonbsample.data 
FROM jsonbsample 
WHERE (jsonbsample.data #>> %(data_1)s) = %(param_1)s


[({'name': 'obj-1', 'tags': ['tag-1', 'tag-2'], 'attributes': {'attr-1': 'value-1', 'attr-num': 123}},)]

In [10]:
# query using ILIKE operator

q3 = select([jsontable]).where(
    jsontable.c.data[('attributes', 'attr-1')].astext.ilike('%lUE-2%')
)
print(q3)
conn.execute(q3).fetchall()


SELECT jsonbsample.data 
FROM jsonbsample 
WHERE jsonbsample.data #>> %(data_1)s ILIKE %(param_1)s


[({'name': 'obj-2', 'tags': ['tag-2', 'tag-3'], 'attributes': {'attr-1': 'value-2'}},),
 ({'name': 'obj-2', 'tags': ['tag-2', 'tag-3'], 'attributes': {'attr-1': 'value-2', 'attr-num': 456}},)]

In [11]:
# filter by presence of a specified tag inside array of tags

q4 = jsontable.select().where(
    jsontable.c.data['tags'].has_key('tag-3')
)
print(q4)
conn.execute(q4).fetchall()


SELECT jsonbsample.data 
FROM jsonbsample 
WHERE ((jsonbsample.data -> %(data_1)s)) ? %(param_1)s


[({'name': 'obj-2', 'tags': ['tag-2', 'tag-3'], 'attributes': {'attr-1': 'value-2'}},),
 ({'name': 'obj-2', 'tags': ['tag-2', 'tag-3'], 'attributes': {'attr-1': 'value-2', 'attr-num': 456}},)]