In [77]:
# create dataset

from dataset_creator import FakeDataset, MISSING_SYMBOLS

filename = 'dataset.csv'
OUTLIER_PERCENTAGE = 0.1
DUPLICATE_PERCENTAGE = 0.15
MISSING_PERCENTAGE = 0.1


dataset = FakeDataset(dataset_size = 100)\
        .add_dominated_string_column(dominated_percentage=0.9)\
        .add_mishmashed_case(mishmashed_percentage=0.1)\
        .add_outliers_above(outlier_percentage = OUTLIER_PERCENTAGE)\
        .add_duplicates(duplicate_percentage = DUPLICATE_PERCENTAGE)\
        .add_missing(missing_percentage = MISSING_PERCENTAGE)\
        .to_csv(filename)

  self.data = self.data.append(self.data.iloc[self.data.sample(frac=duplicate_percentage).index, :])


In [78]:
# read data
import numpy as np
import pandas as pd

data = pd.read_csv(filename)
print(data.shape)
data.head()

(115, 8)


Unnamed: 0,name,surname,birthdate,results1,results2,category,email,gender
0,Michele,Parker,2010-01-20,553,none,A,elizabethwilson@example.com,
1,Dana,Cunningham,,24,,none,NONE,F
2,Jessica,Lopez,,99,0.23356275907386193,A,gsimmons@example.com,F
3,Ariana,Weiss,2015-09-28,10,-1.1893010411447136,b,johnwilliams@example.org,
4,Barbara,Johnson,1958-01-11,93,-0.6320832828351748,,salasjohn@example.com,


In [79]:
from collections import defaultdict
dataset_scores = defaultdict(lambda: 0)

## Check if there are any missing values

In [80]:
data.replace(MISSING_SYMBOLS, np.nan, inplace=True)

In [81]:
data.isna().sum()

name         12
surname      12
birthdate    11
results1     12
results2     12
category     12
email        12
gender       12
dtype: int64

In [82]:
data = data._convert(numeric=True, datetime=True).convert_dtypes()


In [83]:
dataset_scores["missing_percentage"] = data.isna().sum().sum()/data.size
dataset_scores["most_missing_column"] = data.isna().sum().max()/data.shape[0]

In [84]:
assert(round(dataset_scores["missing_percentage"],2) == MISSING_PERCENTAGE)

In [85]:
data.results1.astype("float")

0      553.0
1       24.0
2       99.0
3       10.0
4       93.0
       ...  
110     40.0
111     38.0
112    587.0
113     24.0
114     47.0
Name: results1, Length: 115, dtype: float64

## Check duplicates

In [86]:
dataset_scores["duplication_percentage"] = sum(data.duplicated())/ data.shape[0]

## Check outliers

For numerical values we use the same method as in box plot (outlier is more tham q3 + 1.5 IQR or less than q1 - 1.5 IQR)

In [87]:
numeric_cols = data.select_dtypes(include=['number']).columns
string_cols = data.select_dtypes(include=["string", "object"]).columns
outliers_nums = []

for col in numeric_cols:
    
    q1 = data[col].quantile(0.25)
    q3 = data[col].quantile(0.75)
    
    iqr = q3-q1
    
    upper_bound = q3 + (1.5*iqr)
    lower_bound = q1 - (1.5*iqr)

    outliers_nums.append(np.sum((data[col] > upper_bound) | (data[col] < lower_bound)))

For string columns we look for rare values (less than 5% of the observations)

In [88]:
for col in string_cols:
    if not len(data[col].unique())/len(data[col]) > 0.5 and\
            (rare := data["category"].value_counts().min()/data.shape[0]) < 0.05: # rare category
        outliers_nums.append(rare)

dataset_scores["outliers_percentage"] = sum(outliers_nums)/data.size
dataset_scores["most_outliers_column"] = max(outliers_nums)/data.shape[0]

We also look for dominant values (more than 80% of the observations in column) and columns with unique values (eg. id, email), which may be not useful in further predictions.

In [89]:
for col in data.columns:  
    if len(data[col].unique())/len(data[col]) > 0.5: # column with rather unique values
        dataset_scores["unique_columns"] += 1      
    if data[col].value_counts().max()/data.shape[0] > 0.8: # dominant category
        dataset_scores["dominated_columns"] += 1
        
