In [None]:
import json
import os
from pathlib import Path
from dataclasses import dataclass, field
from typing import List, Dict, Any, Set, Tuple, Callable
from collections import defaultdict

In [None]:
def load_playstore_info(file_path: str = "./android_metadata.json") -> Dict[str, int]:
    """
    Load Play Store metadata from a JSON file and extract the minimum installation count for each app.
    
    Args:
        file_path (str): Path to the JSON file containing app metadata.
    Returns:
        Dict[str, int]: Mapping from app name to minimum installation count.
    """
    result = {}
    with open(file_path, 'r') as file:
        data = json.load(file)
        for k, v in data.items():
            result[k] = v.get('minInstal', 0)
    return result

playstore_infos = load_playstore_info()

In [None]:
@dataclass(frozen=True)
class Sink:
    """Represents a sink in the data flow analysis (e.g., a sensitive operation)."""
    unit: str  # The code unit (e.g., statement or block)
    method: str  # The method containing the sink
    values: List[str] = field(default_factory=list, compare=False, hash=False)  # Values compared at the sink
    src_infos: List[str] = field(default_factory=list, compare=False, hash=False)  # Source info tags for the sink

@dataclass(frozen=True)
class Source:
    """Represents a source in the data flow analysis (e.g., user input or constant)."""
    unit: str  # The code unit (e.g., statement or block)
    method: str  # The method containing the source
    unitIndex: int  # Index of the unit in the method

