In [1]:
import great_expectations as ge
import pandas as pd
import numpy as np
import datetime
import sqlalchemy as sa
from sqlalchemy.orm import sessionmaker

In [2]:
## Put some random data in sufficient for replicating the issue

In [3]:
arr = np.random.normal(0, 1, size=(100, 10))

In [4]:
date = np.full(100, datetime.datetime.now())
diffs = np.array([datetime.timedelta(days=int(k)) for k in np.random.choice([-2, -1, 0], size=100)])
dates = date + diffs

In [5]:
missings = np.random.choice([np.nan, 0,1,2,3,4], size=100)

In [6]:
df = pd.DataFrame(arr)
df["dates"] = dates
df["missings"] = missings

In [7]:
df.head()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,dates,missings
0,0.129241,1.066469,1.462701,-1.340763,-0.71836,-0.030121,-0.432858,0.160536,0.480938,2.055168,2019-09-11 23:14:04.248406,3.0
1,-0.546883,-0.948041,0.857218,0.417211,-2.47552,0.182786,1.145868,1.706848,0.285012,-1.139483,2019-09-10 23:14:04.248406,0.0
2,0.732611,1.080726,1.799009,-1.324832,-0.042106,-0.633771,-0.637599,1.497173,0.182029,0.901961,2019-09-11 23:14:04.248406,0.0
3,0.04009,-0.280354,0.015777,1.014472,1.557504,2.096489,-0.548691,-0.354212,-0.193882,-1.376235,2019-09-09 23:14:04.248406,2.0
4,0.517361,0.007613,0.560541,-0.802848,-0.899714,0.310679,-1.913699,0.024944,0.160441,0.293889,2019-09-11 23:14:04.248406,2.0


In [8]:
engine = sa.create_engine("mssql+pymssql://USER:PASS@HOST.rds.amazonaws.com:1433/DB")

In [9]:
df.to_sql("ge_community_replication_20190910_01", engine, if_exists="replace")

In [10]:
engine.execute(
    sa.select([sa.column("dates"), sa.column("missings")]).select_from(sa.table("ge_community_replication_20190910_01")).limit(10)).fetchall()

[(datetime.datetime(2019, 9, 11, 23, 14, 4, 247000), 3.0),
 (datetime.datetime(2019, 9, 10, 23, 14, 4, 247000), 0.0),
 (datetime.datetime(2019, 9, 11, 23, 14, 4, 247000), 0.0),
 (datetime.datetime(2019, 9, 9, 23, 14, 4, 247000), 2.0),
 (datetime.datetime(2019, 9, 11, 23, 14, 4, 247000), 2.0),
 (datetime.datetime(2019, 9, 9, 23, 14, 4, 247000), 1.0),
 (datetime.datetime(2019, 9, 9, 23, 14, 4, 247000), 3.0),
 (datetime.datetime(2019, 9, 9, 23, 14, 4, 247000), 4.0),
 (datetime.datetime(2019, 9, 11, 23, 14, 4, 247000), 1.0),
 (datetime.datetime(2019, 9, 10, 23, 14, 4, 247000), 4.0)]

In [11]:
gedf = ge.dataset.SqlAlchemyDataset('ge_community_replication_20190910_01', engine)

In [12]:
gedf.expect_column_to_exist("dates")
gedf.expect_column_to_exist("missings")

{'success': True}

In [13]:
## Indeed this basic expectation fails because sqlalchemy doesn't translate our extremely convoluted logic correctly.
## The extra complication was added in response to an issue on Redshift. See:
#  https://github.com/great-expectations/great_expectations/issues/392
gedf.expect_column_values_to_be_between("missings", -4, 4)

