#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
In-memory persistent stub for the Python datastore API. Gets, queries,
and searches are implemented as in-memory scans over all entities.
Stores entities across sessions as pickled proto bufs in a single file. On
startup, all entities are read from the file and loaded into memory. On
every Put(), the file is wiped and all entities are written from scratch.
Clients can also manually Read() and Write() the file themselves.
Transactions are serialized through __tx_lock. Each transaction acquires it
when it begins and releases it when it commits or rolls back. This is
important, since there are other member variables like __tx_snapshot that are
per-transaction, so they should only be used by one tx at a time.
"""
import datetime
import logging
import os
import pickle
import struct
import sys
import tempfile
import threading
import types
import warnings
from google.appengine.api import api_base_pb
from google.appengine.api import datastore
from google.appengine.api import datastore_admin
from google.appengine.api import datastore_errors
from google.appengine.api import datastore_types
from google.appengine.api import users
from google.appengine.datastore import datastore_pb
from google.appengine.datastore import datastore_index
from google.appengine.runtime import apiproxy_errors
from google.net.proto import ProtocolBuffer
from google.appengine.datastore import entity_pb
warnings.filterwarnings('ignore', 'tempnam is a potential security risk')
entity_pb.Reference.__hash__ = lambda self: hash(self.Encode())
datastore_pb.Query.__hash__ = lambda self: hash(self.Encode())
class DatastoreFileStub(object):
""" Persistent stub for the Python datastore API.
Stores all entities in memory, and persists them to a file as pickled
protocol buffers. A DatastoreFileStub instance handles a single app's data
and is backed by files on disk.
"""
def __init__(self, app_id, datastore_file, history_file,
require_indexes=False):
"""Constructor.
Initializes and loads the datastore from the backing files, if they exist.
Args:
app_id: string
datastore_file: string, stores all entities across sessions. Use None
not to use a file.
history_file: string, stores query history. Use None as with
datastore_file.
require_indexes: bool, default False. If True, composite indexes must
exist in index.yaml for queries that need them.
"""
assert isinstance(app_id, types.StringTypes) and app_id != ''
self.__app_id = app_id
self.__datastore_file = datastore_file
self.__history_file = history_file
self.__entities = {}
self.__tx_snapshot = {}
self.__queries = {}
self.__transactions = {}
self.__indexes = {}
self.__require_indexes = require_indexes
self.__query_history = {}
self.__next_id = 1
self.__next_cursor = 1
self.__next_tx_handle = 1
self.__next_index_id = 1
self.__id_lock = threading.Lock()
self.__cursor_lock = threading.Lock()
self.__tx_handle_lock = threading.Lock()
self.__index_id_lock = threading.Lock()
self.__tx_lock = threading.Lock()
self.__entities_lock = threading.Lock()
self.__file_lock = threading.Lock()
self.__indexes_lock = threading.Lock()
self.Read()
def Clear(self):
""" Clears the datastore by deleting all currently stored entities and
queries. """
self.__entities = {}
self.__queries = {}
self.__transactions = {}
self.__query_history = {}
def Read(self):
""" Reads the datastore and history files into memory.
The in-memory query history is cleared, but the datastore is *not*
cleared; the entities in the files are merged into the entities in memory.
If you want them to overwrite the in-memory datastore, call Clear() before
calling Read().
If the datastore file contains an entity with the same app name, kind, and
key as an entity already in the datastore, the entity from the file
overwrites the entity in the datastore.
Also sets __next_id to one greater than the highest id allocated so far.
"""
pb_exceptions = (ProtocolBuffer.ProtocolBufferDecodeError, LookupError,
TypeError, ValueError)
error_msg = ('Data in %s is corrupt or a different version. '
'Try running with the --clear_datastore flag.\n%r')
if self.__datastore_file and self.__datastore_file != '/dev/null':
for encoded_entity in self.__ReadPickled(self.__datastore_file):
try:
entity = entity_pb.EntityProto(encoded_entity)
except pb_exceptions, e:
raise datastore_errors.InternalError(error_msg %
(self.__datastore_file, e))
last_path = entity.key().path().element_list()[-1]
app_kind = (entity.key().app(), last_path.type())
kind_dict = self.__entities.setdefault(app_kind, {})
kind_dict[entity.key()] = entity
if last_path.has_id() and last_path.id() >= self.__next_id:
self.__next_id = last_path.id() + 1
self.__query_history = {}
for encoded_query, count in self.__ReadPickled(self.__history_file):
try:
query_pb = datastore_pb.Query(encoded_query)
except pb_exceptions, e:
raise datastore_errors.InternalError(error_msg %
(self.__history_file, e))
if query_pb in self.__query_history:
self.__query_history[query_pb] += count
else:
self.__query_history[query_pb] = count
def Write(self):
""" Writes out the datastore and history files. Be careful! If the files
already exist, this method overwrites them!
"""
self.__WriteDatastore()
self.__WriteHistory()
def __WriteDatastore(self):
""" Writes out the datastore file. Be careful! If the file already exist,
this method overwrites it!
"""
if self.__datastore_file and self.__datastore_file != '/dev/null':
encoded = []
for kind_dict in self.__entities.values():
for entity in kind_dict.values():
encoded.append(entity.Encode())
self.__WritePickled(encoded, self.__datastore_file)
def __WriteHistory(self):
""" Writes out the history file. Be careful! If the file already exist,
this method overwrites it!
"""
if self.__history_file and self.__history_file != '/dev/null':
encoded = [(query.Encode(), count)
for query, count in self.__query_history.items()]
self.__WritePickled(encoded, self.__history_file)
def __ReadPickled(self, filename):
"""Reads a pickled object from the given file and returns it.
"""
self.__file_lock.acquire()
try:
try:
if filename and filename != '/dev/null' and os.path.isfile(filename):
return pickle.load(open(filename, 'rb'))
else:
logging.warning('Could not read datastore data from %s', filename)
except (AttributeError, LookupError, NameError, TypeError,
ValueError, struct.error, pickle.PickleError), e:
raise datastore_errors.InternalError(
'Could not read data from %s. Try running with the '
'--clear_datastore flag. Cause:\n%r' % (filename, e))
finally:
self.__file_lock.release()
return []
def __WritePickled(self, obj, filename, openfile=file):
"""Pickles the object and writes it to the given file.
"""
if not filename or filename == '/dev/null' or not obj:
return
tmpfile = openfile(os.tempnam(os.path.dirname(filename)), 'wb')
pickle.dump(obj, tmpfile, 1)
tmpfile.close()
self.__file_lock.acquire()
try:
try:
os.rename(tmpfile.name, filename)
except OSError:
try:
os.remove(filename)
except:
pass
os.rename(tmpfile.name, filename)
finally:
self.__file_lock.release()
def MakeSyncCall(self, service, call, request, response):
""" The main RPC entry point. service must be 'datastore_v3'. So far, the
supported calls are 'Get', 'Put', 'RunQuery', 'Next', and 'Count'.
"""
assert service == 'datastore_v3'
explanation = []
assert request.IsInitialized(explanation), explanation
(getattr(self, "_Dynamic_" + call))(request, response)
assert response.IsInitialized(explanation), explanation
def ResolveAppId(self, app):
""" If the given app name is the placeholder for the local app, returns
our app_id. Otherwise returns the app name unchanged.
"""
assert app != ''
if app == datastore._LOCAL_APP_ID:
return self.__app_id
else:
return app
def QueryHistory(self):
"""Returns a dict that maps Query PBs to times they've been run.
"""
return dict((pb, times) for pb, times in self.__query_history.items()
if pb.app() == self.__app_id)
def _Dynamic_Put(self, put_request, put_response):
clones = []
for entity in put_request.entity_list():
clone = entity_pb.EntityProto()
clone.CopyFrom(entity)
clones.append(clone)
assert clone.has_key()
assert clone.key().path().element_size() > 0
app = self.ResolveAppId(clone.key().app())
clone.mutable_key().set_app(app)
last_path = clone.key().path().element_list()[-1]
if last_path.id() == 0 and not last_path.has_name():
self.__id_lock.acquire()
last_path.set_id(self.__next_id)
self.__next_id += 1
self.__id_lock.release()
assert clone.entity_group().element_size() == 0
group = clone.mutable_entity_group()
root = clone.key().path().element(0)
group.add_element().CopyFrom(root)
else:
assert (clone.has_entity_group() and
clone.entity_group().element_size() > 0)
self.__entities_lock.acquire()
try:
for clone in clones:
last_path = clone.key().path().element_list()[-1]
kind_dict = self.__entities.setdefault((app, last_path.type()), {})
kind_dict[clone.key()] = clone
finally:
self.__entities_lock.release()
if not put_request.has_transaction():
self.__WriteDatastore()
put_response.key_list().extend([c.key() for c in clones])
def _Dynamic_Get(self, get_request, get_response):
for key in get_request.key_list():
app = self.ResolveAppId(key.app())
key.set_app(app)
last_path = key.path().element_list()[-1]
group = get_response.add_entity()
try:
entity = self.__entities[app, last_path.type()][key]
except KeyError:
entity = None
if entity:
group.mutable_entity().CopyFrom(entity)
def _Dynamic_Delete(self, delete_request, delete_response):
self.__entities_lock.acquire()
try:
for key in delete_request.key_list():
try:
app = self.ResolveAppId(key.app())
key.set_app(app)
kind = key.path().element_list()[-1].type()
del self.__entities[app, kind][key]
if not self.__entities[app, kind]:
del self.__entities[app, kind]
except KeyError:
pass
if not delete_request.has_transaction():
self.__WriteDatastore()
finally:
self.__entities_lock.release()
def _Dynamic_RunQuery(self, query, query_result):
if not self.__tx_lock.acquire(False):
raise apiproxy_errors.ApplicationError(
datastore_pb.Error.BAD_REQUEST, "Can't query inside a transaction.")
else:
self.__tx_lock.release()
app = self.ResolveAppId(query.app())
if self.__require_indexes:
required_index = datastore_index.CompositeIndexForQuery(query)
if required_index is not None:
kind, ancestor, props, num_eq_filters = required_index
required_key = kind, ancestor, props
indexes = self.__indexes.get(app)
if not indexes:
raise apiproxy_errors.ApplicationError(
datastore_pb.Error.BAD_REQUEST,
"This query requires a composite index, but none are defined. "
"You must create an index.yaml file in your application root.")
eq_filters_set = set(props[:num_eq_filters])
remaining_filters = props[num_eq_filters:]
for index in indexes:
definition = datastore_admin.ProtoToIndexDefinition(index)
index_key = datastore_index.IndexToKey(definition)
if required_key == index_key:
break
if num_eq_filters > 1 and (kind, ancestor) == index_key[:2]:
this_props = index_key[2]
this_eq_filters_set = set(this_props[:num_eq_filters])
this_remaining_filters = this_props[num_eq_filters:]
if (eq_filters_set == this_eq_filters_set and
remaining_filters == this_remaining_filters):
break
else:
raise apiproxy_errors.ApplicationError(
datastore_pb.Error.BAD_REQUEST,
"This query requires a composite index that is not defined. "
"You must update the index.yaml file in your application root.")
try:
query.set_app(app)
results = self.__entities[app, query.kind()].values()
results = [datastore.Entity._FromPb(pb) for pb in results]
except KeyError:
results = []
if query.has_ancestor():
ancestor_path = query.ancestor().path().element_list()
def is_descendant(entity):
path = entity.key()._Key__reference.path().element_list()
return path[:len(ancestor_path)] == ancestor_path
results = filter(is_descendant, results)
operators = {datastore_pb.Query_Filter.LESS_THAN: '<',
datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL: '<=',
datastore_pb.Query_Filter.GREATER_THAN: '>',
datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL: '>=',
datastore_pb.Query_Filter.EQUAL: '==',
}
for filt in query.filter_list():
assert filt.op() != datastore_pb.Query_Filter.IN
prop = filt.property(0).name().decode('utf-8')
op = operators[filt.op()]
def passes(entity):
""" Returns True if the entity passes the filter, False otherwise. """
entity_vals = entity.get(prop, [])
if type(entity_vals) is not types.ListType:
entity_vals = [entity_vals]
entity_property_list = [datastore_types.ToPropertyPb(prop, value)
for value in entity_vals]
for entity_prop in entity_property_list:
fixed_entity_val = datastore_types.FromPropertyPb(entity_prop)
for filter_prop in filt.property_list():
filter_val = datastore_types.FromPropertyPb(filter_prop)
comp = u'%r %s %r' % (fixed_entity_val, op, filter_val)
logging.log(logging.DEBUG - 1,
'Evaling filter expression "%s"', comp)
if eval(comp):
return True
return False
results = filter(passes, results)
for order in query.order_list():
prop = order.property().decode('utf-8')
results = [entity for entity in results if prop in entity]
def order_compare(a, b):
""" Return a negative, zero or positive number depending on whether
entity a is considered smaller than, equal to, or larger than b,
according to the query's orderings. """
for o in query.order_list():
prop = o.property().decode('utf-8')
a_values =