## 🏷️ Phase 2 : Task-Driven Customization Phase

Label-Driven Filtering + Task-Driven Refinement

In [None]:
# Imports
from   dotenv   import load_dotenv
import pandas   as pd
import datetime
import json
import os

# Custom Imports
import sys
sys.path.append('../')
import AnalysisUtils
import LLMUtils 

##### Parameters

In [None]:
TMP_PATH = "../0_Data/TMP"

#### Initialization

In [None]:
print("⚡ START: {} ⚡".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
initTime = datetime.datetime.now()

In [None]:
# Create TMP Folder
if not os.path.exists(TMP_PATH):
	os.makedirs(TMP_PATH)
	print("--- 📁🆕 Folder created       : {}\n".format(TMP_PATH))
else:
	print("--- 📁✅ Folder already exists: {}\n".format(TMP_PATH))

In [None]:
# Load .env Info
load_dotenv()

#### 📥 1) Load Data 

In [None]:
# Model to be used
MODEL = "gpt-4o-mini"

# Number of clusters
N_CLUSTERS = 150

# TEST
N_CLUSTERS = 2

In [None]:
# Data Path
DATA_PATH = "./0_PipelineData/4_methodsPrivacyLabels_{}.csv".format( MODEL)

# Load the CSV file into a DataFrame
methodsDF = pd.read_csv(DATA_PATH)

# Print Number of Methods
print("--- #️⃣ Methods Size: {}".format(methodsDF.shape[0]))

# Test purposes
# methodsDF = methodsDF.head(3)

methodsDF.head(3)

In [None]:
# Count distinct values for the column matching the current clusterID
clusterID = "clusterID_{}".format(N_CLUSTERS)

labelCounts = methodsDF[clusterID].value_counts()
print("--- 🔍 Privacy Label Counts for column {}:".format(clusterID))
print("--- #️⃣ Number of unique labels : {}".format(labelCounts.nunique()))
for label, count in labelCounts.items():
	print("------ {:<35}: {}".format(label, count))

####  🔒 2] Read Privacy-Labels

In [None]:
# Predefined Privacy Labels
PRIVACY_LABELS_FILE = "./0_PipelineData/4_privacyLabels_{}.json".format(MODEL)

# Read all the privacy labels from the JSON file
with open(PRIVACY_LABELS_FILE, 'r') as file:
	privacyLabelsDict = json.load(file)

#### 🖥️ 3A) Using LLM to get a sublist of privacy-labels

In [None]:
print("\n--- ⭕ LLM Init & Check")
print("--- ⭕ Model: {}".format(MODEL))

# OpenAI (PAYING)
llmInterface = LLMUtils.ChatGPTInterface(model=MODEL, pricing=0.150)
print(llmInterface.sendRequest("Ping!"))

In [None]:
# Paths
DOCUMENTATION_PATH = "../0_Data/methodsDocumentationFiles/"

# Number of replies to get
NUM_REPLIES = 5

# Max requests to send to LLMs
MAX_REQUESTS = 10

Prompts

In [None]:
PROMPT_PHASE5_A = """
You are an expert in Android security and privacy.

### Task Description  
Some Android API methods may return data that can be exploited by an attacker, either directly or through inference from correlated information.

You will receive a list of privacy-related labels that describe how such return values can be used to infer private user information.

A user has described their final task in natural language as follows:
"{}"

Your goal is to identify and return only the relevant labels from the list. These may include:
- DIRECT MATCHES to the described task.
- INDIRECT MATCHES where data can be used to infer the target information related to the task.

Think step by step and use your understanding of inference, data correlation, and side-channel analysis.

### Output Constraints:
- Return only the relevant labels as a list in square brackets, e.g., ["LABEL_1", "LABEL_2"].
- Do not include explanations, reasoning, or formatting outside the list.

### Privacy Labels:
{}
"""

In [None]:
PROMPT_PHASE5_B = """
You are an Android security expert specializing in taint analysis with FlowDroid.  

### **Task Description**  
The return value of some Android API methods can be exploited by an attacker for an advantage, either in isolation or in combination with other data.

A FlowDroid user has described their final task in natural language as follows:  
"{}"

Your task is to determine whether the given method should be classified as a **SOURCE** in the context of the user's task i.e., whether running FlowDroid on this method may result in a taint flow from the method that satisfies the user's task. 

### **Instructions**  
1. Base your classification solely on the user's task description.
2. Analyze the method strictly within the provided context.  
3. Classify the method as either:
	- **SOURCE** (if it serves as an information source in the given context).  
	- **NOT_SOURCE** (if it does not).  
4. **Output only the classification label** (**SOURCE** or **NOT_SOURCE**)—no explanations. 
"""

Tasks

In [None]:
### USER TASK 1 ###
USER_TASK_1_A = "Which Android API methods return information which could reveal the user’s physical location? This includes precise location (e.g., GPS, Wi-Fi, cell tower data), approximate location (e.g., IP address, network configuration), or broad geographic indicators such as timezone, country, SIM/network region, settings. Include any methods that could assist an attacker in estimating user location at any granularity, even indirectly."
USER_TASK_1_B = "Which APIs return data that an attacker could use to infer the location of the user/device, even indirectly?"
USER_TASK_1_C = "Which APIs return location data?"

### USER TASK 2 ###
USER_TASK_2 = "Which APIs access microphone-related data (e.g., audio input state or stream), which could be useful for an attacker aiming to eavesdrop or gather information about the user’s environment without their consent?"

### USER TASK 3 ###
USER_TASK_3 = "Which APIs interact with clipboard content, potentially exposing copied sensitive information such as passwords, addresses, or personal data?"

# Choose the User Task
USER_TASK = USER_TASK_2

In [None]:
# Get the clusterID
clusterID = "clusterID_{}".format(N_CLUSTERS)

# Print info
print("--- 🔹 CLUSTERD ID: {}".format(clusterID))

# Select the privacy labels for a specific clusterID
privacyLabelsIDs = list(privacyLabelsDict.keys())
print("--- ⭕ All Privacy Labels [{}]: {}".format(len(privacyLabelsIDs), privacyLabelsIDs))

# Insert the list of privacy labels in the prompt
promptPhase5a = PROMPT_PHASE5_A.format(USER_TASK, "\n".join(["- {}: {}".format(key, privacyLabelsDict[key]) for key in privacyLabelsDict]))

# Testing purposes
print("--- ⭕ New Prompt Phase 5A:")
print(promptPhase5a)

# To store the filtered privacy labels
filteredPrivacyLabels = []

for iteration in range(MAX_REQUESTS):
	try:
		# Send the request	to the LLM
		answer = llmInterface.sendRequest(promptPhase5a)

		# Testing purposes
		print("--- ⭕ LLM Response")
		print(answer)
				
		# Check if the answer is a valid list
		filteredPrivacyLabels = eval(answer)

		# Check if the answer is a valid list
		if isinstance(filteredPrivacyLabels, list):
			break

	except (SyntaxError, NameError):
		continue

if iteration == MAX_REQUESTS - 1:
	print("--- ❌ Reached max iterations without valid response")

# Print the filtered privacy labels
print("--- ⭕ Filtered Privacy Labels [{}]: {}".format(len(filteredPrivacyLabels), filteredPrivacyLabels))

# Filter the android methods
filteredMethodsDF = methodsDF[methodsDF[clusterID].isin(filteredPrivacyLabels)]
print("--- #️⃣ Filtered Methods Size: {}".format(filteredMethodsDF.shape[0]))

# # TEST
# #filteredMethodsDF = filteredMethodsDF.head(3)

#### 🖥️ 3B) Using LLM to refine final list

In [None]:
promptPhase5b = PROMPT_PHASE5_B.format(USER_TASK)
print("\n--- ⭕ New Prompt Phase 5B:")
print(promptPhase5b)

# Process each row in the DataFrame
def processRow(row):
	# Get information from the row
	sha256          = row['sha256']
	methodSignature = row['methodSignature']

	print("\n--- 🔍 Android Method: {}".format(methodSignature))

	# Retrieve documentation and sourceCode
	documentationPath 	= os.path.join(DOCUMENTATION_PATH, "{}.txt".format(sha256))
	with open(documentationPath, 'r') as DocFile:
		documentation = DocFile.read()
		

	# Create an object representing the Android Method
	androidMethod = AnalysisUtils.AndroidMethod(sha256, methodSignature, documentation)

	# Create the prompt
	prompt = androidMethod.addAllToPrompt(promptPhase5b)

	# Testing purposes
	print("--- 🔍 Prompt: {}".format(prompt))

	labelFrequency = AnalysisUtils.labelAndroidMethod(llmInterface, prompt, ['SOURCE','NOT_SOURCE'], NUM_REPLIES, MAX_REQUESTS)
	llmLabel 	   = AnalysisUtils.getMostFrequentLabel(labelFrequency)
	print("--- 🔍 Label Frequency     :", labelFrequency)
	print("--- 🏷️ Most Frequent Label :", llmLabel)

	# Save the label
	filteredMethodsDF.at[row.name, 'llmLabel_FINAL'] = llmLabel

	print("\n" + "---" * 20)

# Apply the function to the DataFrame
_ = filteredMethodsDF.apply(processRow, axis=1)



### 💾 4] Save Results

In [None]:
# Where to save the results
RESULTS_PATH = "./0_PipelineData/"

# Create folder if it does not exist
if not os.path.exists(RESULTS_PATH):
	os.makedirs(RESULTS_PATH)
	print("--- 📁🆕 Folder created       : {}\n".format(RESULTS_PATH))

# Save results to a CSV File
filteredMethodsDF.drop(columns=clusterID, inplace=True)
filteredMethodsDF.to_csv(os.path.join(RESULTS_PATH, "sourceMethods.csv"), index=False)

# Filter rows where llmLabel_FINAL is "SOURCE" and save to a text file
sourceMethods = filteredMethodsDF[filteredMethodsDF['llmLabel_FINAL'] == 'SOURCE']['methodSignature']
with open(os.path.join(RESULTS_PATH, "sourceMethods.txt"), 'w') as file:
	file.write("\n".join(sourceMethods))

print("--- 💾 Results saved successfully at: {}".format(RESULTS_PATH))

##### 🔚 End

In [None]:
endTime = datetime.datetime.now()
print("\n🔚 --- END:  {} --- 🔚".format(endTime.strftime("%Y-%m-%d %H:%M:%S")))

# Assuming endTime and initTime are datetime objects
totalTime = endTime - initTime
hours     = totalTime.total_seconds() // 3600
minutes   = (totalTime.total_seconds() % 3600) // 60
seconds   = totalTime.total_seconds() % 60
print("⏱️ --- Time: {:02d} hours and {:02d} minutes [{:02d} seconds] --- ⏱️".format(int(hours), int(minutes), int(totalTime.total_seconds())))