In [None]:
import os
import zipfile
import json
import pickle
import hashlib
from fingerprint_parser.shell_attributes_parser import ShellAttributeParser
from fingerprint_parser.sdk_attributes_parser import SdkAttributeParser
from fingerprint_parser.cp_attributes_parser import CpAttributeParser

INPUT_DATA_DIR = "YOUR_INPUT_DATA_DIR"
PREPARE_DIR = "YOUR_PREPARED_DATA_OUTPUT_DIR"
STRUCTURE_DIR = "YOUR_PREPARED_DATA_STRUCTURE_DIR"

In [None]:
def save_json_file(data, file_path):
    """Helper function to save JSON data to a file."""
    with open(file_path, 'w') as file:
        json.dump(data, file, indent=4)
          
# Hash the sdk structure
def hash_sdk_structure(l):
    return hashlib.sha256(pickle.dumps(l)).hexdigest()

def check_device_virtual(data):
    """
    Check if the device is virtual (emulator or virtual machine) based on build properties.
    """
    is_device_virtual = False

    # Convert properties to lowercase for case-insensitive checks
    manufacturer = str(data.get("android.os.Build.MANUFACTURER", "")).lower()
    model = str(data.get("android.os.Build.MODEL", "")).lower()
    hardware = str(data.get("android.os.Build.HARDWARE", "")).lower()
    fingerprint = str(data.get("android.os.Build.FINGERPRINT", "")).lower()
    product = str(data.get("android.os.Build.PRODUCT", "")).lower()
    board = str(data.get("android.os.Build.BOARD", "")).lower()
    brand = str(data.get("android.os.Build.BRAND", "")).lower()
    device = str(data.get("android.os.Build.DEVICE", "")).lower()
    kernel = str(data.get("kernel_information", "")).lower()
    system_logs = str(data.get("system_logs", "")).lower()
    
    virtual_flags = ["vbox", "virtual", "qemu", "vmware", "hypervisor", "kvm", "xen", "bochs", "nox"]
    # Check conditions for emulator or virtual machine
    is_emulator = (
        "genymotion" in manufacturer
        or "google_sdk" in model
        or "droid4x" in model
        or "emulator" in model
        or "android sdk built for x86" in model
        or hardware == "goldfish"
        or hardware == "vbox86"
        or "nox" in hardware
        or fingerprint.startswith("generic")
        or product in ["sdk", "google_sdk", "sdk_x86", "vbox86p"]
        or "nox" in product
        or "nox" in board
        or (brand.startswith("generic") and device.startswith("generic"))
        or "x86" in kernel 
        or "amd64" in kernel
        or any(flag in system_logs for flag in virtual_flags)
    )

    is_device_virtual = is_emulator

    return is_device_virtual

In [None]:
def preapre_fingerprint(data, filename):
    cleaned_data = {}
    is_incomplete = True
    suffix = ".ANDROID_ID"
    uuid,timestamp = filename.split("_")
    timestamp = int(timestamp.split(".")[0])
    sdk_structure = set()
    for item in data:
        # Check the first (and only) key-value pair in the dictionary
        for key, value in item.items():
            # Mark as complete if a key ends with the required suffix
            if key.endswith(suffix):
                is_incomplete = False
                
            # Parse shell attributes
            if ShellAttributeParser.isShellAttribute(key):
                value = ShellAttributeParser.parse(key, value, timestamp)
                # add it to cleaned data
                if value : 
                    if key == "ringtones_list_ext":
                        key = "ringtones_list"
                    # Flatten if is a dict
                    if isinstance(value, dict):
                        for k,v in value.items():
                            cleaned_data[f"{key}.{k}"] = v
                    elif isinstance(value, list):
                        if len(value)> 0 and len(value) == 1:
                            cleaned_data[key] = value[0]
                        elif len(value)>0 : 
                            cleaned_data[key] = value
                    else: 
                        cleaned_data[key] = value
            elif SdkAttributeParser.isSdkAttribute(key):
                # For SDK attributes, keep the nbSdk and SDK Structure
                sdk_structure.add(key)
                # Parse using SdkAttributeParser
                value = SdkAttributeParser.parse(key, value)
                # add it to cleaned data
                if value :
                    # Flatten if is a dict
                    if isinstance(value, dict):
                        for k,v in value.items():
                            cleaned_data[f"{key}.{k}"] = v
                    elif isinstance(value, list):
                        if len(value)> 0 and len(value) == 1:
                            cleaned_data[key] = value[0]
                        elif len(value)>0 : 
                            cleaned_data[key] = value
                    else: 
                        cleaned_data[key] = value
            elif CpAttributeParser.isCpAttribute(key):
                value = CpAttributeParser.parse(key,value)
                if value: 
                    # Flatten if is a dict
                    if isinstance(value, dict):
                        for k,v in value.items():
                            cleaned_data[f"{key}.{k}"] = v
                    elif isinstance(value, list):
                        if len(value)> 0 and len(value) == 1:
                            cleaned_data[key] = value[0]
                        elif len(value)>0 : 
                            cleaned_data[key] = value
                    else: 
                        cleaned_data[key] = value
            else: 
                cleaned_data[key] = value
            break # contains only one key-value pair in each dictionary
    # Return empty list if no complete fingerprints are found
    if is_incomplete : 
        return {}
    
    sdk_structure = sorted(list(sdk_structure))
    cleaned_data["structureSdk"] = hash_sdk_structure(sdk_structure)
    cleaned_data["nbSdk"] = len(sdk_structure)
    cleaned_data["timestamp"] = timestamp
    cleaned_data["uuid"] = uuid
    cleaned_data["isDeviceVirtual"] = check_device_virtual(cleaned_data)
    if "isDeviceRooted" not in cleaned_data:
        cleaned_data["isDeviceRooted"] = "unknown"
    if "isDeveloperModeEnabled" not in cleaned_data:
        cleaned_data["isDeveloperModeEnabled"] = -1 # means unknown
    
    # save the sdk structure 
    structure_file_path = os.path.join(STRUCTURE_DIR,filename)     
    save_json_file(sdk_structure,structure_file_path)
    
    return cleaned_data

