In [None]:
import os
import androguard
import androguard.misc
import androguard.util
import ppdeep

from pathlib import Path

In [None]:
PROJECT_ROOT = Path().cwd().parent.parent
PATH_TO_APK = PROJECT_ROOT / "apks" / "0a0a78000e418ea28fa02e8c162c43396db6141ef8fe876db4027fef04bed663.apk"

apk_fuzzy_hash = ppdeep.hash_from_file(PATH_TO_APK)


hash_1 = apk_fuzzy_hash.split(":")[1]

print(f"APK Fuzzy Hash: {apk_fuzzy_hash}, Hash: {hash_1}")

In [None]:
# Analyze the APK file

androguard.util.set_log("CRITICAL")

a, d, dx = androguard.misc.AnalyzeAPK(str(PATH_TO_APK))

# Get size of the APK file
apk_size = os.path.getsize(PATH_TO_APK)

apk_fuzzy_hash = ppdeep.hash_from_file(PATH_TO_APK)

# Get permissions, activities, services, and receivers
activities = a.get_activities()
services = a.get_services()
receivers = a.get_receivers()
permissions = a.get_permissions()

display(activities)
display(services)
display(receivers)
display(permissions)

In [None]:
# Get the number of methods and classes
def extract_api_calls(dx):
    """Extract unique API calls from the APK"""
    api_calls = set()

    # Iterate through all classes
    for cls in dx.get_classes():
        # Iterate through all methods in the class
        for method in cls.get_methods():
            # Get external method calls
            for _, call, _ in method.get_xref_to():
                if call.is_external():
                    # Format: package_name.class_name.method_name
                    api_call = f"{call.get_class_name()[1:-1]}.{call.name}"
                    api_calls.add(api_call)

    return sorted(list(api_calls))


api_calls = extract_api_calls(dx)
print(f"Total number of unique API calls: {len(api_calls)}")
print("Sample API calls:")
for call in api_calls:
    print(f"  - {call}")

In [None]:
# Get all opcodes from bytecode using dx object
def extract_opcodes(dx):
    # Opcode list: https://source.android.com/docs/core/runtime/dalvik-bytecode#instructions
    opcodes_results = {
        "opcodes": [None] * 768,  # Opcode values (0-255 + extended opcodes)
        "mnemonics": [None] * 768,  # Mnemonic names
        "counts": [0] * 768,  # Frequency counts
    }

    class_count = 0
    method_count = 0
    instruction_count = 0
    error_count = 0

    # Iterate through all classes
    for cls in dx.get_classes():
        class_count += 1
        # Iterate through all methods in the class
        for method in cls.get_methods():
            method_count += 1
            if method.is_external():
                continue

            try:
                # Get method implementation
                m = method.get_method()
                if m.get_code():
                    # Extract opcodes from instructions
                    for instruction in m.get_code().get_bc().get_instructions():
                        instruction_count += 1
                        opcode = instruction.get_op_value()
                        mnemonic = instruction.get_name()

                        # Store in the arrays at the correct position
                        if 0 <= opcode < 256:  # Ensure opcode is in valid range
                            opcodes_results["opcodes"][opcode] = opcode
                            opcodes_results["mnemonics"][opcode] = mnemonic
                            opcodes_results["counts"][opcode] += 1

            except Exception as e:
                error_count += 1
                if error_count < 5:  # Only print first few errors
                    print(f"Error in method {method.name}: {str(e)}")

    print(
        f"Processed {class_count} classes, {method_count} methods and {instruction_count} instructions"
    )
    print(f"Encountered {error_count} errors")

    # Calculate the total number of opcodes used
    opcodes_used = sum(1 for opcode in opcodes_results["opcodes"] if opcode is not None)
    print(f"Number of different opcodes used: {opcodes_used} out of 256 possible")

    return opcodes_results


# Use the refactored function with dx
opcodes_results = extract_opcodes(dx)

# Print some statistics from the results
used_opcodes = [i for i, op in enumerate(opcodes_results["opcodes"]) if op is not None]
total_instructions = sum(opcodes_results["counts"])

print(f"\nOpcode usage information:")
print(f"Opcodes used: {len(used_opcodes)} out of 256")
print(f"Total instructions: {total_instructions}")

print("\nTop 10 most frequent opcodes:")
# Create tuples of (opcode, mnemonic, count) for non-null opcodes
opcode_info = [
    (i, opcodes_results["mnemonics"][i], opcodes_results["counts"][i])
    for i in range(256)
    if opcodes_results["opcodes"][i] is not None
]
# Sort by count in descending order
sorted_opcodes = sorted(opcode_info, key=lambda x: x[2], reverse=True)

