In [1]:
import socket
import sys
import unittest
from unittest.mock import patch
import fickling.analysis as analysis
from fickling.pickle import Pickled
import builtins
import os
from IPython.display import Markdown
import pickle
import ast
import pickletools
from typing import Optional, TextIO, Tuple

In [2]:
# step 1 - install fickling
!pip3 install fickling



In [3]:
!pip3 install termcolor



In [4]:
import ast
import json
import os
import re
import subprocess
import sys
from fickling.pickle import Pickled
import pickle
import _pickle as cPickle

from termcolor import colored
# -attacks
# https://www.cadosecurity.com/linux-attack-techniques-dynamic-linker-hijacking-with-ld-preload/
# https://www.cybertriage.com/blog/training/how-to-detect-running-malware-intro-to-incident-response-triage-part-7/

# https://www.beyondtrust.com/blog/entry/important-linux-files-protect
BAD_LIBRARY = {'/etc/hosts', '/bin/sh', '/etc/passwd', '/etc/pam.conf', '/proc', '/etc/shadow', '/etc/profile',
               '~/.bash_profile', '~/.bash_login', '~/.profile. /home/user/.bashrc', '/etc/bash.bashrc',
               '/etc/profile.d/', '/etc/system.d', '/etc/rc.*', '/etc/init.*.', '/etc/resolv.conf', '/etc/gshadow',
               '/etc/pam.d', '/bin', '/sbin'}
# This technique is often called DLL injection on Windows.
# With DLL injection, the attacker creates a malicious library with the same name and API as the good one.
# The program loads the malicious library and it, in turn, loads the good one and it will call the good one as needed to do the operations that the original program wants.
BAD_CALLS = {'os', 'shutil', 'sys', 'requests', 'net', 'func',
             'args',
             'keywords', }
BAD_SIGNAL = {'eval', 'compile', 'rm ', 'cat ', 'nc ', 'exec', 'open', 'run'}
BAD_FILES = {'.py', '.exe', '.dll', '.so'}
# https://redcanary.com/threat-detection-report/techniques/powershell/
# PowerShell -encodedcommand switch
# This detection analytic looks for the execution of powershell.exe with command lines that include variations of the -encodedcommand argument; PowerShell will recognize and accept anything from -e onward, and it will show up outside of the encoded bits.
BAD_COMMAND = {'powershell.exe', '-e', '-en', '-enc', '-enco', 'ls', 'base64'}
# Obfuscation and escape characters
# Obfuscation can disrupt detection logic by splitting commands or parameters or inserting extra characters (that are ignored by PowerShell).
# Monitor for the execution of PowerShell with unusually high counts of characters like ^, +, $, and %.
BAD_CHARACTER = {'^', '+', '$', '%'}
# Suspicious PowerShell cmdlets
# Many of our PowerShell detection analytics look for cmdlets, methods, and switches that may indicate malicious activity.
# The following analytic is by no means exhaustive but offers a few valuable examples of suspicious cmdlets and other oft-abused features to look out for:
BAD_CMD = {'-nop', '-noni', 'invoke-expression', 'iex', '.downloadstring', 'downloadfile'}
BAD_MODULE = {"__init__", "__new__", "__reduce__", "__builtin__", "os", "subprocess", "sys", "builtins", "socket"}
BAD_IMPORT = {'module', 'names', 'level', }

