In [1]:
import re

In [2]:
WHITESPACE_QUOTES_REGEX = re.compile(r"""([^\s]*?\".*?\"[^\s]*?|[^\s]*?\'.*?\'[^\s]*?|\'.*?\'|\".*?\"|[^\"\'\s]+)""")
SEMICOLON_REGEX = re.compile(r"""^[^\"']*;[^\"']*$""")

FLAG_SEP_EQUALS_REGEX = re.compile(r"""^[-\/]+[^\=\"\']+\=(([^=]{1}.*)|())$""")
FLAG_SEP_COLON_REGEX = re.compile(r"""^[\/\-][^\:\"\'\=]+\:[^=]*$""")

REDIRECTION_LEFT_REGEX = re.compile(r"""(^[^'"<>]+<[^'"<>]+$)|(^<[^<>]+$)|(^[^<]+<$)""")
DOUBLE_REDIRECTION_LEFT_REGEX = re.compile(r"""(^[^'"<>]+<<[^'"<>]+$)|(^<<[^<>]+$)|(^[^<]+<<$)""")
REDIRECTION_RIGHT_REGEX = re.compile(r"""(^[^'"<>]+>[^'"<>]+$)|(^>[^>]+$)|(^[^<>]+>$)""")
DOUBLE_REDIRECTION_RIGHT_REGEX = re.compile(r"""(^[^'"<>]+>>[^'"<>]+$)|(^>>[^>]+$)|(^[^<>]+>>$)""")

PIPE_REGEX = re.compile(r"""(^[^'"|]+\|[^'"|]+$)|(^\|[^\|]+$)|(^[^\|]+\|$)""")
DOUBLE_PIPE_REGEX = re.compile(r"""(^[^'"\|]+\|\|[^'"\|]+$)|(^\|\|[^\|]+$)|(^[^\|]+\|\|$)""")

RE_CMD_LEX_LINUX = re.compile(r""""((?:\\["\\]|[^"])*)"|'([^']*)'|(\\.)|(&&?|\|\|?|\d?\>|[<])|([^\s'"\\&|<>]+)|(\s+)|(.)""")
RE_CMD_LEX_WINDOWS = re.compile(r""""((?:""|\\["\\]|[^"])*)"?()|(\\\\(?=\\*")|\\")|(&&?|\|\|?|\d?>|[<])|([^\s"&|<>]+)|(\s+)|(.)""")

EMPTY_STRING = "<empty_string>"

