Skip to content

Commit

Permalink
Merge pull request #227 from earthgecko/py3
Browse files Browse the repository at this point in the history
upload_data - flux and webapp
  • Loading branch information
earthgecko committed May 21, 2020
2 parents 978b2c3 + 421542e commit ae8cfbf
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 18 deletions.
18 changes: 15 additions & 3 deletions docs/upload-data-to-flux.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
upload_data to Flux - EXPERIMENTAL
==================================

**POST variables may change as this is a experimental feature**

**SIMPLE** data files can be uploaded via the /upload_data HTTP/S endpoint for
Flux to process and fed to Graphite. A number of things need to be enabled and
running to allow for processing data file uploads, which are not enabled by
Expand All @@ -15,15 +17,18 @@ Skyline currently allows for the uploading of the following format data files:

- csv (tested)
- xlsx (tested)
- xls (not tested)

Seeing as data files can be large, the following archive formats are accepted:

- gz (tested)
- zip (tested)
- zip (partly tested, multiple data files per archive have not been thoroughly tested)

A single file or archive can be uploaded or many data files can be uploaded in
a single archive. A `info.csv` must also be included in the archive, more
on that below.
on that below. **HOWEVER** if you wish to determine the status of the upload
programmatically you will want to upload one data file per upload, otherwise
making sense of the upload_status will be very difficult.

So you could upload `data.csv`, `data.csv.gz` or `data.zip` with the `data.csv`
file inside the zip archive.
Expand Down Expand Up @@ -94,12 +99,19 @@ info.json

Your date time column MUST be named date in the columns_to_metrics mapping.

For convenience sake you can also add two additional elements to the info.json:
For convenience sake you can also add additional elements to the info.json:

- `"debug": "true"` which outputs additional information regarding the imported
dataframe in the flux.log to aid with debugging.
- `"dryrun": "true"` which runs through the processing but does not submit data
to Graphite.
- `"ignore_submitted_timestamps": "true"`, a check is normally done of the last
timestamp submitted to flux for the metric to ensure that data is not
submitted multiple times. If you wish to override this check to resubmit data,
update or override already submitted data pass this in the info.json. However
do **note**, if you wish to resubmit data that is **NOT IN THE** latest
Graphite retention (old data) this will not have the desired affect as
Graphite down sampling (aggregation) will already have occurred.

This tells Skyline what the parent metric namespace should be which would
result in metrics:
Expand Down
52 changes: 41 additions & 11 deletions skyline/flux/uploaded_data_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
'UTC', 'Universal', 'Zulu'
]

ALLOWED_EXTENSIONS = {'json', 'csv', 'xlsx'}
ALLOWED_EXTENSIONS = {'json', 'csv', 'xlsx', 'xls'}


class UploadedDataWorker(Process):
Expand Down Expand Up @@ -275,6 +275,15 @@ def allowed_file(filename):
except:
resample_method = 'mean'
upload_status.append(['resample_method', resample_method])
# @added 20200521 - Feature #3538: webapp - upload_data endpoint
# Feature #3550: flux.uploaded_data_worker
# Added the ability to ignore_submitted_timestamps and not
# check flux.last metric timestamp
try:
ignore_submitted_timestamps = upload_dict['ignore_submitted_timestamps']
except:
ignore_submitted_timestamps = False
upload_status.append(['ignore_submitted_timestamps', ignore_submitted_timestamps])
except:
logger.error(traceback.format_exc())
logger.error('error :: uploaded_data_worker :: failed to determine required variables for the upload_dict - %s ' % str(upload_dict))
Expand Down Expand Up @@ -576,6 +585,13 @@ def allowed_file(filename):
dryrun = True
except:
dryrun = False
try:
ignore_submitted_timestamps_str = upload_info['ignore_submitted_timestamps']
if ignore_submitted_timestamps_str == 'true':
ignore_submitted_timestamps = True
logger.info('uploaded_data_worker :: determined ignore_submitted_timestamps from the upload_info dict - %s' % str(ignore_submitted_timestamps))
except:
logger.info('uploaded_data_worker :: ignore_submitted_timestamps was not passed in the upload info')