class scan_pickle_file:
    def __init__(self):
        pass
    def scann(scan):
        with open(scan, 'rb') as f:
                print()
                scan=(str(f.read()))
        print("\n******scanning-pickle******")
        result_output = ""
        result_total = 0
        result_other = 0
        result_calls = {}
        result_signals = {}
        result_files = {}
        result_library = {}
        result_cmd = {}
        result_moudle = {}
        result_import = {}

        for call in BAD_CALLS:
            result_calls[call] = 0
        for signal in BAD_SIGNAL:
            result_signals[signal] = 0
        for file in BAD_FILES:
            result_files[file] = 0
        for lib in BAD_LIBRARY:
            result_library[lib] = 0
        for cmd in BAD_CMD:
            result_cmd[cmd] = 0
        for moudle in BAD_MODULE:
            result_moudle[moudle] = 0
        for impor in BAD_IMPORT:
            result_import[impor] = 0

        input = scan
        for call in BAD_CALLS:
            if (input.find(call) > -1):
                result_calls[call] += 1
                result_total += 1
                result_output += "----- found lib call (" + call + ") -----\n"
                result_output += input

        for signal in BAD_SIGNAL:
            if (input.find(signal) > -1):
                result_signals[signal] += 1
                result_total += 1
                result_output += "----- found malicious signal (" + signal + ") -----\n"
                result_output += input

        for file in BAD_FILES:
            if (input.find(file) > -1):
                result_files[file] += 1
                result_total += 1
                result_output += "----- found malicious file (" + file + ") -----\n"
                result_output += input

        for lib in BAD_LIBRARY:
            if (input.find(lib) > -1):
                result_library[lib] += 1
                result_total += 1
                result_output += "----- found malicious signal (" + lib + ") -----\n"
                result_output += input

        for impo in BAD_IMPORT:
            if (input.find(impo) > -1):
                result_import[impo] += 1
                result_total += 1
                result_output += "----- found malicious import (" + impo + ") -----\n"
                result_output += input
        for cm in BAD_CMD:
            if (input.find(cm) > -1):
                result_cmd[impo] += 1
                result_total += 1
                result_output += "----- found malicious cmd command (" + cm + ") -----\n"
                result_output += input
        for mod in BAD_MODULE:
            if (input.find(mod) > -1):
                result_moudle[mod] += 1
                result_total += 1
                result_output += "----- found malicious module (" + mod + ") -----\n"
                result_output += input

        if result_total > 0:
            for file in BAD_FILES:
                if (result_files[file])>0:
                    print("malicious file (" + file + "): " + str(result_files[file]))
            for lib in BAD_LIBRARY:
                if (result_library[lib])>0:
                    print("malicious lib (" + lib + "): " + str(result_library[lib]))
            for call in BAD_CALLS:
                if (result_calls[call])>0:
                    print("library call (" + call + ".): " + str(result_calls[call]))
            for signal in BAD_SIGNAL:
                if (result_signals[signal])>0:
                    print("malicious signal (" + signal + "): " + str(result_signals[signal]))
            for c in BAD_CMD:
                if (result_cmd[c])>0:
                    print("malicious cmd command (" + c + "): " + str(result_cmd[c]))
            for m in BAD_MODULE:
                if (result_moudle[m])>0:
                    print("malicious module (" + m + "): " + str(result_moudle[m]))
            for i in BAD_IMPORT:
                if (result_import[i])>0:
                    print("malicious import (" + i + "): " + str(result_import[i]))
            if (result_other)>0:
                print("non-standard calls: " + str(result_other))
            # print("total: " + str(result_total))

            print(colored("SCAN FAILED\n", "red"))

            # print(result_output)
            # print(result_total)
        else:
            print(colored("SCAN PASSED!", "green"))

In [5]:
if sys.version_info < (3, 9):
    from astunparse import unparse
else:
    from ast import unparse