@dataclass
class App:
    """Represents an analyzed app, including its sources, sinks, and classification logic."""
    name: str  # App name (from filename)
    installations: int = -1  # Play Store installation count (default -1 if unknown)
    sourceToSinks: Dict[Source, Set[Sink]] = field(default_factory=dict)  # Mapping from source to set of sinks
    sinkToSources: Dict[Sink, Set[Source]] = field(default_factory=dict)  # Mapping from sink to set of sources

    @staticmethod
    def get_values(sinks) -> Set[str]:
        """Extract all values from a list of sinks as a set of unique strings."""
        values = set()
        for sink in sinks:
            values.update(sink.values)
        return values

    def has_non_empty_comparisons(self) -> bool:
        """Check if the app has any sinks with non-empty comparisons (excluding empty string checks)."""
        for sink in self.sinkToSources.keys():
            if "(\"\")" in sink.unit:
                continue
            else:
                return True
        return False

    def get_all_values(self) -> Set[str]:
        """Get all unique values from all sinks in the app."""
        all_values = set()
        for sinks in self.sourceToSinks.values():
            for sink in sinks:
                all_values.update(sink.values)
        return all_values

    @staticmethod
    def parse_values(values: List[Dict[str, List[str]]]) -> Set[str]:
        """Convert a list of dictionaries to a set of non-empty, non-null strings."""
        parsed_values = set()
        for value in values:
            for key, val_list in value.items():
                for val in val_list:
                    if "" == val or "{}" == val or " " == val or "null" == val:
                        continue
                    parsed_values.add(val)
        return parsed_values

    @staticmethod
    def load_file(file_path: str) -> 'App':
        """Load an App object from a .njson file containing source/sink analysis results."""
        path = Path(file_path)
        app_name = path.stem
        app = App(name=app_name)

        try:
            if not file_path.endswith(".njson"):
                return None
            with path.open("r", encoding="utf-8") as f:
                for line in f.readlines():
                    line = line.strip()
                    if not line:
                        continue
                    try:
                        obj = json.loads(line)
                    except json.JSONDecodeError:
                        # skip malformed JSON
                        continue

                    # single source per line
                    src_data = obj.get("source")
                    if not src_data:
                        continue
                    src = Source(**src_data)

                    # zero or more sinks
                    for sink_data in obj.get("sinks", []):
                        unit = sink_data.get("unit")
                        if unit is None:
                            unit = sink_data.get("Unit")

                        method = sink_data.get("method")
                        if method is None:
                            method = sink_data.get("SootMethod")
                        src_infos = sink_data.get("src", [])
                        values = App.parse_values(sink_data.get("values", {}))
                        if sink_data.get("values") is None:
                            values = App.parse_values(sink_data.get("ValueSet", {}))
                        snk = Sink(
                            unit=unit,
                            method=method,
                            values=list(values),
                            src_infos=src_infos
                        )

                        # map source -> sinks
                        if src not in app.sourceToSinks:
                            app.sourceToSinks[src] = set()
                        app.sourceToSinks[src].add(snk)

                        # map sink -> sources
                        if snk not in app.sinkToSources:
                            app.sinkToSources[snk] = set()
                        app.sinkToSources[snk].add(src)

        except FileNotFoundError:
            raise FileNotFoundError(f"Could not find file: {file_path}")

        return app

    def get_dispatch_type(self, sinks: Set) -> str:
        """
        Determine the code-dispatch cardinality based on the number of conditions and unique actions.
        - one-to-two: a single condition (<=> one sink) regardless of actions count
        - many-to-two: multiple conditions but only one action (all sinks in same method)
        - many-to-many: multiple conditions and multiple actions (sinks span multiple methods)
        """
        num_conditions = len(sinks)
        # Treat each unique sink.method as a distinct action
        num_actions = len({sink.method for sink in sinks})

        if num_conditions <= 1:
            return 'one-to-two'
        if num_conditions > 1 and num_actions == 1:
            return 'many-to-two'
        if num_conditions > 1 and num_actions > 1:
            return 'many-to-many'
        
        return 'unknown'

    def classify_flow(self, sinks: Set) -> str:
        """
        Classify a validation flow into one of the four cases:
        - Secret Access Key
        - Master Password
        - Blacklist
        - Secret Command

        Uses the rules:
        Secret Access Key:
            (i)  one-to-two dispatch
            (ii) compared content all hardcoded

        Master Password:
            (i)  many-to-two dispatch
            (ii) compared content from multiple sources
            (iii) at least one hardcoded

        Blacklist:
            (i)  many-to-two dispatch
            (ii) all compared content from the same source

        Secret Command:
            (i)  many-to-many dispatch
            (ii) more than one hardcoded compared content
        """
        dispatch = self.get_dispatch_type(sinks)
        # Flatten all source-info tags from sinks
        all_srcs = {tag for sink in sinks for tag in sink.src_infos}
        # Count how many comparisons use a hardcoded string
        hardcoded_count = sum('HARDCODED_STR' in sink.src_infos for sink in sinks)

        if dispatch == 'one-to-two' and all_srcs == {'HARDCODED_STR'}:
            return 'Secret Access Key'

        if dispatch == 'many-to-two' or (dispatch == 'one-to-two' and "List" in all_srcs): # Also consider List as many to two -> since it is looped through
            if "List" in all_srcs:
                all_srcs.discard('List')
            if len(all_srcs) > 1 and hardcoded_count >= 1:
                return 'Master Password'
            if len(all_srcs) == 1:
                return 'Blacklist'

        if dispatch == 'many-to-many' and hardcoded_count > 1:
            return 'Secret Command'

        return 'Unknown'
    
    def get_secret_access_key(self) -> List[Sink]:
        """Return all sinks classified as Secret Access Key."""
        result = set()
        for source, sinks in self.sourceToSinks.items():
            classify = self.classify_flow(sinks)
            if classify == "Secret Access Key" and len(App.get_values(sinks)) > 0:
                for s in sinks:
                    result.add(s) 
        return list(result)

    def get_master_password(self) -> List[Sink]:
        """Return all sinks classified as Master Password."""
        result = set()
        for source, sinks in self.sourceToSinks.items():
            classify = self.classify_flow(sinks)
            if classify == "Master Password" and len(App.get_values(sinks)) > 0:
                for s in sinks:
                    result.add(s) 
        return list(result)

    def get_blacklist(self) -> List[Sink]:
        """Return all sinks classified as Blacklist."""
        result = set()
        for source, sinks in self.sourceToSinks.items():
            classify = self.classify_flow(sinks)
            if classify == "Blacklist" and len(App.get_values(sinks)) > 0:
                for s in sinks:
                    result.add(s) 
        return list(result)

    def get_secret_command(self) -> List[Sink]:
        """Return all sinks classified as Secret Command."""
        result = set()
        for source, sinks in self.sourceToSinks.items():
            classify = self.classify_flow(sinks)
            if classify == "Secret Command" and len(App.get_values(sinks)) > 0:

                for s in sinks:
                    result.add(s) 
        return list(result)

    def classify_app(self):
        """
        Iterate through each source in the App and classify its sinks.
        Returns a mapping {classification: set of sinks}.
        """
        classifications: Dict = {}
        for source, sinks in self.sourceToSinks.items():
            classify = self.classify_flow(sinks)
            if classify != "Unknown" and len(App.get_values(sinks)) > 0:
                if classify not in classifications:
                    classifications[classify] = set()
                for s in sinks:
                    classifications[classify].add(s) 
        return classifications