for opcode, mnemonic, count in sorted_opcodes[:10]:
    percentage = (count / total_instructions) * 100
    print(f"  - Opcode {opcode} ({mnemonic}): {count} ({percentage:.2f}%)")

In [None]:
import os
import pandas as pd
import numpy as np
import ppdeep
import androguard.misc
import androguard.util
from tqdm import tqdm
import traceback
import json
import time
import multiprocessing
import gc
import logging


def extract_api_calls(dx):
    """Extract unique API calls from the APK"""
    api_calls = set()

    # Iterate through all classes
    for cls in dx.get_classes():
        # Iterate through all methods in the class
        for method in cls.get_methods():
            # Get external method calls
            for _, call, _ in method.get_xref_to():
                if call.is_external():
                    # Format: package_name.class_name.method_name
                    api_call = f"{call.get_class_name()[1:-1]}.{call.name}"
                    api_calls.add(api_call)

    return sorted(list(api_calls))


def extract_opcodes(dx):
    """Extract opcode statistics from the APK"""
    opcodes_results = {
        "opcodes": [None] * 768,  # Opcode values (0-255 + extended opcodes)
        "mnemonics": [None] * 768,  # Mnemonic names
        "counts": [0] * 768,  # Frequency counts
    }

    # Iterate through all classes
    for cls in dx.get_classes():
        # Iterate through all methods in the class
        for method in cls.get_methods():
            if method.is_external():
                continue

            try:
                # Get method implementation
                m = method.get_method()
                if m.get_code():
                    # Extract opcodes from instructions
                    for instruction in m.get_code().get_bc().get_instructions():
                        opcode = instruction.get_op_value()
                        mnemonic = instruction.get_name()

                        # Store in the arrays at the correct position
                        if 0 <= opcode < 256:  # Ensure opcode is in valid range
                            opcodes_results["opcodes"][opcode] = opcode
                            opcodes_results["mnemonics"][opcode] = mnemonic
                            opcodes_results["counts"][opcode] += 1
            except Exception as e:
                print(f"Error in method {method.name}: {str(e)}")

    return opcodes_results


def analyze_apk_wrapper(args):
    """Wrapper function for multiprocessing compatibility"""
    apk_path, is_malware = args
    try:
        # Get sha256 from filename
        apk_filename = os.path.basename(apk_path)

        # Check if there is a previous handler
        try:
            androguard.util.set_log("CRITICAL")
        except Exception as e:
            ...

        # Load and analyze the APK
        a, d, dx = androguard.misc.AnalyzeAPK(apk_path)

        # Get basic file information
        apk_size = os.path.getsize(apk_path)

        # Get fuzzy hash
        apk_fuzzy_hash = ppdeep.hash_from_file(apk_path)
        fuzzy_hash_1 = (
            apk_fuzzy_hash.split(":")[1] if ":" in apk_fuzzy_hash else apk_fuzzy_hash
        )

        # Get app components
        activities = a.get_activities()
        services = a.get_services()
        receivers = a.get_receivers()
        permissions = a.get_permissions()

        # Extract API calls
        api_calls = extract_api_calls(dx)

        # Extract opcodes
        opcodes_results = extract_opcodes(dx)

        # Build feature dictionary with both counts and full lists
        features = {
            "file_size": apk_size,
            "fuzzy_hash": fuzzy_hash_1,
            "activities_list": activities,
            "services_list": services,
            "receivers_list": receivers,
            "permissions_list": permissions,
            "api_calls_list": api_calls,
            "opcode_counts": opcodes_results["counts"],
            "is_malware": 1 if is_malware else 0,
        }

        # Clean up to help with memory
        del a, d, dx
        gc.collect()

        return apk_filename, features, None
    except Exception as e:
        error_msg = f"Error analyzing {apk_path}: {str(e)}"
        return None, None, error_msg