from fickling.pickle import Interpreter, Pickled
class cdr:
    def __init__(self):
        pass

    def code(self, pickled: Pickled) -> str:
        """
        Returns the string representation of the code object that was pickled.
        """
        code = pickled['code']
        return code.co_code.decode('utf-8')

    def check_safety(
        self, pickled: Pickled, filename, stdout: Optional[TextIO] = None, stderr: Optional[TextIO] = None
    ) -> bool:
        if stdout is None:
            stdout = sys.stdout
        if stderr is None:
            stderr = sys.stderr

        properties = pickled.properties
        likely_safe = True
        reported_shortened_code = set()

        def shorten_code(ast_node) -> Tuple[str, bool]:
            code = unparse(ast_node).strip()
            if len(code) > 32:
                cutoff = code.find("(")
                if code[cutoff] == "(":
                    shortened_code = f"{code[:code.find('(')].strip()}(...)"
                else:
                    shortened_code = code
            else:
                shortened_code = code
            was_already_reported = shortened_code in reported_shortened_code
            reported_shortened_code.add(shortened_code)
            return shortened_code, was_already_reported

        safe_lines = []

        with open(filename, 'rb') as f:
            code = str(f.read().decode('latin1'))

        for line in code.split('\n'):
            try:
                clean_string = line.replace('\x00', '')
                ast_node = compile(clean_string, '<string>', 'exec', ast.PyCF_ONLY_AST)
            except SyntaxError:
                continue
            is_safe = True
            for node in ast.walk(ast_node):
                if isinstance(node, ast.Call):
                    if (
                        isinstance(node.func, ast.Name)
                        and node.func.id == 'eval'
                    ):
                        is_safe = False
                    elif (
                        isinstance(node.func, ast.Attribute)
                        and node.func.attr == 'loads'
                        and isinstance(node.func.value, ast.Name)
                        and node.func.value.id == 'pickle'
                    ):
                        is_safe = False
                elif isinstance(node, ast.Import):
                    for alias in node.names:
                        if not alias.name.startswith('_') and alias.name not in sys.modules:
                            is_safe = False
                elif isinstance(node, ast.ImportFrom):
                    if not node.module.startswith('_') and node.module not in sys.modules:
                        is_safe = False
                elif ("eval" in line) or ("exec" in line) or ("compile" in line) or ("open" in line):
                    is_safe = False
                elif ("__builtin__"in line) or ("os" in line) or ("subprocess" in line) or ("sys" in line) or ("builtins" in line) or ("socket" in line):
                    is_safe = False

        with open(filename, 'rb') as f:
            data = f.read()

        for op in pickletools.genops(data):
            if type(op[1]) == str and all(substring not in op[1] for substring in ["eval", "exec", "compile", "open", "__builtin__", "os", "subprocess", "sys", "builtins", "socket"]):
                safe_lines.append(op[1])

        with open(filename, 'wb') as f:
            pickle.dump('\n'.join(safe_lines), f)

        if not safe_lines:
            return "False"

        if likely_safe:
            return "True"
        else:
            return "False"


In [6]:

