Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace py dependency with termcolor. #575

Merged
merged 3 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions src/pyshark/capture/capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ def __init__(self, display_filter=None, only_summaries=False, eventloop=None,
if encryption_type and encryption_type.lower() in self.SUPPORTED_ENCRYPTION_STANDARDS:
self.encryption = (decryption_key, encryption_type.lower())
else:
raise UnknownEncyptionStandardException("Only the following standards are supported: %s."
% ", ".join(self.SUPPORTED_ENCRYPTION_STANDARDS))
standards = ", ".join(self.SUPPORTED_ENCRYPTION_STANDARDS)
raise UnknownEncyptionStandardException(f"Only the following standards are supported: {standards}.")

def __getitem__(self, item):
"""Gets the packet in the given index.
Expand Down Expand Up @@ -342,7 +342,7 @@ async def _get_tshark_process(self, packet_count=None, stdin=None):

self._log.debug(
"Creating TShark subprocess with parameters: " + " ".join(parameters))
self._log.debug("Executable: %s" % parameters[0])
self._log.debug("Executable: %s", parameters[0])
tshark_process = await asyncio.create_subprocess_exec(*parameters,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
Expand All @@ -356,8 +356,7 @@ def _created_new_process(self, parameters, process, process_name="TShark"):
process_name + f" subprocess (pid {process.pid}) created")
if process.returncode is not None and process.returncode != 0:
raise TSharkCrashException(
"%s seems to have crashed. Try updating it. (command ran: '%s')" % (
process_name, " ".join(parameters)))
f"{process_name} seems to have crashed. Try updating it. (command ran: '{' '.join(parameters)}')")
self._running_processes.add(process)

async def _cleanup_subprocess(self, process):
Expand Down Expand Up @@ -442,8 +441,7 @@ def get_parameters(self, packet_count=None):
for preference_name, preference_value in self._override_prefs.items():
if all(self.encryption) and preference_name in ("wlan.enable_decryption", "uat:80211_keys"):
continue # skip if override preferences also given via --encryption options
params += ["-o",
"{0}:{1}".format(preference_name, preference_value)]
params += ["-o", f"{preference_name}:{preference_value}"]

if self._output_file:
params += ["-w", self._output_file]
Expand All @@ -465,4 +463,4 @@ def __iter__(self):
return self._packets_from_tshark_sync()

def __repr__(self):
return "<%s (%d packets)>" % (self.__class__.__name__, len(self._packets))
return f"<{self.__class__.__name__} ({len(self._packets)} packets)>"
7 changes: 3 additions & 4 deletions src/pyshark/capture/file_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def __getitem__(self, packet_index):
self.next()
except StopIteration:
# We read the whole file, and there's still not such packet.
raise KeyError("Packet of index %d does not exist in capture" % packet_index)
raise KeyError(f"Packet of index {packet_index} does not exist in capture")
return super(FileCapture, self).__getitem__(packet_index)

def get_parameters(self, packet_count=None):
Expand All @@ -88,7 +88,6 @@ def _verify_capture_parameters(self):

def __repr__(self):
if self.keep_packets:
return "<%s %s>" % (self.__class__.__name__, self.input_filepath.as_posix())
return f"<{self.__class__.__name__} {self.input_filepath.as_posix()}>"
else:
return "<%s %s (%d packets)>" % (self.__class__.__name__, self.input_filepath.as_posix(),
len(self._packets))
return f"<{self.__class__.__name__} {self.input_filepath.as_posix()} ({len(self._packets)} packets)>"
4 changes: 2 additions & 2 deletions src/pyshark/capture/inmem_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ def _get_json_separators(self):
commas, parenthesis).
"""
if self._get_tshark_version() >= version.parse("2.6.7"):
return ("%s }" % os.linesep).encode(), ("}%s]" % os.linesep).encode(), 0
return f"{os.linesep} }}".encode(), f"}}{os.linesep}]".encode(), 0
else:
return ("}%s%s" % (os.linesep, os.linesep)).encode(), ("}%s%s]" % (os.linesep, os.linesep)).encode(), 1
return f'}}{os.linesep}{os.linesep}'.encode(), f"}}{os.linesep}{os.linesep}]", 1

def _write_packet(self, packet, sniff_time):
if sniff_time is None:
Expand Down
2 changes: 1 addition & 1 deletion src/pyshark/capture/live_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ async def _get_tshark_process(self, packet_count=None, stdin=None):

dumpcap_params = [get_process_path(process_name="dumpcap", tshark_path=self.tshark_path)] + self._get_dumpcap_parameters()

self._log.debug("Creating Dumpcap subprocess with parameters: %s" % " ".join(dumpcap_params))
self._log.debug("Creating Dumpcap subprocess with parameters: %s", " ".join(dumpcap_params))
dumpcap_process = await asyncio.create_subprocess_exec(*dumpcap_params, stdout=write,
stderr=subprocess.PIPE)
self._create_stderr_handling_task(dumpcap_process.stderr)
Expand Down
2 changes: 1 addition & 1 deletion src/pyshark/capture/remote_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def __init__(
:param override_prefs: A dictionary of tshark preferences to override, {PREFERENCE_NAME: PREFERENCE_VALUE, ...}.
:param disable_protocol: Tells tshark to remove a dissector for a specifc protocol.
"""
interface = "rpcap://%s:%d/%s" % (remote_host, remote_port, remote_interface)
interface = f'rpcap://{remote_host}:{remote_port}/{remote_interface}'
super(RemoteCapture, self).__init__(
interface,
*args,
Expand Down
23 changes: 15 additions & 8 deletions src/pyshark/packet/common.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
import sys
import functools

import termcolor


class Pickleable(object):
"""
Base class that implements getstate/setstate, since most of the classes are overriding getattr.
Expand All @@ -24,11 +30,12 @@ def __setstate__(self, data):
setattr(self, key, val)


class StrWriter:
"""A class which mocks the py.io.TerminalWriter to write to an internal buffer"""

def __init__(self):
self.buffer = ""

def write(self, text, *_, **__):
self.buffer += text
@functools.wraps(termcolor.colored)
def colored(text, *args, **kwargs):
try:
enable_color = sys.stdout.isatty()
except (AttributeError, NotImplementedError, FileNotFoundError):
enable_color = False
if enable_color:
return termcolor.colored(text, *args, **kwargs)
return text
2 changes: 1 addition & 1 deletion src/pyshark/packet/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def __init__(self, name=None, showname=None, value=None, show=None, hide=None, p
self.hide = False

def __repr__(self):
return '<LayerField %s: %s>' % (self.name, self.get_default_value())
return f'<LayerField {self.name}: {self.get_default_value()}>'

def get_default_value(self) -> str:
"""Gets the best 'value' string this field has."""
Expand Down
17 changes: 9 additions & 8 deletions src/pyshark/packet/layers/base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import typing

import py
import io
import sys

from pyshark.packet import common

Expand Down Expand Up @@ -50,21 +50,22 @@ def __getattr__(self, item):

def pretty_print(self, writer=None):
if not writer:
writer = py.io.TerminalWriter()
writer = sys.stdout
if self.layer_name == DATA_LAYER_NAME:
writer.write('DATA')
return

writer.write('Layer %s:' % self.layer_name.upper() + os.linesep, yellow=True, bold=True)
text = f'Layer {self.layer_name.upper()}{os.linesep}:'
writer.write(common.colored(text, color="yellow", attrs=["bold"]))
self._pretty_print_layer_fields(writer)

def _pretty_print_layer_fields(self, terminal_writer: py.io.TerminalWriter):
def _pretty_print_layer_fields(self, terminal_writer: io.IOBase):
raise NotImplementedError()

def __repr__(self):
return '<%s Layer>' % self.layer_name.upper()
return f'<{self.layer_name.upper()} Layer>'

def __str__(self):
writer = common.StrWriter()
writer = io.StringIO()
self.pretty_print(writer=writer)
return writer.buffer
return writer.getvalue()
21 changes: 11 additions & 10 deletions src/pyshark/packet/layers/ek_layer.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import abc
import os
import io

import py
import typing

from pyshark.packet.common import colored
from pyshark import ek_field_mapping
from pyshark.packet.layers.base import BaseLayer

Expand Down Expand Up @@ -106,24 +107,24 @@ def _field_has_subfields(self, field_ek_name):
return True
return False

def _pretty_print_layer_fields(self, terminal_writer: py.io.TerminalWriter):
def _pretty_print_layer_fields(self, file: io.IOBase):
for field_name in self.field_names:
field = self.get_field(field_name)
self._pretty_print_field(field_name, field, terminal_writer, indent=1)
self._pretty_print_field(field_name, field, file, indent=1)

def _pretty_print_field(self, field_name, field, terminal_writer, indent=0):
def _pretty_print_field(self, field_name, field, file, indent=0):
prefix = "\t" * indent
if isinstance(field, EkMultiField):
terminal_writer.write(f"{prefix}{field_name}: ", green=True, bold=True)
file.write(colored(f"{prefix}{field_name}: ", "green", attrs=["bold"]))
if field.value is not None:
terminal_writer.write(str(field.value))
terminal_writer.write(os.linesep)
file.write(str(field.value))
file.write(os.linesep)
for subfield in field.subfields:
self._pretty_print_field(subfield, field.get_field(subfield), terminal_writer,
self._pretty_print_field(subfield, field.get_field(subfield), file,
indent=indent + 1)
else:
terminal_writer.write(f"{prefix}{field_name}: ", green=True, bold=True)
terminal_writer.write(f"{field}{os.linesep}")
file.write(colored(f"{prefix}{field_name}: ", "green", attrs=["bold"]))
file.write(f"{field}{os.linesep}")

def _get_possible_layer_prefixes(self):
"""Gets the possible prefixes for a field under this layer.
Expand Down
22 changes: 11 additions & 11 deletions src/pyshark/packet/layers/json_layer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import io

import py

from pyshark.packet.common import colored
from pyshark.packet.fields import LayerField
from pyshark.packet.fields import LayerFieldsContainer
from pyshark.packet.layers.base import BaseLayer
Expand Down Expand Up @@ -53,7 +53,7 @@ def get_field(self, name):
# Might be a "fake" field in JSON
is_fake = self._is_fake_field(name)
if not is_fake:
raise AttributeError("No such field %s" % name)
raise AttributeError(f"No such field {name}")
field = self._make_wrapped_field(name, field, is_fake=is_fake)
self._wrapped_fields[name] = field
return field
Expand All @@ -79,12 +79,12 @@ def has_field(self, dotted_name) -> bool:
return False
return True

def _pretty_print_layer_fields(self, terminal_writer: py.io.TerminalWriter):
def _pretty_print_layer_fields(self, file: io.IOBase):
for field_line in self._get_all_field_lines():
if ':' in field_line:
field_name, field_line = field_line.split(':', 1)
terminal_writer.write(field_name + ':', green=True, bold=True)
terminal_writer.write(field_line, bold=True)
file.write(colored(field_name + ':', "green", ["bold"]))
file.write(colored(field_line, attrs=["bold"]))

def _get_all_field_lines(self):
"""Returns all lines that represent the fields of the layer (both their names and values)."""
Expand Down Expand Up @@ -147,12 +147,12 @@ def _convert_showname_field_names_to_field_names(self):

def _get_internal_field_by_name(self, name):
"""Gets the field by name, or None if not found."""
field = self._all_fields.get(name, self._all_fields.get('%s.%s' % (self._full_name, name)))
field = self._all_fields.get(name, self._all_fields.get(f"{self._full_name}.{name}"))
if field is not None:
return field
for field_name in self._all_fields:
# Specific name
if field_name.endswith('.%s' % name):
if field_name.endswith(f'.{name}'):
return self._all_fields[field_name]

def _is_fake_field(self, name):
Expand All @@ -165,7 +165,7 @@ def _is_fake_field(self, name):
# }
# }
# So in this case we must create a fake layer for "bar".
field_full_name = '%s.%s.' % (self._full_name, name)
field_full_name = f"{self._full_name}.{name}."
for name, field in self._all_fields.items():
if name.startswith(field_full_name):
return True
Expand All @@ -180,7 +180,7 @@ def _make_wrapped_field(self, name, field, is_fake=False, full_name=None):
it.
"""
if not full_name:
full_name = '%s.%s' % (self._full_name, name)
full_name = f"{self._full_name}.{name}"

if is_fake:
# Populate with all fields that are supposed to be inside of it
Expand All @@ -189,7 +189,7 @@ def _make_wrapped_field(self, name, field, is_fake=False, full_name=None):
if isinstance(field, dict):
if name.endswith('_tree'):
name = name.replace('_tree', '')
full_name = '%s.%s' % (self._full_name, name)
full_name = f'{self._full_name}.{name}'
return JsonLayer(name, field, full_name=full_name, is_intermediate=is_fake)
elif isinstance(field, list):
# For whatever reason in list-type object it goes back to using the original parent name
Expand Down
12 changes: 6 additions & 6 deletions src/pyshark/packet/layers/xml_layer.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import os
import typing
import io

import py

from pyshark.packet.common import colored
from pyshark.packet.fields import LayerField, LayerFieldsContainer
from pyshark.packet.layers import base

Expand Down Expand Up @@ -96,12 +96,12 @@ def _sanitize_field_name(self, field_name):
field_name = field_name.replace(self._field_prefix, '')
return field_name.replace('.', '_').replace('-', '_').lower()

def _pretty_print_layer_fields(self, terminal_writer: py.io.TerminalWriter):
def _pretty_print_layer_fields(self, file: io.IOBase):
for field_line in self._get_all_field_lines():
if ':' in field_line:
field_name, field_line = field_line.split(':', 1)
terminal_writer.write(field_name + ':', green=True, bold=True)
terminal_writer.write(field_line, bold=True)
file.write(colored(field_name + ':', "green", attrs=["bold"]))
file.write(colored(field_line, attrs=["bold"]))

def _get_all_fields_with_alternates(self):
all_fields = list(self._all_fields.values())
Expand All @@ -127,7 +127,7 @@ def _get_field_repr(self, field):
elif field.show:
return field.show
elif field.raw_value:
return "%s: %s" % (self._sanitize_field_name(field.name), field.raw_value)
return f"{self._sanitize_field_name(field.name)}: {field.raw_value}"

def get_field_by_showname(self, showname) -> typing.Union[LayerFieldsContainer, None]:
"""Gets a field by its "showname"
Expand Down
6 changes: 3 additions & 3 deletions src/pyshark/packet/packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def __repr__(self):
if self.transport_layer != self.highest_layer and self.transport_layer is not None:
transport_protocol = self.transport_layer + '/'

return '<%s%s Packet>' % (transport_protocol, self.highest_layer)
return f'<{transport_protocol}{self.highest_layer} Packet>'

def __str__(self):
s = self._packet_string
Expand All @@ -107,7 +107,7 @@ def __str__(self):
@property
def _packet_string(self):
"""A simple pretty string that represents the packet."""
return 'Packet (Length: %s)%s' % (self.length, os.linesep)
return f'Packet (Length: {self.length}){os.linesep}'

def pretty_print(self):
for layer in self.layers:
Expand All @@ -122,7 +122,7 @@ def __getattr__(self, item):
for layer in self.layers:
if layer.layer_name == item:
return layer
raise AttributeError("No attribute named %s" % item)
raise AttributeError(f"No attribute named {item}")

@property
def highest_layer(self) -> BaseLayer:
Expand Down
2 changes: 1 addition & 1 deletion src/pyshark/packet/packet_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def __init__(self, structure, values):
def __repr__(self):
protocol, src, dst = self._fields.get('Protocol', '?'), self._fields.get('Source', '?'),\
self._fields.get('Destination', '?')
return '<%s %s: %s to %s>' % (self.__class__.__name__, protocol, src, dst)
return f'<{self.__class__.__name__} {protocol}: {src} to {dst}>'

def __str__(self):
return self.summary_line
Expand Down
5 changes: 2 additions & 3 deletions src/pyshark/tshark/output_parser/tshark_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,9 @@ def _get_json_separators(self):
commas, parenthesis).
"""
if not self._tshark_version or self._tshark_version >= version.parse("3.0.0"):
return ("%s },%s" % (os.linesep, os.linesep)).encode(), ("}%s]" % os.linesep).encode(), (
1 + len(os.linesep))
return f"{os.linesep} }},{os.linesep}".encode(), f"}}{os.linesep}]".encode(), 1 + len(os.linesep)
else:
return ("}%s%s ," % (os.linesep, os.linesep)).encode(), ("}%s%s]" % (os.linesep, os.linesep)).encode(), 1
return f"}}{os.linesep}{os.linesep} ,".encode(), f"}}{os.linesep}{os.linesep}]".encode(), 1


def duplicate_object_hook(ordered_pairs):
Expand Down
Loading