-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathdans_aggregate.py
177 lines (131 loc) · 6.14 KB
/
dans_aggregate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
"""
Analysis module for the DANS file metadata. Run python -m analysis.dans_scrape first to collect the file metadata into
a scrape ndjson log, then run this script to analyze it using python -m analysis.dans_analyze
"""
import datetime
import json
import logging
from argparse import ArgumentParser
from os.path import splitext
from typing import Dict
from jsonpath_ng.ext import parse
from ruamel.yaml import CommentedMap
from tqdm import tqdm
from analysis.config import load_config
# Unordered key/val
FileTimeStats = Dict[str, Dict[str, int]]
def main(config: CommentedMap) -> int:
"""
Iterates over all pages in the datasets index of the Archaeology Datastation (~120k results in ~12k pages)
:param config: a analysis.config.Config instance
:return: 0
"""
start = datetime.datetime.now()
dans_cfg = config['data']['dans']
file_stats: FileTimeStats = {}
# number of lines in the file
with open(dans_cfg['scrape_log_path'], 'rt') as f:
for record_count, _ in enumerate(f):
pass
logging.info(f'Log has {record_count + 1} entries')
unusable_datasets: Dict[str, int] = {}
with open(dans_cfg['scrape_log_path'], 'rt') as f:
for json_record in tqdm(f, total=record_count + 1):
record = json.loads(json_record)
reason = explain_valid_dataset(record, dans_cfg)
if reason != "Valid":
unusable_datasets.setdefault(reason, 0)
unusable_datasets[reason] += 1
continue
content_type_counts = extract_content_type_counts(record, dans_cfg)
year_month = extract_year_month(record, dans_cfg)
for content_type, count in content_type_counts.items():
file_stats.setdefault(content_type, {})
file_stats[content_type].setdefault(year_month, 0)
file_stats[content_type][year_month] += count
with open(dans_cfg['filetype_monthly_aggregate_path'], 'wt') as f:
logging.info(f"Wrote aggregation to {dans_cfg['filetype_monthly_aggregate_path']}")
f.write(json.dumps(file_stats, indent=2))
logging.info(f'Unusable dataset reasons out of {record_count + 1}: {json.dumps(unusable_datasets, indent=2)}')
end = datetime.datetime.now()
logging.info(f'Script took {end - start}')
return 0
def explain_valid_dataset(
ds_metadata: dict, # type: ignore[type-arg]
dans_cfg: Dict[str, str]
) -> str:
"""
Analyses a metadata record from the archaeology datastation REST API to validate it for usage in this analysis
:param ds_metadata: Dictionary with keys and values from the Dataverse version API
:param dans_cfg: Simple key-value settings, found under config.yaml data -> dans
:return: True if the dataset is valid, False if not
"""
# sanity check
if 'data' not in ds_metadata.keys():
return 'Metadata has no "data" key'
versions = ds_metadata['data']
# sanity check again
if len(versions) == 0:
return 'Metadata has no versions'
if 'datasetPersistentId' not in versions[0].keys():
return 'Metadata version 1 has no persistent identifier'
# Return invalid if there is no single version 1 of the dataset
first_version_candidates = [version for version in versions
if version['versionNumber'] == 1 and version['versionMinorNumber'] == 0]
if len(first_version_candidates) != 1:
return 'No single version 1.0 for dataset'
date_jsonpath = parse(dans_cfg['date_json_path'])
matches = date_jsonpath.find(ds_metadata)
# Return invalid if there is not a singular date
if len(matches) != 1:
return "No date found"
return 'Valid'
def extract_content_type_counts(
ds_metadata: dict, # type: ignore[type-arg]
dans_cfg: CommentedMap
) -> Dict[str, int]:
"""
Collects the filenames of the first version of the dataset.
:param ds_metadata: Metadata dictionary for a dataset, gotten from the archaeology dataverse REST API
:param dans_cfg: The DANS scrape and analysis configuration, from the config.yaml `dans` section
:return: Either a tuple with a list of file types and a date for them,
or None if the data does not match the criteria
"""
# Since the dataset is already validated, we can safely access the first 'data' entry with version 1
content_types: Dict[str, int] = {}
first_version = [version for version in ds_metadata['data']
if version['versionNumber'] == 1 and version['versionMinorNumber'] == 0][0]
for file in first_version['files']:
# Filter out unwanted files
if file['label'] in dans_cfg['file_skip_list']:
continue
filetype = splitext(file['label'])[1].lower()
if filetype in dans_cfg['filetype_mapping'].keys():
filetype = dans_cfg['filetype_mapping'][filetype]
content_types.setdefault(filetype, 0)
content_types[filetype] += 1
return content_types
def extract_year_month(
ds_metadata: dict, # type: ignore[type-arg]
dans_cfg: dict[str, str]
) -> str:
"""
Collects the correct date for the first version files
It aggregates the file metadata into a counter per file type, per month
:param ds_metadata: Metadata dictionary for a dataset, gotten from the archaeology dataverse REST API
:param dans_cfg: The DANS scrape and analysis configuration, from the config.yaml `dans` section
:return: The extracted year and month in a single string formatted as "YYYY-mm"
"""
date_jsonpath = parse(dans_cfg['date_json_path'])
matches = date_jsonpath.find(ds_metadata)
# Return an empty result if there is not a singular date
if len(matches) != 1:
raise ValueError(f"No date found for {ds_metadata=}: ")
queried_date = matches[0].value
return str(queried_date[:7])
if __name__ == '__main__':
parser = ArgumentParser('Performs the Data Archiving and Networked Services file metadata aggregation')
parser.add_argument('-c', '--config', default='config.yaml')
args = parser.parse_args()
config = load_config(args.config)
raise SystemExit(main(config))