def setUpClass():
        class ExecuteCode(object):
            def __reduce__(self):
                return (builtins.exec, ("import os; os.system('echo I am executing arbitrary code!')",))
        class EvalCode(object):
            def __reduce__(self):
                return (eval, ("['a', 'b', 'c']",))
        class CompileCode(object):
            def __reduce__(self):
                return (compile, ("print('I execute code that runs on your computer')", "<string>", "exec"),)
        class OpenFile(object):
            def __reduce__(self):
                return (builtins.exec, ("f = open('/etc/passwd', 'r'); print(f.read()); f.close()",))
        class Os(object):
            def __reduce__(self):
                import os
                return (os.system, ("echo 'I execute code that runs on your computer'",))
        class MalSocket(object):
            def __reduce__(self):
                return socket.socket, (socket.AF_INET, socket.SOCK_STREAM)

        if os.path.exists(
                'malicious_socket.pkl'):
            os.remove(
                'malicious_socket.pkl')
        with open('malicious_socket.pkl', 'wb') as f:
            pickle.dump(MalSocket(), f)

        my_list = ['a', 'b', 'c']
        with open('malicious_exec.pkl', 'wb') as f:
            pickle.dump((ExecuteCode(), my_list), f)

        student_names = ['Alice','Bob','Elena','Jane','Kyle']
        with open('student_file.pkl', 'wb') as f:  # open a text file
            pickle.dump(student_names, f) # serialize the list

        with open('malicious_eval.pkl', 'wb') as f:
            pickle.dump(EvalCode(), f)

        with open(
                'malicious_compile.pkl', 'wb') as f:
            pickle.dump(CompileCode(), f)

        with open('malicious_open.pkl', 'wb') as f:
            pickle.dump(OpenFile(), f)

        # create a list to pickle
        fruits = ['apple', 'banana', 'orange']
        # open a file in write binary mode to pickle
        with open('fruits.pkl', 'wb') as f:
            # pickle the list
            pickle.dump(fruits, f)

        # create a dictionary to pickle
        person = {'name': 'John', 'age': 30, 'city': 'New York'}
        # open a file in write binary mode to pickle
        with open('person_dictionary.pkl', 'wb') as f:
            # pickle the dictionary
            pickle.dump(person, f)
        with open('malicious_os.pkl', 'wb') as f:
            pickle.dump(Os(), f)

        # Create a malicious pickle
        student_names = ['Alice','Bob','Elena','Jane','Kyle']
        pickle_bin = pickle.dumps(student_names)
        p = Pickled.load(pickle_bin)
        p.insert_python_exec("with open('/etc/passwd','r') as r: print(r.readlines())")
        p.insert_python_exec("with open('/etc/group','r') as r: print(r.readlines())")
        p.insert_python_exec("import module print('malicious')")
        p.insert_python_exec("import os  os.system('echo Malicious code!')")

        with open('unsafe.pkl', 'wb') as f:
            p.dump(f)

In [7]:
setUpClass()

In [8]:
def process_file(filename):
    display(Markdown("-------------------------- " + os.path.splitext(filename)[0] + " ----------------------------------"))
    
    with patch('sys.stdout') as stdout:
            
        with open(filename, 'rb') as f:
                pickled_data = f.read()
        pickled_obj = Pickled.load(pickled_data)
            # First run analysis.py
        analysis_result = analysis.check_safety(pickled_obj)
        display(str(analysis_result)) # Expecting clean
        if str(analysis_result) == '':
            display(Markdown("clean"))
        else:
            display(Markdown("not clean"))
            scan_pickle_file.scann(filename)
            print("Now removing the malicious data....")
            with patch('sys.stdout') as stdout:
                cdr().check_safety(pickled_obj,filename)
                
                # Finally, run analysis.py again
                with open(filename, 'rb') as f:
                    pickled_data = f.read()
                pickled_obj = Pickled.load(pickled_data)
                analysis_result_2 = analysis.check_safety(pickled_obj)
                display(str(analysis_result_2)) # Expecting clean
            # Check stdout for expected messages
            if str(analysis_result_2) == '':
                display(Markdown("clean"))
                display(Markdown("\nThe clean data left in the file:"))
                with open(filename, 'rb') as f:
                    pickled_data = pickle.load(f)
                display(str(pickled_data))
            else:
                display(Markdown("not clean"))



In [9]:
filename = "malicious_exec.pkl"
process_file(filename)


-------------------------- malicious_exec ----------------------------------


Do not unpickle this file if it is from an untrusted source!


'Call to `exec(...)` is almost certainly evidence of a malicious pickle file\nVariable `_var0` is assigned value `exec(...)` but unused afterward; this is suspicious and indicative of a malicious pickle file'

not clean


Do not unpickle this file if it is from an untrusted source!


''

clean


The clean data left in the file:

'a\nb\nc'

In [10]:
filename = "unsafe.pkl"
process_file(filename)


-------------------------- unsafe ----------------------------------


Do not unpickle this file if it is from an untrusted source!


