# Import and convert Recorded Future Sigma rules
Created by Jonah Feldman, Recorded Future. Heavily based off of [this](https://github.com/Azure/Azure-Sentinel-Notebooks/blob/c89c9156ab26092efe7365d37e9e97fe4b7571e1/tutorials-and-examples/example-notebooks/SigmaRuleImporter.ipynb) notebook by ianhelle@microsoft.com

**NOTE:** This is a beta version. We are constantly improving its features and functionality to provide you with the best user experience possible. Your feedback is valuable and helps us identify areas of improvement. Please send any suggestions or issues to support@recordedfuture.com.

This notebook fetches sigma rules created by Recorded Future's Insikt team and converts them to KQL. After conversion, you have the option to interactively query your Log Analytics workspace with these rules or create a Sentine Analytic rule to generate alerts/incidents based off it's detections


Known issues:
- Authentication to Sentinel can sometimes fail to detect msticpyconfig.yaml even when it's present. As a workaround, you can directly hardcode the, subscription id, resource group name, and workspace name into the cell
- Techniques and not currently translated correctly to the Analytic Rule. This is being worked on
- Backslashes are not always escaped properly and some rules with backlashes in the query will not always execute correctly
- In certain shells, `msticpy[azure]` in the first cell needs to be wrapped in quotes

By Default, all Analytic rules are create disabled

## Install dependencies

In [1]:
from pathlib import Path
from IPython.display import display, HTML

REQ_PYTHON_VER = "3.6"
REQ_MSTICPY_VER = "1.0.0"

display(HTML("<h3>Starting Notebook setup...</h3>"))

# If not using Azure Notebooks, install msticpy with
#%pip install msticpy
#%pip install msticpy[azure] #OR %pip install "msticpy[azure]"
#%pip install sigmatools
from msticpy.nbtools import nbinit
nbinit.init_notebook(namespace=globals());

### Enter your Recorded Future API key
All logic apps require API keys to communicate with the Recorded Future API. To obtain API keys, please visit [Recorded Future Requesting API Tokens](https://support.recordedfuture.com/hc/en-us/articles/4411077373587-Requesting-API-Tokens).

In [2]:
import getpass

rf_token = getpass.getpass()

### Fetch Rules from the Recorded Future API. This may take awhile
Modify query here. E.g. time interval (after, before), number of retrieved Sigma rules (limit). For more inspiration check out the API specification [here](https://api.recordedfuture.com/openapi/detection-rule.html).

In [3]:
import requests
# Download Sigma rules
# Modify the query below as needed
query = {
  "filter": {
    "types": [
      "sigma"
    ],
    "created": {
      "after": "2019-02-10T12:00:00.000Z",
      "before": "2022-03-10T00:00:00.000Z"
    }
  },
  "tagged_entities": False,
  "limit": 5
}
rf_detection_api_url = 'https://api.recordedfuture.com/detection-rule/search'
r = requests.post(rf_detection_api_url, headers={'X-RFToken': rf_token}, json=query)
r.raise_for_status()
print(f'Retrieved {r.json()["count"]} Sigma Rules')

Retrieved 5 Sigma Rules


### Save the rules locally. 

In [4]:
import yaml
from ipywidgets import widgets, Layout
import os
from pathlib import Path
import traceback

def_path = Path.joinpath(Path(os.getcwd()), "rf-sigma")
path_wgt = widgets.Text(value=str(def_path), 
                        description='Path to extract to zipped repo files: ', 
                        layout=Layout(width='50%'),
                        style={'description_width': 'initial'})
path_wgt
rules_root = Path(path_wgt.value)
try:
    os.mkdir(rules_root)
except FileExistsError as err:
    pass

for result in r.json()['result']:
    for rule in result['rules']:
        content = rule['content']
        title = list(yaml.safe_load_all(content))[0]['title'] + '.yml'
        with open(Path.joinpath(rules_root, title), 'w') as file:
            file.write(content)


In [5]:
%ls {rules_root}

[0m[01;32mMAL_DarkOwl_Download_Payload.yml[0m*   [01;32mMAL_disable_modify_tools.yml[0m*
[01;32mMAL_PWNKIT_Exploit_File_Event.yml[0m*  [01;32mMal_Trochilus_Interaction.yml[0m*
[01;32mMAL_SharpHound_process.yml[0m*


### Check that we have the files
You should see a folder with folders such as application, apt, windows...

## Convert Sigma Files to Log Analytics KQL queries

In [6]:
# Sigma to Log Analytics Conversion
import yaml
import os
import sys
module_path = os.path.abspath(os.path.join('sigma/sigma-master/tools'))
if module_path not in sys.path:
    sys.path.append(module_path)
from sigma.parser.collection import SigmaCollectionParser
from sigma.parser.exceptions import SigmaCollectionParseError, SigmaParseError
from sigma.configuration import SigmaConfiguration, SigmaConfigurationChain
from sigma.config.exceptions import SigmaConfigParseError, SigmaRuleFilterParseException
from sigma.filter import SigmaRuleFilter
import sigma.backends.discovery as backends
from sigma.backends.base import BackendOptions
from sigma.backends.exceptions import BackendError, NotSupportedError, PartialMatchError, FullMatchError
from pathlib import Path

    
sigma_list = list(Path(rules_root).resolve().glob("*.yml"))

_LA_MAPPINGS = '''
fieldmappings:
  Image: NewProcessName
  ParentImage: ParentProcessName
  ParentCommandLine: NO_MAPPING
'''

NOT_CONVERTIBLE = 'Not convertible'

def sigma_to_la(file_path):
    with open(file_path, 'r') as input_file:
        try:
            sigmaconfigs = SigmaConfigurationChain()
            sigmaconfig = SigmaConfiguration(_LA_MAPPINGS)
            sigmaconfigs.append(sigmaconfig)
            backend_options = BackendOptions(None, None)
            backend = backends.getBackend('ala')(sigmaconfigs, backend_options)
            parser = SigmaCollectionParser(input_file, sigmaconfigs, None)
            results = parser.generate(backend)
            kql_result = ''
            for result in results:
                kql_result += result
        except (NotImplementedError, NotSupportedError, TypeError):
            kql_result = NOT_CONVERTIBLE
        input_file.seek(0,0)
        sigma_txt = input_file.read()
        if not kql_result == NOT_CONVERTIBLE:
            try:
                kql_header = "\n".join(get_sigma_properties(sigma_txt))
                kql_result = kql_header + "\n" + kql_result
            except Exception as e:
                print("exception reading sigma YAML: ", e)
                print(sigma_txt, kql_result, sep='\n')
        return sigma_txt, kql_result

sigma_keys = ['title', 'description', 'tags', 'status', 
              'author', 'logsource', 'falsepositives', 'level']

def get_sigma_properties(sigma_rule):
    sigma_docs = yaml.load_all(sigma_rule, Loader=yaml.SafeLoader)
    sigma_rule_dict = next(sigma_docs)
    for prop in sigma_keys:
        yield get_property(prop, sigma_rule_dict)

def get_property(name, sigma_rule_dict):
    sig_prop = sigma_rule_dict.get(name, 'na')
    if isinstance(sig_prop, dict):
        sig_prop = ' '.join([f"{k}: {v}" for k, v in sig_prop.items()])
    return f"// {name}: {sig_prop}"
        
   
_KQL_FILTERS = {
    'date': ' | where TimeGenerated >= datetime({start}) and TimeGenerated <= datetime({end}) ',
    'host': ' | where Computer has {host_name} '
}

def insert_at(source, insert, find_sub):
    pos = source.find(find_sub)
    if pos != -1:
        return source[:pos] + insert + source[pos:]
    else:
        return source + insert
        
def add_filter_clauses(source, **kwargs):
    if "{" in source or "}" in source:
        source = ("// Warning: embedded braces in source. Please edit if necessary.\n"
                  + source)
        source = source.replace('{', '{{').replace('}', '}}')
    if kwargs.get('host', False):
        source = insert_at(source, _KQL_FILTERS['host'], '|')
    if kwargs.get('date', False):
        source = insert_at(source, _KQL_FILTERS['date'], '|')
    return source


# Run the conversion
print("Converting rules")
conv_counter = []
kql_dict = {}
for file_path in sigma_list:
    file_name = os.path.basename(file_path)
    src_converted = 0
    try:
        sigma, kql = sigma_to_la(file_path)
    except:
        print(f"Error converting {file_name} ({file_path})")
        continue
    print(f'Converted {file_name}')
    kql_dict[file_name] = (sigma, kql)
    if not kql == NOT_CONVERTIBLE:
        src_converted += 1

print("\nConversion statistics")
print("-" * len("Conversion statistics"))
print(f'Rules: {len(sigma_list)}, Converted: {len(kql_dict.items())}')

Converting rules
Converted MAL_DarkOwl_Download_Payload.yml
Converted MAL_disable_modify_tools.yml
Converted MAL_PWNKIT_Exploit_File_Event.yml
Converted MAL_SharpHound_process.yml
Converted Mal_Trochilus_Interaction.yml

Conversion statistics
---------------------
Rules: 5, Converted: 5


### Authenticate to Log Analytics

In [7]:
from IPython.display import display
from msticpy.data import QueryProvider
from msticpy.common.wsconfig import WorkspaceConfig

ws_config = WorkspaceConfig()
qry_prov = QueryProvider("LogAnalytics")
qry_prov.connect(ws_config.code_connect_str)


Please wait. Loading Kqlmagic extension...

<IPython.core.display.Javascript object>

done
try_azcli_login=True;enable_add_items_to_help=False
{'try_azcli_login': 'True', 'enable_add_items_to_help': 'False'}
try_azcli_login=True;enable_add_items_to_help=False
Connecting... 

Copy code to clipboard and authenticate here: https://microsoft.com/devicelogin


connected


### Authenticate To Azure Sentinel

In [8]:
from msticpy.context.azure.sentinel_core import MicrosoftSentinel

azs = MicrosoftSentinel()
# The Above line SHOULD work without modification. However it often fails to detect local config files when they exist.
# If this happens, uncomments out the below code and replace the variables with your subscription id,
# resource group name, and sentinel workspace name

subscription_id = '5129b3ff-c0c6-4e86-bd1c-70e5fcd579cf'
resource_group = 'rf'
workspace_name = 'rf-log-analyitics'
azs = MicrosoftSentinel(sub_id=subscription_id,res_grp=resource_group, ws_name=workspace_name)

azs.connect()


To sign in, use a web browser to open the page https://microsoft.com/devicelogin and enter the code E9YKQA6WE to authenticate.


## Display the results in an interactive browser



In [9]:
from ipywidgets import widgets, Layout
from IPython.display import display, HTML, clear_output
from msticpy.nbtools.nbwidgets import QueryTime


# Browser Functions

def on_query_value_change(change):
    if view_qry_check.value:
        qry_text = kql_dict[queries_w.value][1]
        if "Not convertible" not in qry_text:
            qry_text = add_filter_clauses(qry_text,
                                          date=add_date_filter_check.value,
                                          host=add_host_filter_check.value)
        query_text_w.value = qry_text.replace('|', '\n|')
        orig_text_w.value = kql_dict[queries_w.value][0]
        
def clean_kql_comments(query_string):
    """Cleans"""
    import re
    return re.sub(r'(//[^\n]+)', '', query_string, re.MULTILINE).replace('\n', '').strip()

def execute_kql_query(query_string):
    if not query_string or len(query_string.strip()) == 0:
        print('No query supplied')
        return None
    src_query = clean_kql_comments(query_string)
    src_query = src_query.format(start=qry_wgt.start, end=qry_wgt.end)
    result = qry_prov.exec_query(src_query)
    return result


def on_view_query_value_change(change):
    vis = 'visible' if view_qry_check.value else 'hidden'
    on_query_value_change(None)
    query_text_w.layout.visibility = vis
    orig_text_w.layout.visibility = vis
    
def display_results(qry_results):
    query_results_w.value = qry_results.to_html()
def parse_severity(string):
    if 'Recorded Future Priority Level' not in string:
        if 'critical' in string:
            return 'High'
        return string.capitalize()
    num = string.split(' ')[-1]
    if num in('0', '1'):
        return 'High'
    elif num == '2':
        return 'Medium'
    elif num == '3':
        return 'Low'
def parse_tactics(tags):
    return [t.split('.', 1)[1] for t in tags]

def create_analytic_rule(eh):
    query =  list(yaml.safe_load_all(orig_text_w.value))[0]
    
    azs.create_analytic_rule(
        name=query['title'], 
        description=f'{query["description"]}. Authored by {query["author"]}. {query["date"]}',
        query=query_text_w.value,
        severity = parse_severity(query['level']),
        #tactics=parse_tactics(query['tags']),
        enabled=False)
    analytic_result.value = f'Created Analytic Rule {query["title"]}'
    
def exec_query(eh):
    query_name = queries_w.value
    query_text = query_text_w.value
    qry_results = execute_kql_query(query_text)
        
    display_results(qry_results)


# Browser widget setup

queries_w = widgets.Select(options = kql_dict.keys(),
                           description='Query :    ',
                           layout=Layout(width='30%', height='120px'),
                           style = {'description_width': 'initial'})

query_text_w = widgets.Textarea(
    value='',
    description='Kql Query:',
    layout=Layout(width='100%', height='300px', visiblity='hidden'),
    disabled=False)
orig_text_w = widgets.Textarea(
    value='',
    description='Sigma Query:',
    layout=Layout(width='100%', height='250px', visiblity='hidden'),
    disabled=False)

query_text_w.layout.visibility = 'hidden'
orig_text_w.layout.visibility = 'hidden'
queries_w.observe(on_query_value_change, names='value')

view_qry_check = widgets.Checkbox(description="View query", value=True)
add_date_filter_check = widgets.Checkbox(description="Add date filter", value=False)
add_host_filter_check = widgets.Checkbox(description="Add host filter", value=False)

view_qry_check.observe(on_view_query_value_change, names='value')
add_date_filter_check.observe(on_view_query_value_change, names='value')
add_host_filter_check.observe(on_view_query_value_change, names='value')
# view_qry_button.on_click(click_exec_hqry)
# display(exec_hqry_button);
vbox_opts = widgets.VBox([view_qry_check, add_date_filter_check, add_host_filter_check])
hbox = widgets.HBox([queries_w, vbox_opts])
vbox = widgets.VBox([hbox, orig_text_w, query_text_w])
on_view_query_value_change(None)
display(vbox)
query_results_w = widgets.HTML('')
qry_wgt = QueryTime(units='days', before=5, after=0, max_before=30, max_after=10)
display(qry_wgt)

exec_hqry_button = widgets.Button(description="Execute query")
exec_hqry_button.on_click(exec_query)

analytic_button = widgets.Button(description = 'Create Analytic Rule')
analytic_button.on_click(create_analytic_rule)
analytic_result = widgets.Label('')
query_vbox = widgets.VBox([exec_hqry_button, query_results_w])
analytic_vbox = widgets.VBox([analytic_button, analytic_result])
final_vbox = widgets.VBox([analytic_vbox, query_vbox])
display(final_vbox)


VBox(children=(HBox(children=(Select(description='Query :    ', layout=Layout(height='120px', width='30%'), op…

VBox(children=(HTML(value='<h4>Set query time boundaries</h4>'), HBox(children=(DatePicker(value=datetime.date…

VBox(children=(VBox(children=(Button(description='Create Analytic Rule', style=ButtonStyle()), Label(value='')…