-
Notifications
You must be signed in to change notification settings - Fork 11
/
main.py
168 lines (148 loc) · 6.99 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
from cumulus_process import Process, s3
from cumulus_logger import CumulusLogger
from .dmrpp_options import DMRppOptions
import os
from re import search
import logging
import subprocess
class DMRPPGenerator(Process):
"""
Class to generate dmrpp files from hdf and netCDf files
The input will be *.nc *nc4 *.hdf
The output *.nc.dmrpp *nc4.dmrpp *.hdf.dmrpp
"""
def __init__(self, **kwargs):
self.processing_regex = kwargs.get('config', {}) \
.get('collection', {}) \
.get('meta', {}) \
.get('dmrpp_processing_regex', '.*\\.(((?i:(h|hdf)))(e)?5|nc(4)?)(\\.bz2|\\.gz|\\.Z)?')
super(DMRPPGenerator, self).__init__(**kwargs)
self.path = self.path.rstrip('/') + "/"
self.logger = CumulusLogger(name="DMRPP-Generator")
@property
def input_keys(self):
return {
'input_files': f"{self.processing_regex}(\\.cmr\\.xml|\\.json)?$"
}
@staticmethod
def get_file_type(filename, files):
"""
Get custom file type, default to metadata
:param filename: Granule file name
:param files: list of collection files
:return: file type if defined
"""
for collection_file in files:
if search(collection_file.get('regex', '*.'), filename):
return collection_file.get('type', 'metadata')
return 'metadata'
@staticmethod
def get_bucket(filename, files, buckets):
"""
Extract the bucket from the files
:param filename: Granule file name
:param files: list of collection files
:param buckets: Object holding buckets info
:return: Bucket object
"""
bucket_type = "public"
for file in files:
if search(file.get('regex', '*.'), filename):
bucket_type = file['bucket']
break
return buckets[bucket_type]
def upload_file_to_s3(self, filename, uri):
""" Upload a local file to s3 if collection payload provided """
try:
return s3.upload(filename, uri, extra={})
except Exception as e:
self.logger.error("Error uploading file %s: %s" % (os.path.basename(os.path.basename(filename)), str(e)))
def process(self):
"""
Override the processing wrapper
:return:
"""
collection = self.config.get('collection')
collection_files = collection.get('files', [])
collection_meta = collection.get('meta', {})
dmrpp_meta = collection_meta.get('dmrpp', {})
buckets = self.config.get('buckets')
granules = self.input['granules']
self.processing_regex = dmrpp_meta.get('dmrpp_regex', self.processing_regex)
for granule in granules:
dmrpp_files = []
for file_ in granule['files']:
if not search(f"{self.processing_regex}$", file_['filename']):
self.logger.debug(f"regex {self.processing_regex} does not match filename {file_['filename']}")
continue
self.logger.debug(f"reges {self.processing_regex} matches filename to process {file_['filename']}")
output_file_paths = self.dmrpp_generate(input_file=file_['filename'],
dmrpp_meta=dmrpp_meta)
for output_file_path in output_file_paths:
output_file_basename = os.path.basename(output_file_path)
url_path = file_.get('url_path', self.config.get('fileStagingDir'))
filepath = os.path.dirname(file_.get('filepath', url_path))
if output_file_path:
dmrpp_file = {
"name": os.path.basename(output_file_path),
"path": self.config.get('fileStagingDir'),
"url_path": url_path,
"bucket": self.get_bucket(output_file_basename, collection_files, buckets)['name'],
"size": os.path.getsize(output_file_path),
"type": self.get_file_type(output_file_basename, collection_files)
}
dmrpp_file['filepath'] = f"{filepath}/{dmrpp_file['name']}".lstrip('/')
dmrpp_file['filename'] = f's3://{dmrpp_file["bucket"]}/{dmrpp_file["filepath"]}'
dmrpp_files.append(dmrpp_file)
self.upload_file_to_s3(output_file_path, dmrpp_file['filename'])
granule['files'] += dmrpp_files
return self.input
def get_dmrpp_command(self, dmrpp_meta, input_path, output_filename, local=False):
"""
Getting the command line to create DMRPP files
"""
dmrpp_meta = dmrpp_meta if isinstance(dmrpp_meta, dict) else {}
dmrpp_options = DMRppOptions(self.path)
options = dmrpp_options.get_dmrpp_option(dmrpp_meta=dmrpp_meta)
local_option = f"-u file://{output_filename}" if local else ""
dmrpp_cmd = f"get_dmrpp {options} {input_path} -o {output_filename}.dmrpp {local_option} {os.path.basename(output_filename)}"
return " ".join(dmrpp_cmd.split())
def add_missing_files(self, dmrpp_meta, file_name):
"""
"""
# If the missing file was not generated
if not os.path.isfile(file_name):
return []
# If it was generated and the flag was set
options = dmrpp_meta.get('options', [])
if {'flag': '-M'} in options:
return [file_name]
return []
@staticmethod
def run_command(cmd):
""" Run cmd as a system command """
out = subprocess.run(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return out
def dmrpp_generate(self, input_file, local=False, dmrpp_meta=None):
"""
Generate DMRPP from S3 file
"""
# Force dmrpp_meta to be an object
dmrpp_meta = dmrpp_meta if isinstance(dmrpp_meta, dict) else {}
# If not running locally use Cumulus logger
logger = logging if local else self.logger
cmd_output = ""
try:
file_name = input_file if local else s3.download(input_file, path=self.path)
cmd = self.get_dmrpp_command(dmrpp_meta, self.path, file_name, local)
cmd_output = self.run_command(cmd)
logger.error(f"DMRPP: command {cmd} returned {cmd_output.stderr}") if cmd_output.stderr else ""
out_files = [f"{file_name}.dmrpp"] + self.add_missing_files(dmrpp_meta, f'{file_name}.dmrpp.missing')
return out_files
except Exception as ex:
logger.error(f"DMRPP error {ex}: {cmd_output.stdout} {cmd_output.stderr}")
return []
if __name__ == "__main__":
dmr = DMRPPGenerator(input = [], config = {})
meta = {"options": [{"flag": "-s", "opt": "htp://localhost/config.conf", "download": "true"}, {"flag": "-M"}]}
dmr.get_dmrpp_command(meta, dmr.path, "file_name.nc")