Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

minor update about sessions and added warning before deletion in expo… #1003

Merged
merged 1 commit into from Feb 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
58 changes: 34 additions & 24 deletions androguard/cli/main.py
Expand Up @@ -19,6 +19,7 @@
from androguard.util import readFile
from androguard.ui import DynamicUI


def androaxml_main(inp, outp=None, resource=None):
ret_type = androconf.is_android(inp)
if ret_type == "APK":
Expand Down Expand Up @@ -81,7 +82,6 @@ def export_apps_to_format(filename,
decompiler_type=None,
form=None):
from androguard.misc import clean_file_name
from androguard.core.dex import DEX
from androguard.core.bytecode import method2dot, method2format
from androguard.decompiler import decompiler
print("Dump information {} in {}".format(filename, output))
Expand All @@ -90,9 +90,19 @@ def export_apps_to_format(filename,
print("Create directory %s" % output)
os.makedirs(output)
else:
print("Clean directory %s" % output)
androconf.rrmdir(output)
os.makedirs(output)
while True:
user_input = input(f"Do you want to clean the directory {output}? (Y/N): ").strip().lower()

if user_input == 'y':
print("Deleting...")
androconf.rrmdir(output)
os.makedirs(output)
break
elif user_input == 'n':
print("Not deleting.")
break
else:
print("Invalid input. Please enter Y or N.")

methods_filter_expr = None
if methods_filter:
Expand Down Expand Up @@ -137,7 +147,7 @@ def export_apps_to_format(filename,
for method in vm.get_methods():
if methods_filter_expr:
msig = "{}{}{}".format(method.get_class_name(), method.get_name(),
method.get_descriptor())
method.get_descriptor())
if not methods_filter_expr.search(msig):
continue

Expand All @@ -147,8 +157,8 @@ def export_apps_to_format(filename,
create_directory(filename_class)

print("Dump {} {} {} ...".format(method.get_class_name(),
method.get_name(),
method.get_descriptor()), end=' ')
method.get_name(),
method.get_descriptor()), end=' ')

filename = clean_file_name(os.path.join(filename_class, method.get_short_string()))

Expand Down Expand Up @@ -198,26 +208,21 @@ def androlyze_main(session, filename):
from colorama import Fore
import colorama
import atexit

from IPython.terminal.embed import embed

from traitlets.config import Config

from androguard.core.androconf import ANDROGUARD_VERSION, CONF
from androguard.session import Session
from androguard.core import dex, apk
from androguard.core.analysis.analysis import Analysis
from androguard.pentest import Pentest
from androguard.ui import DynamicUI
from androguard.misc import AnalyzeAPK

colorama.init()

if session:
logger.info("Restoring session '{}'...".format(session))
s = CONF['SESSION'] = Load(session)
logger.info("Successfully restored {}".format(s))
# TODO Restore a, d, dx etc...
logger.info("TODO: Restoring session '{}'...".format(session))
# s = CONF['SESSION'] = Load(session)
# logger.info("Successfully restored {}".format(s))
# TODO actually restore the session a, d, dx etc...
else:
s = CONF["SESSION"] = Session(export_ipython=True)

Expand All @@ -230,7 +235,9 @@ def androlyze_main(session, filename):
logger.info("Found the provided file is of type '{}'".format(filetype))

if filetype not in ['DEX', 'DEY', 'APK']:
logger.error(Fore.RED + "This file type is not supported by androlyze for auto loading right now!" + Fore.RESET, file=sys.stderr)
logger.error(
Fore.RED + "This file type is not supported by androlyze for auto loading right now!" + Fore.RESET,
file=sys.stderr)
logger.error("But your file is still available:")
logger.error(">>> filename")
logger.error(repr(filename))
Expand Down Expand Up @@ -323,7 +330,8 @@ def androsign_main(args_apk, args_hash, args_all, show):
print("Is signed v2: {}".format(a.is_signed_v2()))
print("Is signed v3: {}".format(a.is_signed_v3()))

certs = set(a.get_certificates_der_v3() + a.get_certificates_der_v2() + [a.get_certificate_der(x) for x in a.get_signature_names()])
certs = set(a.get_certificates_der_v3() + a.get_certificates_der_v2() + [a.get_certificate_der(x) for x in
a.get_signature_names()])
pkeys = set(a.get_public_keys_der_v3() + a.get_public_keys_der_v2())

if len(certs) > 0:
Expand Down Expand Up @@ -377,7 +385,7 @@ def androdis_main(offset, size, dex_file):

with open(dex_file, "rb") as fp:
buf = fp.read()

d = DEX(buf)

if size == 0 and offset == 0:
Expand All @@ -404,6 +412,7 @@ def androdis_main(offset, size, dex_file):

idx += i.get_length()


def androtrace_main(apk_file, list_modules, live=False, enable_ui=False):
from androguard.pentest import Pentest
from androguard.session import Session
Expand All @@ -429,8 +438,9 @@ def androtrace_main(apk_file, list_modules, live=False, enable_ui=False):
import time

time.sleep(1)

ui = DynamicUI(p.message_queue)

def inputhook(inputhook_context: InputHookContext):
while not inputhook_context.input_is_ready():
if ui.process_data():
Expand All @@ -444,8 +454,8 @@ def inputhook(inputhook_context: InputHookContext):
else:
logger.warning("Type 'e' to exit the strace ")
s = ""
while (s!='e') and (not p.is_detached()):
s = input("Type 'e' to exit:")
while (s != 'e') and (not p.is_detached()):
s = input("Type 'e' to exit:")