In [None]:
def extract_and_clean_archives(directory):
    """
    Extract archives found in the specified directory, clean the JSON data,
    and save the cleaned data to new files if they haven't been processed already.
    """
    os.makedirs(PREPARE_DIR, exist_ok=True)
    os.makedirs(STRUCTURE_DIR, exist_ok=True)
    
    min_cleaned_data_size = float('inf')
    min_data_size = float('inf')
    max_cleaned_data_size = 0
    max_data_size = 0
    
    # Iterate through the files in the directory
    for filename in os.listdir(directory):
        file_path = os.path.join(directory, filename)

        # Check if the file is a zip archive
        if filename.endswith('.zip'):
            cleaned_file_name = f"{filename.split('.')[0]}.json"
            cleaned_file_path = os.path.join(PREPARE_DIR, cleaned_file_name)
            
            # Skip if the cleaned file already exists
            if os.path.exists(cleaned_file_path):
                print(f"Skipping '{filename}' as it is already processed.")
                continue
            
            # Extract the archive
            with zipfile.ZipFile(file_path, 'r') as archive:
                # Check if data.json is in the archive
                if 'data.json' in archive.namelist():
                    archive.extract('data.json', directory)

                    # Load and clean the JSON data
                    json_file_path = os.path.join(directory, 'data.json')
                    with open(json_file_path, 'r') as json_file:
                        data = json.load(json_file)
                        cleaned_data = preapre_fingerprint(data, cleaned_file_name)
                        if cleaned_data:
                            min_data_size = min(min_data_size, len(data))
                            max_data_size = max(max_data_size, len(data))
                            min_cleaned_data_size = min(min_cleaned_data_size, len(cleaned_data))
                            max_cleaned_data_size = max(max_cleaned_data_size, len(cleaned_data))

                            # Save the cleaned JSON data with a new name
                            with open(cleaned_file_path, 'w') as cleaned_file:
                                json.dump(cleaned_data, cleaned_file, indent=4)
                            
                            print(f"Processed '{filename}' and saved cleaned data")
                    
                    # Remove the extracted data.json file
                    os.remove(json_file_path)
    
    print("max data size :", max_data_size) 
    print("min data size :", min_data_size)
    print("max cleaned data size :", max_cleaned_data_size) 
    print("min cleaned data size :", min_cleaned_data_size)
                    
def process_all_fingerprint_folders(dumps_directory):
    """
    Traverse through the DUMPS directory and process all 'fingerprints/' subdirectories.
    """
    for root, dirs, files in os.walk(dumps_directory):
        # Check if the current directory ends with 'fingerprints/'
        if root.endswith('fingerprints'):
            print(f"Processing directory: {root}")
            extract_and_clean_archives(root)


In [None]:
process_all_fingerprint_folders(INPUT_DATA_DIR)