-
Notifications
You must be signed in to change notification settings - Fork 0
/
evaluator.py
683 lines (542 loc) · 29.9 KB
/
evaluator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
import argparse
from datetime import datetime
from androidcrypto import constants as constants
from androidcrypto import helpers as helpers
import pandas as pd
import logging
import matplotlib.ticker as mtick
import seaborn as sns
import matplotlib.pyplot as plt
import os
from dataclasses import dataclass, field, InitVar
import yaml
import sys
import math
from typing import Any, Optional, Union, Dict
import json
class ExperimentEvaluationError(Exception):
pass
@dataclass
class ExperimentResults:
# TODO remove these commented sections
#json_results_path: str = field(init=False)
#config_path: str = field(init=False)
#n_samples: int = field(init=False)
#year: int = field(init=False)
#folder: InitVar[str]
#
#def __post_init__(self, folder: str):
# self.json_results_path = os.path.join(folder, 'records.json')
# self.config_path = os.path.join(folder, 'config.yml')
#
# with open(self.json_results_path, 'r') as json_handle:
# json_data = json.load(json_handle)
# self.n_samples = int(len(json_data))
#
# for key, val in json_data.items():
# if val['metadata']['dex_year'] is not None:
# self.year = int(val['metadata']['dex_year'])
# else:
# self.year = -1
# break
json_data: Dict = field(init=False)
config: Dict = field(init=False)
n_samples: int = field(init=False)
year: int = field(init=False)
# folder is used as a fallback when config or json_init_data is None
folder: InitVar[Optional[str]] = field(default=None)
config_init: InitVar[Optional[Dict]] = field(default=None)
json_data_init: InitVar[Optional[Dict]] = field(default=None)
def __post_init__(self, folder: Optional[str], config_init: Optional[Dict], json_data_init: Optional[Dict]):
# init json data
if json_data_init is not None:
self.json_data = json_data_init
else:
json_results_path = os.path.join(folder, 'records.json')
with open(json_results_path, 'r') as json_handle:
self.json_data = json.load(json_handle)
# init config
if config_init is not None:
self.config = config_init
else:
config_path = os.path.join(folder, 'config.yml')
with open(config_path, 'r') as handle:
self.config = yaml.load(handle, Loader=yaml.FullLoader)
# init metadata
self.n_samples = int(len(self.json_data))
for _, val in self.json_data.items():
if 'metadata_dex_year' in val:
year = val['metadata_dex_year']
else:
year = val['metadata']['dex_year']
if year is not None:
self.year = int(year)
else:
self.year = -1
break
@dataclass
class EvaluatorInput:
records: Dict[str, Any]
# TODO specify default config
config: Dict[str, Any] = field(default_factory=lambda: {})
class Evaluator:
cols = ['sha256', 'year', 'euphony_name', 'euphony_type', 'java_crypto_libs', 'native_crypto_libs', 'third_party_packages',
'crypto_api_imports', 'crypto_api_records']
def __init__(self,
input_data: Union[str, EvaluatorInput], # [experiment_folder|folder_of_experiment_folders|EvaluatorInput]
# NOTE: `experiment_folder` must contain `records.json` and `config.yml`
# NOTE: EvaluatorInput contains dict for records and config
report_path: Optional[str]=None, # optional path to store report
api_definition: Optional[Union[Dict, str]]=None, # path or dict or default is used if None
new_triggers: Optional[Union[Dict, str]]=None, # path or dict or None
show=True,
save_files=True,
eval_euphony=False):
# maybe move elsewhere
def load_optional_yaml(to_load: Optional[Union[Dict, str]], default_if_None: Optional[Dict]=None) -> Optional[Union[Dict, str]]:
if to_load is None:
return default_if_None
elif isinstance(to_load, str):
with open(to_load, 'r') as yaml_handle:
return yaml.load(yaml_handle, Loader=yaml.FullLoader)
# when dictionary
return to_load
sns.set_style("whitegrid")
self.input_data = input_data
self.show = show
self.save_files = save_files
self.report_path = report_path
self.experiments = None
self.eval_euphone = eval_euphony
self.api_definition = load_optional_yaml(
api_definition,
# TODO here define default api definition to be used
default_if_None={
}
)
self.new_triggers = load_optional_yaml(
new_triggers,
default_if_None=None
)
self.df = None
self.df_records = None
self.df_imports = None
self.size_comparison = None
self.java_libs = None
self.native_libs = None
self.dataset_sizes = None
self.dset_size = 0
self.norm_factor = None
self.summary_handle = None
self.exp_config = None
self.prepare_report_dir()
if self.save_files is True and self.report_path is not None:
self.summary_handle = open(os.path.join(self.report_path, 'summary.md'), 'w')
def __repr__(self):
return 'Evaluator=' + str(self.__dict__)
def evaluate(self):
self.get_exp_results()
self.init_dataframes()
self.analyze_third_party_packages()
self.df, self.size_comparison = self.analyze_empty_samples()
if self.eval_euphone is True:
self.evaluate_euphony()
self.analyze_third_party_crypto_libs()
self.get_imports_dataframe()
self.get_records_dataframe()
self.analyze_crypto_api_imports()
# We should not need this any more...
self.df = self.df.drop(columns='crypto_api_imports')
self.df = self.df.drop(columns='crypto_api_records')
self.evaluate_categories()
self.summary_handle.close()
def evaluate_euphony(self):
if self.report_path is None:
return
euph_folder = os.path.join(self.report_path, 'euphony')
if not os.path.exists(euph_folder):
os.mkdir(euph_folder)
self.plot_euphony(self.df, euph_folder)
def prepare_report_dir(self):
if self.report_path is not None and not os.path.exists(self.report_path):
os.mkdir(self.report_path)
def get_exp_results(self):
if isinstance(self.input_data, str):
logging.info('Results of the experiment supplied as a folder.')
try:
if os.path.isfile(os.path.join(self.input_data, 'records.json')):
self.experiments = [ExperimentResults(folder=self.input_data)]
else:
self.experiments = [ExperimentResults(folder=os.path.join(self.input_data, x)) for x in os.listdir(self.input_data)]
except Exception as e:
logging.error(f'Failed to parse folder with results: {e}')
else:
logging.info('Results of the experiment passed as EvaluatorInput (two dictionaries).')
# cast to dataframe
records_df = pd.DataFrame.from_dict(self.input_data.records, orient="index")
# group by year
if "metadata_dex_year" in records_df.columns:
year = "metadata_dex_year"
else:
year = records_df["metadata"].apply(lambda m: m["dex_year"])
# get dict with year as key and dataframe as value
records_df_per_year = dict(tuple(records_df.groupby(year)))
# cast dataframes into dictionaries
year_dict_records = [r.to_dict(orient="index") for r in records_df_per_year.values()]
# create experiment result for each year
self.experiments = [ExperimentResults(config_init=self.input_data.config, json_data_init=r) for r in year_dict_records]
logging.info('Successfully parsed results of the experiment.')
def init_dataframes(self):
def safe_load(record: Dict, attr_name: str, nested_attr_name: str):
# try [{attr_name}_{nested_attr_name}] and then [attr_name][nested_attr_name]
joined_key = f"{attr_name}_{nested_attr_name}"
if joined_key in record:
return record[joined_key]
return record[attr_name][nested_attr_name]
json_data = helpers.merge_jsons(*[exp.json_data for exp in self.experiments])
self.dataset_sizes = {exp.year: exp.n_samples for exp in self.experiments}
self.dset_size = sum(self.dataset_sizes.values())
self.norm_factor = {key: val / 10000 for key, val in self.dataset_sizes.items()}
data = [
(sample_id,
safe_load(rec, "metadata", "dex_year"), #rec['metadata']['dex_year'],
safe_load(rec, "metadata", "euphony_name"), #rec['metadata']['euphony_name'],
safe_load(rec, "metadata", "euphony_type"), #rec['metadata']['euphony_type'],
rec['third_party_crypto_libs'],
rec['native_imports'],
rec['third_party_packages'],
rec['crypto_imports'],
rec['crypto_api_records'])
for sample_id, rec in json_data.items()
]
self.df = pd.DataFrame(data, columns=self.cols).set_index('sha256')
self.df = helpers.introduce_missing_vals(self.df, ['euphony_name', 'euphony_type', 'java_crypto_libs', 'native_crypto_libs', 'third_party_packages', 'crypto_api_imports', 'crypto_api_records'])
logging.info('Successfully initialized pandas dataframes.')
def analyze_third_party_packages(self):
def print_report(stream):
print('Statistics about third-party packages', file=stream)
print(f'{n_full} out of {third_party.shape[0]} ({(n_full / third_party.shape[0] * 100):.2f}%) apks contain at least one third party package', file=stream)
print(f'In total, the dataset contains {third_party.n_third_party.sum()} unique third-party package names.')
print(f'On average, each apk contains: {third_party.n_third_party.mean()} third-party packages.', file=stream)
helpers.print_delim(stream)
def count_packages(x):
if isinstance(x, float):
return 0
else:
return len(x)
self.df['n_third_party'] = self.df.third_party_packages.apply(count_packages)
third_party = self.df.drop(
columns=['euphony_name', 'euphony_type', 'java_crypto_libs', 'native_crypto_libs', 'crypto_api_imports',
'crypto_api_records'])
n_empty = third_party.loc[third_party.n_third_party == 0].shape[0]
n_full = third_party.shape[0] - n_empty
if self.show is True:
print_report(sys.stdout)
if self.save_files is True:
print_report(self.summary_handle)
logging.info('Successfully analyzed third-party packages.')
def analyze_empty_samples(self):
def print_report(stream):
print('General statistics about dataset', file=stream)
print(f'Number of analyzed samples: {size_comparison.all_samples.sum():,}', file=stream)
print('How many of analyzed samples use some crypto...', file=stream)
print(size_comparison.to_markdown(), file=stream)
helpers.print_delim(stream)
df_clean = self.df.dropna(how='all', subset=['java_crypto_libs', 'native_crypto_libs', 'crypto_api_imports', 'crypto_api_records'])
all_samples_size = self.df.groupby('year').size().to_frame(name='all_samples')
containing_crypto_size = df_clean.groupby('year').size().to_frame(name='containing_crypto')
size_comparison = pd.concat([all_samples_size, containing_crypto_size], axis=1)
size_comparison['empty_samples'] = size_comparison.all_samples - size_comparison.containing_crypto
size_comparison['percentage_containing_crypto'] = (size_comparison.containing_crypto / size_comparison.all_samples) * 100
if self.show is True:
print_report(sys.stdout)
if self.save_files is True:
print_report(self.summary_handle)
logging.info('Successfully mapped the ratio of samples that contain cryptographic API.')
return df_clean, size_comparison
def analyze_third_party_crypto_libs(self):
def print_report(stream):
print('Usage of third party cryptographic libraries', file=stream)
print('Java libraries:', file=stream)
if self.java_libs.empty:
print('No java cryptographic libraries were found', file=stream)
else:
print(self.java_libs.groupby('year').lib_name.value_counts(), file=stream)
helpers.print_delim(stream)
print('Prevalence of java libraries per year:', file=stream)
print(self.java_libs.groupby('year').sha256.nunique(), file=stream)
helpers.print_delim(stream)
print('Overall popularity of java libraries:', file=stream)
print(self.java_libs.lib_name.value_counts(), file=stream)
print('Native libraries:', file=stream)
if self.native_libs.empty:
print('No native cryptographic libraries were found', file=stream)
else:
print(self.native_libs.groupby('year').lib_name.value_counts(), file=stream)
helpers.print_delim(stream)
self.java_libs = helpers.get_third_party_libs_df(self.df, 'java_crypto_libs')
self.native_libs = helpers.get_third_party_libs_df(self.df, 'native_crypto_libs')
if self.show is True:
print_report(sys.stdout)
if self.save_files is True:
print_report(self.summary_handle)
logging.info('Successfully analyzed third-party cryptographic libraries in samples.')
def analyze_crypto_api_imports(self):
def print_report(stream):
print('Number of crypto API imports per sample:', file=stream)
print(self.df.crypto_imports_sum.describe().to_markdown(), file=stream)
print(f'Out of {n_theoretical_imports} analyzed classes, the following number is actually imported:', file=stream)
print(practical_imports, file=stream)
helpers.print_delim(stream)
def get_sum_of_crypto_imports_per_sample(sample_imports):
if isinstance(sample_imports, dict):
return sum([x for x in sample_imports.values()])
else:
return 0 # should be always np.nan
self.df['crypto_imports_sum'] = self.df.crypto_api_imports.apply(get_sum_of_crypto_imports_per_sample)
# TODO remove if working
#with open(self.experiments[0].config_path, 'r') as handle:
# filestream = yaml.load(handle, Loader=yaml.FullLoader)
filestream = self.experiments[0].config
n_theoretical_imports = len(filestream['evaluate']['imports'])
practical_imports = self.df_imports.groupby('year').api_class.nunique()
if self.show is True:
print_report(sys.stdout)
if self.save_files is True:
print_report(self.summary_handle)
logging.info('Successfully analyzed what JCA classes are being imported.')
def get_imports_dataframe(self):
lst = []
for row in self.df.itertuples():
if isinstance(row.crypto_api_imports, dict):
lst.extend([(row.Index, row.year, api_class, n_ocurrences) for api_class, n_ocurrences in
row.crypto_api_imports.items()])
self.df_imports = pd.DataFrame(lst, columns=['sha256', 'year', 'api_class', 'class_count'])
logging.info('Successfully constructed the imports dataframe.')
def get_records_dataframe(self):
lst = []
for row in self.df.itertuples():
if isinstance(row.crypto_api_records, dict):
for key, records in row.crypto_api_records.items():
lst.extend([(row.Index, row.year, key, r[0], r[1], r[2]) for r in records])
self.df_records = pd.DataFrame(lst, columns=['sha256', 'year', 'class_name', 'trigger', 'line', 'line_number'])
self.deduplicate_and_fix_missing_triggers()
logging.info('Successfully constructred the records dataframe.')
def deduplicate_and_fix_missing_triggers(self):
# Quite time consuming, taking up to 1 minute
def apply_new_triggers(row, new_triggers):
if not row.trigger.lower() in default_triggers:
return row
for t in new_triggers[row.trigger]:
if t.lower() in row.line.lower():
row.trigger = t
return row
return row
if self.new_triggers is None:
logging.info('No triggers were specified. Aborting deduplication.')
return # there are no triggers loaded, do nothing
default_triggers = [x.lower() for x in list(self.new_triggers.keys())]
self.df_records = self.df_records.loc[self.df_records.trigger != 'Cipher'] # Manual fix of broken trigger, not needed
self.df_records = self.df_records.drop_duplicates(subset=['sha256', 'class_name', 'line_number'],
keep='last')
self.df_records = self.df_records.apply(lambda row: apply_new_triggers(row, self.new_triggers), axis=1)
self.fix_broken_triggers()
logging.info('Successfully fixed missing and broken triggers, deduplicated some triggers.')
def fix_broken_triggers(self):
def map_sha(x):
if x == 'MessageDigest.getInstance("SHA1':
return 'MessageDigest.getInstance("SHA-1'
elif x == 'MessageDigest.getInstance("SHA256':
return 'MessageDigest.getInstance("SHA-256'
else:
return x
self.df_records.trigger = self.df_records.trigger.map(map_sha)
def analyze_memory_usage(self):
def print_report(stream):
print(f'Total df size in megabytes: {df_usage}', file=stream)
print(f'Total df_imports size in megabytes: {df_imports_usage}', file=stream)
print(f'Total df_records size in megabytes: {df_records_usage}', file=stream)
helpers.print_delim(stream)
df_usage = self.df.memory_usage(deep=True).sum() / 1000000
df_records_usage = self.df_records.memory_usage(deep=True).sum() / 1000000
df_imports_usage = self.df_imports.memory_usage(deep=True).sum() / 1000000
if self.show is True:
print_report(sys.stdout)
if self.save_files is True:
print_report(self.summary_handle)
@staticmethod
def plot_euphony(df, outpath):
def plot_euphony_single(col_name, title_prefix, xlabel):
euphony_series = df.dropna(subset=[col_name]).groupby('year')[col_name].value_counts()
years = list(euphony_series.index.levels[0])
norm_factor = {year: (euphony_series[year].sum() / 10000) for year in years}
subset_sizes = {key: int(val * 10000) for key, val in norm_factor.items()}
euphony_df = pd.DataFrame(euphony_series).rename(columns={col_name: 'count'}).reset_index()
euphony_df['count'] = euphony_df['count'].astype('float64')
euphony_df['normalized_count'] = euphony_df['count'] / euphony_df['year'].map(norm_factor)
euphony_df['normalized_count'] = euphony_df['normalized_count'].fillna(0).astype('int64')
euphony_df = euphony_df.sort_values(by=['normalized_count'], ascending=False).groupby('year').head(top_n_to_take)
with open(os.path.join(outpath, col_name + 's.html'), 'w') as f:
f.write(constants.HTML_STRING.format(table=euphony_df.to_html(classes='mystyle')))
ax = sns.catplot(x=col_name, y='normalized_count', hue='year', data=euphony_df, kind='bar', aspect=3)
ax.set_xticklabels(rotation=90)
ax.set(xlabel=xlabel, ylabel='Number of samples per 10k',
title=f'Top {top_n_to_take} {title_prefix} for each year. Labeled subset sizes: {subset_sizes}')
filepath = os.path.join(outpath, title_prefix + '.png')
plt.savefig(filepath, dpi=300, format='png', bbox_inches='tight', pad_inches=0.1)
plt.close()
top_n_to_take = 10
plot_euphony_single('euphony_name', 'euphony names', 'Euphony name')
plot_euphony_single('euphony_type', 'euphony types', 'Euphony type')
logging.info('Successfully plotted distribution of euphony labels.')
def evaluate_categories(self):
def map_norm_factor_multiindex(x):
return self.norm_factor[x[0]]
def get_df_temp(category_name, category_triggers, df_eval, sum_temp_n_call_sites):
df = df_eval.loc[df_eval.trigger.isin(category_triggers)].groupby('year')
temp_n_call_sites = df.trigger.count()
temp_n_call_sites.name = 'n_call_sites'
temp_norm_n_call_sites = temp_n_call_sites / temp_n_call_sites.index.map(self.norm_factor)
temp_norm_n_call_sites = temp_norm_n_call_sites.map(math.ceil)
temp_norm_n_call_sites.name = 'norm_n_call_sites'
temp_n_apks = df.sha256.nunique()
temp_n_apks.name = 'n_apks'
df_to_return = pd.concat([temp_n_call_sites, temp_norm_n_call_sites, temp_n_apks], axis=1)
df_to_return['category'] = category_name
df_to_return = df_to_return.set_index('category', append=True)
df_to_return['norm_n_apks'] = df_to_return.n_apks / df_to_return.index.map(map_norm_factor_multiindex)
df_to_return.norm_n_apks = df_to_return.norm_n_apks.astype('int64')
df_to_return['norm_n_apks_ratio'] = 100 * (df_to_return.norm_n_apks / 10000)
df_to_return['n_call_sites_ratio'] = 100 * (df_to_return.n_call_sites / sum_temp_n_call_sites)
return df_to_return
def get_df_summary(df_eval, df_temp, dataset_size, all_triggers):
df = df_temp.drop(
columns=['norm_n_call_sites', 'norm_n_apks', 'norm_n_apks_ratio', 'n_call_sites_ratio']).sum(
level='category')
sum_n_call_sites = df_eval.loc[df_eval.trigger.isin(all_triggers)].trigger.count()
sum_n_apks = df_eval.loc[df_eval.trigger.isin(all_triggers)].sha256.nunique()
df['call_sites_ratio'] = 100 * df.n_call_sites / sum_n_call_sites
df['n_apks_ratio'] = 100 * df.n_apks / dataset_size
sum_row = pd.Series([sum_n_call_sites, sum_n_apks, 1, 100 * sum_n_apks / dataset_size],
index=['n_call_sites', 'n_apks', 'call_sites_ratio', 'n_apks_ratio'], name='Total')
df = df.append(sum_row)
return df
def get_all_triggers(obj):
triggers = []
if isinstance(obj, dict):
if len(obj.keys()) == 2:
for key in obj.keys():
if key != 'obfuscated':
triggers.extend(get_all_triggers(obj[key]))
else:
triggers.append(obj[key])
elif len(obj.keys()) > 2:
for key in obj.keys():
triggers.extend(get_all_triggers(obj[key]))
elif isinstance(obj, list):
triggers = obj
return triggers
@dataclass
class Comparison:
name: str
categories: Any
path: str
obfuscated_trigger: str
def __repr__(self):
return f'name: {self.name}, path: {self.path}, obf: {self.obfuscated_trigger}'
def get_all_comps(obj, path, name, obfuscated_trigger):
comps = []
if isinstance(obj, list):
cats = {x.split('"')[1]: [x] for x in obj}
comps.append(Comparison(name, cats, path, obfuscated_trigger))
if isinstance(obj, dict):
if len(obj.keys()) > 2:
comps.append(
Comparison(name, {key: get_all_triggers(val) for key, val in obj.items()}, os.path.join(path),
obfuscated_trigger))
for key, val in obj.items():
comps.extend(get_all_comps(val, os.path.join(path, key), key, None))
if len(obj.keys()) == 2:
comps.extend(get_all_comps(obj['values'], path, name, obj['obfuscated']))
return comps
def plot_temporal_evolution(df_temp, path, title, drop_obfuscated=True):
df = df_temp.reset_index()
if drop_obfuscated is True:
df = df.loc[df.category != 'obfuscated']
df = df.sort_values(by='norm_n_apks_ratio', ascending=False)
ax = sns.lineplot(x='year', y='norm_n_apks_ratio', hue='category', data=df, marker='o')
# ax.yaxis.set_major_locator(mtick.MultipleLocator(5)) # density of grid lines, 5% now
ax.xaxis.set_major_locator(mtick.MultipleLocator(1))
ax.yaxis.set_major_formatter(mtick.PercentFormatter())
ax.set(xlabel='Year', ylabel='% of all samples',
title=title)
ax.legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.)
ax.figure.savefig(path, dpi=300, format='png', bbox_inches='tight', pad_inches=0.1)
plt.close()
def plot_boxplot(df_temp, path, title, drop_obfuscated=True):
df = df_temp.reset_index()
if drop_obfuscated is True:
df = df.loc[df.category != 'obfuscated']
df = df.sort_values(by='norm_n_apks_ratio', ascending=False)
df = df.sort_values(by=['norm_n_apks'], ascending=False)
ax = sns.boxplot(x='category', y='norm_n_apks', data=df)
plt.xticks(rotation=90)
ax.set(xlabel=f'category', ylabel='Number of APKs per 10k APKs', title=f'{title} frequency')
ax.figure.savefig(path, dpi=300, format='png',
bbox_inches='tight', pad_inches=0.1)
plt.close()
def evaluate_comparison(comparison):
if not os.path.exists(comparison.path):
os.makedirs(comparison.path)
if comparison.obfuscated_trigger is not None:
comparison.categories['obfuscated'] = [comparison.obfuscated_trigger]
all_triggers = get_all_triggers(comparison.categories)
sum_temp_n_call_sites = self.df_records.loc[self.df_records.trigger.isin(all_triggers)].groupby(
'year').trigger.count()
temps = []
for cat_name, cat_triggers in comparison.categories.items():
temps.append(get_df_temp(cat_name, cat_triggers, self.df_records, sum_temp_n_call_sites))
df_temp = pd.concat(temps)
if comparison.name == 'all':
all_n_apks = self.df_records.loc[self.df_records.trigger.isin(all_triggers)].groupby(
'year').sha256.nunique()
all_apks_df = all_n_apks.to_frame(name='n_apks')
all_apks_df['norm_n_apks'] = all_apks_df.n_apks / all_apks_df.index.map(self.norm_factor)
all_apks_df['norm_n_apks'] = all_apks_df['norm_n_apks'].map(math.ceil)
all_apks_df['norm_n_apks_ratio'] = 100 * all_apks_df.norm_n_apks / 10000
all_apks_df['category'] = 'all'
all_apks_df.set_index('category', append=True, inplace=True)
df_temp = df_temp.append(all_apks_df)
df_temp.to_html(os.path.join(comparison.path, 'temporal_evolution.html'))
if not df_temp.empty:
plot_temporal_evolution(df_temp, os.path.join(comparison.path, 'temporal_evolution.png'),
comparison.name)
plot_boxplot(df_temp, os.path.join(comparison.path, 'boxplot.png'), comparison.name)
df_sum = get_df_summary(self.df_records, df_temp, self.dset_size, all_triggers)
df_sum.to_html(os.path.join(comparison.path, 'summary.html'))
appendix_csv = df_sum.drop(columns=['n_apks', 'n_apks_ratio', 'call_sites_ratio'])
appendix_csv.n_call_sites = appendix_csv.n_call_sites.astype('int64')
appendix_csv.to_csv(os.path.join(comparison.path, 'summary.csv'), sep='&')
return df_temp
comparisons = get_all_comps(self.api_definition, self.report_path, 'all', None)
for cmp in comparisons:
df_temp = evaluate_comparison(cmp)
logging.info(f'Successfully analyzed crypto API category: {cmp.name}.')
def main():
logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser(description='Evaluation of the cryptographic experiments')
parser.add_argument('exp_path', type=str, help='path to the folder where report should be placed')
parser.add_argument('report_path', type=str, help='path to the folder with experiment folders')
parser.add_argument('api_definition', type=str, help='path to the definition of analyzed API')
parser.add_argument('-s', '--show', type=bool, const=True, nargs='?', help='show output into stdout', default=False)
parser.add_argument('-f', '--file', type=bool, const=True, nargs='?', help='Save files into output folders', default=False)
parser.add_argument('-e', '--euphony', type=bool, const=True, nargs='?', help='Evaluate also euphony? The dataset must have some euphony labels', default=False)
args = parser.parse_args()
start = datetime.now()
logging.info('Starting evaluation.')
evaluator = Evaluator(args.exp_path, args.report_path, args.api_definition, args.show, args.file)
evaluator.evaluate()
end = datetime.now()
logging.info(f'Finished experiment. Computation took {end - start} seconds.')
if __name__ == '__main__':
main()