def androdump_main(package_name, list_modules):
Expand Down
3 changes: 2 additions & 1 deletion androguard/core/bytecode.py
Expand Up @@ -3,6 +3,7 @@
from struct import pack
import textwrap
import json
from loguru import logger

from androguard.core.androconf import CONF, color_range
from androguard.core.dex.dex_types import Kind, Operand
Expand Down Expand Up @@ -461,7 +462,7 @@ def method2format(output, _format="png", mx=None, raw=None):
d = pydot.graph_from_dot_data(buff)
if len(d) > 1:
# Not sure what to do in this case?!
logger.warnig("The graph generated for '{}' has too many subgraphs! "
logger.warning("The graph generated for '{}' has too many subgraphs! "
"Only plotting the first one.".format(output))
for g in d:
getattr(g, "write_" + _format.lower())(output)
Expand Down
54 changes: 19 additions & 35 deletions androguard/session.py
Expand Up @@ -9,37 +9,19 @@

from loguru import logger


class Session:
"""
A Session is able to store multiple APK, DEX or ODEX files and can be pickled
to disk in order to resume work later.

The main function used in Sessions is probably :meth:`add`, which adds files
to the session and performs analysis on them.

Afterwards, the files can be gathered using methods such as
:meth:`get_objects_apk`, :meth:`get_objects_dex` or :meth:`get_classes`.

example::

s = Session()
digest = s.add("some.apk")

print("SHA256 of the file: {}".format(digest))

a, d, dx = s.get_objects_apk("some.apk", digest)
print(a.get_package())

# Reset the Session for a fresh set of files
s.reset()

digest2 = s.add("classes.dex")
print("SHA256 of the file: {}".format(digest2))
for h, d, dx in s.get_objects_dex():
print("SHA256 of the DEX file: {}".format(h))
A Session is able to store in a database, basic information about APK, DEX or ODEX files.
Additionally, it offers the possibility to store actions done when using the 'pentest' module.

NOTE: an attempt to move from pickling to dataset was started here:
https://github.com/androguard/androguard/commit/4dd0dc8c4b55605af863925faf16e8eb35f13e45
but is NOT finished!

>>> Should we go back to pickling or proceed further with the dataset ?<<<
"""

def __init__(self, export_ipython=False):
"""
Create a new Session object
Expand All @@ -62,7 +44,6 @@ def __init__(self, export_ipython=False):
self.table_session.insert(dict(id=self.session_id))
logger.info("Creating new session [{}]".format(self.session_id))


def save(self, filename=None):
"""
Save the current session, see also :func:`~androguard.session.Save`.
Expand Down Expand Up @@ -118,11 +99,13 @@ def show(self):
print("\t{}: {}".format(d, a))

def insert_event(self, call, callee, params, ret):
self.table_pentest.insert(dict(session_id=str(self.session_id), call=call, callee=callee, params=params, ret=ret))
self.table_pentest.insert(
dict(session_id=str(self.session_id), call=call, callee=callee, params=params, ret=ret))

def insert_system_event(self, call, callee, information, params):
self.table_system.insert(dict(session_id=str(self.session_id), call=call, callee=callee, information=information, params=params))

self.table_system.insert(
dict(session_id=str(self.session_id), call=call, callee=callee, information=information, params=params))

def addAPK(self, filename, data):
"""
Add an APK file to the Session and run analysis on it.
Expand All @@ -134,8 +117,8 @@ def addAPK(self, filename, data):
digest = hashlib.sha256(data).hexdigest()

logger.info("add APK {}:{}".format(filename, digest))
self.table_information.insert(dict(session_id=str(self.session_id), filename=filename, digest=digest, type="APK"))

self.table_information.insert(
dict(session_id=str(self.session_id), filename=filename, digest=digest, type="APK"))

newapk = apk.APK(data, True)
self.analyzed_apk[digest] = [newapk]
Expand Down Expand Up @@ -169,7 +152,8 @@ def addDEX(self, filename, data, dx=None, postpone_xref=False):
digest = hashlib.sha256(data).hexdigest()
logger.info("add DEX:{}".format(digest))

self.table_information.insert(dict(session_id=str(self.session_id), filename=filename, digest=digest, type="DEX"))
self.table_information.insert(
dict(session_id=str(self.session_id), filename=filename, digest=digest, type="DEX"))

logger.debug("Parsing format ...")
d = dex.DEX(data)
Expand Down Expand Up @@ -207,7 +191,8 @@ def addODEX(self, filename, data, dx=None):
digest = hashlib.sha256(data).hexdigest()
logger.info("add ODEX:%s" % digest)

self.table_information.insert(dict(session_id=str(self.session_id), filename=filename, digest=digest, type="ODEX"))
self.table_information.insert(
dict(session_id=str(self.session_id), filename=filename, digest=digest, type="ODEX"))

d = dex.ODEX(data)
logger.debug("added ODEX:%s" % digest)
Expand Down Expand Up @@ -415,4 +400,3 @@ def get_objects_dex(self):
# TODO: there is no variant like get_objects_apk
for digest, d in self.analyzed_dex.items():
yield digest, d, self.analyzed_vms[digest]