Skip to content

Commit

Permalink
Reporting data export format and streaming changes.
Browse files Browse the repository at this point in the history
MessageStackHandler is updated to have a default empty implementation
for 'outgoingMessage'. Subclasses are updated to remove their own empty
implementations of this previously abstract method.

Binding is updated to support streaming using new 'toStream' /
'fromStream' methods. Areas that previously converted to an OM model and
then immediately to bytes are updated to skip the intermediate step. The
namespace processing that was previously performed using 'replaceAll' is
now performed via a new NamespaceMappingXMLStreamWriter class.

The reporting Export class is modified to support streaming exports. The
'eureport-export-data' command is updated to support compression and
streaming. A patched version of the Python gzip module is included to
work around a bug that prevents streaming of gzip data (fixed in Python
3.2). The reporting Import class is modified to support streaming import
of data. The RestfulMarshallingHandler has a new 'streamResponse' method
that can be called from service implementations to stream data. In this
case the method will return 'null' to indicate that the response has
been handled. The 'streamResponse' method uses HTTP chunked encoding and
supports gzip and deflate data compression.

A new 'eureport-delete-data' command has been added and can be used to
delete (presumably exported) reporting data. The Entities class has been
updated to support bulk deletion of data. The reporting messages,
binding and service are updated to support reporting data deletion.

An EventFactory class has been added for creation of persistent
reporting events from exported data. ExportUtils is updated to provide
functions mapping between the export model and the internal persistence
events. The new ReportedAction and ReportedUsage classes are the
external representation of the internal reporting persistence events. A
JiBX binding has been added for reporting exports and is imported for
use in the reporting service binding. A new ReportingEventIdGenerator
has been added to allow exported reporting data to be saved to the data
warehouse with the original identifiers (so data can be merged). The
ReportingEventSupport no longer extends AbstractPersistent, this means
that we no longer have various unused fields/columns and that we can
preserve the original identifier and created timestamp for each event.

EventRecord has been updated to allow reuse of eucalyptus code
in the data warehouse without depending on Mule classes.
  • Loading branch information
sjones4 committed Oct 4, 2012
1 parent 5963109 commit 01bc820
Show file tree
Hide file tree
Showing 49 changed files with 3,065 additions and 306 deletions.
32 changes: 32 additions & 0 deletions clc/eucadmin/bin/eureport-delete-data
@@ -0,0 +1,32 @@
#!/usr/bin/python

# Copyright 2011-2012 Eucalyptus Systems, Inc.
#
# Redistribution and use of this software in source and binary forms,
# with or without modification, are permitted provided that the following
# conditions are met:
#
# Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from eucadmin.deletereportdata import DeleteReportData

if __name__ == "__main__":
r = DeleteReportData()
r.main_cli()
47 changes: 47 additions & 0 deletions clc/eucadmin/eucadmin/deletereportdata.py
@@ -0,0 +1,47 @@
# Copyright 2011-2012 Eucalyptus Systems, Inc.
#
# Redistribution and use of this software in source and binary forms,
# with or without modification, are permitted provided that the following
# conditions are met:
#
# Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from boto.roboto.param import Param
from eucadmin.reportsrequest import ReportsRequest

class DeleteReportData(ReportsRequest):
Description = 'Delete reporting data'

Params = [
Param(name='End',
short_name='e', long_name='end-date',
ptype='datetime', optional=False,
doc='the exclusive end date for the report data deletion (e.g. 2012-08-26)'),
]

def cli_formatter(self, data):
deleted = getattr( data, 'DeletedCount', None )
if deleted is None:
raise IOError('Error processing response')
print 'Deleted ' + deleted + ' reporting data entries.'

def process_args(self, **args):
super(DeleteReportData, self).process_args( **args )
self.process_date_param('End')
120 changes: 111 additions & 9 deletions clc/eucadmin/eucadmin/exportreportdata.py
Expand Up @@ -23,14 +23,38 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from StringIO import StringIO
from eucadmin.patched_gzip import GzipFile
import zlib
from boto.roboto.param import Param
from eucadmin.reportsrequest import ReportsRequest
import os
from eucadmin.reportsrequest import ReportsRequest, ReportsServiceClass
from xml.sax.saxutils import XMLGenerator, XMLFilterBase
import os, sys, xml.sax, xml.sax.handler, boto

class GzipReportsServiceClass(ReportsServiceClass):
def build_base_http_request(self, method, path, auth_path,
params=None, headers=None, data='', host=None):
headers = { 'Accept-Encoding': 'gzip' }
return super(GzipReportsServiceClass, self)\
.build_base_http_request( method, path, auth_path, params, headers, data, host )

class ExportReportData(ReportsRequest):
ServiceClass = GzipReportsServiceClass
Description = 'Export reporting data'