In [None]:
def top_installed_apps_by_detector(
    apps: List[App],
    detector: Callable[[App], List[Any]],
    n: int = 1
) -> List[Tuple[App, int]]:
    """
    Return up to the top-n apps (by .installations desc) for which
    detector(app) returns a non-empty list.    
    Returns a list of (app_name, installations, finding_values) tuples.
    
    Args:
        apps (List[App]): List of App instances to check.
        detector (Callable): Function or bound method that takes an App and returns a List of findings.
        n (int): How many top apps to return.
    Returns:
        List[Tuple[str, int, Set[str]]]: List of (app_name, installations, finding_values) tuples.
    """
    # filter to apps where detector finds something, and pair with count
    candidates: List[Tuple[App, Set[str]]] = [
        (app, App.get_values(detector(app)))
        for app in apps
        if len(App.get_values(detector(app))) > 0
    ]
    # sort by installations descending
    candidates.sort(key=lambda pair: pair[0].installations, reverse=True)
    # return top-n
    result = []
    for c,v in candidates[:n]:
        result.append((c.name, c.installations, v))
    return result

In [None]:
def load_dataset(path):
    """
    Load all App objects from a directory of .njson files.
    Sets the installation count from playstore_infos if available.
    Ignores files that cannot be loaded as App objects.
    
    Args:
        path (str): Directory containing .njson files for each app.
    Returns:
        List[App]: List of loaded App objects.
    """
    results = []
    for file in os.listdir(path):
        app_path = os.path.join(path, file)
        try:
            app = App.load_file(app_path)
            if app.name in playstore_infos:
                app.installations = int(playstore_infos[app.name])
            results.append(app)
        except:
            pass
    return results

In [None]:
def compute_detection_stats(apps: List[App], total = None
    ) -> Dict[str, Tuple[int, float]]:
    """
    Returns a dict mapping each detection name to a tuple:
      (number_of_apps_with_finding, percentage_of_total_apps)
    
    Args:
        apps (List[App]): List of App objects to analyze.
        total (int, optional): Total number of apps to use for percentage calculation. Defaults to len(apps).
    Returns:
        Dict[str, Tuple[int, str]]: Mapping from detection name to (count, percent string).
    """
    if total is None:
        total = len(apps)
    print(f"Total apps: {total}")
    stats = {}
    detectors = {
        "access_keys":       lambda a: bool(len(App.get_values(a.get_secret_access_key()))> 0),
        "master_passwords":  lambda a: bool(len(App.get_values(a.get_master_password())))> 0,
        "blacklist_secrets": lambda a: bool(len(App.get_values(a.get_blacklist())))> 0,
        "secret_commands":   lambda a: bool(len(App.get_values(a.get_secret_command()))> 0),
    }
    for name, pred in detectors.items():
        count = sum(1 for app in apps if pred(app))
        pct   = (count / total * 100) if total else 0.0
        stats[name] = (count, f"{pct:.2f}%")
    return stats

In [None]:
def get_key(dict, key):
    """
    Return the actual key object from a dictionary that matches the given key by value.
    Useful if the key object is not a string or is a custom object.
    
    Args:
        dict (dict): Dictionary to search.
        key: Key to match by value.
    Returns:
        The actual key object if found, else None.
    """
    for k in dict.keys():
        if k == key:
            return k
    return None



In [None]:
def has_result(directory):
    """
    Count the number of unique result files in a directory.
    Considers .njson, _time.txt, and other files with a dot in the name as results.
    
    Args:
        directory (str): Directory to search for result files.
    Returns:
        int: Number of unique result identifiers found.
    """
    ids = set()
    for file in os.listdir(directory):
        if file.endswith(".njson"):
            ids.add(file.replace(".njson", ""))
        elif file.endswith("_time.txt"):
            ids.add(file.replace("_time.txt", ""))
        elif "." in file:
            ids.add(file)
    return len(ids)



In [None]:
def return_percentage(num, total):
    """
    Return a string with the number and its percentage of the total.
    
    Args:
        num (int): Numerator value.
        total (int): Denominator value.
    Returns:
        str: Formatted string as 'num (percent%)'.
    """
    if total == 0:
        return "0 (0%)"
    percentage = (num / total) * 100
    return f"{num} ({percentage:.2f}%)"

In [None]:
def get_non_empty_comparisons(apps: List[App]) -> int:
    """
    Count the number of apps with non-empty comparisons (excluding empty string checks).
    
    Args:
        apps (List[App]): List of App objects to check.
    Returns:
        int: Number of apps with non-empty comparisons.
    """
    count = 0
    for app in apps:
        if app.has_non_empty_comparisons():
            count += 1
    return count

