## Step 1                   

In [1]:
from typing import Any, Dict, Union, List
from dataclasses import dataclass
from datetime import datetime

import pandas as pd


@dataclass
class Metric:
    """Base class for Metric"""

    def __call__(self, df: pd.DataFrame) -> Dict[str, Any]:
        return {}


@dataclass
class CountTotal(Metric):
    """Total number of rows in DataFrame"""

    def __call__(self, df: pd.DataFrame) -> Dict[str, Any]:
        return {"total": len(df)}


@dataclass
class CountZeros(Metric):
    """Number of zeros in choosen column"""

    column: str

    def __call__(self, df: pd.DataFrame) -> Dict[str, Any]:
        n = len(df)
        k = sum(df[self.column] == 0)
        return {"total": n, "count": k, "delta": k / n}


@dataclass
class CountNull(Metric):
    """Number of empty values in choosen columns"""

    columns: List[str]
    aggregation: str = "any"  # either "all", or "any"

    def __call__(self, df: pd.DataFrame) -> Dict[str, Any]:
        n = len(df)
        
        mask = df[self.columns[0]].isna()
        if self.aggregation == "any":
            for column in self.columns[1:]:
                mask |= df[column].isna()
        else:
            for column in self.columns[1:]:
                mask &= df[column].isna()
        
        k = sum(mask)
        return {"total": n, "count": k, "delta": k / n}


@dataclass
class CountDuplicates(Metric):
    """Number of duplicates in choosen columns"""

    columns: List[str]

    def __call__(self, df: pd.DataFrame) -> Dict[str, Any]:
        n = len(df)
        k = sum(df.duplicated(subset=self.columns))
        return {"total": n, "count": k, "delta": k / n}


@dataclass
class CountValue(Metric):
    """Number of values in choosen column"""

    column: str
    value: Union[str, int, float]

    def __call__(self, df: pd.DataFrame) -> Dict[str, Any]:
        n = len(df)
        k = sum(df[self.column] == self.value)
        return {"total": n, "count": k, "delta": k / n}


@dataclass
class CountBelowValue(Metric):
    """Number of values below threshold"""

    column: str
    value: float
    strict: bool = False

    def __call__(self, df: pd.DataFrame) -> Dict[str, Any]:
        n = len(df)
        k = sum(df[self.column] < self.value if self.strict else df[self.column] <= self.value)
        return {"total": n, "count": k, "delta": k / n}


@dataclass
class CountBelowColumn(Metric):
    """Count how often column X below Y"""

    column_x: str
    column_y: str
    strict: bool = False

    def __call__(self, df: pd.DataFrame) -> Dict[str, Any]:
        n = len(df)
        k = sum(df[self.column_x] < df[self.column_y] if self.strict else df[self.column_x] <= df[self.column_y])
        return {"total": n, "count": k, "delta": k / n}


@dataclass
class CountRatioBelow(Metric):
    """Count how often X / Y below Z"""

    column_x: str
    column_y: str
    column_z: str
    strict: bool = False

    def __call__(self, df: pd.DataFrame) -> Dict[str, Any]:
        n = len(df)
        k = sum(df[self.column_x]/df[self.column_y] < df[self.column_z] if self.strict 
                else df[self.column_x]/df[self.column_y] <= df[self.column_z])
        return {"total": n, "count": k, "delta": k / n}


@dataclass
class CountCB(Metric):
    """Calculate lower/upper bounds for N%-confidence interval"""

    column: str
    conf: float = 0.95

    def __call__(self, df: pd.DataFrame) -> Dict[str, Any]:
        alpha = 1 - self.conf
        lcb, ucb = df[self.column].quantile([alpha/2, 1-alpha/2])
        return {"lcb": lcb, "ucb": ucb}


@dataclass
class CountLag(Metric):
    """A lag between latest date and today"""

    column: str
    fmt: str = "%Y-%m-%d"

    def __call__(self, df: pd.DataFrame) -> Dict[str, Any]:
        a = datetime.today ()
        b = datetime.strptime(max(df[self.column]), self.fmt)
        lag = a - b
        return {"today": a.strftime(self.fmt), "last_day": b.strftime(self.fmt), "lag": lag.days}


In [2]:
df_sales = pd.read_csv('ke_daily_sales.csv')
df_sales.head()

Unnamed: 0,day,item_id,qty,price,revenue
0,2022-10-24,100,5,120.0,500.0
1,2022-10-24,100,6,120.0,720.0
2,2022-10-24,200,2,200.0,400.0
3,2022-10-24,300,10,85.0,850.0
4,2022-10-23,100,3,110.0,330.0


In [3]:
df_visits = pd.read_csv('ke_visits.csv')
df_visits.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 9 entries, 0 to 8
Data columns (total 5 columns):
 #   Column    Non-Null Count  Dtype 
---  ------    --------------  ----- 
 0   day       9 non-null      object
 1   item_id   9 non-null      int64 
 2   views     9 non-null      int64 
 3   clicks    9 non-null      int64 
 4   payments  9 non-null      int64 
dtypes: int64(4), object(1)
memory usage: 488.0+ bytes


In [4]:
ct_dict = {"total": 9}
ct = CountTotal()
assert ct(df_visits) == ct_dict

cz_dict = {"total": 9, "count": 1, "delta": 1 / 9}
cz = CountZeros(column='views')
assert cz(df_visits) == cz_dict

cn_dict = {'total': 9, 'count': 1, 'delta': 1/9}
df_visits.loc[2,'views'] = None
cn = CountNull(columns=['item_id', 'views'], aggregation="any")
assert cn(df_visits) == cn_dict

#для проверки поменять 'today' на текущую дату
cl_dict = {'today': '2023-10-09', 'last_day': '2022-10-24', 'lag': 350}
cl = CountLag(column ='day')
assert cl(df=df_sales) == cl_dict

AssertionError: 

## Step 2

In [5]:
"""Report checklist."""

from metrics import (
    CountTotal,
    CountLag,
    CountDuplicates,
    CountNull,
    CountRatioBelow,
    CountCB,
    CountZeros,
    CountBelowValue,
    CountBelowColumn,
)

# Checklist contains checks consist of:
# - table_name
# - metric
# - limits

CHECKLIST = [
    # Table with sales ["day", "item_id", "qty", "revenue", "price"]
    ("sales", CountTotal(), {"total": (1, 1e6)}),
    ("sales", CountLag("day"), {"lag": (0, 3)}),
    ("sales", CountDuplicates(["day", "item_id"]), {"total": (0, 0)}),
    ("sales", CountNull(["qty"]), {"total": (0, 0)}),
    ("sales", CountRatioBelow("revenue", "price", "qty", False), {"delta": (0, 0.05)}),
    ("sales", CountCB("revenue"), {}),
    ("sales", CountZeros("qty"), {"delta": (0, 0.3)}),
    ("sales", CountBelowValue("price", 100.0), {"delta": (0, 0.3)}),
    # Table with clickstream ["dt", "item_id", "views", "clicks", "payments"]
    ("relevance", CountTotal(), {"total": (1, 1e6)}),
    ("relevance", CountLag("dt"), {"lag": (0, 3)}),
    ("relevance", CountZeros("views"), {"delta": (0, 0.2)}),
    ("relevance", CountZeros("clicks"), {"delta": (0, 0.5)}),
    ("relevance", CountNull(["views", "clicks", "payments"]), {"delta": (0, 0.1)}),
    ("relevance", CountBelowValue("views", 10), {"delta": (0, 0.5)}),
    ("relevance", CountBelowColumn("clicks", "views"), {"total": (0, 0)}),
    ("relevance", CountBelowColumn("payments", "clicks"), {"total": (0, 0)}),
]

In [35]:
"""DQ Report."""

from typing import Dict, List, Tuple, Union, Callable
from dataclasses import dataclass
#from user_input.metrics import Metric

import pandas as pd

LimitType = Dict[str, Tuple[float, float]]
CheckType = Tuple[str, Metric, LimitType]



def memoize(func: Callable) -> Callable:
    """Memoize function"""
    cache = {}
    def wrapped(*argv, **kwargs):
        #print(cache)
        key = str(argv) + str(kwargs)
        if key not in cache:
            cache[key] = func(*argv, **kwargs)
        
        return cache[key]
    return wrapped

