Skip to content

Commit

Permalink
adds very weak PowerShell support
Browse files Browse the repository at this point in the history
  • Loading branch information
huettenhain committed Jun 20, 2021
1 parent addc7d8 commit fcc36d8
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 17 deletions.
63 changes: 46 additions & 17 deletions refinery/lib/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,14 @@
above example would not be separated by a dash.
"""
import copy
import json
import base64
import os

from typing import Iterable, BinaryIO, Callable, Optional, Tuple, Dict, ByteString, Any
from refinery.lib.structures import MemoryFile
from refinery.lib.powershell import is_powershell_process
from refinery.lib.tools import isbuffer

try:
import msgpack
Expand All @@ -132,15 +136,35 @@
]


class BytesEncoder(json.JSONEncoder):
def default(self, obj):
if isbuffer(obj):
return {'_bin': base64.b85encode(obj).decode('ascii')}
return super().default(obj)


class BytesDecoder(json.JSONDecoder):
def __init__(self, *args, **kwargs):
json.JSONDecoder.__init__(self, object_hook=self.object_hook, *args, **kwargs)

def object_hook(self, obj):
if isinstance(obj, dict) and len(obj) == 1 and '_bin' in obj:
return base64.b85decode(obj['_bin'])
return obj


MAGIC = bytes.fromhex(os.environ.get('REFINERY_FRAME_MAGIC', 'C0CAC01AC0DE'))
ISPS1 = is_powershell_process()

if ISPS1:
MAGIC = B'[[%s]]' % base64.b85encode(MAGIC)


class Chunk(bytearray):
"""
Represents the individual chunks in a frame. The `refinery.units.Unit.filter` method
receives an iterable of `refinery.lib.frame.Chunk`s.
"""

Magic = bytes.fromhex(
os.environ.get('REFINERY_FRAME_MAGIC', 'C0CAC01AC0DE'))

def __init__(
self,
data: ByteString,
Expand Down Expand Up @@ -259,14 +283,21 @@ def unpack(cls, stream):
"""
Classmethod to read a serialized chunk from an unpacker stream.
"""
path, view, meta, fill, data = next(stream)
item = next(stream)
if ISPS1:
item = json.loads(item, cls=BytesDecoder)
path, view, meta, fill, data = item
return cls(data, path, view=view, meta=meta, fill=fill)

def pack(self):
"""
Return the serialized representation of this chunk.
"""
return msgpack.packb((self._path, self._view, self._meta, self._fill, self))
obj = (self._path, self._view, self._meta, self._fill, self)
if ISPS1:
packed = json.dumps(obj, cls=BytesEncoder, separators=(',', ':')) + '\n'
return packed.encode('utf8')
return msgpack.packb(obj)

def __repr__(self) -> str:
layer = '/'.join('#' if not s else str(p) for p, s in zip(self._path, self._view))
Expand Down Expand Up @@ -317,18 +348,15 @@ class FrameUnpacker:
iterator over `[BOO, BAZ]`.
"""
def __init__(self, stream: Optional[BinaryIO]):
import msgpack
self.finished = False
self.trunk = ()
self._next = Chunk(bytearray(), ())
buffer = stream and stream.read(len(Chunk.Magic)) or B''
if buffer == Chunk.Magic:
buffer = stream and stream.read(len(MAGIC)) or B''
if buffer == MAGIC:
self.framed = True
self.stream = stream
self.unpacker = msgpack.Unpacker(
max_buffer_size=0xFFFFFFFF,
use_list=False
)
if not ISPS1:
self.unpacker = msgpack.Unpacker(max_buffer_size=0xFFFFFFFF, use_list=False)
self._advance()
self.gauge = len(self._next.path)
else:
Expand All @@ -341,7 +369,8 @@ def __init__(self, stream: Optional[BinaryIO]):
def _advance(self) -> None:
while not self.finished:
try:
self._next = Chunk.unpack(self.unpacker)
stream = self.stream if ISPS1 else self.unpacker
self._next = Chunk.unpack(stream)
break
except StopIteration:
pass
Expand Down Expand Up @@ -483,7 +512,7 @@ def __iter__(self):
if self.unpack.finished:
return
if self.nesting > 0:
yield Chunk.Magic
yield MAGIC
rest = (0,) * (self.nesting - 1)
while self.unpack.nextframe():
for k, chunk in enumerate(self._apply_filter()):
Expand All @@ -496,7 +525,7 @@ def __iter__(self):
for chunk in self._apply_filter():
yield from self._generate_bytes(chunk)
elif self.nesting == 0:
yield Chunk.Magic
yield MAGIC
while self.unpack.nextframe():
for chunk in self._apply_filter():
if not chunk.visible:
Expand All @@ -508,7 +537,7 @@ def __iter__(self):
gauge = max(self.unpack.gauge + self.nesting, 0)
trunk = None
if gauge:
yield Chunk.Magic
yield MAGIC
while self.unpack.nextframe():
while True:
for chunk in self._apply_filter():
Expand Down
72 changes: 72 additions & 0 deletions refinery/lib/powershell.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Windows-specific module to determine whether the current Python process is running in a PowerShell process.
"""
from __future__ import annotations

import ctypes
import functools
import os

from typing import Iterable, get_type_hints


class FieldsFromTypeHints(type(ctypes.Structure)):
def __new__(cls, name, bases, namespace):
class AnnotationDummy:
__annotations__ = namespace.get('__annotations__', {})
annotations = get_type_hints(AnnotationDummy)
namespace['_fields_'] = list(annotations.items())
return type(ctypes.Structure).__new__(cls, name, bases, namespace)


class PROCESSENTRY32(ctypes.Structure, metaclass=FieldsFromTypeHints):
dwSize : ctypes.c_uint32
cntUsage : ctypes.c_uint32
th32ProcessID : ctypes.c_uint32
th32DefaultHeapID : ctypes.POINTER(ctypes.c_ulong)
th32ModuleID : ctypes.c_uint32
cntThreads : ctypes.c_uint32
th32ParentProcessID : ctypes.c_uint32
pcPriClassBase : ctypes.c_long
dwFlags : ctypes.c_uint32
szExeFile : ctypes.c_char * 260


def get_parent_processes() -> Iterable[str]:
k32 = ctypes.windll.kernel32
entry = PROCESSENTRY32()
entry.dwSize = ctypes.sizeof(PROCESSENTRY32)
snap = k32.CreateToolhelp32Snapshot(2, 0)
if not snap:
raise RuntimeError('could not create snapshot')
try:
if not k32.Process32First(snap, ctypes.byref(entry)):
raise RuntimeError('could not iterate processes')
processes = {
entry.th32ProcessID: (
entry.th32ParentProcessID,
bytes(entry.szExeFile).decode('latin1')
) for _ in iter(
functools.partial(k32.Process32Next, snap, ctypes.byref(entry)), 0)
}
finally:
k32.CloseHandle(snap)
pid = os.getpid()
while pid in processes:
pid, path = processes[pid]
yield path


def is_powershell_process() -> bool:
if os.name != 'nt':
return False
for process in get_parent_processes():
name, _ = os.path.splitext(process)
name = name.lower()
if name == 'cmd':
return False
if name == 'powershell':
return True
return False
15 changes: 15 additions & 0 deletions test/lib/test_powershell.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from refinery.lib.powershell import get_parent_processes, is_powershell_process

from .. import TestBase


class TestPowerShellDetection(TestBase):

def test_process_trace(self):
processes = list(get_parent_processes())
self.assertTrue(any('python' in p for p in processes))

def test_not_running_in_powershell(self):
self.assertFalse(is_powershell_process())

0 comments on commit fcc36d8

Please sign in to comment.