Params = [
Param(name='Start',
short_name='s', long_name='start-date',
ptype='datetime',
doc='the inclusive start date for the exported data (e.g. 2012-08-19)'),
Param(name='End',
short_name='e', long_name='end-date',
ptype='datetime',
doc='the exclusive end date for the exported data (e.g. 2012-08-26)'),
Param(name='Dependencies',
short_name='d', long_name='dependencies',
ptype='boolean',
doc='include event dependencies from outside the requested period'),
Param(name='Force',
short_name='F', long_name='force',
ptype='boolean', request_param=False,
Expand All @@ -49,19 +73,97 @@ def check_export_file(self):
raise IOError(msg)

def cli_formatter(self, data):
export = getattr( data, 'Data', None )
if export is None:
raise IOError('Error reading export response')
if self.file is not None:
f = open( self.file, 'w')
f.write( export )
f.close()
print 'Exported data to ' + self.file

# Override send to handle large response streaming and compression
def send(self, verb='GET', **args):
self.process_args(**args)
self.process_filters()
conn = self.get_connection(**self.connection_args)
self.http_response = conn.make_request(self.name(),
self.request_params,
verb=verb)
if self.http_response.status == 200:
# Handle content encoding
if self.http_response.getheader( 'Content-Encoding', 'identity' ) == 'gzip':
source = GzipFile(fileobj=self.http_response)
else:
source = self.http_response
# Process response without reading it all into memory
if self.file is not None:
f = open( self.file, 'w')
self.body = self.write_export( f, source )
f.close()
else:
self.body = self.write_export( sys.stdout, source )
# Process non-export content in regular way
boto.log.debug(self.body)
self.aws_response = boto.jsonresponse.Element(list_marker=self.list_markers,
item_marker=self.item_markers)
h = boto.jsonresponse.XmlHandler(self.aws_response, self)
self.parse(h, StringIO(self.body))
return self.aws_response
else:
print export
boto.log.error('%s %s' % (self.http_response.status,
self.http_response.reason))
boto.log.error('%s' % self.body)
raise conn.ResponseError(self.http_response.status,
self.http_response.reason,
self.body)

def parse(self, handler, data ):
parser = xml.sax.make_parser()
parser.setFeature(xml.sax.handler.feature_namespaces, 1)
parser.setContentHandler(handler)
parser.setErrorHandler(handler)
input_source = xml.sax.InputSource()
input_source.setByteStream(data)
parser.parse(input_source)

def write_export(self, file, source):
generator = ExportExtractor(file)
self.parse(generator, source)
return generator.getReply()

def process_args(self, **args):
super(ExportReportData, self).process_args( **args )
self.file = self.args['file']
self.force = self.args['force']
self.process_date_param('Start')
self.process_date_param('End')
self.check_export_file()

class ExportExtractor(XMLFilterBase):
def __init__(self, file):
XMLFilterBase.__init__(self)
self.generator = XMLGenerator(file, 'UTF-8')
self.generator.startPrefixMapping(u'', u'http://www.eucalyptus.com/ns/reporting/export/2012-08-24/')
self.replyData = StringIO()
self.replyGenerator = XMLGenerator( self.replyData, 'UTF-8' )
self.switchTarget( self.replyGenerator )

def startDocument(self):
self.generator.startDocument()
XMLFilterBase.startDocument(self)

def endElementNS(self, name, qname):
XMLFilterBase.endElementNS(self, name, qname)
namespace, element = name
if namespace == u'http://www.eucalyptus.com/ns/reporting/export/2012-08-24/' and element == u'Export':
self.switchTarget( self.replyGenerator )

def startElementNS(self, name, qname, attrs):
namespace, element = name
if namespace == u'http://www.eucalyptus.com/ns/reporting/export/2012-08-24/' and element == u'Export':
self.switchTarget( self.generator )
XMLFilterBase.startElementNS(self, name, qname, attrs)

def switchTarget(self, target):
self._cont_handler = target
self._dtd_handler = target
self._ent_handler = target
self._err_handler = target

def getReply(self):
return self.replyData.getvalue()
8 changes: 2 additions & 6 deletions clc/eucadmin/eucadmin/generatereport.py
Expand Up @@ -44,11 +44,11 @@ class GenerateReport(ReportsRequest):
Param(name='Start',
short_name='s', long_name='start-date',
ptype='datetime',
doc='The inclusive start date for the report period (e.g. 2012-08-19)'),
doc='the inclusive start date for the report period (e.g. 2012-08-19)'),
Param(name='End',
short_name='e', long_name='end-date',
ptype='datetime',
doc='The exclusive end date for the report period (e.g. 2012-08-26)'),
doc='the exclusive end date for the report period (e.g. 2012-08-26)'),
Param(name='Force',
short_name='F', long_name='force',
ptype='boolean', request_param=False,
Expand Down Expand Up @@ -78,10 +78,6 @@ def cli_formatter(self, data):
else:
print report

def process_date_param(self, arg):
if arg in self.request_params:
self.request_params[arg] += 'T00:00:00'

def process_args(self, **args):
super(GenerateReport, self).process_args( **args )
self.file = self.args['file']
Expand Down

0 comments on commit 01bc820

Please sign in to comment.