Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUGFIX] expect_day_count_to_be_close_to_equivalent_week_day_mean #7782

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,25 @@
from great_expectations.expectations.metrics import ColumnAggregateMetricProvider
from great_expectations.expectations.metrics.metric_provider import metric_value

TODAY: datetime = datetime(year=2022, month=8, day=10)
TODAY_STR: str = datetime.strftime(TODAY, "%Y-%m-%d")
TODAY_EXAMPLE: datetime = datetime(year=2022, month=8, day=10)
TODAY_EXAMPLE_STR: str = datetime.strftime(TODAY_EXAMPLE, "%Y-%m-%d")
date_format = "%Y-%m-%d"

DAYS_AGO = {
3: TODAY - timedelta(days=3),
7: TODAY - timedelta(days=7),
14: TODAY - timedelta(days=14),
21: TODAY - timedelta(days=21),
28: TODAY - timedelta(days=28),
}
METRIC_SAMPLE_LIMIT = 60

FOUR_PREVIOUS_WEEKS = [7, 14, 21, 28]


def get_days_ago_dict(current_date):
return {
3: current_date - timedelta(days=3),
FOUR_PREVIOUS_WEEKS[0]: current_date - timedelta(days=FOUR_PREVIOUS_WEEKS[0]),
FOUR_PREVIOUS_WEEKS[1]: current_date - timedelta(days=FOUR_PREVIOUS_WEEKS[1]),
FOUR_PREVIOUS_WEEKS[2]: current_date - timedelta(days=FOUR_PREVIOUS_WEEKS[2]),
FOUR_PREVIOUS_WEEKS[3]: current_date - timedelta(days=FOUR_PREVIOUS_WEEKS[3]),
}


def generate_data_sample(n_appearances: dict):
data = []
for d, n in n_appearances.items():
Expand Down Expand Up @@ -72,11 +76,11 @@ def _sqlalchemy(

# get counts for dates
query = (
sa.select(sa.func.Date(column), sa.func.count())
sa.select([sa.func.Date(column), sa.func.count()])
.group_by(sa.func.Date(column))
.select_from(selectable)
.order_by(sa.func.Date(column).desc())
.limit(30)
.limit(METRIC_SAMPLE_LIMIT)
)
results = sqlalchemy_engine.execute(query).fetchall()
return results
Expand All @@ -87,56 +91,56 @@ class ExpectDayCountToBeCloseToEquivalentWeekDayMean(ColumnAggregateExpectation)

# Default values
default_kwarg_values = {"threshold": 0.25}

example_days_ago_dict = get_days_ago_dict(TODAY_EXAMPLE)
examples = [
{
# column a - good counts - 3 rows for every day
"data": {
"column_a": generate_data_sample(
{
TODAY: 3,
DAYS_AGO[7]: 3,
DAYS_AGO[14]: 3,
DAYS_AGO[21]: 3,
DAYS_AGO[28]: 3,
TODAY_EXAMPLE: 3,
example_days_ago_dict[7]: 3,
example_days_ago_dict[14]: 3,
example_days_ago_dict[21]: 3,
example_days_ago_dict[28]: 3,
}
),
"column_b": generate_data_sample(
{
TODAY: 2,
DAYS_AGO[7]: 4,
DAYS_AGO[14]: 3,
DAYS_AGO[21]: 3,
DAYS_AGO[28]: 3,
TODAY_EXAMPLE: 2,
example_days_ago_dict[7]: 4,
example_days_ago_dict[14]: 3,
example_days_ago_dict[21]: 3,
example_days_ago_dict[28]: 3,
}
),
"column_datetime": generate_data_sample(
{
TODAY: 3,
DAYS_AGO[7]: 2,
DAYS_AGO[7].replace(hour=11): 1,
DAYS_AGO[14]: 2,
DAYS_AGO[14].replace(hour=10, minute=40): 1,
DAYS_AGO[21]: 3,
DAYS_AGO[28]: 3,
TODAY_EXAMPLE: 3,
example_days_ago_dict[7]: 2,
example_days_ago_dict[7].replace(hour=11): 1,
example_days_ago_dict[14]: 2,
example_days_ago_dict[14].replace(hour=10, minute=40): 1,
example_days_ago_dict[21]: 3,
example_days_ago_dict[28]: 3,
}
),
"column_current_zero": generate_data_sample(
{
TODAY: 0,
DAYS_AGO[7]: 4,
DAYS_AGO[14]: 4,
DAYS_AGO[21]: 4,
DAYS_AGO[28]: 3,
TODAY_EXAMPLE: 0,
example_days_ago_dict[7]: 4,
example_days_ago_dict[14]: 4,
example_days_ago_dict[21]: 4,
example_days_ago_dict[28]: 3,
}
),
"column_past_mean_zero": generate_data_sample(
{
TODAY: 15,
DAYS_AGO[7]: 0,
DAYS_AGO[14]: 0,
DAYS_AGO[21]: 0,
DAYS_AGO[28]: 0,
TODAY_EXAMPLE: 15,
example_days_ago_dict[7]: 0,
example_days_ago_dict[14]: 0,
example_days_ago_dict[21]: 0,
example_days_ago_dict[28]: 0,
}
),
},
Expand All @@ -147,7 +151,18 @@ class ExpectDayCountToBeCloseToEquivalentWeekDayMean(ColumnAggregateExpectation)
"include_in_gallery": False,
"in": {
"column": "column_a",
"run_date": TODAY_STR,
"run_date": TODAY_EXAMPLE_STR,
"threshold": default_kwarg_values["threshold"],
},
"out": {"success": True},
},
{
"title": "positive test",
"exact_match_out": False,
"include_in_gallery": False,
"in": {
"column": "column_datetime",
"run_date": TODAY_EXAMPLE_STR,
"threshold": default_kwarg_values["threshold"],
},
"out": {"success": True},
Expand All @@ -158,7 +173,7 @@ class ExpectDayCountToBeCloseToEquivalentWeekDayMean(ColumnAggregateExpectation)
"include_in_gallery": False,
"in": {
"column": "column_datetime",
"run_date": TODAY_STR,
"run_date": TODAY_EXAMPLE_STR,
"threshold": default_kwarg_values["threshold"],
},
"out": {"success": True},
Expand All @@ -169,7 +184,7 @@ class ExpectDayCountToBeCloseToEquivalentWeekDayMean(ColumnAggregateExpectation)
"include_in_gallery": False,
"in": {
"column": "column_b",
"run_date": TODAY_STR,
"run_date": TODAY_EXAMPLE_STR,
},
"out": {"success": False},
},
Expand All @@ -179,7 +194,7 @@ class ExpectDayCountToBeCloseToEquivalentWeekDayMean(ColumnAggregateExpectation)
"include_in_gallery": False,
"in": {
"column": "column_current_zero",
"run_date": TODAY_STR,
"run_date": TODAY_EXAMPLE_STR,
},
"out": {"success": False},
},
Expand All @@ -189,7 +204,7 @@ class ExpectDayCountToBeCloseToEquivalentWeekDayMean(ColumnAggregateExpectation)
"include_in_gallery": False,
"in": {
"column": "column_past_mean_zero",
"run_date": TODAY_STR,
"run_date": TODAY_EXAMPLE_STR,
},
"out": {"success": False},
},
Expand Down Expand Up @@ -223,15 +238,32 @@ def _validate(
execution_engine: ExecutionEngine = None,
):

