Permalink
Browse files

python 3 changes exceptions, queue and asesrtEqual

1) import Queue is now lower caps

2) exceptions
except KeyError as e:

3) assertEquals is deprecated but assertEqual does exactly same thing
  • Loading branch information...
1 parent d767d19 commit 149cc8c7c5b099464a9182afc43b367e4f6073d2 @ByReaL ByReaL committed Jul 15, 2016
View
@@ -29,3 +29,11 @@ OctoPrint.egg-info
*.orig
*.codekit
+
+venv
+venv26
+venv27
+venv33
+venv34
+venv35
+venv36
@@ -8,7 +8,10 @@
import datetime
import logging
import subprocess
-import Queue
+try:
+ import queue
+except ImportError:
+ import Queue as queue
import threading
import collections
@@ -120,7 +123,7 @@ def __init__(self):
self._registeredListeners = collections.defaultdict(list)
self._logger = logging.getLogger(__name__)
- self._queue = Queue.Queue()
+ self._queue = queue.Queue()
self._worker = threading.Thread(target=self._work)
self._worker.daemon = True
self._worker.start()
@@ -300,7 +303,7 @@ def eventCallback(self, event, payload):
else:
processedCommand = self._processCommand(command, payload)
self.executeCommand(processedCommand, commandType, debug=debug)
- except KeyError, e:
+ except KeyError as e:
self._logger.warn("There was an error processing one or more placeholders in the following command: %s" % command)
def executeCommand(self, command, commandType, debug=False):
@@ -324,7 +327,7 @@ def commandExecutioner(command):
commandExecutioner(c)
else:
commandExecutioner(command)
- except subprocess.CalledProcessError, e:
+ except subprocess.CalledProcessError as e:
self._logger.warn("Command failed with return code %i: %s" % (e.returncode, str(e)))
except:
self._logger.exception("Command failed")
@@ -7,7 +7,10 @@
import logging
-import Queue as queue
+try:
+ import queue
+except ImportError:
+ import Queue as queue
import os
import threading
import collections
@@ -9,7 +9,10 @@
import re
import threading
import math
-import Queue
+try:
+ import queue
+except ImportError:
+ import Queue as queue
from serial import SerialTimeoutException
@@ -42,8 +45,8 @@ def __init__(self, seriallog_handler=None, read_timeout=5.0, write_timeout=10.0)
self._write_timeout = write_timeout
self.incoming = CharCountingQueue(settings().getInt(["devel", "virtualPrinter", "rxBuffer"]), name="RxBuffer")
- self.outgoing = Queue.Queue()
- self.buffered = Queue.Queue(maxsize=settings().getInt(["devel", "virtualPrinter", "commandBuffer"]))
+ self.outgoing = queue.Queue()
+ self.buffered = queue.Queue(maxsize=settings().getInt(["devel", "virtualPrinter", "commandBuffer"]))
for item in ['start\n', 'Marlin: Virtual Marlin!\n', '\x80\n', 'SD card ok\n']:
self._send(item)
@@ -121,7 +124,7 @@ def _clearQueue(self, queue):
try:
while queue.get(block=False):
continue
- except Queue.Empty:
+ except queue.Empty:
pass
def _processIncoming(self):
@@ -131,7 +134,7 @@ def _processIncoming(self):
try:
data = self.incoming.get(timeout=0.01)
- except Queue.Empty:
+ except queue.Empty:
if self._sendWait and time.time() > next_wait_timeout:
self._send("wait")
next_wait_timeout = time.time() + self._waitInterval
@@ -839,7 +842,7 @@ def _processBuffer(self):
while self.buffered is not None:
try:
line = self.buffered.get(timeout=0.5)
- except Queue.Empty:
+ except queue.Empty:
continue
if line is None:
@@ -865,7 +868,7 @@ def write(self, data):
try:
self.incoming.put(data, timeout=self._write_timeout)
self._seriallog.info("<<< {}".format(data.strip()))
- except Queue.Full:
+ except queue.Full:
self._logger.info("Incoming queue is full, raising SerialTimeoutException")
raise SerialTimeoutException()
@@ -877,7 +880,7 @@ def readline(self):
line = self.outgoing.get(timeout=self._read_timeout)
self._seriallog.info(">>> {}".format(line.strip()))
return line
- except Queue.Empty:
+ except queue.Empty:
return ""
def close(self):
@@ -904,10 +907,10 @@ def _send(self, line):
if self.outgoing is not None:
self.outgoing.put(line)
-class CharCountingQueue(Queue.Queue):
+class CharCountingQueue(queue.Queue):
def __init__(self, maxsize, name=None):
- Queue.Queue.__init__(self, maxsize=maxsize)
+ queue.Queue.__init__(self, maxsize=maxsize)
self._size = 0
self._name = name
@@ -918,7 +921,7 @@ def put(self, item, block=True, timeout=None):
if not block:
if self._qsize() + item_size >= self.maxsize:
- raise Queue.Full
+ raise queue.Full
elif timeout is None:
while self._qsize() + item_size >= self.maxsize:
self.not_full.wait()
@@ -929,7 +932,7 @@ def put(self, item, block=True, timeout=None):
while self._qsize() + item_size >= self.maxsize:
remaining = endtime - time.time()
if remaining <= 0.0:
- raise Queue.Full
+ raise queue.Full
self.not_full.wait(remaining)
self._put(item)
@@ -804,7 +804,7 @@ def _sendInitialStateUpdate(self, callback):
"messages": list(self._messages)
})
callback.on_printer_send_initial_data(data)
- except Exception, err:
+ except Exception as err:
import sys
sys.stderr.write("ERROR: %s\n" % str(err))
pass
@@ -93,7 +93,7 @@ def executeSystemCommand(source, command):
error = "Command failed with return code {}:\nSTDOUT: {}\nSTDERR: {}".format(returncode, stdout_text, stderr_text)
logger.warn(error)
return make_response(error, 500)
- except Exception, e:
+ except Exception as e:
if not ignore:
error = "Command failed: {}".format(str(e))
logger.warn(error)
@@ -11,7 +11,10 @@
import datetime
import sys
import shutil
-import Queue
+try:
+ import queue
+except ImportError:
+ import Queue as queue
import requests
import octoprint.util as util
@@ -318,7 +321,7 @@ def __init__(self, post_roll=0, fps=25):
self._fps = fps
self._capture_mutex = threading.Lock()
- self._capture_queue = Queue.Queue()
+ self._capture_queue = queue.Queue()
self._capture_queue_active = True
self._capture_queue_thread = threading.Thread(target=self._capture_queue_worker)
@@ -18,7 +18,10 @@
from functools import wraps
import warnings
import contextlib
-import Queue as queue
+try:
+ import queue
+except ImportError:
+ import Queue as queue
logger = logging.getLogger(__name__)
@@ -10,7 +10,10 @@
import time
import re
import threading
-import Queue as queue
+try:
+ import queue
+except ImportError:
+ import Queue as queue
import logging
import serial
import octoprint.plugin
@@ -1464,7 +1467,7 @@ def _detectPort(self, close):
self._log("Connecting to: %s" % (p))
programmer.connect(p)
serial_obj = programmer.leaveISP()
- except ispBase.IspError as (e):
+ except ispBase.IspError as e:
error_message = "Error while connecting to %s: %s" % (p, str(e))
self._log(error_message)
self._logger.exception(error_message)
@@ -67,13 +67,13 @@ def test_full_extension_tree(self):
self.assertItemsEqual(["amf"], full["model"]["amf"])
def test_get_mimetype(self):
- self.assertEquals(octoprint.filemanager.get_mime_type("foo.stl"), "application/sla")
- self.assertEquals(octoprint.filemanager.get_mime_type("foo.gcode"), "text/plain")
- self.assertEquals(octoprint.filemanager.get_mime_type("foo.unknown"), "application/octet-stream")
- self.assertEquals(octoprint.filemanager.get_mime_type("foo.mime_map_yes"), "application/mime_map_yes")
- self.assertEquals(octoprint.filemanager.get_mime_type("foo.mime_map_no"), "application/octet-stream")
- self.assertEquals(octoprint.filemanager.get_mime_type("foo.mime_detect_yes"), "application/mime_detect_yes")
- self.assertEquals(octoprint.filemanager.get_mime_type("foo.mime_detect_no"), "application/octet-stream")
+ self.assertEqual(octoprint.filemanager.get_mime_type("foo.stl"), "application/sla")
+ self.assertEqual(octoprint.filemanager.get_mime_type("foo.gcode"), "text/plain")
+ self.assertEqual(octoprint.filemanager.get_mime_type("foo.unknown"), "application/octet-stream")
+ self.assertEqual(octoprint.filemanager.get_mime_type("foo.mime_map_yes"), "application/mime_map_yes")
+ self.assertEqual(octoprint.filemanager.get_mime_type("foo.mime_map_no"), "application/octet-stream")
+ self.assertEqual(octoprint.filemanager.get_mime_type("foo.mime_detect_yes"), "application/mime_detect_yes")
+ self.assertEqual(octoprint.filemanager.get_mime_type("foo.mime_detect_no"), "application/octet-stream")
def test_valid_file_type(self):
self.assertTrue(octoprint.filemanager.valid_file_type("foo.stl", type="model"))
@@ -87,11 +87,11 @@ def test_valid_file_type(self):
self.assertFalse(octoprint.filemanager.valid_file_type("foo.unknown"))
def test_get_file_type(self):
- self.assertEquals(["machinecode", "gcode"], octoprint.filemanager.get_file_type("foo.gcode"))
- self.assertEquals(["machinecode", "gcode"], octoprint.filemanager.get_file_type("foo.gco"))
- self.assertEquals(["machinecode", "foo"], octoprint.filemanager.get_file_type("foo.f"))
- self.assertEquals(["model", "stl"], octoprint.filemanager.get_file_type("foo.stl"))
- self.assertEquals(["model", "amf"], octoprint.filemanager.get_file_type("foo.amf"))
+ self.assertEqual(["machinecode", "gcode"], octoprint.filemanager.get_file_type("foo.gcode"))
+ self.assertEqual(["machinecode", "gcode"], octoprint.filemanager.get_file_type("foo.gco"))
+ self.assertEqual(["machinecode", "foo"], octoprint.filemanager.get_file_type("foo.f"))
+ self.assertEqual(["model", "stl"], octoprint.filemanager.get_file_type("foo.stl"))
+ self.assertEqual(["model", "amf"], octoprint.filemanager.get_file_type("foo.amf"))
self.assertIsNone(octoprint.filemanager.get_file_type("foo.unknown"))
def test_hook_failure(self):
@@ -105,7 +105,7 @@ def hook():
octoprint.filemanager.get_all_extensions()
- self.assertEquals(1, len(logger.mock_calls))
+ self.assertEqual(1, len(logger.mock_calls))
class FileManagerTest(unittest.TestCase):
@@ -165,7 +165,7 @@ def test_add_file(self):
file_path = self.file_manager.add_file(octoprint.filemanager.FileDestinations.LOCAL, "test.file", wrapper)
- self.assertEquals(("", "test.file"), file_path)
+ self.assertEqual(("", "test.file"), file_path)
self.local_storage.add_file.assert_called_once_with("test.file", wrapper, printer_profile=test_profile, allow_overwrite=False, links=None)
self.fire_event.assert_called_once_with(octoprint.filemanager.Events.UPDATED_FILES, dict(type="printables"))
@@ -180,7 +180,7 @@ def test_add_folder(self):
folder_path = self.file_manager.add_folder(octoprint.filemanager.FileDestinations.LOCAL, "test_folder")
- self.assertEquals(("", "test_folder"), folder_path)
+ self.assertEqual(("", "test_folder"), folder_path)
self.local_storage.add_folder.assert_called_once_with("test_folder", ignore_existing=True)
self.fire_event.assert_called_once_with(octoprint.filemanager.Events.UPDATED_FILES, dict(type="printables"))
@@ -191,7 +191,7 @@ def test_add_folder_not_ignoring_existing(self):
self.file_manager.add_folder(octoprint.filemanager.FileDestinations.LOCAL, "test_folder", ignore_existing=False)
self.fail("Expected an exception to occur!")
except RuntimeError as e:
- self.assertEquals("already there", e.message)
+ self.assertEqual("already there", e.message)
self.local_storage.add_folder.assert_called_once_with("test_folder", ignore_existing=False)
def test_remove_folder(self):
@@ -333,7 +333,7 @@ def test_get_metadata(self):
metadata = self.file_manager.get_metadata(octoprint.filemanager.FileDestinations.LOCAL, "test.file")
- self.assertEquals(metadata, expected)
+ self.assertEqual(metadata, expected)
self.local_storage.get_metadata.assert_called_once_with("test.file")
@mock.patch("octoprint.filemanager.util.atomic_write")
@@ -384,9 +384,9 @@ def add_file(path, file_obj, printer_profile=None, links=None, allow_overwrite=F
# mock slice method on slicing manager
def slice(slicer_name, source_path, dest_path, profile, done_cb, printer_profile_id=None, position=None, callback_args=None, overrides=None, on_progress=None, on_progress_args=None, on_progress_kwargs=None):
- self.assertEquals("some_slicer", slicer_name)
- self.assertEquals("prefix/source.file", source_path)
- self.assertEquals("tmp.file", dest_path)
+ self.assertEqual("some_slicer", slicer_name)
+ self.assertEqual("prefix/source.file", source_path)
+ self.assertEqual("tmp.file", dest_path)
self.assertIsNone(profile)
self.assertIsNone(overrides)
self.assertIsNone(printer_profile_id)
@@ -415,15 +415,15 @@ def slice(slicer_name, source_path, dest_path, profile, done_cb, printer_profile
# assert that the generated gcode was manipulated as required
expected_atomic_write_calls = [mock.call("prefix/dest.file", "wb")]
- self.assertEquals(mocked_atomic_write.call_args_list, expected_atomic_write_calls)
+ self.assertEqual(mocked_atomic_write.call_args_list, expected_atomic_write_calls)
#mocked_open.return_value.write.assert_called_once_with(";Generated from source.file aabbccddeeff\r")
# assert that shutil was asked to copy the concatenated multistream
- self.assertEquals(1, len(mocked_shutil.call_args_list))
+ self.assertEqual(1, len(mocked_shutil.call_args_list))
shutil_call_args = mocked_shutil.call_args_list[0]
self.assertTrue(isinstance(shutil_call_args[0][0], octoprint.filemanager.util.MultiStream))
multi_stream = shutil_call_args[0][0]
- self.assertEquals(2, len(multi_stream.streams))
+ self.assertEqual(2, len(multi_stream.streams))
self.assertTrue(isinstance(multi_stream.streams[0], io.BytesIO))
# assert that the temporary file was deleted
@@ -457,9 +457,9 @@ def path_on_disk(path):
# mock slice method on slicing manager
def slice(slicer_name, source_path, dest_path, profile, done_cb, printer_profile_id=None, position=None, callback_args=None, overrides=None, on_progress=None, on_progress_args=None, on_progress_kwargs=None):
- self.assertEquals("some_slicer", slicer_name)
- self.assertEquals("prefix/source.file", source_path)
- self.assertEquals("tmp.file", dest_path)
+ self.assertEqual("some_slicer", slicer_name)
+ self.assertEqual("prefix/source.file", source_path)
+ self.assertEqual("tmp.file", dest_path)
self.assertIsNone(profile)
self.assertIsNone(overrides)
self.assertIsNone(printer_profile_id)
Oops, something went wrong.

0 comments on commit 149cc8c

Please sign in to comment.