def get_backdoor_secrets(apps: List[App]) -> int:
    """
    Count the number of apps with any backdoor secret (master password, access key, or secret command).
    
    Args:
        apps (List[App]): List of App objects to check.
    Returns:
        int: Number of apps with at least one backdoor secret.
    """
    count = 0
    for app in apps:
        if len(App.get_values(app.get_master_password())) > 0 or len(App.get_values(app.get_secret_access_key())) > 0 or len(App.get_values(app.get_secret_command())) >0:
            count += 1
    return count

def get_blacklists(apps: List[App]) -> int:
    """
    Count the total number of blacklist secrets across all apps.
    
    Args:
        apps (List[App]): List of App objects to check.
    Returns:
        int: Total number of blacklist secrets found.
    """
    result = 0
    for app in apps:
        if len(App.get_values(app.get_blacklist())) > 0:
            for s in app.get_blacklist():
                result += 1
    return result

In [None]:
dataset_23 = 10331
dataset_24 = 8702

In [None]:
num_finished_results23 = has_result("../inputscope_2023/")
num_finished_results24 = has_result("../inputscope_2024/")

In [None]:
print(return_percentage(num_finished_results23, dataset_23))
print(return_percentage(num_finished_results24, dataset_24))

In [None]:
inputscope_2023 = load_dataset("../inputscope_2023/")
inputscope_2024 = load_dataset("../inputscope_2024/")



In [None]:
print(return_percentage(len(inputscope_2024), dataset_23))
print(return_percentage(len(inputscope_2023), dataset_24))

In [None]:
non_empty_comparison_23 = get_non_empty_comparisons(inputscope_2023)
non_empty_comparison_24 = get_non_empty_comparisons(inputscope_2024)

In [None]:
print(return_percentage(non_empty_comparison_23, num_finished_results23))


In [None]:
print(return_percentage(non_empty_comparison_24, num_finished_results24))

In [None]:
print(return_percentage(get_backdoor_secrets(inputscope_2023), non_empty_comparison_23))


In [None]:
print(return_percentage(get_backdoor_secrets(inputscope_2024), non_empty_comparison_24))

In [None]:
print(return_percentage(get_blacklists(inputscope_2023), non_empty_comparison_23))
print(return_percentage(get_blacklists(inputscope_2024), non_empty_comparison_24))

In [None]:
compute_detection_stats(inputscope_2023, total=non_empty_comparison_23)

In [None]:
compute_detection_stats(inputscope_2024, total=non_empty_comparison_24)

In [None]:
def build_app_name_map(apps: List[App]) -> Dict[str, App]:
    """
    Build a mapping from app name to App instance for quick lookup.
    
    Args:
        apps (List[App]): List of App objects.
    Returns:
        Dict[str, App]: Mapping from app name to App instance.
    """
    return {app.name: app for app in apps}

inputscope_2023_map = build_app_name_map(inputscope_2023)
inputscope_2024_map = build_app_name_map(inputscope_2024)

In [None]:
compute_detection_stats(inputscope_2023)

In [None]:
compute_detection_stats(inputscope_2024)

In [None]:
top_installed_apps_by_detector(
    inputscope_2023,
    App.get_secret_access_key,
    n=20
)

In [None]:
top_installed_apps_by_detector(
    inputscope_2023,
    App.get_master_password,
    n=20
)

In [None]:
top_installed_apps_by_detector(
    inputscope_2023,
    App.get_blacklist,
    n=20
)

In [None]:
top_installed_apps_by_detector(
    inputscope_2023,
    App.get_secret_command,
    n=20
)