run_date: str = self.get_success_kwargs(configuration).get("run_date")
threshold: float = float(
self.get_success_kwargs(configuration).get("threshold")
)
run_date_str = self.get_success_kwargs(configuration).get("run_date")

run_date = datetime.strptime(run_date_str, date_format)

threshold = float(self.get_success_kwargs(configuration).get("threshold"))

day_counts_dict = get_counts_per_day_as_dict(metrics, run_date)
run_date_count: int = day_counts_dict[run_date]
days_ago_dict = get_days_ago_dict(run_date)

equivalent_previous_days: List[datetime] = [
days_ago_dict[i] for i in FOUR_PREVIOUS_WEEKS
]

assert min(equivalent_previous_days) > (
datetime.today() - timedelta(METRIC_SAMPLE_LIMIT)
), (
f"Data includes only up to {METRIC_SAMPLE_LIMIT} days prior to today ({datetime.today()}), "
f"but 4 weeks before the given run_date is {min(equivalent_previous_days)}",
)

diff_fraction = get_diff_fraction(run_date_count, day_counts_dict)
day_counts_dict = get_counts_per_day_as_dict(
metrics, run_date_str, equivalent_previous_days
)
run_date_count: int = day_counts_dict[run_date_str]
diff_fraction = get_diff_fraction(
run_date_count, day_counts_dict, equivalent_previous_days
)

if diff_fraction > threshold:
msg = (
Expand All @@ -249,10 +281,10 @@ def _validate(
return {"success": success, "result": {"details": msg}}


def get_counts_per_day_as_dict(metrics: dict, run_date: str) -> dict:
equivalent_previous_days: List[datetime] = [
DAYS_AGO[i] for i in FOUR_PREVIOUS_WEEKS
]
def get_counts_per_day_as_dict(
metrics: dict, run_date: str, equivalent_previous_days: list
) -> dict:

equivalent_previous_days_str: List[str] = [
datetime.strftime(i, date_format) for i in equivalent_previous_days
]
Expand All @@ -268,15 +300,15 @@ def get_counts_per_day_as_dict(metrics: dict, run_date: str) -> dict:
return day_counts_dict


def get_diff_fraction(run_date_count: int, day_counts_dict: dict) -> float:
def get_diff_fraction(
run_date_count: int, day_counts_dict: dict, equivalent_previous_days: list
) -> float:
"""
Calculates the fractional difference between current and past average row counts (how much is the
difference relative to the average).
Added +1 to both nuemrator and denominator, to account for cases when previous average is 0.
"""
equivalent_previous_days: List[datetime] = [
DAYS_AGO[i] for i in FOUR_PREVIOUS_WEEKS
]

equivalent_previous_days_str: List[str] = [
datetime.strftime(i, date_format) for i in equivalent_previous_days
]
Expand Down
Loading