Skip to content

Commit

Permalink
Merge pull request #6 from fgreg/SDAP-32
Browse files Browse the repository at this point in the history
SDAP-32 CassandraProxy time series tiles to support time arrays
SDAP-27 Refactor data-access to use nexusproto
  • Loading branch information
fgreg committed Mar 12, 2018
2 parents ca3f45d + 6522656 commit 2004ea0
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 36 deletions.
1 change: 1 addition & 0 deletions data-access/.gitignore
Expand Up @@ -34,6 +34,7 @@ __pycache__/

# C extensions
*.so
*.c

# Distribution / packaging
.Python
Expand Down
2 changes: 1 addition & 1 deletion data-access/nexustiles/config/datastores.ini
Expand Up @@ -17,4 +17,4 @@ host=localhost:8983
core=nexustiles

[datastore]
store=s3
store=cassandra
17 changes: 11 additions & 6 deletions data-access/nexustiles/dao/CassandraProxy.pyx
Expand Up @@ -17,7 +17,7 @@ import uuid
from ConfigParser import NoOptionError
from multiprocessing.synchronize import Lock

import nexusproto.NexusContent_pb2 as nexusproto
import nexusproto.DataTile_pb2 as nexusproto
import numpy as np
from cassandra.cqlengine import columns, connection, CQLEngineException
from cassandra.cqlengine.models import Model
Expand Down Expand Up @@ -99,19 +99,24 @@ class NexusTileData(Model):
time_series_tile = self._get_nexus_tile().time_series_tile

time_series_tile_data = np.ma.masked_invalid(from_shaped_array(time_series_tile.variable_data))
time_data = np.array([time_series_tile.time])
time_data = np.ma.masked_invalid(from_shaped_array(time_series_tile.time)).reshape(-1)
latitude_data = np.ma.masked_invalid(from_shaped_array(time_series_tile.latitude))
longitude_data = np.ma.masked_invalid(from_shaped_array(time_series_tile.longitude))

tile_data = self._to_standard_index(time_series_tile_data,
(len(time_data), len(latitude_data), len(longitude_data)))

reshaped_array = np.ma.masked_all((len(time_data), len(latitude_data), len(longitude_data)))
idx = np.arange(len(latitude_data))
reshaped_array[:, idx, idx] = time_series_tile_data
tile_data = reshaped_array
# Extract the meta data
meta_data = {}
for meta_data_obj in time_series_tile.meta_data:
name = meta_data_obj.name
meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data))
reshaped_meta_array = self._to_standard_index(meta_array, tile_data.shape)

reshaped_meta_array = np.ma.masked_all((len(time_data), len(latitude_data), len(longitude_data)))
idx = np.arange(len(latitude_data))
reshaped_meta_array[:, idx, idx] = meta_array

meta_data[name] = reshaped_meta_array

return latitude_data, longitude_data, time_data, tile_data, meta_data
Expand Down
2 changes: 1 addition & 1 deletion data-access/nexustiles/dao/DynamoProxy.pyx
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.

import uuid
import nexusproto.NexusContent_pb2 as nexusproto
import nexusproto.DataTile_pb2 as nexusproto
from nexusproto.serialization import from_shaped_array
import numpy as np
import boto3
Expand Down
11 changes: 6 additions & 5 deletions data-access/nexustiles/dao/S3Proxy.pyx
Expand Up @@ -14,10 +14,12 @@
# limitations under the License.

import uuid
import nexusproto.NexusContent_pb2 as nexusproto
from nexusproto.serialization import from_shaped_array
import numpy as np

import boto3
import nexusproto.DataTile_pb2 as nexusproto
import numpy as np
from nexusproto.serialization import from_shaped_array


class NexusTileData(object):
__nexus_tile = None
Expand Down Expand Up @@ -127,7 +129,6 @@ class S3Proxy(object):
self.__nexus_tile = None

def fetch_nexus_tiles(self, *tile_ids):

tile_ids = [uuid.UUID(str(tile_id)) for tile_id in tile_ids if
(isinstance(tile_id, str) or isinstance(tile_id, unicode))]
res = []
Expand All @@ -137,4 +138,4 @@ class S3Proxy(object):
nexus_tile = NexusTileData(data, str(tile_id))
res.append(nexus_tile)

return res
return res
16 changes: 8 additions & 8 deletions data-access/nexustiles/nexustiles.py
Expand Up @@ -21,10 +21,10 @@
import numpy as np
import numpy.ma as ma
import pkg_resources
from dao.CassandraProxy import CassandraProxy
from dao.S3Proxy import S3Proxy
from dao.DynamoProxy import DynamoProxy
from dao.SolrProxy import SolrProxy
import dao.CassandraProxy
import dao.S3Proxy
import dao.DynamoProxy
import dao.SolrProxy
from pytz import timezone
from shapely.geometry import MultiPolygon, box

Expand Down Expand Up @@ -73,16 +73,16 @@ def __init__(self, skipDatastore=False, skipMetadatastore=False, config=None):
if not skipDatastore:
datastore = self._config.get("datastore", "store")
if datastore == "cassandra":
self._datastore = CassandraProxy(self._config)
self._datastore = dao.CassandraProxy.CassandraProxy(self._config)
elif datastore == "s3":
self._datastore = S3Proxy(self._config)
self._datastore = dao.S3Proxy.S3Proxy(self._config)
elif datastore == "dynamo":
self._datastore = DynamoProxy(self._config)
self._datastore = dao.DynamoProxy.DynamoProxy(self._config)
else:
raise ValueError("Error reading datastore from config file")

if not skipMetadatastore:
self._metadatastore = SolrProxy(self._config)
self._metadatastore = dao.SolrProxy.SolrProxy(self._config)

def get_dataseries_list(self, simple=False):
if simple:
Expand Down
2 changes: 1 addition & 1 deletion data-access/requirements.txt
Expand Up @@ -14,7 +14,7 @@ futures==3.1.1
ipython==5.3.0
ipython-genutils==0.2.0
jmespath==0.9.3
nexusproto==0.4
nexusproto==1.0.0-SNAPSHOT
numpy==1.11.1
pathlib2==2.2.1
pexpect==4.2.1
Expand Down
2 changes: 1 addition & 1 deletion data-access/setup.py
Expand Up @@ -36,7 +36,7 @@
'cassandra-driver==3.5.0',
'solrpy==0.9.7',
'requests',
'nexusproto==0.4',
'nexusproto==1.0.0-SNAPSHOT',
'shapely'
],

Expand Down
14 changes: 11 additions & 3 deletions data-access/tests/nexustiles_test.py
Expand Up @@ -29,11 +29,14 @@ def setUp(self):
keyspace=nexustiles
local_datacenter=datacenter1
protocol_version=3
port=32769
port=9042
[solr]
host=localhost:8986
core=nexustiles""")
host=localhost:8983
core=nexustiles
[datastore]
store=cassandra""")
cp = ConfigParser.RawConfigParser()
cp.readfp(config)

Expand Down Expand Up @@ -68,6 +71,11 @@ def test_sorted_box(self):
for tile in tiles:
print tile.min_time

def test_time_series_tile(self):
tiles = self.tile_service.find_tile_by_id("055c0b51-d0fb-3f39-b48a-4f762bf0c994")
for tile in tiles:
print tile.get_summary()


# from nexustiles.model.nexusmodel import get_approximate_value_for_lat_lon
# import numpy as np
Expand Down
30 changes: 20 additions & 10 deletions tools/deletebyquery/deletebyquery.py
Expand Up @@ -29,6 +29,7 @@

solr_connection = None
solr_collection = None
SOLR_UNIQUE_KEY = None

cassandra_cluster = None
cassandra_session = None
Expand All @@ -45,6 +46,8 @@ def init(args):
solr_connection = SolrConnection(args.solr)
global solr_collection
solr_collection = solr_connection[args.collection]
global SOLR_UNIQUE_KEY
SOLR_UNIQUE_KEY = args.solrIdField

dc_policy = RoundRobinPolicy()
token_policy = TokenAwarePolicy(dc_policy)
Expand All @@ -64,6 +67,7 @@ def delete_by_query(args):
if args.query:
se = SearchOptions()
se.commonparams.q(args.query) \
.fl(SOLR_UNIQUE_KEY) \
.fl('id')

for fq in args.filterquery if args.filterquery is not None else []:
Expand All @@ -72,7 +76,8 @@ def delete_by_query(args):
query = se
elif args.jsonparams:
se = SearchOptions(**json.loads(args.jsonparams))
se.commonparams.fl('id')
se.commonparams.fl(SOLR_UNIQUE_KEY) \
.fl('id')
query = se
else:
raise RuntimeError("either query or jsonparams is required")
Expand All @@ -82,7 +87,7 @@ def delete_by_query(args):
solr_docs = do_solr_query(query)

if confirm_delete(len(solr_docs)):
deleted_ids = do_delete(solr_docs)
deleted_ids = do_delete(solr_docs, query)
logging.info("Deleted tile IDs %s" % json.dumps([str(doc_id) for doc_id in deleted_ids], indent=2))
else:
logging.info("Exiting")
Expand Down Expand Up @@ -123,7 +128,7 @@ def check_query(query):
return False
else:
se = SearchOptions()
se.commonparams.q('id:%s' % sample(solr_response.result.response.docs, 1)[0]['id'])
se.commonparams.q('%s:%s' % (SOLR_UNIQUE_KEY, sample(solr_response.result.response.docs, 1)[0][SOLR_UNIQUE_KEY]))
logging.info(json.dumps(solr_collection.search(se).result.response.docs[0], indent=2))
return check_query(query)

Expand All @@ -132,7 +137,7 @@ def do_solr_query(query):
doc_ids = []

next_cursor_mark = "*"
query.commonparams.sort('id asc')
query.commonparams.sort('%s asc' % SOLR_UNIQUE_KEY)
while True:
query.commonparams.remove_param('cursorMark')
query.commonparams.add_params(cursorMark=next_cursor_mark)
Expand All @@ -154,11 +159,11 @@ def do_solr_query(query):
return doc_ids


def do_delete(doc_ids):
def do_delete(doc_ids, query):
logging.info("Executing Cassandra delete...")
delete_from_cassandra(doc_ids)
logging.info("Executing Solr delete...")
delete_from_solr(doc_ids)
delete_from_solr(query)
return doc_ids


Expand All @@ -170,12 +175,11 @@ def delete_from_cassandra(doc_ids):

for (success, result) in results:
if not success:
logging.warn("Could not delete tile %s" % result)
logging.warning("Could not delete tile %s" % result)


def delete_from_solr(doc_ids):
for doc_id in doc_ids:
solr_collection.delete({'q': "id:%s" % doc_id}, commit=False)
def delete_from_solr(query):
solr_collection.delete(query, commit=False)
solr_collection.commit()


Expand All @@ -193,6 +197,12 @@ def parse_args():
required=True,
metavar='nexustiles')

parser.add_argument('--solrIdField',
help='The name of the unique ID field for this collection.',
required=False,
default='solr_id_s',
metavar='solr_id_s')

parser.add_argument('--cassandra',
help='The hostname(s) or IP(s) of the Cassandra server(s).',
required=True,
Expand Down

0 comments on commit 2004ea0

Please sign in to comment.