Skip to content
Browse files

Added initial elasticsearch datastore

  • Loading branch information...
1 parent f599825 commit d64f38dd2db742d980dab3a67143e8328f5b6d83 Alberto Paro committed Apr 13, 2012
Showing with 336 additions and 88 deletions.
  1. +1 −0 .gitignore
  2. +3 −1 brewery/ds/__init__.py
  3. +24 −23 brewery/ds/base.py
  4. +244 −0 brewery/ds/elasticsearch_streams.py
  5. +7 −7 brewery/ds/mongo_streams.py
  6. +20 −20 brewery/ds/sql_streams.py
  7. +37 −37 brewery/streams.py
View
1 .gitignore
@@ -4,6 +4,7 @@
sandbox/*
.DS_Store
build/*
+.idea/*
env/
dist/
*.tmproj
View
4 brewery/ds/__init__.py
@@ -6,6 +6,7 @@
from brewery.ds.xls_streams import *
from brewery.ds.gdocs_streams import *
from brewery.ds.mongo_streams import *
+from brewery.ds.elasticsearch_streams import *
from brewery.ds.stream_auditor import *
from brewery.ds.yaml_dir_streams import *
from brewery.ds.sql_streams import *
@@ -24,11 +25,12 @@
"CSVDataTarget",
"XLSDataSource",
"MongoDBDataSource",
+ "ESDataSource",
"GoogleSpreadsheetDataSource",
"YamlDirectoryDataSource",
"YamlDirectoryDataTarget",
"SQLDataSource",
"SQLDataTarget",
"StreamAuditor",
"SimpleHTMLDataTarget"
-)
+)
View
47 brewery/ds/base.py
@@ -6,13 +6,13 @@
# Data sources
# ============
-#
+#
# Should implement:
# * fields
# * prepare()
# * rows() - returns iterable with value tuples
# * records() - returns iterable with dictionaries of key-value pairs
-#
+#
# Data targets
# ============
# Should implement:
@@ -26,15 +26,15 @@
import urllib2
import urlparse
import brewery.dq
-import copy
+from brewery.metadata import collapse_record, Field
def open_resource(resource, mode = None):
"""Get file-like handle for a resource. Conversion:
-
+
* if resource is a string and it is not URL or it is file:// URL, then opens a file
* if resource is URL then opens urllib2 handle
* otherwise assume that resource is a file-like handle
-
+
Returns tuple: (handle, should_close) where `handle` is file-like object and `should_close` is
a flag whether returned handle should be closed or not. Closed should be resources which
where opened by this method, that is resources referenced by a string or URL.
@@ -55,17 +55,18 @@ def open_resource(resource, mode = None):
handle = resource
return (handle, should_close)
+
class DataStream(object):
"""Shared methods for data targets and data sources"""
-
+
def __init__(self):
"""
A data stream object – abstract class.
-
+
The subclasses should provide:
-
+
* `fields`
-
+
`fields` are :class:`FieldList` objects representing fields passed
through the receiving stream - either read from data source
(:meth:`DataSource.rows`) or written to data target
@@ -75,12 +76,12 @@ def __init__(self):
accessor).
The subclasses might override:
-
+
* `initialize()`
* `finalize()`
-
+
The class supports context management, for example::
-
+
with ds.CSVDataSource("output.csv") as s:
for row in s.rows():
print row
@@ -94,31 +95,31 @@ def initialize(self):
"""Delayed stream initialisation code. Subclasses might override this
method to implement file or handle opening, connecting to a database,
doing web authentication, ... By default this method does nothing.
-
+
The method does not take any arguments, it expects pre-configured
object.
"""
pass
def finalize(self):
"""Subclasses might put finalisation code here, for example:
-
+
* closing a file stream
* sending data over network
* writing a chart image to a file
-
+
Default implementation does nothing.
"""
pass
-
+
# Context management
#
# See: http://docs.python.org/reference/datamodel.html#context-managers
#
def __enter__(self):
self.initialize()
return self
-
+
def __exit__(self, exc_type, exc_value, traceback):
self.finalize()
@@ -146,17 +147,17 @@ def read_fields(self, limit = 0, collapse = False):
provide metadata directly, such as CSV files, document bases databases or directories with
structured files. Does nothing in relational databases, as fields are represented by table
columns and table metadata can obtained from database easily.
-
+
Note that this method can be quite costly, as by default all records within dataset are read
and analysed.
-
+
After executing this method, stream ``fields`` is set to the newly read field list and may
be configured (set more appropriate data types for example).
-
+
:Arguments:
- `limit`: read only specified number of records from dataset to guess field properties
- `collapse`: whether records are collapsed into flat structure or not
-
+
Returns: tuple with Field objects. Order of fields is datastore adapter specific.
"""
@@ -221,7 +222,7 @@ def __init__(self):
def append(self, object):
"""Append an object into dataset. Object can be a tuple, array or a dict object. If tuple
or array is used, then value position should correspond to field position in the field list,
- if dict is used, the keys should be valid field names.
+ if dict is used, the keys should be valid field names.
"""
raise NotImplementedError()
-
+
View
244 brewery/ds/elasticsearch_streams.py
@@ -0,0 +1,244 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import base
+from brewery import dq
+import time
+from brewery.metadata import expand_record
+
+try:
+ from pyes.es import ES
+except ImportError:
+ from brewery.utils import MissingPackage
+ pyes = MissingPackage("pyes", "ElasticSearch streams", "http://www.elasticsearch.org/")
+
+class ESDataSource(base.DataSource):
+ """docstring for ClassName
+ """
+ def __init__(self, document_type, database=None, host=None, port=None,
+ expand=False, **elasticsearch_args):
+ """Creates a ElasticSearch data source stream.
+
+ :Attributes:
+ * document_type: elasticsearch document_type name
+ * database: database name
+ * host: elasticsearch database server host, default is ``localhost``
+ * port: elasticsearch port, default is ``27017``
+ * expand: expand dictionary values and treat children as top-level keys with dot '.'
+ separated key path to the child..
+ """
+ self.document_type = document_type
+ self.database_name = database
+ self.host = host
+ self.port = port
+ self.elasticsearch_args = elasticsearch_args
+ self.expand = expand
+ self.connection = None
+ self._fields = None
+
+ def initialize(self):
+ """Initialize ElasticSearch source stream:
+ """
+ args = self.elasticsearch_args.copy()
+ server = ""
+ if self.host:
+ server = self.host
+ if self.port:
+ server += ":" + self.port
+
+ self.connection = ES(server, **args)
+ self.connection.default_indices = self.database_name
+ self.connection.default_types = self.document_type
+
+ def read_fields(self, limit=0):
+ keys = []
+ probes = {}
+
+ def probe_record(record, parent=None):
+ for key, value in record.items():
+ if parent:
+ full_key = parent + "." + key
+ else:
+ full_key = key
+
+ if self.expand and type(value) == dict:
+ probe_record(value, full_key)
+ continue
+
+ if not full_key in probes:
+ probe = dq.FieldTypeProbe(full_key)
+ probes[full_key] = probe
+ keys.append(full_key)
+ else:
+ probe = probes[full_key]
+ probe.probe(value)
+
+ for record in self.document_type.find(limit=limit):
+ probe_record(record)
+
+ fields = []
+
+ for key in keys:
+ probe = probes[key]
+ field = base.Field(probe.field)
+
+ storage_type = probe.unique_storage_type
+ if not storage_type:
+ field.storage_type = "unknown"
+ elif storage_type == "unicode":
+ field.storage_type = "string"
+ else:
+ field.storage_type = "unknown"
+ field.concrete_storage_type = storage_type
+
+ # FIXME: Set analytical type
+
+ fields.append(field)
+
+ self._fields = list(fields)
+ return self._fields
+
+ def rows(self):
+ if not self.connection:
+ raise RuntimeError("Stream is not initialized")
+ from pyes.query import MatchAllQuery
+ fields = self.field_names
+ results = self.connection.search(MatchAllQuery(), search_type="scan", timeout="5m", size="200")
+ return ESRowIterator(results, fields)
+
+ def records(self):
+ if not self.connection:
+ raise RuntimeError("Stream is not initialized")
+ from pyes.query import MatchAllQuery
+ results = self.connection.search(MatchAllQuery(), search_type="scan", timeout="5m", size="200")
+ return ESRecordIterator(results, self.expand)
+
+class ESRowIterator(object):
+ """Wrapper for ElasticSearch ResultSet to be able to return rows() as tuples and records() as
+ dictionaries"""
+ def __init__(self, resultset, field_names):
+ self.resultset = resultset
+ self.field_names = field_names
+
+ def __getitem__(self, index):
+ record = self.resultset.__getitem__(index)
+
+ array = []
+
+ for field in self.field_names:
+ value = record
+ for key in field.split('.'):
+ if key in value:
+ value = value[key]
+ else:
+ break
+ array.append(value)
+
+ return tuple(array)
+
+class ESRecordIterator(object):
+ """Wrapper for ElasticSearch ResultSet to be able to return rows() as tuples and records() as
+ dictionaries"""
+ def __init__(self, resultset, expand=False):
+ self.resultset = resultset
+ self.expand = expand
+
+ def __getitem__(self, index):
+ def expand_record(record, parent=None):
+ ret = {}
+ for key, value in record.items():
+ if parent:
+ full_key = parent + "." + key
+ else:
+ full_key = key
+
+ if type(value) == dict:
+ expanded = expand_record(value, full_key)
+ ret.update(expanded)
+ else:
+ ret[full_key] = value
+ return ret
+
+ record = self.resultset.__getitem__(index)
+ if not self.expand:
+ return record
+ else:
+ return expand_record(record)
+
+class ESDataTarget(base.DataTarget):
+ """docstring for ClassName
+ """
+ def __init__(self, document_type, database="test", host="127.0.0.1", port="9200",
+ truncate=False, expand=False, **elasticsearch_args):
+ """Creates a ElasticSearch data target stream.
+
+ :Attributes:
+ * document_ElasticSearch elasticsearch document_type name
+ * database: database name
+ * host: ElasticSearch database server host, default is ``localhost``
+ * port: ElasticSearch port, default is ``9200``
+ * expand: expand dictionary values and treat children as top-level keys with dot '.'
+ separated key path to the child..
+ * truncate: delete existing data in the document_type. Default: False
+ """
+ self.document_type = document_type
+ self.database_name = database
+ self.host = host
+ self.port = port
+ self.elasticsearch_args = elasticsearch_args
+ self.expand = expand
+ self.truncate = truncate
+ self._fields = None
+
+ def initialize(self):
+ """Initialize ElasticSearch source stream:
+ """
+ from pyes.es import ES
+ from pyes.exceptions import IndexAlreadyExistsException
+
+ args = self.elasticsearch_args.copy()
+ server = ""
+ if self.host:
+ server = self.host
+ if self.port:
+ server += ":" + self.port
+
+ create = args.pop("create", False)
+ replace = args.pop("replace", False)
+
+ self.connection = ES(server, **args)
+ self.connection.default_indices = self.database_name
+ self.connection.default_types = self.document_type
+
+ created = False
+ if create:
+ try:
+ self.connection.create_index(self.database_name)
+ self.connection.refresh(self.database_name)
+ created = True
+ except IndexAlreadyExistsException:
+ pass
+
+ if replace and not created:
+ self.connection.delete_index_if_exists(self.database_name)
+ time.sleep(2)
+ self.connection.create_index(self.database_name)
+ self.connection.refresh(self.database_name)
+
+ if self.truncate:
+ self.connection.delete_mapping(self.database_name, self.document_type)
+ self.connection.refresh(self.database_name)
+
+ def append(self, obj):
+ record = obj
+ if not isinstance(obj, dict):
+ record = dict(zip(self.field_names, obj))
+
+ if self.expand:
+ record = expand_record(record)
+
+ id = record.get('id') or record.get('_id')
+ self.connection.index(record, self.database_name, self.document_type, id, bulk=True)
+
+ def finalize(self):
+ self.connection.flush_bulk(forced=True)
View
14 brewery/ds/mongo_streams.py
@@ -6,7 +6,7 @@
try:
import pymongo
-except:
+except ImportError:
from brewery.utils import MissingPackage
pymongo = MissingPackage("pymongo", "MongoDB streams", "http://www.mongodb.org/downloads/")
@@ -16,7 +16,7 @@ class MongoDBDataSource(base.DataSource):
def __init__(self, collection, database=None, host=None, port=None,
expand=False, **mongo_args):
"""Creates a MongoDB data source stream.
-
+
:Attributes:
* collection: mongo collection name
* database: database name
@@ -118,7 +118,7 @@ def records(self):
return MongoDBRecordIterator(iterator, self.expand)
class MongoDBRowIterator(object):
- """Wrapper for pymongo.cursor.Cursor to be able to return rows() as tuples and records() as
+ """Wrapper for pymongo.cursor.Cursor to be able to return rows() as tuples and records() as
dictionaries"""
def __init__(self, cursor, field_names):
self.cursor = cursor
@@ -164,7 +164,7 @@ def collapse_record(record, parent=None):
return ret
class MongoDBRecordIterator(object):
- """Wrapper for pymongo.cursor.Cursor to be able to return rows() as tuples and records() as
+ """Wrapper for pymongo.cursor.Cursor to be able to return rows() as tuples and records() as
dictionaries"""
def __init__(self, cursor, expand=False):
self.cursor = cursor
@@ -175,7 +175,7 @@ def __iter__(self):
def next(self):
record = self.cursor.next()
-
+
if not record:
raise StopIteration
@@ -200,7 +200,7 @@ def __init__(self, collection, database=None, host=None, port=None,
separated key path to the child..
* truncate: delete existing data in the collection. Default: False
"""
-
+
self.collection_name = collection
self.database_name = database
self.host = host
@@ -228,7 +228,7 @@ def initialize(self):
if self.truncate:
self.collection.remove()
-
+
self.field_names = self.fields.names()
def append(self, obj):
View
40 brewery/ds/sql_streams.py
@@ -51,10 +51,10 @@ def split_table_schema(table_name):
else:
return (None, split[0])
-
+
class SQLContext(object):
"""Holds context of SQL store operations."""
-
+
def __init__(self, url = None, connection = None, schema = None):
"""Creates a SQL context"""
@@ -71,26 +71,26 @@ def __init__(self, url = None, connection = None, schema = None):
engine = sqlalchemy.create_engine(url)
self.connection = engine.connect()
self.should_close = True
-
+
self.metadata = sqlalchemy.MetaData()
self.metadata.bind = self.connection.engine
self.schema = schema
-
+
def close(self):
if self.should_close and self.connection:
self.connection.close()
-
+
def table(self, name, autoload=True):
"""Get table by name"""
-
- return sqlalchemy.Table(name, self.metadata,
+
+ return sqlalchemy.Table(name, self.metadata,
autoload=autoload, schema=self.schema)
def fields_from_table(table):
"""Get fields from a table. Field types are normalized to the Brewery
data types. Analytical type is set according to a default conversion
dictionary."""
-
+
fields = []
for column in table.columns:
@@ -118,14 +118,14 @@ def concrete_storage_type(field, type_map={}):
dictionary"""
concrete_type = field.concrete_storage_type
-
+
if not isinstance(concrete_type, sqlalchemy.types.TypeEngine):
if type_map:
concrete_type = type_map.get(field.storage_type)
if not concrete_type:
concrete_type = concrete_sql_type_map.get(field.storage_type)
-
+
if not concrete_type:
raise ValueError("unable to find concrete storage type for field '%s' "
"of type '%s'" % (field.name, field.storage_type))
@@ -139,13 +139,13 @@ def __init__(self, connection=None, url=None,
table=None, statement=None, schema=None, autoinit = True,
**options):
"""Creates a relational database data source stream.
-
+
:Attributes:
* url: SQLAlchemy URL - either this or connection should be specified
* connection: SQLAlchemy database connection - either this or url should be specified
* table: table name
* statement: SQL statement to be used as a data source (not supported yet)
- * autoinit: initialize on creation, no explicit initialize() is
+ * autoinit: initialize on creation, no explicit initialize() is
needed
* options: SQL alchemy connect() options
"""
@@ -174,7 +174,7 @@ def __init__(self, connection=None, url=None,
self.context = None
self.table = None
self.fields = None
-
+
if autoinit:
self.initialize()
@@ -219,7 +219,7 @@ def __init__(self, connection=None, url=None,
buffer_size=None, fields=None, concrete_type_map=None,
**options):
"""Creates a relational database data target stream.
-
+
:Attributes:
* url: SQLAlchemy URL - either this or connection should be specified
* connection: SQLAlchemy database connection - either this or url should be specified
@@ -235,9 +235,9 @@ def __init__(self, connection=None, url=None,
* buffer_size: size of INSERT buffer - how many records are collected before they are
inserted using multi-insert statement. Default is 1000
* fields : fieldlist for a new table
-
+
Note: avoid auto-detection when you are reading from remote URL stream.
-
+
"""
if not options:
options = {}
@@ -254,7 +254,7 @@ def __init__(self, connection=None, url=None,
self.table = None
self.fields = fields
-
+
self.concrete_type_map = concrete_type_map
if id_key_name:
@@ -271,7 +271,7 @@ def initialize(self):
"""Initialize source stream:
"""
- self.context = SQLContext(url=self.url,
+ self.context = SQLContext(url=self.url,
connection=self.connection,
schema=self.schema)
@@ -285,7 +285,7 @@ def initialize(self):
if not self.fields:
self.fields = fields_from_table(self.table)
-
+
self.field_names = self.fields.names()
self.insert_command = self.table.insert()
@@ -311,7 +311,7 @@ def _create_table(self):
if self.add_id_key:
id_key_name = self.id_key_name or 'id'
- sequence_name = "seq_" + name + "_" + id_key_name
+ sequence_name = "seq_" + self.table_name + "_" + id_key_name
sequence = sqlalchemy.schema.Sequence(sequence_name, optional=True)
col = sqlalchemy.schema.Column(id_key_name,
View
74 brewery/streams.py
@@ -4,6 +4,7 @@
import threading
import traceback
import sys
+from brewery.nodes.base import node_dictionary
from brewery.utils import get_logger
from brewery.nodes import *
from brewery.common import *
@@ -69,7 +70,7 @@ class Pipe(SimpleDataPipe):
"""Data pipe:
Contains buffer for data that should be thransferred to another node.
Data are being sent t other node when the buffer is full. Pipe is one-directional where
- one thread is sending data to another thread. There is only one backward signalling: closing
+ one thread is sending data to another thread. There is only one backward signalling: closing
the pipe from remote object.
@@ -226,7 +227,7 @@ def __init__(self, nodes=None, connections=None):
* `nodes` - dictionary with keys as node names and values as nodes
* `connections` - list of two-item tuples. Each tuple contains source and target node
or source and target node name.
- * `stream` - another stream or
+ * `stream` - another stream or
"""
super(Stream, self).__init__(nodes, connections)
self.logger = get_logger()
@@ -236,33 +237,33 @@ def __init__(self, nodes=None, connections=None):
def fork(self):
"""Creates a construction fork of the stream. Used for constructing streams in functional
fashion. Example::
-
+
stream = Stream()
fork = stream.fork()
fork.csv_source("fork.csv")
fork.formatted_printer()
stream.run()
-
+
Fork responds to node names as functions. The function arguments are the same as node
constructor (__init__ method) arguments. Each call will append new node to the fork and
will connect the new node to the previous node in the fork.
-
+
To configure current node you can use ``fork.node``, like::
-
+
fork.csv_source("fork.csv")
fork.node.read_header = True
-
+
To set actual node name use ``set_name()``::
fork.csv_source("fork.csv")
fork.set_name("source")
-
+
...
-
+
source_node = stream.node("source")
-
+
To fork a fork, just call ``fork()``
"""
@@ -279,7 +280,7 @@ def update(self, nodes = None, connections = None):
# FIXME: use either node type identifier or fully initialized node, not
# node class (Warning: might break some existing code,
# depreciate it first
-
+
nodes = nodes or {}
connections = connections or []
@@ -314,26 +315,26 @@ def configure(self, config = {}):
`config` is a list of dictionaries with keys: ``node`` - node name, ``parameter`` - node parameter
name, ``value`` - parameter value
-
+
.. warning:
-
- This method might change to a list of dictionaries where one
+
+ This method might change to a list of dictionaries where one
dictionary will represent one node, keys will be attributes.
-
+
"""
-
+
# FIXME: this is wrong, it should be a single dict per node (or not?)
# List of attributes:
# * can reflect a form for configuring whole stream
# * can have attribute order regardless of their node ownership
# List of nodes:
# * bundled attributes in single dictioary
# FIXME: this is inconsistent with node configuration! node.config()
-
+
configurations = {}
# Collect configurations for each node
-
+
for attribute in config:
node_name = attribute["node"]
attribute_name = attribute["attribute"]
@@ -352,15 +353,15 @@ def configure(self, config = {}):
for (node_name, config) in configurations.items():
node = self.coalesce_node(node_name)
node.configure(config)
-
+
def _initialize(self):
"""Initializes the data processing stream:
-
+
* sorts nodes based on connection dependencies
* creates pipes between nodes
* initializes each node
* initializes pipe fields
-
+
"""
self.logger.info("initializing stream")
@@ -404,12 +405,12 @@ def _initialize(self):
def run(self):
"""Run all nodes in the stream.
-
+
Each node is being wrapped and run in a separate thread.
-
+
When an exception occurs, the stream is stopped and all catched exceptions are stored in
attribute `exceptions`.
-
+
"""
self._initialize()
@@ -503,20 +504,20 @@ def _finalize(self):
for node in self.sorted_nodes():
self.logger.debug("finalizing node %s" % node_label(node))
node.finalize()
-
+
def node_label(node):
"""Debug label for a node: node identifier with python object id."""
return "%s(%s)" % (node.identifier() or str(type(node)), id(node))
-
+
class _StreamNodeThread(threading.Thread):
def __init__(self, node):
"""Creates a stream node thread.
-
+
:Attributes:
* `node`: a Node object
* `exception`: attribute will contain exception if one occurs during run()
* `traceback`: will contain traceback if exception occurs
-
+
"""
super(_StreamNodeThread, self).__init__()
self.node = node
@@ -526,7 +527,7 @@ def __init__(self, node):
def run(self):
"""Wrapper method for running a node"""
-
+
label = node_label(self.node)
self.logger.debug("%s: start" % label)
try:
@@ -585,31 +586,31 @@ def fork(self):
def merge(self, obj, **kwargs):
"""Joins two streams using the MergeNode (please refer to the node documentaton
for more information).
-
+
`obj` is a fork or a node to be merged. `kwargs` are MergeNode configuration arguments,
such as `joins`.
-
+
"""
raise NotImplementedError
# if type(obj) == StreamFork:
# node = obj.node
# else:
# node = obj
- #
+ #
# self.stream.append(node)
- #
+ #
# merge = MergeNode(**kwargs)
# self.stream.append(merge)
# self.stream.connect()
def append(self, obj):
"""Appends data from nodes using AppendNode"""
- raise NotImplementedError
+ raise NotImplementedError
def __getattr__(self, name):
"""Returns node class"""
# FIXME: use create_node here
-
+
class_dict = node_dictionary()
node_class = class_dict[name]
@@ -633,5 +634,4 @@ def create_builder():
"""Creates a stream builder for incremental stream building."""
stream = Stream()
return stream.fork()
-
-
+

0 comments on commit d64f38d

Please sign in to comment.
Something went wrong with that request. Please try again.