-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
PaloAltoNetworks_PrismaCloudCompute.py
2783 lines (2335 loc) · 109 KB
/
PaloAltoNetworks_PrismaCloudCompute.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import json
import demistomock as demisto # noqa: F401
from CommonServerPython import * # noqa: F401
import urllib.parse
from collections import defaultdict
''' IMPORTS '''
import urllib3
import ipaddress
import dateparser
import tempfile
import urllib
# Disable insecure warnings
urllib3.disable_warnings()
''' CONSTANTS '''
ALERT_TITLE = 'Prisma Cloud Compute Alert - '
ALERT_TYPE_VULNERABILITY = 'vulnerability'
ALERT_TYPE_COMPLIANCE = 'compliance'
ALERT_TYPE_AUDIT = 'audit'
# this is a list of known headers arranged in the order to be displayed in the markdown table
HEADERS_BY_NAME = {
'vulnerabilities': ['severity', 'cve', 'status', 'packages', 'sourcePackage', 'packageVersion', 'link'],
'entities': ['name', 'containerGroup', 'resourceGroup', 'nodesCount', 'image', 'status', 'runningTasksCount',
'activeServicesCount', 'version', 'createdAt', 'runtime', 'arn', 'lastModified', 'protected'],
'compliance': ['type', 'id', 'description']
}
MAX_API_LIMIT = 50
INTEGRATION_NAME = 'PaloAltoNetworks_PrismaCloudCompute'
''' COMMANDS + REQUESTS FUNCTIONS '''
class PrismaCloudComputeClient(BaseClient):
def __init__(self, base_url, verify, project, proxy=False, ok_codes=(), headers=None, auth=None):
"""
Extends the init method of BaseClient by adding the arguments below,
verify: A 'True' or 'False' string, in which case it controls whether we verify
the server's TLS certificate, or a string that represents a path to a CA bundle to use.
project: A projectID string, set in the integration parameters.
the projectID is saved under self._project
"""
self._project = project
if verify in ['True', 'False']:
super().__init__(base_url, str_to_bool(verify), proxy, ok_codes, headers, auth)
else:
# verify points a path to certificate
super().__init__(base_url, True, proxy, ok_codes, headers, auth)
self._verify = verify
def _http_request(self, method, url_suffix, full_url=None, headers=None,
auth=None, json_data=None, params=None, data=None, files=None,
timeout=10, resp_type='json', ok_codes=None, **kwargs):
"""
Extends the _http_request method of BaseClient.
If self._project is available, a 'project=projectID' query param is automatically added to all requests.
"""
# if project is given add it to params and call super method
if self._project:
params = params or {}
params.update({'project': self._project})
return super()._http_request(method=method, url_suffix=url_suffix, full_url=full_url, headers=headers,
auth=auth, json_data=json_data, params=params, data=data, files=files,
timeout=timeout, resp_type=resp_type, ok_codes=ok_codes, **kwargs)
def test(self):
"""
Calls the fetch alerts endpoint with to=epoch_time to check connectivity, authentication and authorization
"""
return self.list_incidents(to_=time.strftime('%Y-%m-%d', time.gmtime(0)))
def list_incidents(self, to_=None, from_=None):
"""
Sends a request to fetch available alerts from last call
No need to pass here TO/FROM query params, the API returns new alerts from the last request
Can be used with TO/FROM query params to get alerts in a specific time period
REMARK: alerts are deleted from the endpoint once were successfully fetched
"""
params = {}
if to_:
params['to'] = to_
if from_:
params['from'] = from_
# If the endpoint not found, fallback to the previous demisto-alerts endpoint (backward compatibility)
try:
return self._http_request(
method='GET',
url_suffix='xsoar-alerts',
params=params
)
except Exception as e:
if '[404]' in str(e):
return self._http_request(
method='GET',
url_suffix='demisto-alerts',
params=params
)
raise e
def get_host_profiles(self, params: Optional[dict] = None) -> List[dict]:
"""
Sends a request to get all the host profiles.
Args:
params (dict): query parameters.
Returns:
list[dict]: host profiles api response.
"""
return self._http_request(method="GET", url_suffix="/profiles/host", params=params)
def get_container_profiles(self, params: Optional[dict] = None) -> List[dict]:
"""
Sends a request to get all the container profiles.
Args:
params (dict): query parameters.
Returns:
list[dict]: host profiles api response.
"""
return self._http_request(method="GET", url_suffix="/profiles/container", params=params)
def get_containers_hosts(self, container_id: str) -> List[str]:
"""
Sends a request to get the hosts that host a specific container.
Args:
container_id (str): the container ID.
Returns:
list[str]: hosts IDs that host the container.
"""
return self._http_request(method="GET", url_suffix=f"profiles/container/{container_id}/hosts")
def get_container_forensics(self, container_id: str, params: Optional[dict] = None) -> List[dict]:
"""
Sends a request to get a specific container forensics.
Args:
container_id (str): the container ID.
params (dict): query parameters.
Returns:
list[dict]: container forensics.
"""
return self._http_request(method="GET", url_suffix=f"profiles/container/{container_id}/forensic", params=params)
def get_host_forensics(self, host_id, params: Optional[dict] = None) -> List[dict]:
"""
Sends a request to get a specific host forensics.
Args:
host_id (str): the host ID.
params (dict): query parameters.
Returns:
list[dict]: host forensics.
"""
return self._http_request(method="GET", url_suffix=f"/profiles/host/{host_id}/forensic", params=params)
def get_console_version(self) -> str:
"""
Sends a request to get the prisma cloud compute console version.
Returns:
str: console version.
"""
return self._http_request(method="GET", url_suffix="/version")
def get_custom_ip_feeds(self) -> dict:
"""
Sends a request to get the custom IP feeds.
Returns:
dict: existing IP feeds.
"""
return self._http_request(method="GET", url_suffix="/feeds/custom/ips")
def add_custom_ip_feeds(self, feeds: List[str]):
"""
Sends a request to add custom IP feeds.
Args:
feeds (list[str]): IP feeds to add.
"""
self._http_request(method="PUT", url_suffix="/feeds/custom/ips", resp_type="text", json_data={"feed": feeds})
def get_custom_md5_malware(self) -> dict:
"""
Sends a request to get the list of all custom uploaded md5 malware records
Returns:
dict: custom md5 malware records
"""
return self._http_request(method="GET", url_suffix="/feeds/custom/malware")
def add_custom_md5_malware(self, feeds: List[str]) -> None:
"""
Sends a request to add md5 malware hashes.
Args:
feeds: (list[dict]): md5 malware feeds to add.
"""
self._http_request(
method="PUT", url_suffix="/feeds/custom/malware", json_data={"feed": feeds}, resp_type="text"
)
def get_cve_info(self, cve_id: str) -> List[dict]:
"""
Sends a request to get information about a cve.
Args:
cve_id (str): the cve to get information about.
Returns:
list[dict]: cves information.
"""
return self._http_request(method="GET", url_suffix="/cves", params={"id": cve_id})
def get_defenders(self, params: Optional[dict] = None) -> List[dict]:
"""
Sends a request to get defenders information.
Returns:
list[dict]: defenders information.
"""
return self._http_request(method="GET", url_suffix="/defenders", params=params)
def get_collections(self) -> List[dict]:
"""
Sends a request to get the collections information.
Returns:
list[dict]: collections information.
"""
return self._http_request(method="GET", url_suffix="/collections")
def get_namespaces(self, params: Optional[dict] = None) -> List[str]:
"""
Sends a request to get the namespaces.
Args:
params (dict): query parameters.
Returns:
list[str]: available namespaces
"""
return self._http_request(method="GET", url_suffix="/radar/container/namespaces", params=params)
def _get_all_results(self, url_suffix: str, params: Optional[dict] = None):
"""
Gets all results by calling the API until the Total-Count of results is reached.
"""
if not params:
params = {}
params.update({"limit": MAX_API_LIMIT, "offset": 0})
response = self._http_request(method="GET", url_suffix=url_suffix, params=params, resp_type='response')
total_count = int(response.headers.get("Total-Count", -1))
response = response.json()
current_count = len(response) if response else 0
while current_count < total_count:
try:
params["offset"] = current_count
response.extend(self._http_request(method="GET", url_suffix=url_suffix, params=params))
current_count = len(response)
except DemistoException as de:
if not (hasattr(de, "res") and hasattr(de.res, "status_code")):
raise
if de.res.status_code == 429:
# The API rate limit of 30 requests per minute was exceeded for this endpoint
demisto.info(f"Rate limit exceeded, waiting 2 seconds before continuing.\n"
f"Current count: {current_count}, total count: {total_count}.")
time.sleep(2) # pylint: disable=E9003
return response
def get_images_scan_info(self, all_results: bool = False, params: Optional[dict] = None) -> List[dict]:
"""
Sends a request to get information about images scans.
Args:
all_results (bool): whether to return all results or just the first page.
params (dict): query parameters.
Returns:
list[dict]: images scan information.
"""
if all_results:
return self._get_all_results(url_suffix="/images", params=params)
return self._http_request(method="GET", url_suffix="/images", params=params)
def get_hosts_scan_info(self, all_results: bool = False, params: Optional[dict] = None) -> List[dict]:
"""
Sends a request to get information about hosts scans.
Args:
all_results (bool): whether to return all results or just the first page.
params (dict): query parameters.
Returns:
list[dict]: hosts scan information.
"""
if all_results:
return self._get_all_results(url_suffix="/hosts", params=params)
return self._http_request(method="GET", url_suffix="/hosts", params=params)
def get_impacted_resources(self, cve: str, resource_type: str) -> dict:
"""
Get the impacted resources that are based on a specific CVE.
Args:
cve (str): The CVE from which impacted resources will be retrieved.
resource_type (str): ResourceType is the single resource type to return vulnerability data for.
Returns:
dict: the impacted resources from the CVE.
"""
params = {"cve": cve}
# When there is no specific resource then images and hosts will be returned if they exist
if resource_type:
params["resourceType"] = resource_type
return self._http_request(
method="GET", url_suffix="/stats/vulnerabilities/impacted-resources",
params=params
)
def get_waas_policies(self) -> dict:
"""
Get the current WAAS policy
Returns:
dict: the current policy.
"""
return self._http_request(
method="GET", url_suffix="policies/firewall/app/container"
)
def update_waas_policies(self, policy: dict) -> dict:
"""
Update the waas policy.
Args:
policy (dict): the previous waas policy.
Returns:
dict: the updated policy.
"""
return self._http_request(
method="PUT", url_suffix="policies/firewall/app/container", json_data=policy, resp_type="response", ok_codes=(200),
error_handler=lambda res: f"Error: {res.status_code} - {res.text}"
)
def get_firewall_audit_container_alerts(self, image_name: str, from_time: str, to_time: str, limit: int, audit_type: str):
"""
Get the container audit alerts for a specific image.
Args:
image_name (str): The container image name.
from_time (str): The start time of the query for alerts.
to_time (str): The end time to query alerts.
limit (num): the limit of alerts returned.
audit_type (str): the alert audit type.
Returns:
dict: the container alerts.
"""
params = {
"type": audit_type,
"imageName": image_name,
"from": from_time,
"to": to_time,
"limit": limit
}
return self._http_request(
method="GET", url_suffix="audits/firewall/app/container", params=params
)
def get_alert_profiles_request(self, project):
"""
Get the alert profiles.
Args:
project (str): The project name
Returns:
dict: the alert profiles
"""
params = assign_params(project=project)
headers = self._headers
return self._http_request('get', 'alert-profiles', headers=headers, params=params)
def get_settings_defender_request(self, hostname):
"""
Get the defender settings.
Returns:
dict: the defender settings
"""
headers = self._headers
params = assign_params(hostname=hostname)
return self._http_request('get', 'settings/defender', headers=headers, params=params)
def get_logs_defender_request(self, hostname, lines):
"""
Get the defender logs.
Returns:
list: the defender logs
"""
params = assign_params(hostname=hostname, lines=lines)
headers = self._headers
return self._http_request('get', 'logs/defender', params=params, headers=headers)
def get_backups_request(self, project):
"""
Get the defender backups.
Args:
project (str): The project name
Returns:
list: the defender backups
"""
params = assign_params(project=project)
headers = self._headers
return self._http_request('get', 'backups', headers=headers, params=params)
def get_logs_defender_download_request(self, hostname, lines):
"""
Download all logs for a certain defender
Args:
hostname (str): The hostname to get the logs for
lines (int): The number of logs to return
Returns:
list: the logs to download
"""
params = assign_params(hostname=hostname, lines=lines)
headers = self._headers
return self._http_request('get', 'logs/defender/download', params=params, headers=headers, resp_type="content")
def get_file_integrity_events(self, limit, sort, hostname=None, event_id=None, from_date=None,
to_date=None, search_term=None):
"""
Get runtime file integrity audit events
Args:
hostname (str): The hostname for which to get runtime file integrity events
Returns:
HTTP response
"""
endpoint = "audits/runtime/file-integrity"
headers = self._headers
params = {
"hostname": hostname,
"id": event_id,
"limit": limit,
"from": from_date,
"to": to_date,
"search": search_term,
"sort": "time",
"reverse": sort == "desc"
}
return self._http_request('get', endpoint, params=params, headers=headers)
def get_ci_scan_results(self, all_results: bool = False, params: Optional[dict] = None) -> List[dict]:
"""
Sends a request to get CI scan results information.
Returns:
list[dict]: CI scan results information.
"""
if all_results:
return self._get_all_results(url_suffix="/scans", params=params)
return self._http_request(method="GET", url_suffix="scans", params=params)
def get_trusted_images(self) -> dict:
"""
Sends a request to get trusted images information.
Returns:
list[dict]: trusted images information.
"""
return self._http_request(method="GET", url_suffix="trust/data")
def update_trusted_images(self, data: dict):
"""
Sends a request to update trusted images information.
"""
return self._http_request(method="PUT", url_suffix="trust/data", json_data=data, resp_type="response", ok_codes=(200,))
def get_container_scan_results(self, params: Optional[dict] = None) -> List[dict]:
"""
Sends a request to get container scan results information.
Returns:
list[dict]: container scan results information.
"""
return self._http_request(method="GET", url_suffix="containers", params=params)
def get_hosts_info(self, params: Optional[dict] = None) -> List[dict]:
"""
Sends a request to get hosts information.
Returns:
list[dict]: host information.
"""
return self._http_request(method="GET", url_suffix="hosts/info", params=params)
def get_runtime_container_audit_events(self, params: Optional[dict] = None) -> List[dict]:
"""
Sends a request to get runtime container audit events.
Returns:
list[dict]: runtime container audit events information.
"""
return self._http_request(method="GET", url_suffix="audits/runtime/container", params=params)
def archive_audit_incident(self, incident_id: str, data: Optional[str] = None) -> dict:
"""
Sends a request to archive audit incident.
Returns:
list[dict]: runtime container audit events information.
"""
suffix = f'audits/incidents/acknowledge/{incident_id}'
return self._http_request(method="patch", url_suffix=suffix, data=data, resp_type="response")
def get_runtime_host_audit_events(self, all_results: bool = False, params: Optional[dict] = None) -> List[dict]:
if all_results:
return self._get_all_results(url_suffix="audits/runtime/host", params=params)
return self._http_request(method="GET", url_suffix="audits/runtime/host", params=params)
def get_runtime_container_policy(self) -> dict:
return self._http_request(method="GET", url_suffix="policies/runtime/container")
def format_context(context):
"""
Format the context keys
"""
if context and isinstance(context, dict):
context = {pascalToSpace(key).replace(" ", ""): format_context(value) for key, value in context.items()}
elif context and isinstance(context, list):
context = [format_context(item) for item in context]
return context
def str_to_bool(s):
"""
Translates string representing boolean value into boolean value
"""
if s == 'True':
return True
elif s == 'False':
return False
else:
raise ValueError
def translate_severity(sev):
"""
Translates Prisma Cloud Compute alert severity into Demisto's severity score
"""
sev = sev.capitalize()
if sev == 'Critical':
return 4
elif sev in ['High', 'Important']:
return 3
elif sev == 'Medium':
return 2
elif sev == 'Low':
return 1
return 0
def camel_case_transformer(s):
"""
Converts a camel case string into space separated words starting with a capital letters
E.g. input: 'camelCase' output: 'Camel Case'
REMARK: the exceptions list below is returned uppercase, e.g. "cve" => "CVE"
"""
transformed_string = re.sub('([a-z])([A-Z])', r'\g<1> \g<2>', str(s))
if transformed_string in ['id', 'cve', 'arn']:
return transformed_string.upper()
return transformed_string.title()
def get_headers(name: str, data: list) -> list:
"""
Returns a list of headers to the given list of objects
If the list name is known (listed in the HEADERS_BY_NAME) it returns the list and checks for any additional headers
in the given list
Else returns the given headers from the given list
Args:
name: name of the list (e.g. vulnerabilities)
data: list of dicts
Returns: list of headers
"""
# check the list for any additional headers that might have been added
known_headers = HEADERS_BY_NAME.get(name)
headers = known_headers[:] if known_headers else []
if isinstance(data, list):
for d in data:
if isinstance(d, dict):
for key in d:
if key not in headers:
headers.append(key)
return headers
def test_module(client):
"""
Test connection, authentication and user authorization
Args:
client: Requests client
Returns:
'ok' if test passed, error from client otherwise
"""
client.test()
return 'ok'
@logger
def is_command_is_fetch():
"""
Rules wether the executed command is fetch_incidents or classifier
- If Last Run is set, then it's a fetch_incident command.
Otherwise, the results are dependent on the fetched_incidents_list section in integration context:
If it's empty, then it means that fetch_incidents already ran once and therefore it must be a classifier.
:return: True if this is a fetch_incidents command, otherwise return false.
:rtype: ``bool``
"""
ctx = demisto.getIntegrationContext()
if demisto.getLastRun() or ctx.get("unstuck", False):
return True
else:
return not ctx.get('fetched_incidents_list', [])
def fetch_incidents(client):
"""
Fetches new alerts from Prisma Cloud Compute and returns them as a list of Demisto incidents
- A markdown table will be added for alerts with a list object,
If the alert has a list under field "tableField", another field will be added to the
incident "tableFieldMarkdownTable" representing the markdown table
Args:
client: Prisma Compute client
Returns:
list of incidents
"""
if is_command_is_fetch():
demisto.debug("is_command_is_fetch = true, calling list_incidents")
alerts = client.list_incidents()
incidents = []
if alerts:
for a in alerts:
alert_type = a.get('kind')
name = ALERT_TITLE
severity = 0
# fix the audit category from camel case to display properly
if alert_type == ALERT_TYPE_AUDIT:
a['category'] = camel_case_transformer(a.get('category'))
# always save the raw JSON data under this argument (used in scripts)
a['rawJSONAlert'] = json.dumps(a)
# parse any list into a markdown table, since tableToMarkdown takes the headers from the first object in
# the list check headers manually since some entries might have omit empty fields
tables = {}
for key, value in a.items():
# check only if we got a non empty list of dict
if isinstance(value, list) and value and isinstance(value[0], dict):
tables[key + 'MarkdownTable'] = tableToMarkdown(camel_case_transformer(key + ' table'),
value,
headers=get_headers(key, value),
headerTransform=camel_case_transformer,
removeNull=True)
a.update(tables)
if alert_type == ALERT_TYPE_VULNERABILITY:
# E.g. "Prisma Cloud Compute Alert - imageName Vulnerabilities"
name += a.get('imageName') + ' Vulnerabilities'
# Set the severity to the highest vulnerability, take the first from the list
severity = translate_severity(a.get('vulnerabilities')[0].get('severity'))
elif alert_type in (ALERT_TYPE_COMPLIANCE, ALERT_TYPE_AUDIT):
# E.g. "Prisma Cloud Compute Alert - Incident"
name += camel_case_transformer(a.get('type'))
# E.g. "Prisma Cloud Compute Alert - Image Compliance" \ "Prisma Compute Alert - Host Runtime Audit"
if a.get('type') != "incident":
name += ' ' + camel_case_transformer(alert_type)
else:
# E.g. "Prisma Cloud Compute Alert - Cloud Discovery"
name += camel_case_transformer(alert_type)
incidents.append({
'name': name,
'occurred': a.get('time'),
'severity': severity,
'rawJSON': json.dumps(a)
})
demisto.debug("Setting last run to 'id': 'a'")
demisto.setLastRun({"id": "a"})
ctx = demisto.getIntegrationContext()
demisto.debug(f"Integration Context before update = {ctx}")
incidents_to_update = incidents or ctx.get('fetched_incidents_list')
ctx.update({'fetched_incidents_list': incidents_to_update})
ctx["unstuck"] = False
demisto.setIntegrationContext(ctx)
demisto.debug(f"Integration Context after update = {ctx}")
return incidents
else:
ctx = demisto.getIntegrationContext().get('fetched_incidents_list', [])
demisto.debug(f"Integration Context (is_command_is_fetch = false) = {ctx}")
return ctx
def parse_limit_and_offset_values(limit: str, offset: str = "0") -> tuple[int, int]:
"""
Parse the offset and limit parameters to integers and verify that the offset/limit are valid.
Args:
limit (str): limit argument.
offset (str): offset argument.
Returns:
Tuple[int, int]: parsed offset and parsed limit
"""
limit, offset = arg_to_number(arg=limit, arg_name="limit"), arg_to_number(arg=offset, arg_name="offset")
assert offset is not None
assert offset >= 0, f"offset {offset} is invalid, scope >= 0"
assert limit is not None
assert 0 < limit <= MAX_API_LIMIT, f"limit {limit} is invalid, scope = 1-50"
return limit, offset
def parse_date_string_format(date_string: str, new_format: str = "%B %d, %Y %H:%M:%S %p") -> str:
"""
Parses a date string format to a different date string format.
Args:
date_string (str): the date in string representation.
new_format (str): the new requested format for the date string.
Returns:
str: date as a new format, in case of a failure returns the original date string.
"""
try:
parsed_date = dateparser.parse(date_string=date_string) # type: ignore
return parsed_date.strftime(new_format) # type: ignore
except AttributeError:
return date_string
def epochs_to_timestamp(epochs: int, date_format: str = "%B %d, %Y %H:%M:%S %p") -> str:
"""
Converts epochs time representation to a new string date format.
Args:
epochs (int): time in epochs (seconds)
date_format (str): the desired format that the timestamp will be.
Returns:
str: timestamp in the new format, empty string in case of a failure
"""
try:
return datetime.utcfromtimestamp(epochs).strftime(date_format)
except TypeError:
return ""
def filter_api_response(api_response: Optional[list], limit: int, offset: int = 0) -> Optional[list]:
"""
Filter the api response according to the offset/limit.
Args:
api_response (list): api response from an endpoint.
offset (int): the offset from which to begin listing the response.
limit (int): the maximum limit of records in the response to fetch.
Returns:
list: api filtered response, None in case the api response is empty
"""
if not api_response:
return api_response
start = min(offset, len(api_response))
end = min(offset + limit, len(api_response))
return api_response[start:end]
def get_hostname_description_info(host_info: dict) -> dict:
"""
Get the hostname description information.
Args:
host_info (dict): host's information from the api.
Returns:
dict: host description information.
"""
if (labels := host_info.get("labels")) and len(labels) == 2:
dist = labels[0].replace("osDistro:", "") + " " + labels[1].replace("osVersion:", "")
else:
dist = ""
return {
"Hostname": host_info.get("_id"),
"Distribution": dist,
"Collections": host_info.get("collections")
}
def get_profile_host_list(client: PrismaCloudComputeClient, args: dict) -> CommandResults:
"""
Get information about the hosts and their profile events.
Implement the command 'prisma-cloud-compute-profile-host-list'
Args:
client (PrismaCloudComputeClient): prisma-cloud-compute client.
args (dict): prisma-cloud-compute-profile-host-list command arguments.
Returns:
CommandResults: command-results object.
"""
if "hostname" in args:
args["hostName"] = args.pop("hostname")
args["limit"], args["offset"] = parse_limit_and_offset_values(
limit=args.get("limit", "15"), offset=args.get("offset", "0")
)
if hosts_profile_info := client.get_host_profiles(params=assign_params(**args)):
for host_profile in hosts_profile_info:
for event in host_profile.get("sshEvents", []):
if "ip" in event:
# transforms ip as integer representation to ip as string representation
event["ip"] = str(ipaddress.IPv4Address(event.get("ip")))
if "time" in event:
event["time"] = parse_date_string_format(date_string=host_profile.get("time", ""))
# loginTime is in unix format
if "loginTime" in event:
event["loginTime"] = epochs_to_timestamp(epochs=event.get("loginTime"))
if len(hosts_profile_info) == 1: # means we have only one host
host_info = hosts_profile_info[0]
host_description_table = tableToMarkdown(
name="Host Description",
t=get_hostname_description_info(host_info=host_info),
headers=["Hostname", "Distribution", "Collections"],
removeNull=True
)
apps_table = tableToMarkdown(
name="Apps",
t=[
{
"AppName": app.get("name"),
"StartupProcess": app.get("startupProcess").get("path"),
"User": app.get("startupProcess").get("user"),
"LaunchTime": parse_date_string_format(date_string=app.get("startupProcess").get("time"))
} for app in host_info.get("apps", [])
],
headers=["AppName", "StartupProcess", "User", "LaunchTime"],
removeNull=True
)
ssh_events_table = tableToMarkdown(
name="SSH Events",
t=[
{
"User": event.get("user"),
"Ip": event.get("ip"),
"ProcessPath": event.get("path"),
"Command": event.get("command"),
"Time": event.get("time")
} for event in host_info.get("sshEvents", [])
],
headers=["User", "Ip", "ProcessPath", "Command", "Time"],
removeNull=True
)
table = host_description_table + apps_table + ssh_events_table
else:
table = tableToMarkdown(
name="Host Description",
t=[get_hostname_description_info(host_info=host_info) for host_info in hosts_profile_info],
headers=["Hostname", "Distribution", "Collections"],
removeNull=True
)
else:
table = "No results found."
return CommandResults(
outputs_prefix="PrismaCloudCompute.ProfileHost",
outputs_key_field="_id",
outputs=hosts_profile_info,
readable_output=table,
raw_response=hosts_profile_info
)
def get_container_description_info(container_info: dict) -> dict:
"""
Get the container description information.
Args:
container_info (dict): container information from the api.
Returns:
dict: container description information.
"""
return {
"ContainerID": container_info.get("_id"),
"Image": container_info.get("image"),
"Os": container_info.get("os"),
"State": container_info.get("state"),
"Created": parse_date_string_format(date_string=container_info.get("created", "")),
"EntryPoint": container_info.get("entrypoint")
}
def get_container_profile_list(client: PrismaCloudComputeClient, args: dict) -> CommandResults:
"""
Get information about the containers and their profile events.
Implement the command 'prisma-cloud-compute-profile-container-list'
Args:
client (PrismaCloudComputeClient): prisma-cloud-compute client.
args (dict): prisma-cloud-compute-profile-container-list command arguments.
Returns:
CommandResults: command-results object.
"""
if "image_id" in args:
args["imageID"] = args.pop("image_id")
args["limit"], args["offset"] = parse_limit_and_offset_values(
limit=args.get("limit", "15"), offset=args.get("offset", "0")
)
if containers_info := client.get_container_profiles(params=assign_params(**args)):
container_description_headers = ["ContainerID", "Image", "Os", "State", "Created", "EntryPoint"]
if len(containers_info) == 1: # means we have only one container
container_info = containers_info[0]
container_description_table = tableToMarkdown(
name="Container Description",
t=get_container_description_info(container_info=container_info),
headers=container_description_headers,
removeNull=True
)
processes_table = tableToMarkdown(
name="Processes",
t=[
{
"Type": process_type,
"Md5": process.get("md5"),
"Path": process.get("path"),