def analyze_apks_mp(benign_dir, malware_dir, max_workers=None, chunk_size=10):
    """
    Analyze APK files using multiprocessing instead of concurrent.futures

    Parameters:
    - benign_dir: Directory containing benign APKs
    - malware_dir: Directory containing malware APKs
    - max_workers: Maximum number of parallel workers
    - chunk_size: Number of APKs to process in each batch

    Returns:
    - Pandas DataFrame with extracted features
    """
    # Get list of APK files
    benign_apks = [
        os.path.join(benign_dir, f)
        for f in os.listdir(benign_dir)
        if os.path.isfile(os.path.join(benign_dir, f))
    ]
    malware_apks = [
        os.path.join(malware_dir, f)
        for f in os.listdir(malware_dir)
        if os.path.isfile(os.path.join(malware_dir, f))
    ]
    benign_apks = benign_apks[9120:]

    all_apks = [(path, False) for path in benign_apks] + [
        (path, True) for path in malware_apks
    ]
    results = {}
    failed_apks = []

    # If max_workers is not specified, use half of available cores
    if max_workers is None:
        max_workers = max(1, multiprocessing.cpu_count() // 2)

    print(
        f"Processing {len(all_apks)} APKs ({len(benign_apks)} benign, {len(malware_apks)} malware)"
    )
    print(f"Using {max_workers} worker processes")

    # Split APKs into chunks to avoid memory issues
    chunks = [all_apks[i : i + chunk_size] for i in range(0, len(all_apks), chunk_size)]

    total_processed = 0
    with tqdm(total=len(all_apks), desc="Analyzing APKs") as pbar:
        for chunk_idx, chunk in enumerate(chunks):
            # Process this chunk of APKs
            try:
                # Create a pool for this chunk only
                with multiprocessing.Pool(processes=max_workers) as pool:
                    # Process in parallel and collect results
                    chunk_results = pool.map(analyze_apk_wrapper, chunk)

                    # Update results and progress
                    for apk_filename, features, error in chunk_results:
                        if features:
                            results[apk_filename] = features
                        else:
                            failed_apks.append((apk_filename, error))
                            print(f"Error with {apk_filename}: {error}")

                    total_processed += len(chunk)
                    pbar.update(len(chunk))

                # Explicit cleanup after each chunk
                pool.close()
                pool.join()
                del pool
                gc.collect()

                # Save intermediate results periodically
                if (chunk_idx + 1) % 20 == 0 or chunk_idx == len(chunks) - 1:
                    if results:
                        temp_df = pd.DataFrame.from_dict(results, orient="index")
                        temp_df.to_pickle(f"apk_analysis_temp_{total_processed}.pkl")
                        print(
                            f"\nSaved intermediate results ({len(results)} APKs processed so far)"
                        )

            except Exception as e:
                print(f"Error processing chunk {chunk_idx + 1}/{len(chunks)}: {str(e)}")
                traceback.print_exc()

                # Mark all APKs in this chunk as failed
                for apk_path, is_malware in chunk:
                    apk_filename = os.path.basename(apk_path)
                    failed_apks.append(
                        (apk_filename, f"Chunk processing error: {str(e)}")
                    )
                pbar.update(len(chunk))

    # Log failed APKs
    if failed_apks:
        with open("failed_apks.txt", "w") as f:
            for apk, error in failed_apks:
                f.write(f"{apk}: {error}\n")
        print(
            f"\n{len(failed_apks)} APKs failed to process. See failed_apks.txt for details."
        )

    # Create DataFrame from results
    df = pd.DataFrame.from_dict(results, orient="index")

    # Convert list columns to JSON strings for CSV storage
    df_csv = df.copy()
    list_columns = [
        "activities_list",
        "services_list",
        "receivers_list",
        "permissions_list",
        "api_calls_list",
    ]

    for col in list_columns:
        if col in df_csv.columns:
            df_csv[col] = df_csv[col].apply(
                lambda x: json.dumps(x) if isinstance(x, (list, dict)) else x
            )

    # Fill NaN values
    df = df.fillna(0)
    df_csv = df_csv.fillna(0)

    return df, df_csv, failed_apks


# Example usage
if __name__ == "__main__":
    # Set paths to APK directories
    BENIGN_DIR = PROJECT_ROOT / "apks" / "20k" / "benign_apks"
    MALWARE_DIR = PROJECT_ROOT / "apks" / "20k" / "malware_apks"
    # Set log level to suppress warnings

    # Analyze APKs with multiprocessing
    df, df_csv, failed_apks = analyze_apks_mp(
        BENIGN_DIR,
        MALWARE_DIR,
        max_workers=16,  # Adjust based on your system
        chunk_size=16,  # Process in smaller batches
    )

    # Save the dataframes
    df.to_pickle("apk_analysis_results.pkl")  # Full data including lists
    df_csv.to_csv("apk_analysis_results.csv")  # CSV with JSON strings for lists

    # Print statistics
    print(
        f"\nAnalysis complete. Generated dataset with {df.shape[0]} samples and {df.shape[1]} features."
    )
    print(f"Benign samples: {sum(df['is_malware'] == 0)}")
    print(f"Malware samples: {sum(df['is_malware'] == 1)}")
    print(f"Failed samples: {len(failed_apks)}")

    # Print sample of the dataframe
    print("\nSample of the dataframe:")
    print(df.head())
