In [1]:
import matplotlib
import seaborn as sns

import pandas as pd
import matplotlib.pyplot as plt
import shutil
import csv
import os
from enum import Enum, auto

class DataQualityIssue(Enum):
    STRUCTURAL_INTEGRITY_ISSUES = auto()
    UN_TIMELY = auto()
    EMPTY = auto()
    INCONSISTENT = auto()
    DUPLICATE = auto()
    FORMAT_ERROR = auto()


class Timeliness(Enum):
    TIMELY = '及时'
    UNTIMELY = '不及时'
    UNDETERMINED = '无法判断'
    
    def __str__(self):
        return self.value

matplotlib.rcParams['font.sans-serif'] = ['SimHei']
matplotlib.rcParams['axes.unicode_minus'] = False

dataset_dir = r'D:\Data\workspace\python\projects\CNDataAuditOutput\sichuan\datasets'
empties_dir = r'D:\Data\workspace\python\projects\CNDataAuditOutput\sichuan\empty_datasets'
integrity_issues_dir = r'D:\Data\workspace\python\projects\CNDataAuditOutput\sichuan\structural_issues_datasets'
catalog_path = r'D:\Data\workspace\python\projects\CNDataAuditOutput\sichuan\dataset_catalog.json'
catalog_dtype = {'name': str, 'id': str, 'URL': str, 'owner': str, 'category': 'category',
                 'published': 'datetime64[ns]', 'updated': 'datetime64[ns]',
                 'frequency': 'category', 'sample_data': object}
null_values = ['无', '未知', '/', '-', '', ' ', '&nbsp;', 'null', 'NULL', 'N/A', ]


def read_csv(dataset_name, directory=dataset_dir):
    return pd.read_csv(dataset_file_path(dataset_name, directory),
                       encoding='gbk', na_values=null_values,
                       encoding_errors='ignore')

def dataset_files(directory=dataset_dir):
    return (f for f in os.listdir(directory) if f.endswith('.csv'))

def dataset_file_path(filename: str, directory=dataset_dir):
    if not filename.endswith('.csv'):
        filename += '.csv'
    return os.path.join(directory, filename)

def dataset_completeness(dataset_df):
    return ((dataset_df.size - dataset_df.isna().sum().sum()) / dataset_df.size) * 100

def exists(dataset_name, directory=dataset_dir):
    return os.path.exists(dataset_file_path(dataset_name, directory))


In [2]:
catalog_df = pd.read_json(catalog_path, dtype=catalog_dtype)
print(catalog_df.info())
print(catalog_df['category'].value_counts())
print(catalog_df['frequency'].value_counts())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3777 entries, 0 to 3776
Data columns (total 9 columns):
 #   Column       Non-Null Count  Dtype         
---  ------       --------------  -----         
 0   name         3777 non-null   object        
 1   id           3777 non-null   object        
 2   URL          3777 non-null   object        
 3   owner        3777 non-null   object        
 4   category     3777 non-null   category      
 5   published    3777 non-null   datetime64[ns]
 6   updated      3777 non-null   datetime64[ns]
 7   frequency    3777 non-null   category      
 8   sample_data  3776 non-null   object        
dtypes: category(2), datetime64[ns](2), object(5)
memory usage: 215.1+ KB
None
category
社保就业         946
医疗卫生         463
市场监管         360
生活服务         355
教育文化         234
生态环境         219
工业农业         189
公共安全         140
信用服务         132
财税金融         124
交通运输         110
城建住房         107
能源资源          80
社会救助          78
机构团体          53
商贸流通        

In [None]:
def compare_datasets(catalog_df):
    dataset_names_from_files = [os.path.splitext(f)[0] for f in dataset_files()]
    dataset_names_from_catalog = catalog_df['name'].tolist()
    datasets_not_in_catalog = set(dataset_names_from_files) - set(dataset_names_from_catalog)
    datasets_in_catalog_not_found = set(dataset_names_from_catalog) - set(dataset_names_from_files)
    return datasets_not_in_catalog, datasets_in_catalog_not_found

