Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Adding tests and fixing bugs the tests found

  • Loading branch information...
commit 76e20fd99ee832ac7c9ec0dbfa5c98d1032d4dd5 1 parent 04b65ea
@arfrank authored
View
3  app.yaml
@@ -8,6 +8,9 @@ handlers:
- url: /examples/twiml
static_dir: examples/twiml
+- url: /test.*
+ script: gaeunit.py
+
#Handler for all callbacks as of right now.
- url: /Callbacks/.*
script: handlers/callbacks.py
View
470 gaeunit.py
@@ -0,0 +1,470 @@
+#!/usr/bin/env python
+'''
+GAEUnit: Google App Engine Unit Test Framework
+
+Usage:
+
+1. Put gaeunit.py into your application directory. Modify 'app.yaml' by
+ adding the following mapping below the 'handlers:' section:
+
+ - url: /test.*
+ script: gaeunit.py
+
+2. Write your own test cases by extending unittest.TestCase.
+
+3. Launch the development web server. To run all tests, point your browser to:
+
+ http://localhost:8080/test (Modify the port if necessary.)
+
+ For plain text output add '?format=plain' to the above URL.
+ See README.TXT for information on how to run specific tests.
+
+4. The results are displayed as the tests are run.
+
+Visit http://code.google.com/p/gaeunit for more information and updates.
+
+------------------------------------------------------------------------------
+Copyright (c) 2008-2009, George Lei and Steven R. Farley. All rights reserved.
+
+Distributed under the following BSD license:
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+* Redistributions of source code must retain the above copyright notice,
+ this list of conditions and the following disclaimer.
+
+* Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+------------------------------------------------------------------------------
+'''
+
+__author__ = "George Lei and Steven R. Farley"
+__email__ = "George.Z.Lei@Gmail.com"
+__version__ = "#Revision: 1.2.8 $"[11:-2]
+__copyright__= "Copyright (c) 2008-2009, George Lei and Steven R. Farley"
+__license__ = "BSD"
+__url__ = "http://code.google.com/p/gaeunit"
+
+import sys
+import os
+import unittest
+import time
+import logging
+import cgi
+import django.utils.simplejson
+
+from google.appengine.ext import webapp
+from google.appengine.api import apiproxy_stub_map
+from google.appengine.api import datastore_file_stub
+from google.appengine.ext.webapp.util import run_wsgi_app
+
+_LOCAL_TEST_DIR = 'test' # location of files
+_WEB_TEST_DIR = '/test' # how you want to refer to tests on your web server
+
+# or:
+# _WEB_TEST_DIR = '/u/test'
+# then in app.yaml:
+# - url: /u/test.*
+# script: gaeunit.py
+
+
+##############################################################################
+# Main request handler
+##############################################################################
+
+
+class MainTestPageHandler(webapp.RequestHandler):
+ def get(self):
+ unknown_args = [arg for arg in self.request.arguments()
+ if arg not in ("format", "package", "name")]
+ if len(unknown_args) > 0:
+ errors = []
+ for arg in unknown_args:
+ errors.append(_log_error("The request parameter '%s' is not valid." % arg))
+ self.error(404)
+ self.response.out.write(" ".join(errors))
+ return
+
+ format = self.request.get("format", "html")
+ if format == "html":
+ self._render_html()
+ elif format == "plain":
+ self._render_plain()
+ else:
+ error = _log_error("The format '%s' is not valid." % cgi.escape(format))
+ self.error(404)
+ self.response.out.write(error)
+
+ def _render_html(self):
+ suite, error = _create_suite(self.request)
+ if not error:
+ self.response.out.write(_MAIN_PAGE_CONTENT % (_test_suite_to_json(suite), _WEB_TEST_DIR, __version__))
+ else:
+ self.error(404)
+ self.response.out.write(error)
+
+ def _render_plain(self):
+ self.response.headers["Content-Type"] = "text/plain"
+ runner = unittest.TextTestRunner(self.response.out)
+ suite, error = _create_suite(self.request)
+ if not error:
+ self.response.out.write("====================\n" \
+ "GAEUnit Test Results\n" \
+ "====================\n\n")
+ _run_test_suite(runner, suite)
+ else:
+ self.error(404)
+ self.response.out.write(error)
+
+
+##############################################################################
+# JSON test classes
+##############################################################################
+
+
+class JsonTestResult(unittest.TestResult):
+ def __init__(self):
+ unittest.TestResult.__init__(self)
+ self.testNumber = 0
+
+ def render_to(self, stream):
+ result = {
+ 'runs': self.testsRun,
+ 'total': self.testNumber,
+ 'errors': self._list(self.errors),
+ 'failures': self._list(self.failures),
+ }
+
+ stream.write(django.utils.simplejson.dumps(result).replace('},', '},\n'))
+
+ def _list(self, list):
+ dict = []
+ for test, err in list:
+ d = {
+ 'desc': test.shortDescription() or str(test),
+ 'detail': err,
+ }
+ dict.append(d)
+ return dict
+
+
+class JsonTestRunner:
+ def run(self, test):
+ self.result = JsonTestResult()
+ self.result.testNumber = test.countTestCases()
+ startTime = time.time()
+ test(self.result)
+ stopTime = time.time()
+ timeTaken = stopTime - startTime
+ return self.result
+
+
+class JsonTestRunHandler(webapp.RequestHandler):
+ def get(self):
+ self.response.headers["Content-Type"] = "text/javascript"
+ test_name = self.request.get("name")
+ _load_default_test_modules()
+ suite = unittest.defaultTestLoader.loadTestsFromName(test_name)
+ runner = JsonTestRunner()
+ _run_test_suite(runner, suite)
+ runner.result.render_to(self.response.out)
+
+
+# This is not used by the HTML page, but it may be useful for other client test runners.
+class JsonTestListHandler(webapp.RequestHandler):
+ def get(self):
+ self.response.headers["Content-Type"] = "text/javascript"
+ suite, error = _create_suite(self.request)
+ if not error:
+ self.response.out.write(_test_suite_to_json(suite))
+ else:
+ self.error(404)
+ self.response.out.write(error)
+
+
+##############################################################################
+# Module helper functions
+##############################################################################
+
+
+def _create_suite(request):
+ package_name = request.get("package")
+ test_name = request.get("name")
+
+ loader = unittest.defaultTestLoader
+ suite = unittest.TestSuite()
+
+ error = None
+
+ try:
+ if not package_name and not test_name:
+ modules = _load_default_test_modules()
+ for module in modules:
+ suite.addTest(loader.loadTestsFromModule(module))
+ elif test_name:
+ _load_default_test_modules()
+ suite.addTest(loader.loadTestsFromName(test_name))
+ elif package_name:
+ package = reload(__import__(package_name))
+ module_names = package.__all__
+ for module_name in module_names:
+ suite.addTest(loader.loadTestsFromName('%s.%s' % (package_name, module_name)))
+
+ if suite.countTestCases() == 0:
+ raise Exception("'%s' is not found or does not contain any tests." % \
+ (test_name or package_name or 'local directory: \"%s\"' % _LOCAL_TEST_DIR))
+ except Exception, e:
+ error = str(e)
+ _log_error(error)
+
+ return (suite, error)
+
+
+def _load_default_test_modules():
+ if not _LOCAL_TEST_DIR in sys.path:
+ sys.path.append(_LOCAL_TEST_DIR)
+ module_names = [mf[0:-3] for mf in os.listdir(_LOCAL_TEST_DIR) if mf.endswith(".py")]
+ return [reload(__import__(name)) for name in module_names]
+
+
+def _get_tests_from_suite(suite, tests):
+ for test in suite:
+ if isinstance(test, unittest.TestSuite):
+ _get_tests_from_suite(test, tests)
+ else:
+ tests.append(test)
+
+
+def _test_suite_to_json(suite):
+ tests = []
+ _get_tests_from_suite(suite, tests)
+ test_tuples = [(type(test).__module__, type(test).__name__, test._testMethodName) \
+ for test in tests]
+ test_dict = {}
+ for test_tuple in test_tuples:
+ module_name, class_name, method_name = test_tuple
+ if module_name not in test_dict:
+ mod_dict = {}
+ method_list = []
+ method_list.append(method_name)
+ mod_dict[class_name] = method_list
+ test_dict[module_name] = mod_dict
+ else:
+ mod_dict = test_dict[module_name]
+ if class_name not in mod_dict:
+ method_list = []
+ method_list.append(method_name)
+ mod_dict[class_name] = method_list
+ else:
+ method_list = mod_dict[class_name]
+ method_list.append(method_name)
+
+ return django.utils.simplejson.dumps(test_dict)
+
+
+def _run_test_suite(runner, suite):
+ """Run the test suite.
+
+ Preserve the current development apiproxy, create a new apiproxy and
+ replace the datastore with a temporary one that will be used for this
+ test suite, run the test suite, and restore the development apiproxy.
+ This isolates the test datastore from the development datastore.
+
+ """
+ original_apiproxy = apiproxy_stub_map.apiproxy
+ try:
+ apiproxy_stub_map.apiproxy = apiproxy_stub_map.APIProxyStubMap()
+ temp_stub = datastore_file_stub.DatastoreFileStub('GAEUnitDataStore', None, None, trusted=True)
+ apiproxy_stub_map.apiproxy.RegisterStub('datastore', temp_stub)
+ # Allow the other services to be used as-is for tests.
+ for name in ['user', 'urlfetch', 'mail', 'memcache', 'images']:
+ apiproxy_stub_map.apiproxy.RegisterStub(name, original_apiproxy.GetStub(name))
+ runner.run(suite)
+ finally:
+ apiproxy_stub_map.apiproxy = original_apiproxy
+
+
+def _log_error(s):
+ logging.warn(s)
+ return s
+
+
+################################################
+# Browser HTML, CSS, and Javascript
+################################################
+
+
+# This string uses Python string formatting, so be sure to escape percents as %%.
+_MAIN_PAGE_CONTENT = """
+<html>
+<head>
+ <style>
+ body {font-family:arial,sans-serif; text-align:center}
+ #title {font-family:"Times New Roman","Times Roman",TimesNR,times,serif; font-size:28px; font-weight:bold; text-align:center}
+ #version {font-size:87%%; text-align:center;}
+ #weblink {font-style:italic; text-align:center; padding-top:7px; padding-bottom:7px}
+ #results {padding-top:20px; margin:0pt auto; text-align:center; font-weight:bold}
+ #testindicator {width:750px; height:16px; border-style:solid; border-width:2px 1px 1px 2px; background-color:#f8f8f8;}
+ #footerarea {text-align:center; font-size:83%%; padding-top:25px}
+ #errorarea {padding-top:25px}
+ .error {border-color: #c3d9ff; border-style: solid; border-width: 2px 1px 2px 1px; width:750px; padding:1px; margin:0pt auto; text-align:left}
+ .errtitle {background-color:#c3d9ff; font-weight:bold}
+ </style>
+ <script language="javascript" type="text/javascript">
+ var testsToRun = %s;
+ var totalRuns = 0;
+ var totalErrors = 0;
+ var totalFailures = 0;
+
+ function newXmlHttp() {
+ try { return new XMLHttpRequest(); } catch(e) {}
+ try { return new ActiveXObject("Msxml2.XMLHTTP"); } catch (e) {}
+ try { return new ActiveXObject("Microsoft.XMLHTTP"); } catch (e) {}
+ alert("XMLHttpRequest not supported");
+ return null;
+ }
+
+ function requestTestRun(moduleName, className, methodName) {
+ var methodSuffix = "";
+ if (methodName) {
+ methodSuffix = "." + methodName;
+ }
+ var xmlHttp = newXmlHttp();
+ xmlHttp.open("GET", "%s/run?name=" + moduleName + "." + className + methodSuffix, true);
+ xmlHttp.onreadystatechange = function() {
+ if (xmlHttp.readyState != 4) {
+ return;
+ }
+ if (xmlHttp.status == 200) {
+ var result = eval("(" + xmlHttp.responseText + ")");
+ totalRuns += parseInt(result.runs);
+ totalErrors += result.errors.length;
+ totalFailures += result.failures.length;
+ document.getElementById("testran").innerHTML = totalRuns;
+ document.getElementById("testerror").innerHTML = totalErrors;
+ document.getElementById("testfailure").innerHTML = totalFailures;
+ if (totalErrors == 0 && totalFailures == 0) {
+ testSucceed();
+ } else {
+ testFailed();
+ }
+ var errors = result.errors;
+ var failures = result.failures;
+ var details = "";
+ for(var i=0; i<errors.length; i++) {
+ details += '<p><div class="error"><div class="errtitle">ERROR ' +
+ errors[i].desc +
+ '</div><div class="errdetail"><pre>'+errors[i].detail +
+ '</pre></div></div></p>';
+ }
+ for(var i=0; i<failures.length; i++) {
+ details += '<p><div class="error"><div class="errtitle">FAILURE ' +
+ failures[i].desc +
+ '</div><div class="errdetail"><pre>' +
+ failures[i].detail +
+ '</pre></div></div></p>';
+ }
+ var errorArea = document.getElementById("errorarea");
+ errorArea.innerHTML += details;
+ } else {
+ document.getElementById("errorarea").innerHTML = xmlHttp.responseText;
+ testFailed();
+ }
+ };
+ xmlHttp.send(null);
+ }
+
+ function testFailed() {
+ document.getElementById("testindicator").style.backgroundColor="red";
+ }
+
+ function testSucceed() {
+ document.getElementById("testindicator").style.backgroundColor="green";
+ }
+
+ function runTests() {
+ // Run each test asynchronously (concurrently).
+ var totalTests = 0;
+ for (var moduleName in testsToRun) {
+ var classes = testsToRun[moduleName];
+ for (var className in classes) {
+ // TODO: Optimize for the case where tests are run by class so we don't
+ // have to always execute each method separately. This should be
+ // possible when we have a UI that allows the user to select tests
+ // by module, class, and method.
+ //requestTestRun(moduleName, className);
+ methods = classes[className];
+ for (var i = 0; i < methods.length; i++) {
+ totalTests += 1;
+ var methodName = methods[i];
+ requestTestRun(moduleName, className, methodName);
+ }
+ }
+ }
+ document.getElementById("testtotal").innerHTML = totalTests;
+ }
+
+ </script>
+ <title>GAEUnit: Google App Engine Unit Test Framework</title>
+</head>
+<body onload="runTests()">
+ <div id="headerarea">
+ <div id="title">GAEUnit: Google App Engine Unit Test Framework</div>
+ <div id="version">Version %s</div>
+ </div>
+ <div id="resultarea">
+ <table id="results"><tbody>
+ <tr><td colspan="3"><div id="testindicator"> </div></td</tr>
+ <tr>
+ <td>Runs: <span id="testran">0</span>/<span id="testtotal">0</span></td>
+ <td>Errors: <span id="testerror">0</span></td>
+ <td>Failures: <span id="testfailure">0</span></td>
+ </tr>
+ </tbody></table>
+ </div>
+ <div id="errorarea"></div>
+ <div id="footerarea">
+ <div id="weblink">
+ <p>
+ Please visit the <a href="http://code.google.com/p/gaeunit">project home page</a>
+ for the latest version or to report problems.
+ </p>
+ <p>
+ Copyright 2008-2009 <a href="mailto:George.Z.Lei@Gmail.com">George Lei</a>
+ and <a href="mailto:srfarley@gmail.com>Steven R. Farley</a>
+ </p>
+ </div>
+ </div>
+</body>
+</html>
+"""
+
+
+##############################################################################
+# Script setup and execution
+##############################################################################
+
+
+application = webapp.WSGIApplication([('%s' % _WEB_TEST_DIR, MainTestPageHandler),
+ ('%s/run' % _WEB_TEST_DIR, JsonTestRunHandler),
+ ('%s/list' % _WEB_TEST_DIR, JsonTestListHandler)],
+ debug=True)
+
+def main():
+ run_wsgi_app(application)
+
+if __name__ == '__main__':
+ main()
View
2  handlers/calls.py
@@ -107,7 +107,7 @@ def post(self,API_VERSION,ACCOUNT_SID,*args):
PhoneNumberSid = Phone_Number.Sid,
AccountSid = ACCOUNT_SID,
Status = 'queued',
- Direction = 'outgoing-api'
+ Direction = 'outbound-api'
)
Call.put()
response_data = Call.get_dict()
View
2  handlers/main.py
@@ -120,7 +120,7 @@ def post(self,Sid):
Valid = True
for arg in self.request.arguments():
if Valid:
- Valid,TwilioCode,TwilioMsg = self.data['PhoneNumber'].validate(self.request, arg, self.request.get( arg ,None))
+ Valid,TwilioCode,TwilioMsg = self.data['PhoneNumber'].validate(self.request, arg, self.request.get( arg ,None),{})
setattr(self.data['PhoneNumber'], arg, self.data['PhoneNumber'].sanitize( self.request, arg, self.request.get( arg ,None)))
if Valid:
View
83 helpers/parameters.py
@@ -1,10 +1,37 @@
import urlparse
import re
import logging
+
+from models import incoming_phone_numbers, outgoing_caller_ids
+def arg_or_request(arg_value,request, arg_name, default = None):
+ return arg_value if ((arg_value is not None and arg_value != '') or request is None) else request.get(arg_name, default)
#Parses the given phone number, and then makes sure that it can be put into a valid twilio format.
#Returns the twilio formatted phone_number and whether or not it went well, Valid or not
+
def parse_phone_number(phone_number):
- return phone_number, True
+ #TAKEN FROM DIVE INTO PYTHON
+ # http://diveintopython.org/
+ phonePattern = re.compile(r'''
+ (\+1)* # optional +1 capture don't match beginning of string, number can start anywhere
+ (\d{3}) # area code is 3 digits (e.g. '800')
+ \D* # optional separator is any number of non-digits
+ (\d{3}) # trunk is 3 digits (e.g. '555')
+ \D* # optional separator
+ (\d{4}) # rest of number is 4 digits (e.g. '1212')
+ \D* # optional separator
+ (\d*) # extension is optional and can be any number of digits
+ $ # end of string
+ ''', re.VERBOSE)
+ try:
+ phoneGroups = phonePattern.search(phone_number).groups()
+ pn = '+1'+str(phoneGroups[1])+str(phoneGroups[2])+str(phoneGroups[3])
+ except Exception, e:
+ logging.info('having trouble parsing phone number: '+phone_number)
+ return phone_number, False
+ else:
+ logging.info('successful parsing phone number: '+phone_number)
+ return pn, True
+ #should actually parse # and check for truth
def valid_to_phone_number(phone_number,required = False):
if phone_number is None:
@@ -14,23 +41,51 @@ def valid_to_phone_number(phone_number,required = False):
if Valid:
return True, 0, ''
else:
- return False, 21401, 'http://www.twilio.com/docs/errors/21401'
+ if required:
+ return False, 21401, 'http://www.twilio.com/docs/errors/21401'
+ else:
+ return True, 21401, 'http://www.twilio.com/docs/errors/21401'
-def valid_from_phone_number(phone_number,required = False):
- if phone_number is None and required:
+def valid_from_phone_number(phone_number, required = False, Direction = 'outbound-api', SMS = False):
+ if (phone_number is None or phone_number == '') and required:
return False, 21603, 'http://www.twilio.com/docs/errors/21603'
else:
+# logging.info('from phone number not none, and required')
number_parsed, Valid = parse_phone_number(phone_number)
if Valid:
- return True, 0, ''
+ logging.info('valid from phone number, but is it outgoing')
+ logging.info('Direction is: '+str(Direction))
+ if Direction in ['outbound-call','outbound-api','outbound-reply']:
+ logging.info('outgoing direction from phone number')
+ #need to check numbers
+ #first check if we have that phone number as an incoming phone number
+ PN = incoming_phone_numbers.Incoming_Phone_Number.all().filter('PhoneNumber =',number_parsed).get()
+ if PN is None:
+ logging.info('no incoming phone number')
+ if SMS:
+ return False, 14108, 'http://www.twilio.com/docs/error/14108'
+ else:
+ PN = outgoing_caller_ids.Outgoing_Caller_Ids.all().filter('PhoneNumber =',number_parsed).get()
+ if PN is None:
+ return False, 14108, 'http://www.twilio.com/docs/error/14108'
+ else:
+ return True, 0, ''
+ else:
+ return True, 0, ''
+ else:
+ return True, 0, ''
else:
+ logging.info('not a valid from phone number')
return False, 21401, 'http://www.twilio.com/docs/errors/21401'
def valid_body(body, required=True):
- if body is None and required:
+ if (body is None or body == '') and required:
return False, 14103, 'http://www.twilio.com/docs/errors/14103'
else:
- return True, 0, ''
+ if body is not None and len(body) > 160:
+ return False, 21605, 'http://www.twilio.com/docs/errors/21605'
+ else:
+ return True, 0, ''
def required(required_list,request):
Valid = True
@@ -77,9 +132,9 @@ def check_url(URL):
return True
# Checks for the normal callback url to make sure they are valid
-def standard_urls(request,StandardArgName):
- if request.get(StandardArgName, None) is not None:
- if check_url(request.get(StandardArgName,None)):
+def standard_urls(ArgValue):
+ if ArgValue is not None:
+ if check_url(ArgValue):
return True, 0, ''
else:
return False, 21502, 'http://www.twilio.com/docs/errors/21502'
@@ -89,12 +144,12 @@ def standard_urls(request,StandardArgName):
#What should this pass back?
#Passed,Twilio error code, Twilio error message
#Checks fallback urls for validity, normal callback being created (cant have fallback without normal callback)
-def fallback_urls(request, FallbackArgName, StandardArgName, Instance, method = 'Voice'):
+def fallback_urls(fallback_arg_value, standard_arg_value, StandardArgName, Instance, method = 'Voice'):
# need to check that its a valid url,
- if request.get(FallbackArgName,None) is not None and request.get(FallbackArgName,None) != '':
- if check_url(request.get(FallbackArgName,None)):
+ if fallback_arg_value != '': #if not passed, then returns ''
+ if check_url(fallback_arg_value):
# need to check that a standard url is passed, or set already
- if (request.get(StandardArgName,None) is not None and request.get(StandardArgName,None) != '') or (getattr(Instance,StandardArgName) is not None and getattr(Instance,StandardArgName) != ''):
+ if standard_arg_value != '' or getattr(Instance,StandardArgName) is not None:
return True, 0, ''
else:
#Hack to check for sms fallback missing or not.
View
BIN  helpers/parameters.pyc
Binary file not shown
View
2  models/accounts.py
@@ -44,7 +44,7 @@ def sanitize(self, request, arg_name, arg_value):
else:
return arg_value
- def validate(self, request, arg_name, arg_value):
+ def validate(self, request, arg_name,arg_value, **kwargs):
validators = {
'FriendlyName' : parameters.friendlyname_length(request.get('FriendlyName',''))
}
View
BIN  models/accounts.pyc
Binary file not shown
View
4 models/base.py
@@ -28,7 +28,7 @@ def new(cls, request, AccountSid = None, **kwargs):
arg_length = len(kwargs)
for keyword in kwargs:
if hasattr(cls,keyword) and kwargs[keyword] is not None:
- Valid, TwilioCode, TwilioMsg = cls().validate( request, keyword, kwargs[keyword] )
+ Valid, TwilioCode, TwilioMsg = cls().validate( request, keyword, kwargs[keyword], **kwargs)
if not Valid:
break
else:
@@ -49,5 +49,5 @@ def new(cls, request, AccountSid = None, **kwargs):
def sanitize(self, request, arg_name, arg_value):
return arg_value
- def validate(self, request, arg_name, arg_value):
+ def validate(self, request, arg_name,arg_value, **kwargs):
return True, 0, ''
View
BIN  models/base.pyc
Binary file not shown
View
6 models/calls.py
@@ -67,10 +67,10 @@ def new(cls, ParentCallSid, AccountSid,To,From,PhoneNumberSid,Status,StartTime =
)
"""
- def validate(self, request, arg_name,arg_value):
+ def validate(self, request, arg_name,arg_value, **kwargs):
validators = {
'To' : parameters.valid_to_phone_number(arg_value if arg_value is not None else request.get('To',None),required=True),
- 'From' : parameters.valid_from_phone_number(arg_value if arg_value is not None else request.get('From',None),required=True)
+ 'From' : parameters.valid_from_phone_number(arg_value if arg_value is not None else request.get('From',None),required=True, self = self)
}
if arg_name in validators:
return validators[arg_name]
@@ -125,7 +125,7 @@ def disconnect(self,StatusCallback = None,StatusCallbackMethod = 'POST'):
self.Status = 'complete'
self.EndTime = datetime.datetime.now()
self.Duration = (self.EndTime - self.StartTime).seconds
- if self.Direction == 'outgoing-api' or self.Direction == 'outbound-dial':
+ if self.Direction == 'outbound-api' or self.Direction == 'outbound-dial':
#should be dependent on country code, but will need more work
self.Price = self.Duration * (0.02)
elif self.Direction == 'inbound':
View
2  models/conferences.py
@@ -32,7 +32,7 @@ def sanitize(self, request, arg_name, arg_value):
else:
return arg_value
- def validate(self, request, arg_name, arg_value):
+ def validate(self, request, arg_name,arg_value, **kwargs):
validators = {
'FriendlyName' : parameters.friendlyname_length(request.get('FriendlyName',''))
}
View
10 models/incoming_phone_numbers.py
@@ -1,12 +1,4 @@
-from google.appengine.ext import db
-
-from models import base, phone_numbers
-
-from random import random
-
-from hashlib import sha256
-
-from helpers import parameters
+from models import phone_numbers
class Incoming_Phone_Number(phone_numbers.Phone_Number):
pass
View
10 models/messages.py
@@ -44,12 +44,12 @@ def error(self):
self.Price = 0.00
self.put()
- def validate(self, request, arg_name,arg_value):
+ def validate(self, request, arg_name,arg_value, **kwargs):
validators = {
- 'To' : parameters.valid_to_phone_number(arg_value if arg_value is not None else request.get('To',None),required=True),
- 'From' : parameters.valid_from_phone_number(arg_value if arg_value is not None else request.get('From',None),required=True),
- 'Body' : parameters.valid_body(arg_value if arg_value is not None else request.get('Body',None),required=True),
- 'StatusCallback' : arg_value if (arg_value is not None or request is None) else parameters.standard_urls(request,'StatusCallback')
+ 'To' : parameters.valid_to_phone_number(parameters.arg_or_request(arg_value, request, arg_name),required=True),
+ 'From' : parameters.valid_from_phone_number(parameters.arg_or_request(arg_value, request, arg_name),required=True, Direction = kwargs['Direction'] if 'Direction' in kwargs else None, SMS = True),
+ 'Body' : parameters.valid_body(parameters.arg_or_request(arg_value, request, arg_name), required=True),
+ 'StatusCallback' : parameters.standard_urls(parameters.arg_or_request(arg_value, request, arg_name))
}
if arg_name in validators:
return validators[arg_name]
View
10 models/outgoing_caller_ids.py
@@ -1,12 +1,4 @@
-from google.appengine.ext import db
-
-from models import base, phone_numbers
-
-from random import random
-
-from hashlib import sha256
-
-from helpers import parameters
+from models import phone_numbers
class Outgoing_Caller_Id(phone_numbers.Phone_Number):
pass
View
2  models/participants.py
@@ -38,7 +38,7 @@ def sanitize(self, request, arg_name, arg_value):
else:
return arg_value
- def validate(self, request, arg_name, arg_value):
+ def validate(self, request, arg_name,arg_value, **kwargs):
validators = {
'Muted' : parameters.allow_boolean(arg_value if arg_value is not None else request.get( arg_name, None) )
}
View
68 models/phone_numbers.py
@@ -3,8 +3,6 @@
from random import random
from hashlib import sha256
-from helpers import parameters
-
"""
Sid A 34 character string that uniquely idetifies this resource.
DateCreated The date that this resource was created, given as GMT RFC 2822 format.
@@ -48,20 +46,34 @@ def new_Sid(self):
return 'PN'+sha256(str(random())).hexdigest()
#Validators for all properties that are user-editable.
- def validate(self, request, arg_name, arg_value):
+ def validate(self, request, arg_name, arg_value, **kwargs):
+ from helpers import parameters
+
validators = {
- 'FriendlyName' : parameters.friendlyname_length(request.get('FriendlyName','')),
- 'VoiceCallerIdLookup' : parameters.allowed_boolean(request.get('VoiceCallerIdLookup',None)),
- 'VoiceUrl' : parameters.standard_urls(request,'VoiceUrl'),
- 'VoiceMethod' : parameters.phone_allowed_methods(arg_value,['GET','POST']),
- 'VoiceFallbackUrl' : parameters.fallback_urls(request, 'VoiceFallbackUrl', 'VoiceUrl', self, 'Voice'),
- 'VoiceFallbackMethod' : parameters.phone_allowed_methods(arg_value,['GET','POST']),
- 'StatusCallback' : parameters.standard_urls(request,'StatusCallback'),
- 'StatusCallbackMethod' : parameters.phone_allowed_methods(arg_value,['GET','POST']),
- 'SmsUrl' : parameters.standard_urls(request,'SmsUrl'),
- 'SmsMethod' : parameters.sms_allowed_methods(arg_value,['GET','POST']),
- 'SmsFallbackUrl' : parameters.fallback_urls(request, 'SmsFallbackUrl', 'SmsUrl', self, 'SMS'),
- 'SmsFallbackMethod' : parameters.sms_allowed_methods(arg_value,['GET','POST'])
+ 'FriendlyName' : parameters.friendlyname_length(parameters.arg_or_request(arg_value, request, arg_name)),
+
+ 'VoiceCallerIdLookup' : parameters.allowed_boolean(parameters.arg_or_request(arg_value, request, arg_name,False)),
+
+ 'VoiceUrl' : parameters.standard_urls(parameters.arg_or_request(arg_value, request, arg_name)),
+
+ 'VoiceMethod' : parameters.phone_allowed_methods(parameters.arg_or_request(arg_value, request, arg_name,'POST'),['GET','POST']),
+
+ 'VoiceFallbackUrl' : parameters.fallback_urls(parameters.arg_or_request(arg_value, request, arg_name,''), parameters.arg_or_request(arg_value, request, 'VoiceUrl',''), 'VoiceUrl', self, 'Voice'),
+
+ 'VoiceFallbackMethod' : parameters.phone_allowed_methods(parameters.arg_or_request(arg_value, request, arg_name,'POST'),['GET','POST']),
+
+ 'StatusCallback' : parameters.standard_urls(parameters.arg_or_request(arg_value, request, arg_name)),
+
+ 'StatusCallbackMethod' : parameters.phone_allowed_methods(parameters.arg_or_request(arg_value, request, arg_name,'POST'),['GET','POST']),
+
+ 'SmsUrl' : parameters.standard_urls(parameters.arg_or_request(arg_value, request, arg_name)),
+
+ 'SmsMethod' : parameters.sms_allowed_methods(parameters.arg_or_request(arg_value, request, arg_name,'POST'),['GET','POST']),
+
+ 'SmsFallbackUrl' : parameters.fallback_urls(parameters.arg_or_request(arg_value, request, arg_name,''), parameters.arg_or_request(arg_value, request, 'SmsUrl',''), 'SmsUrl', self, 'SMS'),
+
+ 'SmsFallbackMethod' : parameters.sms_allowed_methods(parameters.arg_or_request(arg_value, request, arg_name,'POST'),['GET','POST'])
+
}
if arg_name in validators:
@@ -70,19 +82,21 @@ def validate(self, request, arg_name, arg_value):
return True, 0, ''
#to be used, but for now will leave as is, minus standardizing how I do method saving
def sanitize(self, request, arg_name, arg_value):
+ from helpers import parameters
+
sanitizers = {
- 'FriendlyName' : request.get('FriendlyName',None),
- 'VoiceCallerIdLookup' : request.get('VoiceCallerIdLookup',None),
- 'VoiceUrl' : request.get('VoiceUrl',None),
- 'VoiceMethod' : request.get('VoiceMethod','POST').upper(),
- 'VoiceFallbackUrl' : request.get('VoiceFallbackUrl',None),
- 'VoiceFallbackMethod' : request.get('VoiceFallbackMethod','POST').upper(),
- 'StatusCallback' : request.get('StatusCallback',None),
- 'StatusCallbackMethod' : request.get('StatusCallbackMethod','POST').upper(),
- 'SmsUrl' : request.get('SmsUrl',None),
- 'SmsMethod' : request.get('SmsMethod','POST').upper(),
- 'SmsFallbackUrl' : request.get('SmsFallbackUrl',None),
- 'SmsFallbackMethod' : request.get('SmsFallbackMethod','POST').upper()
+ 'FriendlyName' : parameters.arg_or_request(arg_value, request, arg_name),
+ 'VoiceCallerIdLookup' : parameters.arg_or_request(arg_value, request, arg_name,False),
+ 'VoiceUrl' : parameters.arg_or_request(arg_value, request, arg_name),
+ 'VoiceMethod' : parameters.arg_or_request(arg_value, request, arg_name,'POST').upper(),
+ 'VoiceFallbackUrl' : parameters.arg_or_request(arg_value, request, arg_name),
+ 'VoiceFallbackMethod' : parameters.arg_or_request(arg_value, request, arg_name,'POST').upper(),
+ 'StatusCallback' : parameters.arg_or_request(arg_value, request, arg_name),
+ 'StatusCallbackMethod' : parameters.arg_or_request(arg_value, request, arg_name,'POST').upper(),
+ 'SmsUrl' :parameters.arg_or_request(arg_value, request, arg_name),
+ 'SmsMethod' : parameters.arg_or_request(arg_value, request, arg_name,'POST').upper(),
+ 'SmsFallbackUrl' : parameters.arg_or_request(arg_value, request, arg_name),
+ 'SmsFallbackMethod' : parameters.arg_or_request(arg_value, request, arg_name,'POST').upper()
}
if arg_name in sanitizers:
return sanitizers[arg_name]
View
0  tests/handlers/base_handler.py → test/handlers/base_handler.py
File renamed without changes
View
0  tests/models/base.py → test/models/base.py
File renamed without changes
View
51 test/models_test.py
@@ -0,0 +1,51 @@
+import unittest
+import logging
+from google.appengine.ext import db
+import models
+
+class MessagesModel(unittest.TestCase):
+ def setUp(self):
+ #create account
+ self.Account = models.accounts.Account.new(key_name='email@email.com',email='email@email.com',password='password')
+ self.Account.put()
+ self.PhoneNumber,Valid,TwilioCode, TwilioMsg = models.incoming_phone_numbers.Incoming_Phone_Number.new(PhoneNumber = '+13015559999', request = None, AccountSid = self.Account.Sid)
+ self.PhoneNumber.put()
+ self.FakeToNumber = '+12405551234'
+ self.BodyText = 'Fake Body Text'
+ self.LongBodyText = 'Lorem ipsum dolor sit amet, consectetur adipiscing elit. Donec a diam lectus. Sed sit amet ipsum mauris. Maecenas congue ligula ac quam viverra nec consectetur ante hendrerit. Donec et mollis dolor. Praesent et diam eget libero egestas mattis sit amet vitae augue. Nam tincidunt congue enim, ut porta lorem lacinia consectetur. Donec ut libero sed arcu vehicula ultricies a non tortor. Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aenean ut gravida lorem. Ut turpis felis, pulvinar a semper sed, adipiscing id dolor. Pellentesque auctor nisi id magna consequat sagittis. Curabitur dapibus enim sit amet elit pharetra tincidunt feugiat nisl imperdiet. Ut convallis libero in urna ultrices accumsan. Donec sed odio eros. Donec viverra mi quis quam pulvinar at malesuada arcu rhoncus. Cum sociis natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. In rutrum accumsan ultricies. Mauris vitae nisi at sem facilisis semper ac in est.'
+
+ def test_Message_creation_success(self):
+ Message, Valid, TwilioCode, TwilioMsg = models.messages.Message.new(To = self.FakeToNumber, From = self.PhoneNumber.PhoneNumber, Body = self.BodyText, Direction = 'outbound-api', request = None, AccountSid = self.Account.Sid)
+ self.assertTrue(Valid)
+ self.assertEqual(Message.To,self.FakeToNumber)
+ self.assertEqual(Message.From,self.PhoneNumber.PhoneNumber)
+ self.assertEqual(Message.Body,self.BodyText)
+
+ def test_Message_creation_to_failure(self):
+ Message, Valid, TwilioCode, TwilioMsg = models.messages.Message.new(To = 'adsfadsfasdf', From = self.PhoneNumber.PhoneNumber, Body = 'Good body text', Direction = 'outbound-api', request = None, AccountSid = self.Account.Sid)
+ self.assertFalse(Valid)
+ self.assertEqual(TwilioCode, 21401)
+
+ def test_Message_creation_from_failure_mistyped_number(self):
+ Message, Valid, TwilioCode, TwilioMsg = models.messages.Message.new(To = self.FakeToNumber, From = '+240555a123', Body = 'Good body text', Direction = 'outbound-api', request = None, AccountSid = self.Account.Sid)
+ self.assertFalse(Valid)
+ self.assertEqual(TwilioCode, 21401)
+
+ def test_Message_creation_from_failure_not_allowed_number(self):
+ Message, Valid, TwilioCode, TwilioMsg = models.messages.Message.new(To = self.FakeToNumber, From = '+5555555555', Body = 'Good body text', Direction = 'outbound-api', request = None, AccountSid = self.Account.Sid)
+ self.assertFalse(Valid)
+ self.assertEqual(TwilioCode, 14108)
+
+ def test_Message_creation_body_blank_failure(self):
+ Message, Valid, TwilioCode, TwilioMsg = models.messages.Message.new(To = self.FakeToNumber, From = self.PhoneNumber.PhoneNumber, Body = '', Direction = 'outbound-api', request = None, AccountSid = self.Account.Sid)
+ self.assertFalse(Valid)
+ self.assertEqual(TwilioCode, 14103)
+
+ def test_Message_creation_body_long_failure(self):
+ Message, Valid, TwilioCode, TwilioMsg = models.messages.Message.new(To = self.FakeToNumber, From = self.PhoneNumber.PhoneNumber, Body = self.LongBodyText, Direction = 'outbound-api', request = None, AccountSid = self.Account.Sid)
+ self.assertFalse(Valid)
+ self.assertEqual(TwilioCode, 21605)
+
+ def test_Message_creation_callback_failure(self):
+ Message, Valid, TwilioCode, TwilioMsg = models.messages.Message.new(To = self.FakeToNumber, From = self.PhoneNumber.PhoneNumber, Body = 'Good body text', Direction = 'outbound-api', request = None, AccountSid = self.Account.Sid)
+
View
1,311 webtest/__init__.py
@@ -0,0 +1,1311 @@
+# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
+# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
+"""
+Routines for testing WSGI applications.
+
+Most interesting is TestApp
+"""
+
+import sys
+import random
+import urllib
+import urlparse
+import mimetypes
+import time
+import cgi
+import os
+#import webbrowser
+from Cookie import BaseCookie
+try:
+ from cStringIO import StringIO
+except ImportError:
+ from StringIO import StringIO
+import re
+from webob import Response, Request
+from wsgiref.validate import validator
+
+__all__ = ['TestApp']
+
+def tempnam_no_warning(*args):
+ """
+ An os.tempnam with the warning turned off, because sometimes
+ you just need to use this and don't care about the stupid
+ security warning.
+ """
+ return os.tempnam(*args)
+
+class NoDefault(object):
+ pass
+
+try:
+ sorted
+except NameError:
+ def sorted(l):
+ l = list(l)
+ l.sort()
+ return l
+
+class AppError(Exception):
+ pass
+
+class TestApp(object):
+
+ # for py.test
+ disabled = True
+
+ def __init__(self, app, extra_environ=None, relative_to=None):
+ """
+ Wraps a WSGI application in a more convenient interface for
+ testing.
+
+ ``app`` may be an application, or a Paste Deploy app
+ URI, like ``'config:filename.ini#test'``.
+
+ ``extra_environ`` is a dictionary of values that should go
+ into the environment for each request. These can provide a
+ communication channel with the application.
+
+ ``relative_to`` is a directory, and filenames used for file
+ uploads are calculated relative to this. Also ``config:``
+ URIs that aren't absolute.
+ """
+ if isinstance(app, (str, unicode)):
+ from paste.deploy import loadapp
+ # @@: Should pick up relative_to from calling module's
+ # __file__
+ app = loadapp(app, relative_to=relative_to)
+ self.app = app
+ self.relative_to = relative_to
+ if extra_environ is None:
+ extra_environ = {}
+ self.extra_environ = extra_environ
+ self.reset()
+
+ def reset(self):
+ """
+ Resets the state of the application; currently just clears
+ saved cookies.
+ """
+ self.cookies = {}
+
+ def _make_environ(self, extra_environ=None):
+ environ = self.extra_environ.copy()
+ environ['paste.throw_errors'] = True
+ if extra_environ:
+ environ.update(extra_environ)
+ return environ
+
+ def get(self, url, params=None, headers=None, extra_environ=None,
+ status=None, expect_errors=False):
+ """
+ Get the given url (well, actually a path like
+ ``'/page.html'``).
+
+ ``params``:
+ A query string, or a dictionary that will be encoded
+ into a query string. You may also include a query
+ string on the ``url``.
+
+ ``headers``:
+ A dictionary of extra headers to send.
+
+ ``extra_environ``:
+ A dictionary of environmental variables that should
+ be added to the request.
+
+ ``status``:
+ The integer status code you expect (if not 200 or 3xx).
+ If you expect a 404 response, for instance, you must give
+ ``status=404`` or it will be an error. You can also give
+ a wildcard, like ``'3*'`` or ``'*'``.
+
+ ``expect_errors``:
+ If this is not true, then if anything is written to
+ ``wsgi.errors`` it will be an error. If it is true, then
+ non-200/3xx responses are also okay.
+
+ Returns a ``webob.Response`` object.
+ """
+ environ = self._make_environ(extra_environ)
+ # Hide from py.test:
+ __tracebackhide__ = True
+ if params:
+ if not isinstance(params, (str, unicode)):
+ params = urllib.urlencode(params, doseq=True)
+ if '?' in url:
+ url += '&'
+ else:
+ url += '?'
+ url += params
+ url = str(url)
+ if '?' in url:
+ url, environ['QUERY_STRING'] = url.split('?', 1)
+ else:
+ environ['QUERY_STRING'] = ''
+ req = TestRequest.blank(url, environ)
+ if headers:
+ req.headers.update(headers)
+ return self.do_request(req, status=status,
+ expect_errors=expect_errors)
+
+ def _gen_request(self, method, url, params='', headers=None, extra_environ=None,
+ status=None, upload_files=None, expect_errors=False):
+ """
+ Do a generic request.
+ """
+ environ = self._make_environ(extra_environ)
+ # @@: Should this be all non-strings?
+ if isinstance(params, (list, tuple, dict)):
+ params = urllib.urlencode(params)
+ if hasattr(params, 'items'):
+ params = urllib.urlencode(params.items())
+ if upload_files:
+ params = cgi.parse_qsl(params, keep_blank_values=True)
+ content_type, params = self.encode_multipart(
+ params, upload_files)
+ environ['CONTENT_TYPE'] = content_type
+ elif params:
+ environ.setdefault('CONTENT_TYPE', 'application/x-www-form-urlencoded')
+ if '?' in url:
+ url, environ['QUERY_STRING'] = url.split('?', 1)
+ else:
+ environ['QUERY_STRING'] = ''
+ environ['CONTENT_LENGTH'] = str(len(params))
+ environ['REQUEST_METHOD'] = method
+ environ['wsgi.input'] = StringIO(params)
+ req = TestRequest.blank(url, environ)
+ if headers:
+ req.headers.update(headers)
+ return self.do_request(req, status=status,
+ expect_errors=expect_errors)
+
+ def post(self, url, params='', headers=None, extra_environ=None,
+ status=None, upload_files=None, expect_errors=False):
+ """
+ Do a POST request. Very like the ``.get()`` method.
+ ``params`` are put in the body of the request.
+
+ ``upload_files`` is for file uploads. It should be a list of
+ ``[(fieldname, filename, file_content)]``. You can also use
+ just ``[(fieldname, filename)]`` and the file content will be
+ read from disk.
+
+ Returns a ``webob.Response`` object.
+ """
+ return self._gen_request('POST', url, params=params, headers=headers,
+ extra_environ=extra_environ,status=status,
+ upload_files=upload_files,
+ expect_errors=expect_errors)
+
+ def put(self, url, params='', headers=None, extra_environ=None,
+ status=None, upload_files=None, expect_errors=False):
+ """
+ Do a PUT request. Very like the ``.get()`` method.
+ ``params`` are put in the body of the request.
+
+ ``upload_files`` is for file uploads. It should be a list of
+ ``[(fieldname, filename, file_content)]``. You can also use
+ just ``[(fieldname, filename)]`` and the file content will be
+ read from disk.
+
+ Returns a ``webob.Response`` object.
+ """
+ return self._gen_request('PUT', url, params=params, headers=headers,
+ extra_environ=extra_environ,status=status,
+ upload_files=upload_files,
+ expect_errors=expect_errors)
+
+ def delete(self, url, headers=None, extra_environ=None,
+ status=None, expect_errors=False):
+ """
+ Do a DELETE request. Very like the ``.get()`` method.
+ ``params`` are put in the body of the request.
+
+ Returns a ``webob.Response`` object.
+ """
+ return self._gen_request('DELETE', url, params=params, headers=headers,
+ extra_environ=extra_environ,status=status,
+ upload_files=None, expect_errors=expect_errors)
+
+ def encode_multipart(self, params, files):
+ """
+ Encodes a set of parameters (typically a name/value list) and
+ a set of files (a list of (name, filename, file_body)) into a
+ typical POST body, returning the (content_type, body).
+ """
+ boundary = '----------a_BoUnDaRy%s$' % random.random()
+ lines = []
+ for key, value in params:
+ lines.append('--'+boundary)
+ lines.append('Content-Disposition: form-data; name="%s"' % key)
+ lines.append('')
+ lines.append(value)
+ for file_info in files:
+ key, filename, value = self._get_file_info(file_info)
+ lines.append('--'+boundary)
+ lines.append('Content-Disposition: form-data; name="%s"; filename="%s"'
+ % (key, filename))
+ fcontent = mimetypes.guess_type(filename)[0]
+ lines.append('Content-Type: %s' %
+ fcontent or 'application/octet-stream')
+ lines.append('')
+ lines.append(value)
+ lines.append('--' + boundary + '--')
+ lines.append('')
+ body = '\r\n'.join(lines)
+ content_type = 'multipart/form-data; boundary=%s' % boundary
+ return content_type, body
+
+ def _get_file_info(self, file_info):
+ if len(file_info) == 2:
+ # It only has a filename
+ filename = file_info[1]
+ if self.relative_to:
+ filename = os.path.join(self.relative_to, filename)
+ f = open(filename, 'rb')
+ content = f.read()
+ f.close()
+ return (file_info[0], filename, content)
+ elif len(file_info) == 3:
+ return file_info
+ else:
+ raise ValueError(
+ "upload_files need to be a list of tuples of (fieldname, "
+ "filename, filecontent) or (fieldname, filename); "
+ "you gave: %r"
+ % repr(file_info)[:100])
+
+ def do_request(self, req, status, expect_errors):
+ """
+ Executes the given request (``req``), with the expected
+ ``status``. Generally ``.get()`` and ``.post()`` are used
+ instead.
+ """
+ __tracebackhide__ = True
+ errors = StringIO()
+ req.environ['wsgi.errors'] = errors
+ if self.cookies:
+ c = BaseCookie()
+ for name, value in self.cookies.items():
+ c[name] = value
+ req.environ['HTTP_COOKIE'] = str(c).split(': ', 1)[1]
+ req.environ['paste.testing'] = True
+ req.environ['paste.testing_variables'] = {}
+ app = validator(self.app)
+ old_stdout = sys.stdout
+ out = CaptureStdout(old_stdout)
+ try:
+ sys.stdout = out
+ start_time = time.time()
+ ## FIXME: should it be an option to not catch exc_info?
+ res = req.get_response(app, catch_exc_info=True)
+ end_time = time.time()
+ finally:
+ sys.stdout = old_stdout
+ sys.stderr.write(out.getvalue())
+ res.app = app
+ res.test_app = self
+ # We do this to make sure the app_iter is exausted:
+ res.body
+ res.errors = errors.getvalue()
+ total_time = end_time - start_time
+ for name, value in req.environ['paste.testing_variables'].items():
+ if hasattr(res, name):
+ raise ValueError(
+ "paste.testing_variables contains the variable %r, but "
+ "the response object already has an attribute by that "
+ "name" % name)
+ setattr(res, name, value)
+ if not expect_errors:
+ self._check_status(status, res)
+ self._check_errors(res)
+ res.cookies_set = {}
+ for header in res.headers.getall('set-cookie'):
+ c = BaseCookie(header)
+ for key, morsel in c.items():
+ self.cookies[key] = morsel.value
+ res.cookies_set[key] = morsel.value
+ return res
+
+ def _check_status(self, status, res):
+ __tracebackhide__ = True
+ if status == '*':
+ return
+ if isinstance(status, (list, tuple)):
+ if res.status_int not in status:
+ raise AppError(
+ "Bad response: %s (not one of %s for %s)\n%s"
+ % (res.status, ', '.join(map(str, status)),
+ res.request.url, res.body))
+ return
+ if status is None:
+ if res.status_int >= 200 and res.status_int < 400:
+ return
+ raise AppError(
+ "Bad response: %s (not 200 OK or 3xx redirect for %s)\n%s"
+ % (res.status, res.request.url,
+ res.body))
+ if status != res.status_int:
+ raise AppError(
+ "Bad response: %s (not %s)" % (res.status, status))
+
+ def _check_errors(self, res):
+ errors = res.errors
+ if errors:
+ raise AppError(
+ "Application had errors logged:\n%s" % errors)
+
+class CaptureStdout(object):
+
+ def __init__(self, actual):
+ self.captured = StringIO()
+ self.actual = actual
+
+ def write(self, s):
+ self.captured.write(s)
+ self.actual.write(s)
+
+ def flush(self):
+ self.actual.flush()
+
+ def writelines(self, lines):
+ for item in lines:
+ self.write(item)
+
+ def getvalue(self):
+ return self.captured.getvalue()
+
+class TestResponse(Response):
+
+ """
+ Instances of this class are return by ``TestApp``
+ """
+
+ _forms_indexed = None
+
+
+ def forms__get(self):
+ """
+ Returns a dictionary of ``Form`` objects. Indexes are both in
+ order (from zero) and by form id (if the form is given an id).
+ """
+ if self._forms_indexed is None:
+ self._parse_forms()
+ return self._forms_indexed
+
+ forms = property(forms__get,
+ doc="""
+ A list of <form>s found on the page (instances of
+ ``Form``)
+ """)
+
+ def form__get(self):
+ forms = self.forms
+ if not forms:
+ raise TypeError(
+ "You used response.form, but no forms exist")
+ if 1 in forms:
+ # There is more than one form
+ raise TypeError(
+ "You used response.form, but more than one form exists")
+ return forms[0]
+
+ form = property(form__get,
+ doc="""
+ Returns a single ``Form`` instance; it
+ is an error if there are multiple forms on the
+ page.
+ """)
+
+ _tag_re = re.compile(r'<(/?)([:a-z0-9_\-]*)(.*?)>', re.S|re.I)
+
+ def _parse_forms(self):
+ forms = self._forms_indexed = {}
+ form_texts = []
+ started = None
+ for match in self._tag_re.finditer(self.body):
+ end = match.group(1) == '/'
+ tag = match.group(2).lower()
+ if tag != 'form':
+ continue
+ if end:
+ assert started, (
+ "</form> unexpected at %s" % match.start())
+ form_texts.append(self.body[started:match.end()])
+ started = None
+ else:
+ assert not started, (
+ "Nested form tags at %s" % match.start())
+ started = match.start()
+ assert not started, (
+ "Danging form: %r" % self.body[started:])
+ for i, text in enumerate(form_texts):
+ form = Form(self, text)
+ forms[i] = form
+ if form.id:
+ forms[form.id] = form
+
+ def follow(self, **kw):
+ """
+ If this request is a redirect, follow that redirect. It
+ is an error if this is not a redirect response. Returns
+ another response object.
+ """
+ assert self.status_int >= 300 and self.status_int < 400, (
+ "You can only follow redirect responses (not %s)"
+ % self.status)
+ location = self.headers['location']
+ type, rest = urllib.splittype(location)
+ host, path = urllib.splithost(rest)
+ # @@: We should test that it's not a remote redirect
+ return self.test_app.get(location, **kw)
+
+ def click(self, description=None, linkid=None, href=None,
+ anchor=None, index=None, verbose=False):
+ """
+ Click the link as described. Each of ``description``,
+ ``linkid``, and ``url`` are *patterns*, meaning that they are
+ either strings (regular expressions), compiled regular
+ expressions (objects with a ``search`` method), or callables
+ returning true or false.
+
+ All the given patterns are ANDed together:
+
+ * ``description`` is a pattern that matches the contents of the
+ anchor (HTML and all -- everything between ``<a...>`` and
+ ``</a>``)
+
+ * ``linkid`` is a pattern that matches the ``id`` attribute of
+ the anchor. It will receive the empty string if no id is
+ given.
+
+ * ``href`` is a pattern that matches the ``href`` of the anchor;
+ the literal content of that attribute, not the fully qualified
+ attribute.
+
+ * ``anchor`` is a pattern that matches the entire anchor, with
+ its contents.
+
+ If more than one link matches, then the ``index`` link is
+ followed. If ``index`` is not given and more than one link
+ matches, or if no link matches, then ``IndexError`` will be
+ raised.
+
+ If you give ``verbose`` then messages will be printed about
+ each link, and why it does or doesn't match. If you use
+ ``app.click(verbose=True)`` you'll see a list of all the
+ links.
+
+ You can use multiple criteria to essentially assert multiple
+ aspects about the link, e.g., where the link's destination is.
+ """
+ __tracebackhide__ = True
+ found_html, found_desc, found_attrs = self._find_element(
+ tag='a', href_attr='href',
+ href_extract=None,
+ content=description,
+ id=linkid,
+ href_pattern=href,
+ html_pattern=anchor,
+ index=index, verbose=verbose)
+ return self.goto(found_attrs['uri'])
+
+ def clickbutton(self, description=None, buttonid=None, href=None,
+ button=None, index=None, verbose=False):
+ """
+ Like ``.click()``, except looks for link-like buttons.
+ This kind of button should look like
+ ``<button onclick="...location.href='url'...">``.
+ """
+ __tracebackhide__ = True
+ found_html, found_desc, found_attrs = self._find_element(
+ tag='button', href_attr='onclick',
+ href_extract=re.compile(r"location\.href='(.*?)'"),
+ content=description,
+ id=buttonid,
+ href_pattern=href,
+ html_pattern=button,
+ index=index, verbose=verbose)
+ return self.goto(found_attrs['uri'])
+
+ def _find_element(self, tag, href_attr, href_extract,
+ content, id,
+ href_pattern,
+ html_pattern,
+ index, verbose):
+ content_pat = _make_pattern(content)
+ id_pat = _make_pattern(id)
+ href_pat = _make_pattern(href_pattern)
+ html_pat = _make_pattern(html_pattern)
+
+ _tag_re = re.compile(r'<%s\s+(.*?)>(.*?)</%s>' % (tag, tag),
+ re.I+re.S)
+
+ def printlog(s):
+ if verbose:
+ print s
+
+ found_links = []
+ total_links = 0
+ for match in _tag_re.finditer(self.body):
+ el_html = match.group(0)
+ el_attr = match.group(1)
+ el_content = match.group(2)
+ attrs = _parse_attrs(el_attr)
+ if verbose:
+ printlog('Element: %r' % el_html)
+ if not attrs.get(href_attr):
+ printlog(' Skipped: no %s attribute' % href_attr)
+ continue
+ el_href = attrs[href_attr]
+ if href_extract:
+ m = href_extract.search(el_href)
+ if not m:
+ printlog(" Skipped: doesn't match extract pattern")
+ continue
+ el_href = m.group(1)
+ attrs['uri'] = el_href
+ if el_href.startswith('#'):
+ printlog(' Skipped: only internal fragment href')
+ continue
+ if el_href.startswith('javascript:'):
+ printlog(' Skipped: cannot follow javascript:')
+ continue
+ total_links += 1
+ if content_pat and not content_pat(el_content):
+ printlog(" Skipped: doesn't match description")
+ continue
+ if id_pat and not id_pat(attrs.get('id', '')):
+ printlog(" Skipped: doesn't match id")
+ continue
+ if href_pat and not href_pat(el_href):
+ printlog(" Skipped: doesn't match href")
+ continue
+ if html_pat and not html_pat(el_html):
+ printlog(" Skipped: doesn't match html")
+ continue
+ printlog(" Accepted")
+ found_links.append((el_html, el_content, attrs))
+ if not found_links:
+ raise IndexError(
+ "No matching elements found (from %s possible)"
+ % total_links)
+ if index is None:
+ if len(found_links) > 1:
+ raise IndexError(
+ "Multiple links match: %s"
+ % ', '.join([repr(anc) for anc, d, attr in found_links]))
+ found_link = found_links[0]
+ else:
+ try:
+ found_link = found_links[index]
+ except IndexError:
+ raise IndexError(
+ "Only %s (out of %s) links match; index %s out of range"
+ % (len(found_links), total_links, index))
+ return found_link
+
+ def goto(self, href, method='get', **args):
+ """
+ Go to the (potentially relative) link ``href``, using the
+ given method (``'get'`` or ``'post'``) and any extra arguments
+ you want to pass to the ``app.get()`` or ``app.post()``
+ methods.
+
+ All hostnames and schemes will be ignored.
+ """
+ scheme, host, path, query, fragment = urlparse.urlsplit(href)
+ # We
+ scheme = host = fragment = ''
+ href = urlparse.urlunsplit((scheme, host, path, query, fragment))
+ href = urlparse.urljoin(self.request.url, href)
+ method = method.lower()
+ assert method in ('get', 'post'), (
+ 'Only "get" or "post" are allowed for method (you gave %r)'
+ % method)
+ if method == 'get':
+ method = self.test_app.get
+ else:
+ method = self.test_app.post
+ return method(href, **args)
+
+ _normal_body_regex = re.compile(r'[ \n\r\t]+')
+
+ _normal_body = None
+
+ def normal_body__get(self):
+ if self._normal_body is None:
+ self._normal_body = self._normal_body_regex.sub(
+ ' ', self.body)
+ return self._normal_body
+
+ normal_body = property(normal_body__get,
+ doc="""
+ Return the whitespace-normalized body
+ """.strip())
+
+ def unicode_normal_body__get(self):
+ if not self.charset:
+ raise AttributeError(
+ "You cannot access Response.unicode_normal_body unless charset is set")
+ return self.normal_body.decode(self.charset)
+
+ unicode_normal_body = property(
+ unicode_normal_body__get, doc="""
+ Return the whitespace-normalized body, as unicode
+ """.strip())
+
+ def __contains__(self, s):
+ """
+ A response 'contains' a string if it is present in the body
+ of the response. Whitespace is normalized when searching
+ for a string.
+ """
+ if not isinstance(s, basestring):
+ if hasattr(s, '__unicode__'):
+ s = unicode(s)
+ else:
+ s = str(s)
+ if isinstance(s, unicode):
+ body = self.unicode_body
+ normal_body = self.unicode_normal_body
+ else:
+ body = self.body
+ normal_body = self.normal_body
+ return s in body or s in normal_body
+
+ def mustcontain(self, *strings, **kw):
+ """
+ Assert that the response contains all of the strings passed
+ in as arguments.
+
+ Equivalent to::
+
+ assert string in res
+ """
+ if 'no' in kw:
+ no = kw['no']
+ del kw['no']
+ if isinstance(no, basestring):
+ no = [no]
+ else:
+ no = []
+ if kw:
+ raise TypeError(
+ "The only keyword argument allowed is 'no'")
+ for s in strings:
+ if not s in self:
+ print >> sys.stderr, "Actual response (no %r):" % s
+ print >> sys.stderr, self
+ raise IndexError(
+ "Body does not contain string %r" % s)
+ for no_s in no:
+ if no_s in self:
+ print >> sys.stderr, "Actual response (has %r)" % s
+ print >> sys.stderr, self
+ raise IndexError(
+ "Body contains string %r" % s)
+
+ def __str__(self):
+ simple_body = '\n'.join([l for l in self.body.splitlines()
+ if l.strip()])
+ return 'Response: %s\n%s\n%s' % (
+ self.status,
+ '\n'.join(['%s: %s' % (n, v) for n, v in self.headerlist]),
+ simple_body)
+
+ def __repr__(self):
+ # Specifically intended for doctests
+ if self.content_type:
+ ct = ' %s' % self.content_type
+ else:
+ ct = ''
+ if self.body:
+ br = repr(self.body)
+ if len(br) > 18:
+ br = br[:10]+'...'+br[-5:]
+ body = ' body=%s/%s' % (br, len(self.body))
+ else:
+ body = ' no body'
+ if self.location:
+ location = ' location: %s' % self.location
+ else:
+ location = ''
+ return ('<' + self.status + ct + location + body + '>')
+
+ def html(self):
+ """
+ Returns the response as a `BeautifulSoup
+ <http://www.crummy.com/software/BeautifulSoup/documentation.html>`_
+ object.
+
+ Only works with HTML responses; other content-types raise
+ AttributeError.
+ """
+ if 'html' not in self.content_type:
+ raise AttributeError(
+ "Not an HTML response body (content-type: %s)"
+ % self.content_type)
+ try:
+ from BeautifulSoup import BeautifulSoup
+ except ImportError:
+ raise ImportError(
+ "You must have BeautifulSoup installed to use response.html")
+ soup = BeautifulSoup(self.body)
+ return soup
+
+ html = property(html, doc=html.__doc__)
+
+ def xml(self):
+ """
+ Returns the response as an `ElementTree
+ <http://python.org/doc/current/lib/module-xml.etree.ElementTree.html>`_
+ object.
+
+ Only works with XML responses; other content-types raise
+ AttributeError
+ """
+ if 'xml' not in self.content_type:
+ raise AttributeError(
+ "Not an XML response body (content-type: %s)"
+ % self.content_type)
+ try:
+ from xml.etree import ElementTree
+ except ImportError:
+ try:
+ import ElementTree
+ except ImportError:
+ try:
+ from elementtree import ElementTree
+ except ImportError:
+ raise ImportError(
+ "You must have ElementTree installed (or use Python 2.5) to use response.xml")
+ return ElementTree.XML(self.body)
+
+ xml = property(xml, doc=xml.__doc__)
+
+ def lxml(self):
+ """
+ Returns the response as an `lxml object
+ <http://codespeak.net/lxml/>`_. You must have lxml installed
+ to use this.
+
+ If this is an HTML response and you have lxml 2.x installed,
+ then an ``lxml.html.HTML`` object will be returned; if you
+ have an earlier version of lxml then a ``lxml.HTML`` object
+ will be returned.
+ """
+ if ('html' not in self.content_type
+ and 'xml' not in self.content_type):
+ raise AttributeError(
+ "Not an XML or HTML response body (content-type: %s)"
+ % self.content_type)
+ try:
+ from lxml import etree
+ except ImportError:
+ raise ImportError(
+ "You must have lxml installed to use response.lxml")
+ try:
+ from lxml.html import fromstring
+ except ImportError:
+ fromstring = etree.HTML
+ ## FIXME: would be nice to set xml:base, in some fashion
+ if self.content_type == 'text/html':
+ return fromstring(self.body)
+ else:
+ return etree.XML(self.body)
+
+ lxml = property(lxml, doc=lxml.__doc__)
+
+ def json(self):
+ """
+ Return the response as a JSON response. You must have
+ `simplejson
+ <http://svn.red-bean.com/bob/simplejson/tags/simplejson-1.7/docs/index.html>`_
+ installed to use this.
+
+ The content type must be application/json to use this.
+ """
+ if self.content_type != 'application/json':
+ raise AttributeError(
+ "Not a JSON response body (content-type: %s)"
+ % self.content_type)
+ try:
+ from simplejson import loads
+ except ImportError:
+ raise ImportError(
+ "You must have simplejson installed to use response.json")
+ return loads(self.body)
+
+ json = property(json, doc=json.__doc__)
+
+ def showbrowser(self):
+ """
+ Show this response in a browser window (for debugging purposes,
+ when it's hard to read the HTML).
+ """
+ fn = tempnam_no_warning(None, 'webtest-page') + '.html'
+ f = open(fn, 'wb')
+ f.write(self.body)
+ f.close()
+ url = 'file:' + fn.replace(os.sep, '/')
+ webbrowser.open_new(url)
+
+class TestRequest(Request):
+
+ # for py.test
+ disabled = True
+ ResponseClass = TestResponse
+
+########################################
+## Form objects
+########################################
+
+class Form(object):
+
+ """
+ This object represents a form that has been found in a page.
+ This has a couple useful attributes:
+
+ ``text``:
+ the full HTML of the form.
+
+ ``action``:
+ the relative URI of the action.
+
+ ``method``:
+ the method (e.g., ``'GET'``).
+
+ ``id``:
+ the id, or None if not given.
+
+ ``fields``:
+ a dictionary of fields, each value is a list of fields by
+ that name. ``<input type=\"radio\">`` and ``<select>`` are
+ both represented as single fields with multiple options.
+ """
+
+ # @@: This really should be using Mechanize/ClientForm or
+ # something...
+
+ _tag_re = re.compile(r'<(/?)([a-z0-9_\-]*)([^>]*?)>', re.I)
+
+ def __init__(self, response, text):
+ self.response = response
+ self.text = text
+ self._parse_fields()
+ self._parse_action()
+
+ def _parse_fields(self):
+ in_select = None
+ in_textarea = None
+ fields = {}
+ for match in self._tag_re.finditer(self.text):
+ end = match.group(1) == '/'
+ tag = match.group(2).lower()
+ if tag not in ('input', 'select', 'option', 'textarea',
+ 'button'):
+ continue
+ if tag == 'select' and end:
+ assert in_select, (
+ '%r without starting select' % match.group(0))
+ in_select = None
+ continue
+ if tag == 'textarea' and end:
+ assert in_textarea, (
+ "</textarea> with no <textarea> at %s" % match.start())
+ in_textarea[0].value = html_unquote(self.text[in_textarea[1]:match.start()])
+ in_textarea = None
+ continue
+ if end:
+ continue
+ attrs = _parse_attrs(match.group(3))
+ if 'name' in attrs:
+ name = attrs.pop('name')
+ else:
+ name = None
+ if tag == 'option':
+ in_select.options.append((attrs.get('value'),
+ 'selected' in attrs))
+ continue
+ if tag == 'input' and attrs.get('type') == 'radio':
+ field = fields.get(name)
+ if not field:
+ field = Radio(self, tag, name, match.start(), **attrs)
+ fields.setdefault(name, []).append(field)
+ else:
+ field = field[0]
+ assert isinstance(field, Radio)
+ field.options.append((attrs.get('value'),
+ 'checked' in attrs))
+ continue
+ tag_type = tag
+ if tag == 'input':
+ tag_type = attrs.get('type', 'text').lower()
+ FieldClass = Field.classes.get(tag_type, Field)
+ field = FieldClass(self, tag, name, match.start(), **attrs)
+ if tag == 'textarea':
+ assert not in_textarea, (
+ "Nested textareas: %r and %r"
+ % (in_textarea, match.group(0)))
+ in_textarea = field, match.end()
+ elif tag == 'select':
+ assert not in_select, (
+ "Nested selects: %r and %r"
+ % (in_select, match.group(0)))
+ in_select = field
+ fields.setdefault(name, []).append(field)
+ self.fields = fields
+
+ def _parse_action(self):
+ self.action = None
+ for match in self._tag_re.finditer(self.text):
+ end = match.group(1) == '/'
+ tag = match.group(2).lower()
+ if tag != 'form':
+ continue
+ if end:
+ break
+ attrs = _parse_attrs(match.group(3))
+ self.action = attrs.get('action', '')
+ self.method = attrs.get('method', 'GET')
+ self.id = attrs.get('id')
+ # @@: enctype?
+ else:
+ assert 0, "No </form> tag found"
+ assert self.action is not None, (
+ "No <form> tag found")
+
+ def __setitem__(self, name, value):
+ """
+ Set the value of the named field. If there is 0 or multiple
+ fields by that name, it is an error.
+
+ Setting the value of a ``<select>`` selects the given option
+ (and confirms it is an option). Setting radio fields does the
+ same. Checkboxes get boolean values. You cannot set hidden
+ fields or buttons.
+
+ Use ``.set()`` if there is any ambiguity and you must provide
+ an index.
+ """
+ fields = self.fields.get(name)
+ assert fields is not None, (
+ "No field by the name %r found (fields: %s)"
+ % (name, ', '.join(map(repr, self.fields.keys()))))
+ assert len(fields) == 1, (
+ "Multiple fields match %r: %s"
+ % (name, ', '.join(map(repr, fields))))
+ fields[0].value = value
+
+ def __getitem__(self, name):
+ """
+ Get the named field object (ambiguity is an error).
+ """
+ fields = self.fields.get(name)
+ assert fields is not None, (
+ "No field by the name %r found" % name)
+ assert len(fields) == 1, (
+ "Multiple fields match %r: %s"
+ % (name, ', '.join(map(repr, fields))))
+ return fields[0]
+
+ def set(self, name, value, index=None):
+ """
+ Set the given name, using ``index`` to disambiguate.
+ """
+ if index is None:
+ self[name] = value
+ else:
+ fields = self.fields.get(name)
+ assert fields is not None, (
+ "No fields found matching %r" % name)
+ field = fields[index]
+ field.value = value
+
+ def get(self, name, index=None, default=NoDefault):
+ """
+ Get the named/indexed field object, or ``default`` if no field
+ is found.
+ """
+ fields = self.fields.get(name)
+ if fields is None and default is not NoDefault:
+ return default
+ if index is None:
+ return self[name]
+ else:
+ fields = self.fields.get(name)
+ assert fields is not None, (
+ "No fields found matching %r" % name)
+ field = fields[index]
+ return field
+
+ def select(self, name, value, index=None):
+ """
+ Like ``.set()``, except also confirms the target is a
+ ``<select>``.
+ """
+ field = self.get(name, index=index)
+ assert isinstance(field, Select)
+ field.value = value
+
+ def submit(self, name=None, index=None, **args):
+ """
+ Submits the form. If ``name`` is given, then also select that
+ button (using ``index`` to disambiguate)``.
+
+ Any extra keyword arguments are passed to the ``.get()`` or
+ ``.post()`` method.
+ """
+ fields = self.submit_fields(name, index=index)
+ return self.response.goto(self.action, method=self.method,
+ params=fields, **args)
+
+ def submit_fields(self, name=None, index=None):
+ """
+ Return a list of ``[(name, value), ...]`` for the current
+ state of the form.
+ """
+ submit = []
+ if name is not None:
+ field = self.get(name, index=index)
+ submit.append((field.name, field.value_if_submitted()))
+ for name, fields in self.fields.items():
+ for field in fields:
+ value = field.value
+ if value is None:
+ continue
+ submit.append((name, value))
+ return submit
+
+
+_attr_re = re.compile(r'([^= \n\r\t]+)[ \n\r\t]*(?:=[ \n\r\t]*(?:"([^"]*)"|([^"][^ \n\r\t>]*)))?', re.S)
+
+def _parse_attrs(text):
+ attrs = {}
+ for match in _attr_re.finditer(text):
+ attr_name = match.group(1).lower()
+ attr_body = match.group(2) or match.group(3)
+ attr_body = html_unquote(attr_body or '')
+ attrs[attr_name] = attr_body
+ return attrs
+
+class Field(object):
+
+ """
+ Field object.
+ """
+
+ # Dictionary of field types (select, radio, etc) to classes
+ classes = {}
+
+ settable = True
+
+ def __init__(self, form, tag, name, pos,
+ value=None, id=None, **attrs):
+ self.form = form
+ self.tag = tag
+ self.name = name
+ self.pos = pos
+ self._value = value
+ self.id = id
+ self.attrs = attrs
+
+ def value__set(self, value):
+ if not self.settable:
+ raise AttributeError(
+ "You cannot set the value of the <%s> field %r"
+ % (self.tag, self.name))
+ self._value = value
+
+ def force_value(self, value):
+ """
+ Like setting a value, except forces it even for, say, hidden
+ fields.
+ """
+ self._value = value
+
+ def value__get(self):
+ return self._value
+
+ value = property(value__get, value__set)
+
+class Select(Field):
+
+ """
+ Field representing ``<select>``
+ """
+
+ def __init__(self, *args, **attrs):
+ super(Select, self).__init__(*args, **attrs)
+ self.options = []
+ self.multiple = attrs.get('multiple')
+ assert not self.multiple, (
+ "<select multiple> not yet supported")
+ # Undetermined yet:
+ self.selectedIndex = None
+
+ def value__set(self, value):
+ for i, (option, checked) in enumerate(self.options):
+ if option == str(value):
+ self.selectedIndex = i
+ break
+ else:
+ raise ValueError(
+ "Option %r not found (from %s)"
+ % (value, ', '.join(
+ [repr(o) for o, c in self.options])))
+
+ def value__get(self):
+ if self.selectedIndex is not None:
+ return self.options[self.selectedIndex][0]
+ else:
+ for option, checked in self.options:
+ if checked:
+ return option
+ else:
+ if self.options:
+ return self.options[0][0]
+ else:
+ return None
+
+ value = property(value__get, value__set)
+
+Field.classes['select'] = Select
+
+class Radio(Select):
+
+ """
+ Field representing ``<input type="radio">``
+ """
+
+Field.classes['radio'] = Radio
+
+class Checkbox(Field):
+
+ """
+ Field representing ``<input type="checkbox">``
+ """
+
+ def __init__(self, *args, **attrs):
+ super(Checkbox, self).__init__(*args, **attrs)
+ self.checked = 'checked' in attrs
+
+ def value__set(self, value):
+ self.checked = not not value
+
+ def value__get(self):
+ if self.checked:
+ if self._value is None:
+ # @@: 'on'?
+ return 'checked'
+ else:
+ return self._value
+ else:
+ return None
+
+ value = property(value__get, value__set)
+
+Field.classes['checkbox'] = Checkbox
+
+class Text(Field):
+ """
+ Field representing ``<input type="text">``
+ """
+
+Field.classes['text'] = Text
+
+class Textarea(Text):
+ """
+ Field representing ``<textarea>``
+ """
+
+Field.classes['textarea'] = Textarea
+
+class Hidden(Text):
+ """
+ Field representing ``<input type="hidden">``
+ """
+
+Field.classes['hidden'] = Hidden
+
+class Submit(Field):
+ """
+ Field representing ``<input type="submit">`` and ``<button>``
+ """
+
+ settable = False
+
+ def value__get(self):
+ return None
+
+ value = property(value__get)
+
+ def value_if_submitted(self):
+ return self._value
+
+Field.classes['submit'] = Submit
+
+Field.classes['button'] = Submit
+
+Field.classes['image'] = Submit
+
+########################################
+## Utility functions
+########################################
+
+def _popget(d, key, default=None):
+ """
+ Pop the key if found (else return default)
+ """
+ if key in d:
+ return d.pop(key)
+ return default
+
+def _space_prefix(pref, full, sep=None, indent=None, include_sep=True):
+ """
+ Anything shared by pref and full will be replaced with spaces
+ in full, and full returned.
+ """
+ if sep is None:
+ sep = os.path.sep
+ pref = pref.split(sep)
+ full = full.split(sep)
+ padding = []
+ while pref and full and pref[0] == full[0]:
+ if indent is None:
+ padding.append(' ' * (len(full[0]) + len(sep)))
+ else:
+ padding.append(' ' * indent)
+ full.pop(0)
+ pref.pop(0)
+ if padding:
+ if include_sep:
+ return ''.join(padding) + sep + sep.join(full)
+ else:
+ return ''.join(padding) + sep.join(full)
+ else:
+ return sep.join(full)
+
+def _make_pattern(pat):
+ if pat is None:
+ return None
+ if isinstance(pat, (str, unicode)):
+ pat = re.compile(pat)
+ if hasattr(pat, 'search'):
+ return pat.search
+ if callable(pat):
+ return pat
+ assert 0, (
+ "Cannot make callable pattern object out of %r" % pat)
+
+def html_unquote(v):
+ """