-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
ServiceNowv2.py
3080 lines (2535 loc) · 121 KB
/
ServiceNowv2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import re
import demistomock as demisto # noqa: F401
from CommonServerPython import * # noqa: F401
import shutil
from typing import Callable, Dict, Iterable, List, Tuple
import mimetypes
# disable insecure warnings
import urllib3
urllib3.disable_warnings()
INCIDENT = 'incident'
SIR_INCIDENT = 'sn_si_incident'
COMMAND_NOT_IMPLEMENTED_MSG = 'Command not implemented'
DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
DATE_FORMAT_OPTIONS = {
'MM-dd-yyyy': '%m-%d-%Y %H:%M:%S',
'dd/MM/yyyy': '%d/%m/%Y %H:%M:%S',
'dd-MM-yyyy': '%d-%m-%Y %H:%M:%S',
'dd.MM.yyyy': '%d.%m.%Y %H:%M:%S',
'yyyy-MM-dd': '%Y-%m-%d %H:%M:%S'
}
TICKET_STATES = {
'incident': {
'1': '1 - New',
'2': '2 - In Progress',
'3': '3 - On Hold',
'4': '4 - Awaiting Caller',
'5': '5 - Awaiting Evidence',
'6': '6 - Resolved',
'7': '7 - Closed',
'8': '8 - Canceled'
},
'problem': {
'1': '1 - Open',
'2': '2 - Known Error',
'3': '3 - Pending Change',
'4': '4 - Closed/Resolved'
},
'change_request': {
'-5': '-5 - New',
'-4': '-4 - Assess',
'-3': '-3 - Authorize',
'-2': '-2 - Scheduled',
'-1': '-1 - Implement',
'0': '0 - Review',
'3': '3 - Closed',
'4': '4 - Canceled'
},
'sc_task': {
'-5': '-5 - Pending',
'1': '1 - Open',
'2': '2 - Work In Progress',
'3': '3 - Closed Complete',
'4': '4 - Closed Incomplete',
'7': '7 - Closed Skipped'
},
'sc_request': {
'1': '1 - Approved',
'3': '3 - Closed',
'4': '4 - Rejected'
},
SIR_INCIDENT: {
'3': 'Closed',
'7': 'Cancelled',
'10': 'Draft',
'16': 'Analysis',
'18': 'Contain',
'19': 'Eradicate'
}
}
TICKET_TYPE_TO_CLOSED_STATE = {INCIDENT: '7',
'problem': '4',
'change_request': '3',
'sc_task': '3',
'sc_request': '3',
SIR_INCIDENT: '3'}
TICKET_APPROVAL = {
'sc_req_item': {
'waiting_for_approval': 'Waiting for approval',
'approved': 'Approved',
'requested': 'Requested',
'rejected': 'Rejected',
'not requested': 'Not Yet Requested'
}
}
TICKET_PRIORITY = {
'1': '1 - Critical',
'2': '2 - High',
'3': '3 - Moderate',
'4': '4 - Low',
'5': '5 - Planning'
}
TICKET_IMPACT = {
'1': '1 - Enterprise',
'2': '2 - Region / Market',
'3': '3 - Ministry',
'4': '4 - Department / Function',
'5': '5 - Caregiver'
}
BUSINESS_IMPACT = {
'1': '1 - Critical',
'2': '2 - High',
'3': '3 - Non-Critical'
}
SNOW_ARGS = ['active', 'activity_due', 'opened_at', 'short_description', 'additional_assignee_list', 'approval_history',
'approval', 'approval_set', 'assigned_to', 'assignment_group',
'business_duration', 'business_service', 'business_stc', 'change_type', 'category', 'caller',
'calendar_duration', 'calendar_stc', 'caller_id', 'caused_by', 'close_code', 'close_notes',
'closed_at', 'closed_by', 'cmdb_ci', 'comments', 'comments_and_work_notes', 'company', 'contact_type',
'correlation_display', 'correlation_id', 'delivery_plan', 'delivery_task', 'description', 'due_date',
'expected_start', 'follow_up', 'group_list', 'hold_reason', 'impact', 'incident_state',
'knowledge', 'location', 'made_sla', 'notify', 'order', 'parent', 'parent_incident', 'priority',
'problem_id', 'reassignment_count', 'reopen_count', 'resolved_at', 'resolved_by', 'rfc',
'severity', 'sla_due', 'state', 'subcategory', 'sys_tags', 'sys_updated_by', 'sys_updated_on',
'time_worked', 'title', 'type', 'urgency', 'user_input', 'watch_list', 'work_end', 'work_notes',
'work_notes_list', 'work_start', 'business_criticality', 'risk_score']
SIR_OUT_FIELDS = ['attack_vector', 'affected_user', 'change_request', 'incident', 'parent_security_incident',
'substate']
# Every table in ServiceNow should have those fields
DEFAULT_RECORD_FIELDS = {
'sys_id': 'ID',
'sys_updated_by': 'UpdatedBy',
'sys_updated_on': 'UpdatedAt',
'sys_created_by': 'CreatedBy',
'sys_created_on': 'CreatedAt'
}
MIRROR_DIRECTION = {
'None': None,
'Incoming': 'In',
'Outgoing': 'Out',
'Incoming And Outgoing': 'Both'
}
def arg_to_timestamp(arg: Any, arg_name: str, required: bool = False) -> int:
"""
Converts an XSOAR argument to a timestamp (seconds from epoch).
This function is used to quickly validate an argument provided to XSOAR
via ``demisto.args()`` into an ``int`` containing a timestamp (seconds
since epoch). It will throw a ValueError if the input is invalid.
If the input is None, it will throw a ValueError if required is ``True``,
or ``None`` if required is ``False``.
Args:
arg: argument to convert
arg_name: argument name.
required: throws exception if ``True`` and argument provided is None
Returns:
returns an ``int`` containing a timestamp (seconds from epoch) if conversion works
returns ``None`` if arg is ``None`` and required is set to ``False``
otherwise throws an Exception
"""
if arg is None:
if required is True:
raise ValueError(f'Missing "{arg_name}"')
if isinstance(arg, str) and arg.isdigit():
# timestamp is a str containing digits - we just convert it to int
return int(arg)
if isinstance(arg, str):
# we use dateparser to handle strings either in ISO8601 format, or
# relative time stamps.
# For example: format 2019-10-23T00:00:00 or "3 days", etc
date = dateparser.parse(arg, settings={'TIMEZONE': 'UTC'})
if date is None:
# if d is None it means dateparser failed to parse it
raise ValueError(f'Invalid date: {arg_name}')
return int(date.timestamp())
if isinstance(arg, (int, float)):
# Convert to int if the input is a float
return int(arg)
raise ValueError(f'Invalid date: "{arg_name}"')
def get_server_url(server_url: str) -> str:
url = server_url
url = re.sub('/[/]+$/', '', url)
url = re.sub('/$', '', url)
return url
def get_item_human_readable(data: dict) -> dict:
"""Get item human readable.
Args:
data: item data.
Returns:
item human readable.
"""
item = {
'ID': data.get('sys_id', ''),
'Name': data.get('name', ''),
'Description': data.get('short_description', ''),
'Price': data.get('price', ''),
'Variables': []
}
variables = data.get('variables')
if variables and isinstance(variables, list):
for var in variables:
if var:
pretty_variables = {
'Question': var.get('label', ''),
'Type': var.get('display_type', ''),
'Name': var.get('name', ''),
'Mandatory': var.get('mandatory', '')
}
item['Variables'].append(pretty_variables)
return item
def create_ticket_context(data: dict, additional_fields: list | None = None) -> Any:
"""Create ticket context.
Args:
data: ticket data.
additional_fields: additional fields to extract from the ticket
Returns:
ticket context.
"""
context = {
'ID': data.get('sys_id'),
'Summary': data.get('short_description'),
'Number': data.get('number'),
'CreatedOn': data.get('sys_created_on'),
'Active': data.get('active'),
'AdditionalComments': data.get('comments'),
'CloseCode': data.get('close_code'),
'OpenedAt': data.get('opened_at')
}
if additional_fields:
for additional_field in additional_fields:
if camelize_string(additional_field) not in context.keys():
# in case of a nested additional field (in the form of field1.field2)
nested_additional_field_list = additional_field.split('.')
if value := dict_safe_get(data, nested_additional_field_list):
context[additional_field] = value
# These fields refer to records in the database, the value is their system ID.
closed_by = data.get('closed_by')
if closed_by:
if isinstance(closed_by, dict):
context['ResolvedBy'] = closed_by.get('value', '')
else:
context['ResolvedBy'] = closed_by
opened_by = data.get('opened_by')
if opened_by:
if isinstance(opened_by, dict):
context['OpenedBy'] = opened_by.get('value', '')
context['Creator'] = opened_by.get('value', '')
else:
context['OpenedBy'] = opened_by
context['Creator'] = opened_by
assigned_to = data.get('assigned_to')
if assigned_to:
if isinstance(assigned_to, dict):
context['Assignee'] = assigned_to.get('value', '')
else:
context['Assignee'] = assigned_to
# Try to map fields
priority = data.get('priority')
if priority:
if isinstance(priority, dict):
context['Priority'] = TICKET_PRIORITY.get(str(int(priority.get('value', ''))),
str(int(priority.get('value', '')))),
else:
context['Priority'] = TICKET_PRIORITY.get(priority, priority)
state = data.get('state')
if state:
context['State'] = state
return createContext(context, removeNull=True)
def get_ticket_context(data: Any, additional_fields: list | None = None) -> Any:
"""Manager of ticket context creation.
Args:
data: ticket data. in the form of a dict or a list of dict.
additional_fields: additional fields to extract from the ticket
Returns:
ticket context. in the form of a dict or a list of dict.
"""
if not isinstance(data, list):
return create_ticket_context(data, additional_fields)
tickets = []
for d in data:
tickets.append(create_ticket_context(d, additional_fields))
return tickets
def get_ticket_human_readable(tickets, ticket_type: str, additional_fields: list | None = None) -> list:
"""Get ticket human readable.
Args:
tickets: tickets data. in the form of a dict or a list of dict.
ticket_type: ticket type.
additional_fields: additional fields to extract from the ticket
Returns:
ticket human readable.
"""
if not isinstance(tickets, list):
tickets = [tickets]
ticket_severity = {
'1': '1 - High',
'2': '2 - Medium',
'3': '3 - Low'
}
result = []
for ticket in tickets:
hr = {
'Number': ticket.get('number'),
'System ID': ticket.get('sys_id'),
'Created On': ticket.get('sys_created_on'),
'Created By': ticket.get('sys_created_by'),
'Active': ticket.get('active'),
'Close Notes': ticket.get('close_notes'),
'Close Code': ticket.get('close_code'),
'Description': ticket.get('description'),
'Opened At': ticket.get('opened_at'),
'Due Date': ticket.get('due_date'),
# This field refers to a record in the database, the value is its system ID.
'Resolved By': ticket.get('closed_by', {}).get('value') if isinstance(ticket.get('closed_by'), dict)
else ticket.get('closed_by'),
'Resolved At': ticket.get('resolved_at'),
'SLA Due': ticket.get('sla_due'),
'Short Description': ticket.get('short_description'),
'Additional Comments': ticket.get('comments')
}
# Try to map the fields
impact = ticket.get('impact', '')
if impact:
hr['Impact'] = ticket_severity.get(impact, impact)
urgency = ticket.get('urgency', '')
if urgency:
hr['Urgency'] = ticket_severity.get(urgency, urgency)
severity = ticket.get('severity', '')
if severity:
hr['Severity'] = ticket_severity.get(severity, severity)
priority = ticket.get('priority', '')
if priority:
hr['Priority'] = TICKET_PRIORITY.get(priority, priority)
state = ticket.get('state', '')
if state:
mapped_state = state
if ticket_type in TICKET_STATES:
mapped_state = TICKET_STATES[ticket_type].get(state, mapped_state)
hr['State'] = mapped_state
approval = ticket.get('approval', '')
if approval:
mapped_approval = approval
if ticket_type in TICKET_APPROVAL:
mapped_approval = TICKET_APPROVAL[ticket_type].get(ticket.get('approval'), mapped_approval)
# Approval will be added to the markdown only in the necessary ticket types
hr['Approval'] = mapped_approval
if additional_fields:
for additional_field in additional_fields:
# in case of a nested additional field (in the form of field1.field2)
nested_additional_field_list = additional_field.split('.')
hr[additional_field] = dict_safe_get(ticket, nested_additional_field_list)
result.append(hr)
return result
def get_ticket_fields(args: dict, template_name: dict = {}, ticket_type: str = '') -> dict:
"""Inverse the keys and values of those dictionaries
to map the arguments to their corresponding values in ServiceNow.
Args:
args: Demisto args
template_name: ticket template name
ticket_type: ticket type
Returns:
ticket fields.
"""
ticket_severity = {
'1': '1 - High',
'2': '2 - Medium',
'3': '3 - Low'
}
inv_severity = {v: k for k, v in ticket_severity.items()}
inv_priority = {v: k for k, v in TICKET_PRIORITY.items()}
inv_business_impact = {v: k for k, v in BUSINESS_IMPACT.items()}
states = TICKET_STATES.get(ticket_type)
inv_states = {v: k for k, v in states.items()} if states else {}
approval = TICKET_APPROVAL.get(ticket_type)
inv_approval = {v: k for k, v in approval.items()} if approval else {}
fields_to_clear = argToList(
args.get('clear_fields', [])) # This argument will contain fields to allow their value empty
# This is for updating null fields for update_remote_system function for example: assigned_to.
for arg in args.keys():
if not args[arg]:
fields_to_clear.append(arg)
demisto.debug(f'Fields to clear {fields_to_clear}')
ticket_fields = {}
for arg in SNOW_ARGS:
input_arg = args.get(arg)
if arg in fields_to_clear:
if input_arg:
raise DemistoException(f"Could not set a value for the argument '{arg}' and add it to the clear_fields. \
You can either set or clear the field value.")
ticket_fields[arg] = ""
elif input_arg:
if arg in ['impact', 'urgency', 'severity']:
ticket_fields[arg] = inv_severity.get(input_arg, input_arg)
elif arg == 'priority':
ticket_fields[arg] = inv_priority.get(input_arg, input_arg)
elif arg == 'state':
ticket_fields[arg] = inv_states.get(input_arg, input_arg)
elif arg == 'approval':
ticket_fields[arg] = inv_approval.get(input_arg, input_arg)
elif arg == 'change_type':
# this change is required in order to use type 'Standard' as well.
ticket_fields['type'] = input_arg
elif arg == 'business_criticality':
ticket_fields[arg] = inv_business_impact.get(input_arg, input_arg)
else:
ticket_fields[arg] = input_arg
elif template_name and arg in template_name:
ticket_fields[arg] = template_name[arg]
return ticket_fields
def generate_body(fields: dict = {}, custom_fields: dict = {}) -> dict:
"""Generates a body from fields and custom fields.
Args:
fields: fields data.
custom_fields: custom fields data.
Returns:
body object for SNOW requests.
"""
body = {}
if fields:
for field in fields:
body[field] = fields[field]
if custom_fields:
for field in custom_fields:
# custom fields begin with "u_"
if field.startswith('u_'):
body[field] = custom_fields[field]
else:
body['u_' + field] = custom_fields[field]
return body
def split_fields(fields: str = '', delimiter: str = ';') -> dict:
"""Split str fields of Demisto arguments to SNOW request fields by the char ';'.
Args:
fields: fields in a string representation.
delimiter: the delimiter to use to separate the fields.
Returns:
dic_fields object for SNOW requests.
"""
dic_fields = {}
if fields:
if '=' not in fields:
raise Exception(
f"The argument: {fields}.\nmust contain a '=' to specify the keys and values. e.g: key=val.")
arr_fields = fields.split(delimiter)
for f in arr_fields:
field = f.split('=', 1) # a field might include a '=' sign in the value. thus, splitting only once.
if len(field) > 1:
dic_fields[field[0]] = field[1]
return dic_fields
def split_notes(raw_notes, note_type, time_info):
notes: List = []
# The notes should be in this form:
# '16/05/2023 15:49:56 - John Doe (Additional comments)\nsecond note first line\n\nsecond line\n\nthird
# line\n\n2023-05-10 15:41:38 - פלוני אלמוני (Additional comments)\nfirst note first line\n\nsecond line\n\n
delimiter = '([0-9]{1,4}(?:\/|-)[0-9]{1,2}(?:\/|-)[0-9]{1,4}.*\((?:Additional comments|Work notes)\))'
notes_split = list(filter(None, re.split(delimiter, raw_notes)))
for note_info, note_value in zip(notes_split[::2], notes_split[1::2]):
created_on, _, created_by = note_info.partition(" - ")
created_by = created_by.split(' (')[0]
if not created_on or not created_by:
raise Exception(f'Failed to extract the required information from the following note: {note_info} - {note_value}')
# convert note creation time to UTC
try:
display_date_format = time_info.get('display_date_format')
created_on_UTC = datetime.strptime(created_on, display_date_format) + time_info.get('timezone_offset')
except ValueError as e:
raise Exception(f'Failed to convert {created_on} to a datetime object. Error: {e}')
if time_info.get('filter') and created_on_UTC < time_info.get('filter'):
# If a time_filter was passed and the note was created before this time, do not return it.
demisto.debug(f'Using time filter: {time_info.get("filter")}. Not including note: {note_info} - {note_value}.')
continue
note_dict = {
"sys_created_on": created_on_UTC.strftime(DATE_FORMAT),
"value": note_value.strip(),
"sys_created_by": created_by,
"element": note_type
}
notes.append(note_dict)
return notes
def convert_to_notes_result(full_response, time_info):
"""
Converts the response of a ticket to the response format when making a query for notes only.
"""
if not full_response or 'result' not in full_response or not full_response.get('result'):
return []
timezone_offset = get_timezone_offset(full_response, time_info.get('display_date_format'))
time_info['timezone_offset'] = timezone_offset
all_notes = []
raw_comments = full_response.get('result', {}).get('comments', {}).get('display_value', '')
if raw_comments:
comments = split_notes(raw_comments, 'comments', time_info=time_info)
all_notes.extend(comments)
raw_work_notes = full_response.get('result', {}).get('work_notes', {}).get('display_value', '')
if raw_work_notes:
work_notes = split_notes(raw_work_notes, 'work_notes', time_info=time_info)
all_notes.extend(work_notes)
return {'result': all_notes}
class Client(BaseClient):
"""
Client to use in the ServiceNow integration. Overrides BaseClient.
"""
def __init__(self, server_url: str, sc_server_url: str, cr_server_url: str, username: str,
password: str, verify: bool, fetch_time: str, sysparm_query: str,
sysparm_limit: int, timestamp_field: str, ticket_type: str, get_attachments: bool,
incident_name: str, oauth_params: dict | None = None, version: str | None = None, look_back: int = 0,
use_display_value: bool = False, display_date_format: str = ''):
"""
Args:
server_url: SNOW server url
sc_server_url: SNOW Service Catalog url
cr_server_url: SNOW Change Management url
username: SNOW username
password: SNOW password
oauth_params: (optional) the parameters for the ServiceNowClient that should be used to create an
access token when using OAuth2 authentication.
verify: whether to verify the request
fetch_time: first time fetch for fetch_incidents
sysparm_query: system query
sysparm_limit: system limit
timestamp_field: timestamp field for fetch_incidents
ticket_type: default ticket type
get_attachments: whether to get ticket attachments by default
incident_name: the ServiceNow ticket field to be set as the incident name
look_back: defines how much backwards (minutes) should we go back to try to fetch incidents.
"""
oauth_params = oauth_params if oauth_params else {}
self._base_url = server_url
self._sc_server_url = sc_server_url
self._cr_server_url = cr_server_url
self._version = version
self._verify = verify
self._username = username
self._password = password
self._proxies = handle_proxy(proxy_param_name='proxy', checkbox_default_value=False)
self.use_oauth = True if oauth_params else False
self.fetch_time = fetch_time
self.timestamp_field = timestamp_field
self.ticket_type = ticket_type
self.get_attachments = get_attachments
self.incident_name = incident_name
self.sys_param_query = sysparm_query
self.sys_param_limit = sysparm_limit
self.sys_param_offset = 0
self.look_back = look_back
self.use_display_value = use_display_value
self.display_date_format = DATE_FORMAT_OPTIONS.get(display_date_format)
if self.use_display_value:
assert self.display_date_format, 'A display date format must be selected in the instance configuration when ' \
'using the `Use Display Value` option.'
if self.use_oauth: # if user selected the `Use OAuth` checkbox, OAuth2 authentication should be used
self.snow_client: ServiceNowClient = ServiceNowClient(credentials=oauth_params.get('credentials', {}),
use_oauth=self.use_oauth,
client_id=oauth_params.get('client_id', ''),
client_secret=oauth_params.get('client_secret', ''),
url=oauth_params.get('url', ''),
verify=oauth_params.get('verify', False),
proxy=oauth_params.get('proxy', False),
headers=oauth_params.get('headers', ''))
else:
self._auth = (self._username, self._password)
def generic_request(self, method: str, path: str, body: Optional[Dict] = None, headers: Optional[Dict] = None,
sc_api: bool = False, cr_api: bool = False):
"""Generic request to ServiceNow api.
Args:
(Required Arguments)
method (str) required: The HTTP method, for example, GET, POST, and so on.
path (str) required: The API endpoint.
(Optional Arguments)
body (dict): The body to send in a 'POST' request. Default is None.
header (dict): requests headers. Default is None.
sc_api: Whether to send the request to the Service Catalog API
cr_api: Whether to send the request to the Change Request REST API
Returns:
Resposne object or Exception
"""
return self.send_request(path, method, body, headers=headers, sc_api=sc_api, cr_api=cr_api)
def send_request(self, path: str, method: str = 'GET', body: dict | None = None, params: dict | None = None,
headers: dict | None = None, file=None, sc_api: bool = False, cr_api: bool = False,
no_record_found_res: dict = {'result': []}):
"""Generic request to ServiceNow.
Args:
path: API path
method: request method
body: request body
params: request params
headers: request headers
file: request file
sc_api: Whether to send the request to the Service Catalog API
cr_api: Whether to send the request to the Change Request REST API
Returns:
response from API
"""
body = body if body is not None else {}
params = params if params is not None else {}
if sc_api:
url = f'{self._sc_server_url}{path}'
elif cr_api:
url = f'{self._cr_server_url}{path}'
else:
url = f'{self._base_url}{path}'
if not headers:
headers = {
'Accept': 'application/json',
'Content-Type': 'application/json'
}
max_retries = 3
num_of_tries = 0
while num_of_tries < max_retries:
if file:
# Not supported in v2
url = url.replace('/v2', '/v1')
try:
file_entry = file['id']
file_name = file['name']
shutil.copy(demisto.getFilePath(file_entry)['path'], file_name)
with open(file_name, 'rb') as f:
file_info = (file_name, f, self.get_content_type(file_name))
if self.use_oauth:
access_token = self.snow_client.get_access_token()
headers.update({
'Authorization': f'Bearer {access_token}'
})
res = requests.request(method, url, headers=headers, data=body, params=params,
files={'file': file_info}, verify=self._verify, proxies=self._proxies)
else:
res = requests.request(method, url, headers=headers, data=body, params=params,
files={'file': file_info}, auth=self._auth,
verify=self._verify, proxies=self._proxies)
shutil.rmtree(demisto.getFilePath(file_entry)['name'], ignore_errors=True)
except Exception as err:
raise Exception('Failed to upload file - ' + str(err))
else:
if self.use_oauth:
access_token = self.snow_client.get_access_token()
headers.update({
'Authorization': f'Bearer {access_token}'
})
res = requests.request(method, url, headers=headers, data=json.dumps(body) if body else {},
params=params, verify=self._verify, proxies=self._proxies)
else:
res = requests.request(method, url, headers=headers, data=json.dumps(body) if body else {},
params=params, auth=self._auth, verify=self._verify, proxies=self._proxies)
if "Instance Hibernating page" in res.text:
raise DemistoException(
"A connection was established but the instance is in hibernate mode.\n"
"Please wake your instance and try again.")
try:
json_res = res.json()
except Exception as err:
if res.status_code == 201:
return "The ticket was successfully created."
if not res.content:
return ''
raise Exception(f'Error parsing reply - {str(res.content)} - {str(err)}')
if 'error' in json_res:
error = json_res.get('error', {})
if res.status_code == 401:
demisto.debug(f'Got status code 401 - {json_res}. Retrying ...')
else:
if isinstance(error, dict):
message = json_res.get('error', {}).get('message')
details = json_res.get('error', {}).get('detail')
if message == 'No Record found':
return no_record_found_res
else:
raise Exception(f'ServiceNow Error: {message}, details: {details}')
else:
raise Exception(f'ServiceNow Error: {error}')
if res.status_code < 200 or res.status_code >= 300:
if res.status_code != 401 or num_of_tries == (max_retries - 1):
raise Exception(
f'Got status code {str(res.status_code)} with url {url} with body {str(res.content)}'
f' with headers {str(res.headers)}')
else:
break
num_of_tries += 1
return json_res
def get_content_type(self, file_name):
"""Get the correct content type for the POST request.
Args:
file_name: file name
Returns:
the content type - image with right type for images , and general for other types..
"""
file_type = None
if not file_name:
demisto.debug("file name was not supllied, uploading with general type")
else:
file_type, _ = mimetypes.guess_type(file_name)
return file_type or '*/*'
def get_table_name(self, ticket_type: str = '') -> str:
"""Get the relevant table name from th client.
Args:
ticket_type: ticket type
Returns:
the ticket_type if given or the client ticket type
"""
if ticket_type:
return ticket_type
return self.ticket_type
def get_template(self, template_name: str) -> dict:
"""Get a ticket by sending a GET request.
Args:
template_name: ticket template name
Returns:
the ticket template
"""
query_params = {'sysparm_limit': 1, 'sysparm_query': f'name={template_name}'}
result = self.send_request('table/sys_template', 'GET', params=query_params)
if len(result['result']) == 0:
raise ValueError("Incorrect template name.")
template = result['result'][0].get('template', '').split('^')
dic_template = {}
for i in range(len(template) - 1):
template_value = template[i].split('=')
if len(template_value) > 1:
dic_template[template_value[0]] = template_value[1]
return dic_template
def get_ticket_attachments(self, ticket_id: str, sys_created_on: Optional[str] = None) -> dict:
"""Get ticket attachments by sending a GET request.
Args:
ticket_id: ticket id
sys_created_on: string, when the attachment was created
Returns:
Response from API.
"""
query = f'table_sys_id={ticket_id}'
if sys_created_on:
query += f'^sys_created_on>{sys_created_on}'
return self.send_request('attachment', 'GET', params={'sysparm_query': query})
def get_ticket_attachment_entries(self, ticket_id: str, sys_created_on: Optional[str] = None) -> list:
"""Get ticket attachments, including file attachments
by sending a GET request and using the get_ticket_attachments class function.
Args:
ticket_id: ticket id
sys_created_on: string, when the attachment was created
Returns:
Array of attachments entries.
"""
entries = []
links = [] # type: List[Tuple[str, str]]
headers = {
'Accept': 'application/json',
'Content-Type': 'application/json'
}
attachments_res = self.get_ticket_attachments(ticket_id, sys_created_on)
if 'result' in attachments_res and len(attachments_res['result']) > 0:
attachments = attachments_res['result']
links = [(attachment.get('download_link', ''), attachment.get('file_name', ''))
for attachment in attachments]
for link in links:
if self.use_oauth:
access_token = self.snow_client.get_access_token()
headers.update({'Authorization': f'Bearer {access_token}'})
file_res = requests.get(link[0], headers=headers, verify=self._verify, proxies=self._proxies)
else:
file_res = requests.get(link[0], auth=(self._username, self._password), verify=self._verify,
proxies=self._proxies)
if file_res is not None:
entries.append(fileResult(link[1], file_res.content))
return entries
def get(self, table_name: str, record_id: str, custom_fields: dict = {}, number: str | None = None,
no_record_found_res: dict = {'result': []}) -> dict:
"""Get a ticket by sending a GET request.
Args:
table_name: the table name
record_id: the record ID
custom_fields: custom fields of the record to query
number: record number
Returns:
Response from API.
"""
query_params = {} # type: Dict
if record_id:
path = f'table/{table_name}/{record_id}'
elif number:
path = f'table/{table_name}'
query_params = {
'number': number
}
elif custom_fields:
path = f'table/{table_name}'
query_params = custom_fields
else:
# Only in cases where the table is of type ticket
raise ValueError('servicenow-get-ticket requires either ticket ID (sys_id) or ticket number.')
return self.send_request(path, 'GET', params=query_params, no_record_found_res=no_record_found_res)
def update(self, table_name: str, record_id: str, fields: dict = {}, custom_fields: dict = {},
input_display_value: bool = False) -> dict:
"""Updates a ticket or a record by sending a PATCH request.
Args:
table_name: table name
record_id: record id
fields: fields to update
custom_fields: custom_fields to update
input_display_value: whether to set field values using the display value or the actual value.
Returns:
Response from API.
"""
body = generate_body(fields, custom_fields)
query_params = {'sysparm_input_display_value': input_display_value}
return self.send_request(f'table/{table_name}/{record_id}', 'PATCH', params=query_params, body=body)
def create(self, table_name: str, fields: dict = {}, custom_fields: dict = {},
input_display_value: bool = False):
"""Creates a ticket or a record by sending a POST request.
Args:
table_name: table name
record_id: record id
fields: fields to update
custom_fields: custom_fields to update
input_display_value: whether to set field values using the display value or the actual value.
Returns:
Response from API.
"""
body = generate_body(fields, custom_fields)
query_params = {'sysparm_input_display_value': input_display_value}
return self.send_request(f'table/{table_name}', 'POST', params=query_params, body=body)
def delete(self, table_name: str, record_id: str) -> dict:
"""Deletes a ticket or a record by sending a DELETE request.
Args:
table_name: table name
record_id: record id
Returns:
Response from API.
"""
return self.send_request(f'table/{table_name}/{record_id}', 'DELETE')
def add_link(self, ticket_id: str, ticket_type: str, key: str, link: str) -> dict:
"""Adds a link to a ticket by sending a PATCH request.
Args:
ticket_id: ticket ID
ticket_type: ticket type
key: link key
link: link str
Returns:
Response from API.
"""
return self.send_request(f'table/{ticket_type}/{ticket_id}', 'PATCH', body={key: link})
def add_comment(self, ticket_id: str, ticket_type: str, key: str, text: str) -> dict:
"""Adds a comment to a ticket by sending a PATCH request.
Args:
ticket_id: ticket ID
ticket_type: ticket type
key: link key
link: link str
Returns:
Response from API.
"""
return self.send_request(f'table/{ticket_type}/{ticket_id}', 'PATCH', body={key: text})
def upload_file(self, ticket_id: str, file_id: str, file_name: str, ticket_type: str) -> dict:
"""Adds a file to a ticket by sending a POST request.
Args:
ticket_id: ticket ID
file_id: file ID
file_name: file name
ticket_type: ticket type
Returns:
Response from API.
"""
body = {
'table_name': ticket_type,
'table_sys_id': ticket_id,
'file_name': file_name
}
return self.send_request('attachment/upload', 'POST', headers={'Accept': 'application/json'},
body=body, file={'id': file_id, 'name': file_name})
def add_tag(self, ticket_id: str, tag_id: str, title: str, ticket_type: str) -> dict: