Expand Up
@@ -27,17 +27,19 @@
@todo only test_time seems to work correctly with database backend test settings
"""
from geonode .tests .base import GeoNodeBaseTestSupport
from geonode .tests .base import GeoNodeLiveTestSupport
import os .path
from django .conf import settings
from django .db import connections
from geonode .maps .models import Map
from geonode .layers .models import Layer
from geonode .upload .models import Upload
from geonode .people .models import Profile
from geonode .documents .models import Document
from geonode .base .models import Link
from geonode .catalogue import get_catalogue
from geonode .tests .utils import upload_step , Client
from geonode .upload .utils import _ALLOW_TIME_STEP
from geonode .geoserver .helpers import ogc_server_settings , cascading_delete
Expand All
@@ -56,6 +58,7 @@
import time
import json
import urllib
import urllib2
import logging
import tempfile
import unittest
Expand All
@@ -67,6 +70,20 @@
GEOSERVER_URL = ogc_server_settings .LOCATION
GEOSERVER_USER , GEOSERVER_PASSWD = ogc_server_settings .credentials
DB_HOST = settings .DATABASES ['default' ]['HOST' ]
DB_PORT = settings .DATABASES ['default' ]['PORT' ]
DB_NAME = settings .DATABASES ['default' ]['NAME' ]
DB_USER = settings .DATABASES ['default' ]['USER' ]
DB_PASSWORD = settings .DATABASES ['default' ]['PASSWORD' ]
DATASTORE_URL = 'postgis://{}:{}@{}:{}/{}' .format (
DB_USER ,
DB_PASSWORD ,
DB_HOST ,
DB_PORT ,
DB_NAME
)
postgis_db = dj_database_url .parse (DATASTORE_URL , conn_max_age = 5 )
logging .getLogger ('south' ).setLevel (logging .WARNING )
logger = logging .getLogger (__name__ )
Expand Down
Expand Up
@@ -100,9 +117,9 @@ def get_wms(version='1.1.1', type_name=None, username=None, password=None):
return WebMapService (url )
class UploaderBase (GeoNodeBaseTestSupport ):
class UploaderBase (GeoNodeLiveTestSupport ):
settings_overrides = []
type = 'layer'
@classmethod
def setUpClass (cls ):
Expand All
@@ -114,8 +131,6 @@ def tearDownClass(cls):
os .unlink ('integration_settings.py' )
def setUp (self ):
# super(UploaderBase, self).setUp()
# await startup
cl = Client (
GEONODE_URL , GEONODE_USER , GEONODE_PASSWD
Expand All
@@ -135,37 +150,32 @@ def setUp(self):
GEOSERVER_URL + 'rest' , GEOSERVER_USER , GEOSERVER_PASSWD
)
settings .DATABASES ['default' ]['NAME' ] = DB_NAME
connections ['default' ].settings_dict ['ATOMIC_REQUESTS' ] = False
connections ['default' ].connect ()
self ._tempfiles = []
# createlayer must use postgis as a datastore
# set temporary settings to use a postgis datastore
DB_HOST = settings .DATABASES ['default' ]['HOST' ]
DB_PORT = settings .DATABASES ['default' ]['PORT' ]
DB_NAME = settings .DATABASES ['default' ]['NAME' ]
DB_USER = settings .DATABASES ['default' ]['USER' ]
DB_PASSWORD = settings .DATABASES ['default' ]['PASSWORD' ]
settings .DATASTORE_URL = 'postgis://{}:{}@{}:{}/{}' .format (
DB_USER ,
DB_PASSWORD ,
DB_HOST ,
DB_PORT ,
DB_NAME
)
postgis_db = dj_database_url .parse (
settings .DATASTORE_URL , conn_max_age = 600 )
settings .DATABASES ['datastore' ] = postgis_db
settings .OGC_SERVER ['default' ]['DATASTORE' ] = 'datastore'
def _post_teardown (self ):
pass
def tearDown (self ):
# super(UploaderBase, self).tearDown()
connections .databases ['default' ]['ATOMIC_REQUESTS' ] = False
map (os .unlink , self ._tempfiles )
# move to original settings
settings .OGC_SERVER ['default' ]['DATASTORE' ] = ''
del settings .DATABASES ['datastore' ]
# Cleanup
Upload .objects .all ().delete ()
Layer .objects .all ().delete ()
Map .objects .all ().delete ()
Document .objects .all ().delete ()
if settings .OGC_SERVER ['default' ].get (
"GEOFENCE_SECURITY_ENABLED" , False ):
from geonode .security .utils import purge_geofence_all
purge_geofence_all ()
def check_layer_geonode_page (self , path ):
""" Check that the final layer page render's correctly after
an layer is uploaded """
Expand Down
Expand Up
@@ -263,13 +273,8 @@ def finish_upload(
if not is_raster and not skip_srs :
self .assertTrue (upload_step ('srs' ) in current_step )
# if all is good, the srs step will redirect to the final page
resp = self .client .get (current_step )
content = json .loads (resp .read ())
if not content .get ('url' ) and content .get (
'redirect_to' ,
current_step ) == upload_step ('final' ):
resp = self .client .get (content .get ('redirect_to' ))
final_step = current_step .replace ('srs' , 'final' )
resp = self .client .make_request (final_step )
else :
self .assertTrue (upload_step ('final' ) in current_step )
resp = self .client .get (current_step )
Expand Down
Expand Up
@@ -297,13 +302,13 @@ def check_upload_model(self, original_name):
upload = None
try :
upload = Upload .objects .filter (name = str (original_name )).last ()
# Making sure the Upload object is present on the DB and
# the import session is COMPLETE
if upload :
self .assertTrue (upload .complete )
except Upload .DoesNotExist :
self .fail ('expected to find Upload object for %s' % original_name )
# Making sure the Upload object is present on the DB and
# the import session is COMPLETE
self .assertTrue (upload .complete )
def check_layer_complete (self , layer_page , original_name ):
'''check everything to verify the layer is complete'''
self .check_layer_geonode_page (layer_page )
Expand All
@@ -326,13 +331,12 @@ def check_layer_complete(self, layer_page, original_name):
caps_found = True
except BaseException :
pass
if caps_found :
self .check_layer_geoserver_rest (layer_name )
self .check_upload_model (layer_name )
else :
if not caps_found :
logger .warning (
"Could not recognize Layer %s on GeoServer WMS" %
"Could not recognize Layer %s on GeoServer WMS Capa " %
original_name )
self .check_layer_geoserver_rest (layer_name )
self .check_upload_model (layer_name )
def check_invalid_projection (self , layer_name , resp , data ):
""" Makes sure that we got the correct response from an layer
Expand Down
Expand Up
@@ -414,22 +418,86 @@ def make_csv(self, *rows):
class TestUpload (UploaderBase ):
settings_overrides = []
def test_shp_upload (self ):
""" Tests if a vector layer can be upload to a running GeoNode GeoServer"""
""" Tests if a vector layer can be uploaded to a running GeoNode/GeoServer"""
layer_name = 'san_andres_y_providencia_water'
fname = os .path .join (
GOOD_DATA ,
'vector' ,
'san_andres_y_providencia_water.shp' )
self .upload_file (fname , self .complete_upload ,
check_name = 'san_andres_y_providencia_water' )
test_layer = Layer .objects .all ().first ()
if test_layer :
layer_attributes = test_layer .attributes
self .assertIsNotNone (layer_attributes )
self .assertTrue (layer_attributes .count () > 0 )
'%s.shp' % layer_name )
self .upload_file (fname ,
self .complete_upload ,
check_name = '%s' % layer_name )
test_layer = Layer .objects .using ('default' ).get (name = '%s' % layer_name )
layer_attributes = test_layer .attributes
self .assertIsNotNone (layer_attributes )
self .assertTrue (layer_attributes .count () > 0 )
# Links
_def_link_types = ['original' , 'metadata' ]
_links = Link .objects .filter (link_type__in = _def_link_types )
# Check 'original' and 'metadata' links exist
self .assertIsNotNone (
_links ,
"No 'original' and 'metadata' links have been found"
)
self .assertTrue (
_links .count () > 0 ,
"No 'original' and 'metadata' links have been found"
)
# Check original links in csw_anytext
_post_migrate_links_orig = Link .objects .filter (
resource = test_layer .resourcebase_ptr ,
resource_id = test_layer .resourcebase_ptr .id ,
link_type = 'original'
)
self .assertTrue (
_post_migrate_links_orig .count () > 0 ,
"No 'original' links has been found for the layer '{}'" .format (
test_layer .alternate
)
)
for _link_orig in _post_migrate_links_orig :
self .assertIn (
_link_orig .url ,
test_layer .csw_anytext ,
"The link URL {0} is not present in the 'csw_anytext' attribute of the layer '{1}'" .format (
_link_orig .url ,
test_layer .alternate
)
)
# Check catalogue
catalogue = get_catalogue ()
record = catalogue .get_record (test_layer .uuid )
self .assertIsNotNone (record )
self .assertTrue (
hasattr (record , 'links' ),
"No records have been found in the catalogue for the resource '{}'" .format (
test_layer .alternate
)
)
# Check 'metadata' links for each record
for mime , name , metadata_url in record .links ['metadata' ]:
try :
_post_migrate_link_meta = Link .objects .get (
resource = test_layer .resourcebase_ptr ,
url = metadata_url ,
name = name ,
extension = 'xml' ,
mime = mime ,
link_type = 'metadata'
)
except Link .DoesNotExist :
_post_migrate_link_meta = None
self .assertIsNotNone (
_post_migrate_link_meta ,
"No '{}' links have been found in the catalogue for the resource '{}'" .format (
name ,
test_layer .alternate
)
)
def test_raster_upload (self ):
""" Tests if a raster layer can be upload to a running GeoNode GeoServer"""
Expand All
@@ -438,10 +506,9 @@ def test_raster_upload(self):
check_name = 'relief_san_andres' )
test_layer = Layer .objects .all ().first ()
if test_layer :
layer_attributes = test_layer .attributes
self .assertIsNotNone (layer_attributes )
self .assertTrue (layer_attributes .count () > 0 )
layer_attributes = test_layer .attributes
self .assertIsNotNone (layer_attributes )
self .assertTrue (layer_attributes .count () > 0 )
def test_zipped_upload (self ):
"""Test uploading a zipped shapefile"""
Expand Down
Expand Up
@@ -506,8 +573,8 @@ def test_extension_not_implemented(self):
if unsupported_path .endswith ('.pyc' ):
unsupported_path = unsupported_path .rstrip ('c' )
# with self.assertRaises(HTTPError):
self .client .upload_file (unsupported_path )
with self .assertRaises (urllib2 . HTTPError ):
self .client .upload_file (unsupported_path )
def test_csv (self ):
'''make sure a csv upload fails gracefully/normally when not activated'''
Expand All
@@ -526,8 +593,6 @@ def test_csv(self):
'Vector datastore not enabled' )
class TestUploadDBDataStore (UploaderBase ):
settings_overrides = []
def test_csv (self ):
"""Override the baseclass test and verify a correct CSV upload"""
Expand All
@@ -546,8 +611,9 @@ def test_csv(self):
csrfmiddlewaretoken = self .client .get_csrf_token ())
resp = self .client .make_request (csv_step , form_data )
content = json .loads (resp .read ())
logger .info (content )
self .assertEquals (resp .code , 200 )
self .assertTrue ( upload_step ( 'srs' ) in content ['redirect_to' ] )
self .assertEquals ( content ['status' ], 'incomplete' )
def test_time (self ):
"""Verify that uploading time based shapefile works properly"""
Expand Down