dataset_scores["dominated_columns"] /= len(data.columns)
dataset_scores["unique_columns"] /= len(data.columns)

## Check mishmashed formats

In [90]:
mishmashed_cases = []
for col in string_cols:
    unique_in_data = len(data["category"].unique())
    truly_unique = len(data["category"].map(lambda x: x.lower() if not pd.isna(x) else x).unique())

    mishmashed_cases.append((unique_in_data - truly_unique)/truly_unique)

dataset_scores["max_mishmashed_case"] = max(mishmashed_cases)

Other ideas: correlation, not good dates, mishmashed formats, upper and lower cased, duplicates, is it actual, are all values the same...

## Aggregate scores

In [91]:
dataset_scores

defaultdict(<function __main__.<lambda>()>,
            {'missing_percentage': 0.10326086956521739,
             'most_missing_column': 0.10434782608695652,
             'duplication_percentage': 0.017391304347826087,
             'outliers_percentage': 0.013100189035916825,
             'most_outliers_column': 0.10434782608695652,
             'unique_columns': 0.75,
             'dominated_columns': 0.125,
             'max_mishmashed_case': 0.75})

In [92]:
weights = {
    "missing_percentage": 10, # many missing values is difficult to handle
    "most_missing_column": 2, # if 1 we had a column with huge amount of missing values, we'd have to drop it
    "duplication_percentage": 4, # many duplicates means less data
    "outliers_percentage": 2, # outliers may be removed or cause problems with predictions
    "most_outliers_column": 1,
    "unique_columns": 5, # if all columns are unique, we can't do much with it
    "dominated_columns": 3, # if a column has one dominant category, it may be not very useful
    "max_mishmashed_case": 1 # our data may be dirty and require a lot of cleaning
}

assert weights.keys() == dataset_scores.keys()

In [93]:
final_score = 0
for name, score in dataset_scores.items():
    final_score += score * weights[name]

final_score /= sum(weights.values())
final_score = 1 - final_score # 1 is the best score, 0 – the worst
final_score

dataset_scores["dataset_quality_score"] = final_score

# Create badges

In [94]:
# save obtained scores to json
import json
 
filename="./badge_data.json"
json_object = json.dumps(dataset_scores, indent=4)
 
with open(filename, "w") as outfile:
    outfile.write(json_object)

In [95]:
repo_url = "https://github.com/annapanfil/data_quality_labeler" #todo: get dinamically

ownername = repo_url.split("/")[3]
repo_name = repo_url.split("/")[4]
branch = "main"


print("To add badges paste this to your readme.md file:")
for badge in dataset_scores.keys():
    print(f"![DQ Badge](https://img.shields.io/badge/dynamic/json?url=https%3A%2F%2Fraw.githubusercontent.com%2F{ownername}%2F{repo_name}%2F{branch}%2F{filename}&query=%24.{badge}&label={badge})")


To add badges paste this to your readme.md file:
![DQ Badge](https://img.shields.io/badge/dynamic/json?url=https%3A%2F%2Fraw.githubusercontent.com%2Fannapanfil%2Fdata_quality_labeler%2Fmain%2F./badge_data.json&query=%24.missing_percentage&label=missing_percentage)
![DQ Badge](https://img.shields.io/badge/dynamic/json?url=https%3A%2F%2Fraw.githubusercontent.com%2Fannapanfil%2Fdata_quality_labeler%2Fmain%2F./badge_data.json&query=%24.most_missing_column&label=most_missing_column)
![DQ Badge](https://img.shields.io/badge/dynamic/json?url=https%3A%2F%2Fraw.githubusercontent.com%2Fannapanfil%2Fdata_quality_labeler%2Fmain%2F./badge_data.json&query=%24.duplication_percentage&label=duplication_percentage)
![DQ Badge](https://img.shields.io/badge/dynamic/json?url=https%3A%2F%2Fraw.githubusercontent.com%2Fannapanfil%2Fdata_quality_labeler%2Fmain%2F./badge_data.json&query=%24.outliers_percentage&label=outliers_percentage)
![DQ Badge](https://img.shields.io/badge/dynamic/json?url=https%3A%2F%2Fraw