datasets_not_in_catalog, datasets_in_catalog_not_found = compare_datasets(catalog_df)
print("Datasets not in catalog:", len(datasets_not_in_catalog))
print("Datasets in catalog but not found in files:", len(datasets_in_catalog_not_found))

def remove_files(names):
    count = 0
    for dataset_name in names:
      if exists(dataset_name):
          os.remove(dataset_file_path(dataset_name))
          count += 1
      else:
          print(f"File not found: {dataset_file_path(dataset_name)}")
    print(count)

def remove_records(names):
    filtered_df = catalog_df[~catalog_df['name'].isin(names)]
    str_filtered_df = filtered_df.astype(str)
    # last column as type dict
    str_filtered_df['sample_data'] = filtered_df['sample_data']
    print(len(str_filtered_df))
    return str_filtered_df

# remove_files(datasets_not_in_catalog)
# filtered = remove_records(set(duplicates['name'].tolist()))
# filtered.to_json('updated_catalog.json', orient='records', force_ascii=False)

In [None]:
def remove_duplicated_datasets_by_name():
    duplicates = catalog_df[catalog_df.duplicated('name', keep=False)]
    print("Duplicate records based on 'name':")
    print(len(duplicates), duplicates['name'].sort_values())
    remove_files(set(duplicates['name'].tolist()))

remove_duplicated_datasets_by_name()

In [None]:
def move_empty_datasets():
    os.makedirs(empties_dir, exist_ok=True)
    is_empty_sample = catalog_df['sample_data'].apply(lambda x: x == {'null': 'null'})
    
    for index, row in catalog_df[is_empty_sample].iterrows():
        file_name = f"{row['name']}.csv"
        source_path = dataset_file_path(file_name)
        destination_path = dataset_file_path(file_name)
        shutil.move(source_path, destination_path)
        print(f"Moved '{file_name}' to {empties_dir}")

In [65]:
def evaluate_csv_structural_integrity():
    integrity_issues = {}
    for file in dataset_files():
        file_name = os.path.splitext(file)[0]
        integrity_issues[file_name] = detect_csv_structural_issues(dataset_file_path(file_name))
    return pd.DataFrame(list(integrity_issues.items()), columns=['文件名', '首个异常行标'])

def detect_csv_structural_issues(file_path):
    with open(file_path, encoding='gbk', errors='ignore') as file:
        reader = csv.reader(file)
        expected_columns = len(next(reader))
        for row_index, row in enumerate(reader, 1):
            if len(row) != expected_columns:
                return row_index
    return -1

def move_structural_issues(destination_dir):
    os.makedirs(destination_dir, exist_ok=True)
    for file in dataset_files():
        if detect_csv_structural_issues(dataset_file_path(file)) != -1:
            shutil.move(dataset_file_path(file), dataset_file_path(file, directory=destination_dir))
            print(f"Moved '{dataset_file_path(file)}' to '{dataset_file_path(file, directory=destination_dir)}'")


integrity_df = evaluate_csv_structural_integrity()
# move_structural_issues(anomalies_dir)
print(integrity_df['首个异常行标'].value_counts())
integrity_df[integrity_df['首个异常行标'] != -1]

In [5]:
from joblib import Parallel, delayed

def evaluate_completeness(catalog_df):
    def completeness(dataset_name):
        return dataset_completeness(read_csv(dataset_name)) if exists(dataset_name) else None

    completeness_scores = Parallel(n_jobs=-1)(delayed(completeness)(name)
                                              for name in catalog_df['name'])
    catalog_df['completeness'] = completeness_scores


# catalog_df['completeness'].hist()
evaluate_completeness(catalog_df)
catalog_df['completeness'].value_counts(dropna=False)

completeness
100.000000    1385
NaN            805
83.333333       50
80.000000       39
85.714286       33
              ... 
53.703704        1
70.897436        1
90.277778        1
99.914286        1
99.988235        1
Name: count, Length: 1092, dtype: int64

In [6]:
from datetime import timedelta