In [3]:
class Tokenizer:
    def __init__(self, mode="manual", os_name="windows"):
        if mode not in ["manual", "custom"]:
            raise AttributeError("Wrong mode. Possible values: 'manual', 'custom'")
        self.mode = mode
        if os_name.lower() not in ["windows", "linux"]:
            raise AttributeError("Wrong os_name. Possible values: 'windows', 'linux'")
        self.os_name = os_name.lower()

    def get_metadata(self):
        metadata =  {
            "mode": self.mode,
            "os_name": self.os_name
        }
        if self.mode == "custom":
            metadata["empty_string_tag"] = EMPTY_STRING
        return metadata

    def _get_splitters(self, token):
        one_char_splitters = []
        multichar_splitters = []

        if (
            self.os_name == "linux"
            or (self.os_name == "windows" and self.mode == "manual")
        ) and SEMICOLON_REGEX.match(token):
            one_char_splitters.append(";")
        if FLAG_SEP_EQUALS_REGEX.match(token):
            one_char_splitters.append("=")
        if FLAG_SEP_COLON_REGEX.match(token):
            one_char_splitters.append(r"\:")

        if DOUBLE_REDIRECTION_LEFT_REGEX.match(token):
            multichar_splitters.append("<<")
        elif REDIRECTION_LEFT_REGEX.match(token):
            one_char_splitters.append("<")

        if DOUBLE_REDIRECTION_RIGHT_REGEX.match(token):
            multichar_splitters.append(">>")
        elif REDIRECTION_RIGHT_REGEX.match(token):
            one_char_splitters.append(">")

        if DOUBLE_PIPE_REGEX.match(token):
            multichar_splitters.append(r"\|\|")
        elif PIPE_REGEX.match(token):
            one_char_splitters.append(r"\|")
        return one_char_splitters, multichar_splitters

    def _get_split_regex(self, one_char_splitters, multichar_splitters):
        splitters = []
        if one_char_splitters:
            splitters.append(f"[{''.join(one_char_splitters)}]")
        if multichar_splitters:
            splitters.extend(multichar_splitters)
        if len(splitters) >= 1:
            return "(" + "|".join(splitters) + ")"
        else:
            return None

    def _base_split(self, sentence):
        if self.mode == "manual":
            return WHITESPACE_QUOTES_REGEX.findall(sentence)
        elif self.mode == "custom":
            return self._custom_cmdline_split(sentence)
        return []

    def _custom_cmdline_split(self, s):
        """Multi-platform variant of shlex.split() for command-line splitting.
        For use with subprocess, for argv injection etc. Using fast REGEX.
        """
        args = []
        accu = None  # collects pieces of one arg
        if self.os_name == "linux":
            re_cmd_lex = RE_CMD_LEX_LINUX
        elif self.os_name == "windows":
            re_cmd_lex = RE_CMD_LEX_WINDOWS
        else:
            return args

        is_quote = False
        for qs, qss, esc, pipe, word, white, fail in re_cmd_lex.findall(s):
            if word:
                pass  # most frequent
            elif esc:
                word = esc[1]
            elif white or pipe:
                if accu is not None:
                    args.append(accu)
                if pipe:
                    args.append(pipe)
                is_quote = False
                accu = None
                continue
            elif fail:
                raise ValueError("invalid or incomplete shell string")
            elif qs:
                word = qs.replace('\\"', '"').replace("\\\\", "\\")
                if self.os_name == "windows":
                    word = word.replace('""', '"')
                is_quote = True
            elif qss:
                word = qss  # may be even empty; must be last
            else:
                if not is_quote:
                    word = EMPTY_STRING
            accu = (accu or "") + word
        if accu is not None:
            args.append(accu)
        return args

    def _merge_tokens(self, tokens):
        def _merge_ampersand(tokens):
            found = False
            for i in range(len(tokens) - 2):
                if tokens[i] == ">" and tokens[i + 1] == "&":
                    found = True
                    break
            if found:
                i += 1
                replacement = tokens[i] + tokens[i + 1]
                tokens[i] = replacement
                for j in range(i + 1, len(tokens) - 1):
                    tokens[j] = tokens[j + 1]
                tokens = tokens[: len(tokens) - 1]
            return tokens

        def _merge_redirections(tokens):
            found = False
            for i in range(len(tokens) - 1):
                if tokens[i] == ">" and tokens[i + 1] == ">":
                    found = True
                    break
            if found:
                replacement = tokens[i] + tokens[i + 1]
                tokens[i] = replacement
                for j in range(i + 1, len(tokens) - 1):
                    tokens[j] = tokens[j + 1]
                tokens = tokens[: len(tokens) - 1]
            return tokens

        tokens = _merge_ampersand(tokens)
        tokens = _merge_redirections(tokens)
        return tokens

    def tokenize(self, sentence):
        tokens = []
        for token in self._base_split(sentence):
            one_char_splitters, multichar_splitters = self._get_splitters(token)
            split_regex = self._get_split_regex(one_char_splitters, multichar_splitters)
            if split_regex:
                token = re.split(
                    split_regex,
                    token,
                    maxsplit=len(one_char_splitters) + len(multichar_splitters),
                )
                tokens.extend(filter(None, token))
            else:
                tokens.append(token)
        tokens = self._merge_tokens(tokens)
        return tokens

In [5]:
#from tokenize import Tokenizer

if __name__ == "__main__":
    windows_tokenizer = Tokenizer(mode="custom", os_name='windows')
    windows_cmd = 'cmd.exe /C dir C:\\'
    print(windows_cmd, '->', windows_tokenizer.tokenize(windows_cmd))

    linux_tokenizer = Tokenizer(mode="manual", os_name='linux')
    linux_cmd = 'ls -alh /home >log.txt'
    print(linux_cmd, '->', linux_tokenizer.tokenize(linux_cmd))

cmd.exe /C dir C:\ -> ['cmd.exe', '/C', 'dir', 'C:\\']
ls -alh /home >log.txt -> ['ls', '-alh', '/home', '>', 'log.txt']