@dataclass
class Report:
    """DQ report class."""

    checklist: List[CheckType]
    engine: str = "pandas"
    
    @memoize
    def fit(self, tables: Dict[str, pd.DataFrame]) -> Dict:
        """Calculate DQ metrics and build report."""
        self.report_ = {}
        report = self.report_

        # Check if engine supported
        if self.engine != "pandas":
            raise NotImplementedError("Only pandas API currently supported!")
        
        result = []
        passed = 0
        failed = 0
        errors = 0
        for table_name, metric, limits in self.checklist:
            df = tables[table_name]
            error = ''
            try:
                values = metric(df)
                if len(limits) == 0:
                    status = '.'
                    passed+=1
                else:
                    # limits: {"total": (1, 1e6), "": () ... }
                    for column_limit, limit in limits.items():
                        value = values[column_limit]
                        if value >= limit[0] and value <= limit[1]:
                            status = '.'
                            passed+=1
                        else:
                            status = 'F'
                            failed+=1
                  
            except Exception as e:
                status = 'E'
                error = type(e).__name__ 
                errors+=1
        
            result.append({'table_name': table_name,
                           'metric': str(metric),
                           'limits': str(limits),
                           'values': values,
                           'status': status,
                           'error': error})
        
        
        report['title'] = 'DQ Report for tables ' + str(sorted(list(tables.keys())))
        report['result'] = pd.DataFrame(result)
        report['total'] = len(result)
        report['passed'] = passed
        report['passed_pct'] = round(passed / report['total'] * 100, 2)
        report['failed'] = failed
        report['failed_pct'] = round(failed / report['total'] * 100, 2)
        report['errors'] = errors
        report['errors_pct'] = round(errors / report['total'] * 100, 2)
        

        return report

    def to_str(self) -> None:
        """Convert report to string format."""
        report = self.report_

        msg = (
            "This Report instance is not fitted yet. "
            "Call 'fit' before using this method."
        )

        assert isinstance(report, dict), msg

        pd.set_option("display.max_rows", 500)
        pd.set_option("display.max_columns", 500)
        pd.set_option("display.max_colwidth", 20)
        pd.set_option("display.width", 1000)

        return (
            f"{report['title']}\n\n"
            f"{report['result']}\n\n"
            f"Passed: {report['passed']} ({report['passed_pct']}%)\n"
            f"Failed: {report['failed']} ({report['failed_pct']}%)\n"
            f"Errors: {report['errors']} ({report['errors_pct']}%)\n"
            "\n"
            f"Total: {report['total']}"
        )


In [36]:
cl = Report(CHECKLIST)

In [37]:
cl.fit({'sales': df_sales, 'relevance': df_visits})

{'title': "DQ Report for tables ['relevance', 'sales']",
 'result':    table_name               metric               limits               values status     error
 0       sales         CountTotal()  {'total': (1, 10...         {'total': 7}      .          
 1       sales  CountLag(column=...      {'lag': (0, 3)}  {'today': '2023-...      F          
 2       sales  CountDuplicates(...    {'total': (0, 0)}  {'total': 7, 'co...      F          
 3       sales  CountNull(column...    {'total': (0, 0)}  {'total': 7, 'co...      F          
 4       sales  CountRatioBelow(...  {'delta': (0, 0....  {'total': 7, 'co...      F          
 5       sales  CountCB(column='...                   {}  {'lcb': 49.50000...      .          
 6       sales  CountZeros(colum...  {'delta': (0, 0.3)}  {'total': 7, 'co...      .          
 7       sales  CountBelowValue(...  {'delta': (0, 0.3)}  {'total': 7, 'co...      .          
 8   relevance         CountTotal()  {'total': (1, 10...         {'total': 9} 

In [38]:
cl.to_str()

"DQ Report for tables ['relevance', 'sales']\n\n   table_name               metric               limits               values status     error\n0       sales         CountTotal()  {'total': (1, 10...         {'total': 7}      .          \n1       sales  CountLag(column=...      {'lag': (0, 3)}  {'today': '2023-...      F          \n2       sales  CountDuplicates(...    {'total': (0, 0)}  {'total': 7, 'co...      F          \n3       sales  CountNull(column...    {'total': (0, 0)}  {'total': 7, 'co...      F          \n4       sales  CountRatioBelow(...  {'delta': (0, 0....  {'total': 7, 'co...      F          \n5       sales  CountCB(column='...                   {}  {'lcb': 49.50000...      .          \n6       sales  CountZeros(colum...  {'delta': (0, 0.3)}  {'total': 7, 'co...      .          \n7       sales  CountBelowValue(...  {'delta': (0, 0.3)}  {'total': 7, 'co...      .          \n8   relevance         CountTotal()  {'total': (1, 10...         {'total': 9}      .          \n9 