ProgrammingError: (pymssql.ProgrammingError) (102, b"Incorrect syntax near '='.DB-Lib error message 20018, severity 15:\nGeneral SQL Server error: Check messages from the SQL Server\n")
[SQL: SELECT TOP 20 missings 
FROM ge_community_replication_20190910_01 
WHERE NOT (missings >= %(missings_1)s AND missings <= %(missings_2)s) AND (CASE WHEN (missings IS NULL) THEN %(param_1)s ELSE %(param_2)s END = 1 OR (missings IN (NULL)) = 0)]
[parameters: {'missings_1': -4, 'missings_2': 4, 'param_1': 0, 'param_2': 1}]
(Background on this error at: http://sqlalche.me/e/f405)

In [14]:
# Clearly, a cleaner version of this query would work
engine.execute("SELECT TOP 20 missings FROM ge_community_replication_20190910_01 WHERE ((missings > 2) OR (missings < -4) AND missings IS NOT NULL)").fetchall()

[(3.0,),
 (3.0,),
 (4.0,),
 (4.0,),
 (4.0,),
 (3.0,),
 (4.0,),
 (4.0,),
 (3.0,),
 (3.0,),
 (3.0,),
 (3.0,),
 (3.0,),
 (4.0,),
 (3.0,),
 (3.0,),
 (3.0,),
 (4.0,),
 (3.0,),
 (3.0,)]

In [15]:
## Grabbing the code we can isolate the confusion...
column = "missings"
table = "ge_community_replication_20190910_01"
ignore_values = [None]
unexpected_count_limit = 20
expected_condition = sa.not_(sa.or_(sa.column(column) > 4, sa.column(column) < -4))
unexpected_query_results = engine.execute(
    sa.select([sa.column(column)]).select_from(sa.table(table)).where(
        sa.and_(sa.not_(expected_condition),
                sa.or_(
                    # SA normally evaluates `== None` as `IS NONE`. However `sa.in_()`
                    # replaces `None` as `NULL` in the list and incorrectly uses `== NULL`
                    sa.case([
                        (
                            sa.column(column).is_(None),
                            False
                        )
                    ], else_=True) if None in ignore_values else False,
                    # Ignore any other values that are in the ignore list
                    sa.column(column).in_(ignore_values) == False))
    ).limit(unexpected_count_limit)
).fetchall()

ProgrammingError: (pymssql.ProgrammingError) (102, b"Incorrect syntax near '='.DB-Lib error message 20018, severity 15:\nGeneral SQL Server error: Check messages from the SQL Server\n")
[SQL: SELECT TOP 20 missings 
FROM ge_community_replication_20190910_01 
WHERE NOT (NOT (missings > %(missings_1)s OR missings < %(missings_2)s)) AND (CASE WHEN (missings IS NULL) THEN %(param_1)s ELSE %(param_2)s END = 1 OR (missings IN (NULL)) = 0)]
[parameters: {'missings_1': 4, 'missings_2': -4, 'param_1': 0, 'param_2': 1}]
(Background on this error at: http://sqlalche.me/e/f405)

In [16]:
## ...by progressively simplifying

column = "missings"
table = "ge_community_replication_20190910_01"
ignore_values = [None]
unexpected_count_limit = 20
expected_condition = sa.not_(sa.or_(sa.column(column) > 2, sa.column(column) < -4))
unexpected_query_results = engine.execute(
    sa.select([sa.column(column)]).select_from(sa.table(table)).where(
        sa.and_(
            sa.not_(expected_condition),
            sa.not_(sa.column(column) is None)
        )
    ).limit(unexpected_count_limit)
).fetchall()
print(unexpected_query_results)

[(3.0,), (3.0,), (4.0,), (4.0,), (4.0,), (3.0,), (4.0,), (4.0,), (3.0,), (3.0,), (3.0,), (3.0,), (3.0,), (4.0,), (3.0,), (3.0,), (3.0,), (4.0,), (3.0,), (3.0,)]


In [17]:
str(sa.select([sa.column(column)]).select_from(sa.table(table)).where(
        sa.and_(
            sa.not_(expected_condition),
            sa.not_(sa.column(column).in_([None]))
        )
    ).limit(unexpected_count_limit)
)

'SELECT missings \nFROM ge_community_replication_20190910_01 \nWHERE NOT (NOT (missings > :missings_1 OR missings < :missings_2)) AND missings NOT IN (NULL)\n LIMIT :param_1'

In [18]:
## Which lets us find specificially what the problem is...
column = "missings"
table = "ge_community_replication_20190910_01"
ignore_values = [None]
unexpected_count_limit = 20
expected_condition = sa.not_(sa.or_(sa.column(column) > 2, sa.column(column) < -4))
unexpected_query_results = engine.execute(
    sa.select([sa.column(column)]).select_from(sa.table(table)).where(
        sa.and_(
            sa.not_(expected_condition),
            sa.not_(sa.column(column).in_([None]))
        )
    ).limit(unexpected_count_limit)
).fetchall()
print(unexpected_query_results)


[]


In [19]:
## Which lets us find specificially what the problem is...
column = "missings"
table = "ge_community_replication_20190910_01"
ignore_values = [3]
unexpected_count_limit = 20
expected_condition = sa.not_(sa.or_(sa.column(column) > 2, sa.column(column) < -4))
ignore_conditions = [
    sa.column(column).isnot(None),
    sa.not_(sa.column(column).in_(ignore_values))
]

unexpected_query_results = engine.execute(
    sa.select([sa.column(column)]).select_from(sa.table(table)).where(
        sa.and_(
            sa.not_(expected_condition),
            *ignore_conditions
        )
    ).limit(unexpected_count_limit)
).fetchall()
print(unexpected_query_results)

[(4.0,), (4.0,), (4.0,), (4.0,), (4.0,), (4.0,), (4.0,), (4.0,), (4.0,), (4.0,), (4.0,), (4.0,), (4.0,), (4.0,), (4.0,)]


In [20]:
## Which lets us find specificially what the problem is...
column = "missings"
table = "ge_community_replication_20190910_01"
ignore_values = [None]
unexpected_count_limit = 20
expected_condition = sa.not_(sa.or_(sa.column(column) > 2, sa.column(column) < -4))
unexpected_query_results = engine.execute(
    sa.select([sa.column(column)]).select_from(sa.table(table)).where(
        sa.and_(
            sa.not_(expected_condition),
            sa.not_(sa.column(column).is_(None))
        )
    ).limit(unexpected_count_limit)
).fetchall()
print(unexpected_query_results)

[(3.0,), (3.0,), (4.0,), (4.0,), (4.0,), (3.0,), (4.0,), (4.0,), (3.0,), (3.0,), (3.0,), (3.0,), (3.0,), (4.0,), (3.0,), (3.0,), (3.0,), (4.0,), (3.0,), (3.0,)]


In [21]:
## ... but the new simpler logic compiles clearly for postgres, redshift, and mssql

ignore_values = [None]

ignore_values_conditions = []
if len(ignore_values) > 0 and None not in ignore_values or len(ignore_values) > 1 and None in ignore_values:
    ignore_values_conditions += [
        sa.column(column).in_([val for val in ignore_values if val is not None])
    ]
if None in ignore_values:
    ignore_values_conditions += [sa.column(column).is_(None)]

if len(ignore_values_conditions) > 1:
    ignore_values_condition = sa.or_(*ignore_values_conditions)
elif len(ignore_values_conditions) == 1:
    ignore_values_condition = ignore_values_conditions[0]
else:
    ignore_values_condition = False
    
expected_condition = sa.not_(sa.or_(sa.column(column) > 2, sa.column(column) < -4))
    
unexpected_query_results = engine.execute(
    sa.select([sa.column(column)]).select_from(sa.table(table)).where(
        sa.and_(sa.not_(expected_condition),
                sa.not_(ignore_values_condition)
                )
    ).limit(unexpected_count_limit)
).fetchall()
print(unexpected_query_results)

[(3.0,), (3.0,), (4.0,), (4.0,), (4.0,), (3.0,), (4.0,), (4.0,), (3.0,), (3.0,), (3.0,), (3.0,), (3.0,), (4.0,), (3.0,), (3.0,), (3.0,), (4.0,), (3.0,), (3.0,)]


In [22]:
engine = sa.create_engine("postgresql://test:test@localhost:5432/postgres")

In [23]:
df.to_sql("ge_community_replication_20190910_01", engine, if_exists="replace")

In [24]:
## ... but the new simpler logic compiles clearly for postgres, redshift, and mssql

ignore_values = [None]

ignore_values_conditions = []
if len(ignore_values) > 0 and None not in ignore_values or len(ignore_values) > 1 and None in ignore_values:
    ignore_values_conditions += [
        sa.column(column).in_([val for val in ignore_values if val is not None])
    ]
if None in ignore_values:
    ignore_values_conditions += [sa.column(column).is_(None)]

if len(ignore_values_conditions) > 1:
    ignore_values_condition = sa.or_(*ignore_values_conditions)
elif len(ignore_values_conditions) == 1:
    ignore_values_condition = ignore_values_conditions[0]
else:
    ignore_values_condition = sa.literal(False)
    
expected_condition = sa.not_(sa.or_(sa.column(column) > 2, sa.column(column) < -4))

unexpected_query_results = engine.execute(
    sa.select([sa.column(column)]).select_from(sa.table(table)).where(
        sa.and_(sa.not_(expected_condition),
                sa.not_(ignore_values_condition)
                )
    ).limit(unexpected_count_limit)
).fetchall()
print(unexpected_query_results)

[(3.0,), (3.0,), (4.0,), (4.0,), (4.0,), (3.0,), (4.0,), (4.0,), (3.0,), (3.0,), (3.0,), (3.0,), (3.0,), (4.0,), (3.0,), (3.0,), (3.0,), (4.0,), (3.0,), (3.0,)]


In [25]:
print(
    engine.execute(
        sa.select([
            sa.column(column)
        ]).select_from(sa.table(table)).where(ignore_values_condition)).fetchall())

[(None,), (None,), (None,), (None,), (None,), (None,), (None,), (None,), (None,), (None,), (None,), (None,), (None,), (None,), (None,), (None,), (None,), (None,)]


In [26]:
# ... and now check with redshift, since issue https://github.com/great-expectations/great_expectations/issues/392 
# had raised an issue in the past that was causing this situation in the first place

In [27]:
engine = sa.create_engine("redshift+psycopg2://USER:PASS@HOST.redshift.amazonaws.com:5439/DB")

In [28]:
df.to_sql("ge_community_replication_20190910_01", engine, if_exists="replace", index=False)

In [29]:
column = "missings"
table = "ge_community_replication_20190910_01"
unexpected_count_limit = 20

ignore_values = [None]

ignore_values_conditions = []
if len(ignore_values) > 0 and None not in ignore_values or len(ignore_values) > 1 and None in ignore_values:
    ignore_values_conditions += [
        sa.column(column).in_([val for val in ignore_values if val is not None])
    ]
if None in ignore_values:
    ignore_values_conditions += [sa.column(column).is_(None)]

if len(ignore_values_conditions) > 1:
    ignore_values_condition = sa.or_(*ignore_values_conditions)
elif len(ignore_values_conditions) == 1:
    ignore_values_condition = ignore_values_conditions[0]
else:
    ignore_values_condition = sa.literal(False)
    
expected_condition = sa.not_(sa.or_(sa.column(column) > 2, sa.column(column) < -4))

unexpected_query_results = engine.execute(
    sa.select([sa.column(column)]).select_from(sa.table(table)).where(
        sa.and_(sa.not_(expected_condition),
                sa.not_(ignore_values_condition)
                )
    ).limit(unexpected_count_limit)
).fetchall()
print(unexpected_query_results)

[(3.0,), (3.0,), (4.0,), (4.0,), (4.0,), (3.0,), (4.0,), (4.0,), (3.0,), (3.0,), (3.0,), (3.0,), (3.0,), (4.0,), (3.0,), (3.0,), (3.0,), (4.0,), (3.0,), (3.0,)]


In [30]:
# .. finally, because of the original issue, we need to ensure we're working correctly with set
gedf = ge.dataset.SqlAlchemyDataset("ge_community_replication_20190910_01", engine)

In [31]:
gedf.expect_column_values_to_be_in_set("missings", [0, 1, 2])

{'success': False,
 'result': {'element_count': 100,
  'missing_count': 18,
  'missing_percent': 0.18,
  'unexpected_count': 38,
  'unexpected_percent': 0.38,
  'unexpected_percent_nonmissing': 0.4634146341463415,
  'partial_unexpected_list': [3.0,
   3.0,
   4.0,
   4.0,
   4.0,
   3.0,
   4.0,
   4.0,
   3.0,
   3.0,
   3.0,
   3.0,
   3.0,
   4.0,
   3.0,
   3.0,
   3.0,
   4.0,
   3.0,
   3.0]}}