def evaluate_timeliness(catalog_df):
    def timeliness(dataset, now, days_map):
        days = days_map.get(dataset['frequency'], None)
        if days is None:
            return Timeliness.UNDETERMINED
        return (Timeliness.TIMELY
                if now - dataset['updated'] <= timedelta(days=days)
                else Timeliness.UNTIMELY)

    downloaded = pd.Timestamp('2024-04-30 20:00:00')
    frequency_to_days = {'实时': 0, '每天': 1, '每周': 7, '每月': 30,
                         '每季度': 90, '每半年': 183, '每年': 365}
    catalog_df['timeliness'] = catalog_df.apply(timeliness, axis=1,
                                                args=(downloaded, frequency_to_days))

def timeliness_freq_distribution():
    timely_df = catalog_df[catalog_df['timeliness'] == Timeliness.TIMELY]
    untimely_df = catalog_df[catalog_df['timeliness'] == Timeliness.UNTIMELY]
    return timely_df['frequency'].value_counts(), untimely_df['frequency'].value_counts()


def print_timeliness_freq_distribution():
    timely_freq_counts, untimely_freq_counts = timeliness_freq_distribution()
    print('Frequency distribution for timely updates:')
    print(timely_freq_counts)
    print('Frequency distribution for untimely updates:')
    print(untimely_freq_counts)


def visualize_timeliness_distribution(save_path=None):
    def autopct_format(values):
        def my_format(pct):
            val = int(round(pct * sum(values) / 100.0))
            return '{v:d} ({p:.2f}%)'.format(v=val, p=pct) if pct > 0 else ''
        return my_format
    
    timely_freq_counts, untimely_freq_counts = timeliness_freq_distribution()
    _timely_freq_counts = timely_freq_counts[timely_freq_counts > 0]
    _untimely_freq_counts = untimely_freq_counts[untimely_freq_counts > 0]

    fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(16, 9))
    axes[0].pie(_timely_freq_counts, labels=_timely_freq_counts.index,
                autopct=autopct_format(_timely_freq_counts), startangle=140)
    axes[1].pie(_untimely_freq_counts, labels=_untimely_freq_counts.index,
                autopct=autopct_format(_untimely_freq_counts), startangle=140)
    axes[0].set_title('及时更新的数据集更新频率分布')
    axes[1].set_title('未及时更新的数据集更新频率分布')
    fig.text(0.5, 0.01, '评估时间：2024-04-30 20:00:00', ha='center', va='bottom', fontsize=10)
    fig.tight_layout()
    if save_path is not None:
        plt.savefig(save_path)
    plt.show()


evaluate_timeliness(catalog_df)
print_timeliness_freq_distribution()
# visualize_timeliness_distribution(save_path='timeliness_distribution.png')
catalog_df['timeliness'].value_counts(dropna=False)

Frequency distribution for timely updates:
frequency
每年     980
每半年     15
每月       1
不定期      0
实时       0
每周       0
每天       0
每季度      0
Name: count, dtype: int64
Frequency distribution for untimely updates:
frequency
每天     330
实时     246
每月     212
每半年    102
每季度     54
每周      18
每年       1
不定期      0
Name: count, dtype: int64


timeliness
无法判断    1818
及时       996
不及时      963
Name: count, dtype: int64

In [49]:
def evaluate_data_quality(catalog_df):
    evaluate_completeness(catalog_df)
    evaluate_timeliness(catalog_df)

evaluate_data_quality(catalog_df)

In [53]:
def view(catalog_df):
    return catalog_df[['name', 'owner', 'category', 'completeness', 'timeliness', ]]

catalog_view = view(catalog_df)
catalog_view.sample(5)

Unnamed: 0,name,owner,category,timeliness,completeness
2374,食盐定点生产企业证书,省经济和信息化厅,市场监管,及时,100.0
1297,资阳市_经信局_工业园区营业收入,资阳市经济和信息化局,工业农业,无法判断,100.0
999,四川省大气环境重点排污单位信息,生态环境厅,生态环境,无法判断,88.0
2842,巴中市_南江县人民政府办公室_公共财政支出预算经济科目款明细表,南江县人民政府办公室,生活服务,无法判断,100.0
778,南充市_嘉陵区_嘉陵区图书馆借书流通信息,南充市,生活服务,及时,