if upload_dict and processing_upload_failed:
logger.info('uploaded_data_worker :: failed to process upload - %s' % str(upload_dict))
Expand Down Expand Up @@ -643,7 +659,7 @@ def allowed_file(filename):
upload_status = new_upload_status(upload_status, processing_filename, failure_reason)

if successful:
if data_format == 'xlsx':
if data_format == 'xlsx' or data_format == 'xls':
try:
if LOCAL_DEBUG or debug_enabled_in_info:
logger.info('running - pd.read_excel(' + file_to_process + ', skiprows=' + str(skip_rows) + ', header=' + str(header_row) + ', usecols=' + str(columns_to_process) + ')')
Expand All @@ -661,18 +677,20 @@ def allowed_file(filename):
logger.debug(df.info())
else:
df = pd.read_excel(file_to_process, skiprows=skip_rows, header=header_row, usecols=columns_to_process)
logger.info('uploaded_data_worker :: pandas dataframe created from xlsx - %s' % str(file_to_process))
logger.info('uploaded_data_worker :: pandas dataframe created from %s - %s' % (
data_format, str(file_to_process)))
# Unfortunately this if df is not a reliable test
# if df.info() is None:
# logger.error('error :: uploaded_data_worker :: df.info() returns None')
# df = None
except:
logger.error(traceback.format_exc())
failure_reason = 'pandas failed to parse the xlsx data file - %s' % str(file_to_process)
failure_reason = 'failed - pandas failed to parse the %s data file - %s' % (
data_format, str(file_to_process))
logger.error('error :: uploaded_data_worker :: %s' % failure_reason)
successful = False
if upload_status:
upload_status = new_upload_status(upload_status, processing_filename, 'failed to read xlsx data')
upload_status = new_upload_status(upload_status, processing_filename, failure_reason)
if data_format == 'csv':
try:
if LOCAL_DEBUG or debug_enabled_in_info:
Expand Down Expand Up @@ -926,12 +944,17 @@ def allowed_file(filename):
if data_df_successful and timeseries and metric:
cache_key = 'flux.last.%s' % metric
redis_last_metric_data = None
try:
redis_last_metric_data = self.redis_conn_decoded.get(cache_key)
except:
logger.error(traceback.format_exc())
logger.error('error :: uploaded_data_worker :: failed to determine last_flux_timestamp from Redis key %s' % cache_key)
last_flux_timestamp = None
# @added 20200521 - Feature #3538: webapp - upload_data endpoint
# Feature #3550: flux.uploaded_data_worker
# Added the ability to ignore_submitted_timestamps and not
# check flux.last metric timestamp
if not ignore_submitted_timestamps:
try:
redis_last_metric_data = self.redis_conn_decoded.get(cache_key)
except:
logger.error(traceback.format_exc())
logger.error('error :: uploaded_data_worker :: failed to determine last_flux_timestamp from Redis key %s' % cache_key)
last_flux_timestamp = None
if redis_last_metric_data:
try:
last_metric_data = literal_eval(redis_last_metric_data)
Expand Down Expand Up @@ -1228,6 +1251,13 @@ def allowed_file(filename):
if dryrun:
logger.info('uploaded_data_worker :: DRYRUN :: faking updating %s with %s' % (
cache_key, str(metric_data)))
# @added 20200521 - Feature #3538: webapp - upload_data endpoint
# Feature #3550: flux.uploaded_data_worker
# Added the ability to ignore_submitted_timestamps and not
# check flux.last metric timestamp
elif ignore_submitted_timestamps:
logger.info('uploaded_data_worker :: ignore_submitted_timestamps :: not updating %s with %s' % (
cache_key, str(metric_data)))
else:
self.redis_conn.set(cache_key, str(metric_data))
logger.info('uploaded_data_worker :: set the metric Redis key - %s - %s' % (
Expand Down
12 changes: 12 additions & 0 deletions skyline/webapp/templates/upload_data.html
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ <h4><span class="logo"><span class="sky">Upload ::</span> <span class="re">data<
<td><select name="format">
<option value="csv">csv</option>
<option value="xlsx">xlsx</option>
<option value="xls">xls</option>
</select><br>
</tr>
<tr>
Expand Down Expand Up @@ -173,6 +174,17 @@ <h4><span class="logo"><span class="sky">Upload ::</span> <span class="re">data<
If you data file has a sample rate of more than 1 data point per metric per 60 seconds, a pandas resample at <code>1Min</code> is appied to the data.<br>
If the data needs to be resampled, you can select to resample it by the mean (default) or the sum.<br>
</tr>
<tr>
<td>ignore_submitted_timestamps [optional]</td>
<td><select name="ignore_submitted_timestamps">
<option value="false">false</option>
<option value="true">true</option>
</select><br>
A check is normally done of the last timestamp submitted to flux for the metric to ensure that data is not submitted multiple times<br>
If you wish to override this check to resubmit data, update or override already submitted data set this to <code>true</code><br>
<strong>Note</strong>: If you wish to resubmit data that is <code>NOT IN THE</code> latest Graphite retention (old data) this will not have the desired affect.<br>
If you are submit data for the first time probably set this to true.
</tr>
</tbody>
</table>
<br>
Expand Down
24 changes: 20 additions & 4 deletions skyline/webapp/webapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@
DATA_UPLOADS_PATH = settings.DATA_UPLOADS_PATH
except:
DATA_UPLOADS_PATH = '/tmp/skyline/data_uploads'
ALLOWED_EXTENSIONS = {'json', 'csv', 'xlsx', 'zip', 'gz'}
ALLOWED_FORMATS = {'csv', 'xlsx'}
ALLOWED_EXTENSIONS = {'json', 'csv', 'xlsx', 'xls', 'zip', 'gz'}
ALLOWED_FORMATS = {'csv', 'xlsx', 'xls'}
# @modified 20200520 - Bug #3552: flux.uploaded_data_worker - tar.gz
# tar.gz needs more work
# ALLOWED_ARCHIVES = {'none', 'zip', 'gz', 'tar_gz'}
Expand Down Expand Up @@ -4685,6 +4685,11 @@ def upload_data():
resample_method = 'mean'
json_response = False
flux_identifier = None
# @added 20200521 - Feature #3538: webapp - upload_data endpoint
# Feature #3550: flux.uploaded_data_worker
# Added the ability to ignore_submitted_timestamps and not
# check flux.last metric timestamp
ignore_submitted_timestamps = False

upload_id = None
data_dict = {}
Expand Down Expand Up @@ -4879,6 +4884,15 @@ def upload_data():
if resample_method_str == 'sum':
resample_method = 'sum'
logger.info('handling upload_data POST with variable columns_to_process - %s' % str(columns_to_process))
# @added 20200521 - Feature #3538: webapp - upload_data endpoint
# Feature #3550: flux.uploaded_data_worker
# Added the ability to ignore_submitted_timestamps and not
# check flux.last metric timestamp
if 'ignore_submitted_timestamps' in request.form:
ignore_submitted_timestamps_str = request.form['ignore_submitted_timestamps']
if ignore_submitted_timestamps_str == 'true':
ignore_submitted_timestamps = True
logger.info('handling upload_data POST with variable json_response - %s' % str(json_response))

if 'info_file' not in request.files:
create_info_file = True
Expand All @@ -4903,7 +4917,8 @@ def upload_data():
"columns_to_ignore": columns_to_ignore,
"columns_to_process": columns_to_process,
"info_file_in_archive": info_file_in_archive,
"resample_method": resample_method
"resample_method": resample_method,
"ignore_submitted_timestamps": ignore_submitted_timestamps,
}

info_file_saved = False
Expand Down Expand Up @@ -4989,7 +5004,8 @@ def upload_data():
"info_file_in_archive": info_file_in_archive,
"skip_rows": skip_rows,
"header_row": header_row,
"resample_method": resample_method
"resample_method": resample_method,
"ignore_submitted_timestamps": ignore_submitted_timestamps,
}
try:
REDIS_CONN.sadd('flux.uploaded_data', str(upload_data_dict))
Expand Down

0 comments on commit ae8cfbf

Please sign in to comment.