In [None]:
def one_to_one_comparison(results_23, results_24):
    """
    Compare two sets of app analysis results and return a mapping of app names to their findings.
    Tracks which secrets are found only in 2023, only in 2024, or in both, for each secret type.
    
    Args:
        results_23 (Dict[str, App]): Mapping of app name to App object for 2023 dataset.
        results_24 (Dict[str, App]): Mapping of app name to App object for 2024 dataset.
    Returns:
        Dict[str, int]: Mapping of comparison categories to counts.
    """
    results = {}
    for app_name, app in results_23.items():
        secret_access_keys = App.get_values(app.get_secret_access_key())
        master_passwords = App.get_values(app.get_master_password())
        blacklist_secrets = App.get_values(app.get_blacklist())
        secret_commands = App.get_values(app.get_secret_command())

        has_secrets = (
            len(secret_access_keys) > 0 or
            len(master_passwords) > 0 or
            len(blacklist_secrets) > 0 or
            len(secret_commands) > 0
        )

        app_24 = results_24.get(app_name)
        if app_24:
            secret_access_keys_24 = App.get_values(app_24.get_secret_access_key())
            master_passwords_24 = App.get_values(app_24.get_master_password())
            blacklist_secrets_24 = App.get_values(app_24.get_blacklist())
            secret_commands_24 = App.get_values(app_24.get_secret_command())
            has_secrets_24 = (
                len(secret_access_keys_24) > 0 or
                len(master_passwords_24) > 0 or
                len(blacklist_secrets_24) > 0 or
                len(secret_commands_24) > 0
            )
            if has_secrets and not has_secrets_24:
                results["only_23_with_24"] = results.get("only_23_with_24", 0) + 1
            elif not has_secrets and has_secrets_24:
                results["only_24_with_23"] = results.get("only_24_with_23", 0) + 1
            elif has_secrets and has_secrets_24:
                results["both"] = results.get("both", 0) + 1

            if len(secret_access_keys) > 0 and len(secret_access_keys_24) == 0:
                results["only_23_access_keys"] = results.get("only_23_access_keys", 0) + 1
            elif len(secret_access_keys) == 0 and len(secret_access_keys_24) > 0:
                results["only_24_access_keys"] = results.get("only_24_access_keys", 0) + 1
            elif len(secret_access_keys) > 0 and len(secret_access_keys_24) > 0:
                results["both_access_keys"] = results.get("both_access_keys", 0) + 1
            if len(master_passwords) > 0 and len(master_passwords_24) == 0:
                results["only_23_master_passwords"] = results.get("only_23_master_passwords", 0) + 1
            elif len(master_passwords) == 0 and len(master_passwords_24) > 0:   
                results["only_24_master_passwords"] = results.get("only_24_master_passwords", 0) + 1
            elif len(master_passwords) > 0 and len(master_passwords_24) > 0:
                results["both_master_passwords"] = results.get("both_master_passwords", 0) + 1
            if len(blacklist_secrets) > 0 and len(blacklist_secrets_24) == 0:
                results["only_23_blacklist_secrets"] = results.get("only_23_blacklist_secrets", 0) + 1
            elif len(blacklist_secrets) == 0 and len(blacklist_secrets_24) > 0:
                results["only_24_blacklist_secrets"] = results.get("only_24_blacklist_secrets", 0) + 1
            elif len(blacklist_secrets) > 0 and len(blacklist_secrets_24) > 0:
                results["both_blacklist_secrets"] = results.get("both_blacklist_secrets", 0) + 1
            if len(secret_commands) > 0 and len(secret_commands_24) == 0:
                results["only_23_secret_commands"] = results.get("only_23_secret_commands", 0) + 1
            elif len(secret_commands) == 0 and len(secret_commands_24) > 0:
                results["only_24_secret_commands"] = results.get("only_24_secret_commands", 0) + 1
            elif len(secret_commands) > 0 and len(secret_commands_24) > 0:
                results["both_secret_commands"] = results.get("both_secret_commands", 0) + 1

        else:
            if has_secrets:
                results["23"] = results.get("23", 0) + 1

            if len(secret_access_keys) > 0:
                results["23_access_keys"] = results.get("23_access_keys", 0) + 1
            if len(master_passwords) > 0:
                results["23_master_passwords"] = results.get("23_master_passwords", 0) + 1
            if len(blacklist_secrets) > 0:
                results["23_blacklist_secrets"] = results.get("23_blacklist_secrets", 0) + 1
            if len(secret_commands) > 0:
                results["23_secret_commands"] = results.get("23_secret_commands", 0) + 1

    for app_name, app in results_24.items():
        if app_name not in results_23:
            secret_access_keys = App.get_values(app.get_secret_access_key())
            master_passwords = App.get_values(app.get_master_password())
            blacklist_secrets = App.get_values(app.get_blacklist())
            secret_commands = App.get_values(app.get_secret_command())
            has_secrets = (
                len(secret_access_keys) > 0 or
                len(master_passwords) > 0 or
                len(blacklist_secrets) > 0 or
                len(secret_commands) > 0
            )
            if has_secrets:
                results["24"] = results.get("24", 0) + 1

            if len(secret_access_keys) > 0:
                results["24_access_keys"] = results.get("24_access_keys", 0) + 1
            if len(master_passwords) > 0:
                results["24_master_passwords"] = results.get("24_master_passwords", 0) + 1
            if len(blacklist_secrets) > 0:
                results["24_blacklist_secrets"] = results.get("24_blacklist_secrets", 0) + 1
            if len(secret_commands) > 0:
                results["24_secret_commands"] = results.get("24_secret_commands", 0) + 1
    
    return results



In [None]:
one_to_one_comparison(inputscope_2023_map, inputscope_2024_map)