'Call to `exec(...)` is almost certainly evidence of a malicious pickle file\nCall to `exec(...)` is almost certainly evidence of a malicious pickle file\nCall to `exec(...)` is almost certainly evidence of a malicious pickle file\nCall to `exec(...)` is almost certainly evidence of a malicious pickle file\nVariable `_var0` is assigned value `exec(...)` but unused afterward; this is suspicious and indicative of a malicious pickle file\nVariable `_var1` is assigned value `exec(...)` but unused afterward; this is suspicious and indicative of a malicious pickle file\nVariable `_var3` is assigned value `exec(...)` but unused afterward; this is suspicious and indicative of a malicious pickle file\nVariable `_var2` is assigned value `exec(...)` but unused afterward; this is suspicious and indicative of a malicious pickle file'

not clean


Do not unpickle this file if it is from an untrusted source!


''

clean


The clean data left in the file:

"import module print('malicious')\nAlice\nBob\nElena\nJane\nKyle"

In [11]:
filename = "malicious_compile.pkl"
process_file(filename)


-------------------------- malicious_compile ----------------------------------


Do not unpickle this file if it is from an untrusted source!


'Call to `compile(...)` is almost certainly evidence of a malicious pickle file\nVariable `_var0` is assigned value `compile(...)` but unused afterward; this is suspicious and indicative of a malicious pickle file'

not clean


Do not unpickle this file if it is from an untrusted source!


''

clean


The clean data left in the file:

'<string>'

In [12]:
filename = "malicious_open.pkl"
process_file(filename)


-------------------------- malicious_open ----------------------------------


Do not unpickle this file if it is from an untrusted source!


'Call to `exec(...)` is almost certainly evidence of a malicious pickle file\nVariable `_var0` is assigned value `exec(...)` but unused afterward; this is suspicious and indicative of a malicious pickle file'

not clean


Do not unpickle this file if it is from an untrusted source!


''

clean


The clean data left in the file:

''

In [13]:
filename = "malicious_os.pkl"
process_file(filename)

-------------------------- malicious_os ----------------------------------


Do not unpickle this file if it is from an untrusted source!


'Variable `_var0` is assigned value `system(...)` but unused afterward; this is suspicious and indicative of a malicious pickle file'

not clean


Do not unpickle this file if it is from an untrusted source!


''

clean


The clean data left in the file:

'nt'

In [14]:
filename = "malicious_eval.pkl"
process_file(filename)


-------------------------- malicious_eval ----------------------------------


Do not unpickle this file if it is from an untrusted source!


'Call to `eval("[\'a\', \'b\', \'c\']")` is almost certainly evidence of a malicious pickle file\nVariable `_var0` is assigned value `eval("[\'a\', \'b\', \'c\']")` but unused afterward; this is suspicious and indicative of a malicious pickle file'

not clean


Do not unpickle this file if it is from an untrusted source!


''

clean


The clean data left in the file:

"['a', 'b', 'c']"

In [15]:
filename = "malicious_socket.pkl"
process_file(filename)


-------------------------- malicious_socket ----------------------------------


Do not unpickle this file if it is from an untrusted source!


'`from socket import socket` is suspicious and indicative of an overtly malicious pickle file\n`from socket import AF_INET` is suspicious and indicative of an overtly malicious pickle file\n`from socket import SOCK_STREAM` is suspicious and indicative of an overtly malicious pickle file\nVariable `_var0` is assigned value `socket(AF_INET, SOCK_STREAM)` but unused afterward; this is suspicious and indicative of a malicious pickle file'

not clean


Do not unpickle this file if it is from an untrusted source!


''

clean


The clean data left in the file:

'AF_INET\nSOCK_STREAM'

In [17]:
filename = "student_file.pkl"
process_file(filename)


-------------------------- student_file ----------------------------------


Do not unpickle this file if it is from an untrusted source!


''

clean

In [18]:
filename = "fruits.pkl"
process_file(filename)


-------------------------- fruits ----------------------------------


Do not unpickle this file if it is from an untrusted source!


''

clean

In [19]:
filename = "person_dictionary.pkl"
process_file(filename)


-------------------------- person_dictionary ----------------------------------


Do not unpickle this file if it is from an untrusted source!


''

clean