# IOC Analysis Notebook

## Objective
This Jupyter notebook is designed to perform Indicators of Compromise (IOC) analysis using various tools from the `msticpy` package. The notebook facilitates the identification and analysis of potential threats in a cybersecurity context.

## Requirements
- **Python 3.6+**
- **Jupyter Notebook**

## Modules Used
- **msticpy**: A package providing tools for threat intelligence, security monitoring, and incident investigation.
- **IPython.display**: For displaying rich content within the notebook.
- **Pathlib**: For file path manipulations.
- **pyvis**: For creating network visualizations.

## How to Run the Notebook
1. **Install the necessary modules**:  
    Run the following command to install required Python packages:
    ```bash
    pip install msticpy pyvis python-whois
    ```
2. **Open the notebook**:  
    Use Jupyter Notebook to open `IOC Analysis.ipynb`.

3. **Execute the cells**:  
    Run each cell sequentially to perform IOC analysis.

## Important Notes
- Ensure you have appropriate access to data environments like Azure Sentinel for querying threat intelligence data.
- Modify the data environment configuration as per your setup.

## Conclusion
This notebook is a powerful tool for analyzing and responding to security incidents by leveraging MSTICPy's robust feature set.
"""

 # Import necessary modules and tools for IOC analysis
 

In [None]:
# Import the nbinit module from msticpy.nbtools
from msticpy.nbtools import nbinit

# Import Path from pathlib for file path operations
from pathlib import Path

# Import display, HTML, and Image from IPython.display for displaying HTML content and images in the notebook
from IPython.display import display, HTML, Image

# Define a list of additional imports needed for the notebook
extra_imports = [
    "msticpy.vis.timeseries, display_timeseries_anomolies",  # Import display_timeseries_anomolies from msticpy.vis.timeseries
    "msticpy.analysis.timeseries, timeseries_anomalies_stl",  # Import timeseries_anomalies_stl from msticpy.analysis.timeseries
    "datetime, datetime",  # Import datetime from datetime module
    "msticpy.vis.nbdisplay, draw_alert_entity_graph",  # Import draw_alert_entity_graph from msticpy.vis.nbdisplay
    "msticpy.context.ip_utils, convert_to_ip_entities",  # Import convert_to_ip_entities from msticpy.context.ip_utils
    "msticpy.vis.ti_browser, browse_results",  # Import browse_results from msticpy.vis.ti_browser
    "IPython.display, Image",  # Import Image from IPython.display
    "msticpy.context.ip_utils, get_whois_info",  # Import get_whois_info from msticpy.context.ip_utils
    "msticpy.context.ip_utils, get_ip_type"  # Import get_ip_type from msticpy.context.ip_utils
]

# Define the required Python version
REQ_PYTHON_VER = (3, 6)

# Define the required MSTICpy version
REQ_MSTICPY_VER = (1, 0, 0)

# Define an HTML message to display if nb_check.py needs to be updated
update_nbcheck = (
    "<p style='color: orange; text-align=left'>"
    "<b>Warning: we needed to update '<i>utils/nb_check.py</i>'</b><br>"
    "Please restart the kernel and re-run this cell."
    "</p>"
)

# Display a message indicating the start of the notebook setup
display(HTML("<h3>Starting Notebook setup...</h3>"))

# Check if the nb_check.py file exists in the utils directory
if Path("./utils/nb_check.py").is_file():
    try:
        # Try to import the check_versions function from nb_check.py
        from utils.nb_check import check_versions
    except ImportError as err:
        # If import fails, set exception mode to minimal and download the latest nb_check.py
        %xmode Minimal
        !curl https://raw.githubusercontent.com/Azure/Azure-Sentinel-Notebooks/master/utils/nb_check.py > ./utils/nb_check.py 2>/dev/null
        # Display the update message
        display(HTML(update_nbcheck))
    # Check if check_versions is not in the global namespace
    if "check_versions" not in globals():
        # Raise an ImportError if the old version of nb_check.py is detected
        raise ImportError("Old version of nb_check.py detected - see instructions below.")
    # Set exception mode to verbose and check the versions
    %xmode Verbose
    check_versions(REQ_PYTHON_VER, REQ_MSTICPY_VER)

# If not using Azure Notebooks, install msticpy with pip
# !pip install msticpy

# Define additional imports for the notebook
extra_imports = [
    "msticpy.nbtools, observationlist",  # Import observationlist from msticpy.nbtools
    "msticpy.sectools, domain_utils",  # Import domain_utils from msticpy.sectools
    "pyvis.network, Network",  # Import Network from pyvis.network
]

# Initialize the notebook with nbinit, passing the global namespace, additional packages, and extra imports
nbinit.init_notebook(
    namespace=globals(),
    additional_packages=["pyvis", "python-whois"],
    extra_imports=extra_imports,
)

# Initialize data environments and list them for selection

In [None]:
# List available data environments using QueryProvider
data_environments = QueryProvider.list_data_environments()

# Print a message indicating the data provider
printme("Data provider")

# Print the list of data environments
print(data_environments)

# Initialize a QueryProvider for the 'AzureSentinel' data environment
qry_prov = QueryProvider(data_environment='AzureSentinel')

# List available workspaces using WorkspaceConfig
_customers = WorkspaceConfig.list_workspaces()

# Initialize a SetQuery object
Set = SetQuery()

In [None]:
widget,search_q_times,search_origin= Set[0],Set[1],Set[2]

In [None]:
selected_customer  = [w.description for w in widget.children[1].children if w.value]
printme(f'Selected Tenant : {selected_customer} Time : {search_origin}' ) 

def multi_customers(query,*args):
    Results = []
    # Iterate over selected customers
    for cus in selected_customer:
        # Connect to the customer's workspace
        qry_prov.connect(WorkspaceConfig(workspace= cus))
        
        # Check if the query function is in the arguments
        if qry_prov.exec_query in args:
            # Execute the query function with the query string
            Result = args[0](f'{query}')
        # Check if only one argument is provided or no arguments are provided
        elif len(args) == 1 or len(args) <1 :
            # Execute the query function with the query string
            Result = args[0](f'{query}')
        else:
            # Execute the query function with the query string and additional arguments
            Result =  args[0](f'{query}',args[1])
        
        # Append the result to the list of results
        Results.append(Result)
    
    return Results


In [None]:
# Create an instance of TILookup class
tilookup = TILookup()

# Reload the providers for TILookup
tilookup.reload_providers()

# Check the status of the providers
tilookup.provider_status

# Create a text input widget for the domain or URL
domain_url = widgets.Text(description='Please enter the domain or URL to investigate:',
                          **WIDGET_DEFAULTS)

# Display the domain or URL input widget
display(domain_url)

In [None]:
import tldextract
from msticpy.sectools.tiproviders.ti_provider_base import TISeverity

graph_items = []
dom_val = domain_utils.DomainValidator()
summary = observationlist.Observations()
dom_record = None
url=domain_url.value.strip().lower()

# Extract the domain and top-level domain (TLD) using tldextract
_, domain, tld = tldextract.extract(domain_url.value)
domain = domain.lower() + "." + tld.lower()

# Validate the TLD of the domain
if dom_val.validate_tld(domain) is not True:
    md(f"{domain} is not a valid domain name", "bold")

# Check if the URL is different from the domain
if url != domain:
    md(f"<strong>Domain</strong> : {domain}")
    md(f"<strong>URL</strong> : {url}")
    graph_items.append((domain,url))
else:
    md(f"<strong>Domain</strong> : {domain}")
    url = None

# Function to convert severity to TISeverity enum
def conv_severity(severity):
    try:
        if isinstance(severity, TISeverity):
            return severity
        if isinstance(severity, str):
            return TISeverity[severity]
        else:
            return TISeverity(severity)
    except (ValueError, KeyError):
        return TISeverity.information

# Function to check if severity meets threshold
def ti_check_sev(severity, threshold):
    severity = conv_severity(severity)
    threshold = conv_severity(threshold)
    return severity.value >= threshold.value

# Lookup Threat Intelligence for the domain
domain_ti = tilookup.result_to_df(tilookup.lookup_ioc(observable=domain, ioc_type='dns'))

# If URL is provided, lookup Threat Intelligence for the URL as well
if url is not None:
    url_ti = tilookup.result_to_df(tilookup.lookup_ioc(observable=url, ioc_type='url'))
    md(f"Threat Intelligence Results for {url}", "bold")
    display(url_ti.T)
    summary.add_observation(caption="URL TI", description=f"Summary of TI for {url}", data=url_ti)
    graph_items += [((url,provider)) for provider in url_ti.index
                    if ti_check_sev(url_ti.loc[provider]['Severity'], 1)] 

md(f"Threat Intelligence Results for {domain}", "bold")
display(domain_ti.T)
summary.add_observation(caption="Domain TI", description=f"Summary of TI for {domain}", data=domain_ti)
graph_items += [((domain,provider)) for provider in domain_ti.index 
                if ti_check_sev(domain_ti.loc[provider]['Severity'],1)]

In [None]:
from whois import whois
from collections import Counter

# Function to calculate entropy
def Entropy(data):
    s, lens = Counter(data), np.float(len(data))
    return -sum(count/lens * np.log2(count/lens) for count in s.values())

# Get a whois record for our domain
wis = whois(domain)

if wis.domain_name is not None:
    # Create domain record from whois data
    dom_record = pd.DataFrame({"Domain":[domain],
                               "Name":[wis['name']],
                               "Org":[wis['org']],
                               "DNSSec":[wis['dnssec']],
                               "City":[wis['city']],
                               "State":[wis['state']],
                               "Country":[wis['country']],
                               "Registrar": [wis['registrar']],
                               "Status": [wis['status']],
                               "Created":[wis['creation_date']],
                               "Expiration" : [wis['expiration_date']],
                               "Last Updated" : [wis['updated_date']],
                               "Name Servers": [wis['name_servers']]})
    ns_domains = []
    
    # Remove duplicate Name Server records
    for server in wis['name_servers']:
        ns_sub_d, ns_domain, ns_tld = tldextract.extract(server)
        ns_dom = ns_domain.lower() + "." + ns_tld.lower()
        if domain not in ns_domains:
            ns_domains.append(ns_dom)                                            

    # Identity domains popularity with Open Page Rank
    page_rank = tilookup.result_to_df(tilookup.lookup_ioc(observable=domain, providers=["OPR"]))
    if page_rank['RawResult'][0]:
        page_rank_score = page_rank['RawResult'][0]['response'][0]['page_rank_integer']
    else:
        page_rank_score = 0
    dom_record["Page Rank"] = [page_rank_score]

    # Get a list of subdomains for the domain
    url_ti = tilookup.result_to_df(tilookup.lookup_ioc(observable=domain, providers=["VirusTotal"]))
    if url_ti['RawResult'][0]:
        sub_doms = url_ti['RawResult'][0]['subdomains']
    else:
        sub_doms = 0
    graph_items.append((domain, "Sub Domains"))
    graph_items += [(sub,"Sub Domains") for sub in sub_doms]
    dom_record['Sub Domains'] = [sub_doms]

    # Work out domain entropy to identify possible DGA
    dom_ent = Entropy(domain)
    dom_record['Domains Entropy'] = [dom_ent]

    # Add elements to graph for later plotting
    if isinstance(dom_record['Created'],list):                                                        
        graph_items.append((domain,dom_record['Created'][0][0]))
    else:
        graph_items.append((domain,dom_record['Created'][0]))
    graph_items.append((domain, "Name Servers"))
    graph_items += [(("Name Servers", ns)) for ns in dom_record['Name Servers'][0]]
    graph_items += [(domain,dom_record['Registrar'][0]), (domain,dom_record['Country'][0]),(domain,f"Page Rank : {dom_record['Page Rank'][0]}")]

    # Highlight domains with low PageRank score or if their entropy is more than 2 standard deviations from the average for the top 1 million domains
    def color_cells(val):
        if isinstance(val, int):
            color = 'yellow' if val < 3 else 'white'
        elif isinstance(val, float):
            color = 'yellow' if val > 4.30891 or val < 2.72120  else 'white'
        else:
            color = 'white'
        return 'background-color: %s' % color

    # Display whois details and highlight interesting values
    display(dom_record.T.style.applymap(color_cells, subset=pd.IndexSlice[['Page Rank', 'Domains Entropy'],0]))
    summary.add_observation(caption="Domain Summary", description=f"Summary of public domain records for {domain}", data=dom_record)
    md("If Page Rank or Domain Entropy are highlighted, this indicates that their values are outside the expected values of a legitimate website")
    md(f"The average entropy for the 1M most popular domains is 3.2675")

else:
    # If there is no whois data, see what we can use from TI
    url_ti = tilookup.result_to_df(tilookup.lookup_ioc(observable=domain, providers=["VirusTotal"]))
    md(f"No current whois record exists for {domain}. Below are historical records")
    print(url_ti['RawResult'][0]['whois'])

In [None]:
url_ti

In [None]:
if url is not None:
    scope = url
else:
    scope = domain

# See if TLS cert is in abuse.ch malicious certs list and get cert details
result, x509 = dom_val.in_abuse_list(scope)

if x509 is not None:
    # Create a dataframe to store the certificate details
    cert_df = pd.DataFrame({"SN" :[x509.serial_number],
                            "Subject":[[(i.value) for i in x509.subject]],
                            "Issuer": [[(i.value) for i in x509.issuer]],
                            "Expired": [x509.not_valid_after],
                            "InAbuseList": result})

    # Display the certificate details
    display(cert_df.T)
    summary.add_observation(caption="TLS Summary", description=f"Summary of TLS certificate for {domain}", data=cert_df)
    md("If 'InAbuseList' is True this shows that the SSL certificate fingerprint appeared in the abuse.ch list")
    graph_items.append((domain,result))

else:
    md("No TLS certificate was found in abuse.ch lists.")

In [None]:
from dns.resolver import NXDOMAIN
from ipwhois import IPWhois

import dns.resolver

# Get the list of primary providers
primary_providers = [prov[0] for prov in tilookup._providers.items()]

# Add "VirusTotal" as a primary provider if it is loaded
if "VirusTotal" in tilookup.loaded_providers and "VirusTotal" not in primary_providers:
    primary_providers.append("VirusTotal")

# Check if the domain is resolvable
if dom_val.is_resolvable(domain) is True:
    try:
        # Query the A record for the domain
        answer = dns.resolver.query(domain, 'A')
    except NXDOMAIN:
        raise ValueError("Could not resolve IP addresses from domain.")

    # Get the first IP address from the answer
    x = answer[0].to_text()

    # Perform IPWhois lookup for the IP address
    whois = IPWhois(x)
    ipwis = whois.lookup_whois()

    # Create a dataframe to store the IP address details
    ip_rec = pd.DataFrame({"IP Address": [x],
                           "ASN" : [ipwis['asn']],
                           "ASN Owner": [ipwis['asn_description']],
                           "Country" : [ipwis['asn_country_code']],
                           "Date": [ipwis['asn_date']]})

    # Add IP address and ASN details to the graph items
    ip_addresses = ip_rec['IP Address'].to_list()
    graph_items += [
        (ip_rec["IP Address"][0],domain),
        (ip_rec["IP Address"][0],ip_rec["ASN"][0]),
        (ip_rec["ASN Owner"][0],ip_rec["ASN"][0]),
        (ip_rec["Country"][0],ip_rec["ASN"][0])
    ]

    # Check if the IP address is a Tor node
    tor = None
    if "Tor" in tilookup.loaded_providers:
        tor = tilookup.result_to_df(tilookup.lookup_ioc(observable=ip_rec['IP Address'][0], providers=["Tor"]))
    if tor is None or tor['Details'][0] == "Not found.":
        ip_rec['Tor Node?'] = "No"
    else:
        ip_rec['Tor Node?'] = "Yes"
        graph_items.append((ip_rec["IP Address"][0],"Tor Node"))

    # Lookup Threat Intelligence for the IP address
    ip_ti = tilookup.result_to_df(tilookup.lookup_ioc(observable=ip_rec['IP Address'][0], providers=primary_providers))

    # Get the last 10 resolutions for the IP address from VirusTotal
    last_10 = []
    if "VirusTotal" in tilookup.loaded_providers:
        last_10 = ip_ti.T['VirusTotal']['RawResult']["resolutions"][0:10]

    # Add previous domains to the graph items
    prev_domains = []
    for record in last_10:
        prev_domains.append(record['hostname'])
        graph_items.append((record['hostname'],ip_rec["IP Address"][0]))

    # Add the last 10 resolutions to the IP record
    ip_rec["Last 10 resolutions"] = [prev_domains]

    # Display the IP address details
    display(ip_rec.T)
    summary.add_observation(caption="IP Summary", description=f"Summary of IP associated with {domain}", data=ip_rec)
else:
    # If the domain is not resolvable, lookup Threat Intelligence for the IP address
    ip_ti = tilookup.result_to_df(tilookup.lookup_ioc(observable=answer[0].to_text()))
    print(ip_ti.T['VirusTotal']['RawResult'])

In [None]:
if url is not None:
    image_data = domain_utils.screenshot(url)
else:
    image_data = domain_utils.screenshot(domain)
    
with open('screenshot.png', 'wb') as f:
        f.write(image_data.content)

display(Image(filename='screenshot.png'))

In [None]:
# Create graph from items saved to graph_items
import networkx as nx
import matplotlib.pyplot as plt
G=nx.Graph()
for item in graph_items:
    G.add_edge(item[0],str(item[1]))

In [None]:
# Plot Graph with pyvis
net=Network(height=900, width=900, notebook=True)
net.barnes_hut()
net.from_nx(G)
net.set_options("""
var options = {"nodes": {"color": {"highlight": {"border": "rgba(233,77,49,1)"},"hover": {"border": "rgba(233,77,49,1)"}},
    "scaling": {"min": 1},"size": 7},
    "edges": {"color": {"inherit": true}, "smooth": false},
    "interaction": {"hover": true,"multiselect": true},
    "manipulation": {"enabled": true},
    "physics": {"enabled": false,"barnesHut": {"gravitationalConstant": -80000,"springLength": 250,"springConstant": 0.001},"minVelocity": 0.75}
}""")
net.show("graph.html")
# If the intereactive graph does not display correcrtly uncomment the three lines below to access display a non-interactive version
import matplotlib.pyplot as plt
plt.figure(3,figsize=(12,12))
nx.draw(G, with_labels=True, font_weight='bold')

In [None]:
# Check if the domain record exists and if the Page Rank score is less than 6
if dom_record is None or int(dom_record["Page Rank"]) < 6:
    warning = None
    md(f"The Page Rank score for {domain} is low, querying for this domain should not present issues.")
else:
    md_warn(f"{domain} has a high Page Rank score, it is likely to be highly prevalent in the environment.")
    md("Please confirm below that you wish to proceed, note that some queries are likely to be slow due to large amounts of data", "bold")
    warning = widgets.Checkbox(
        value=False,
        description='Are you sure?',
        disabled=False
    )
    display(warning)

# Establish if we want to investigate just the URL or domain and URL
if warning is not None and warning.value == False:
    md_warn("Please check the box above to confirm you wish to proceed")
else:
    if url is not None:
        md("Do you wish to search on the URL alone or URL and Domain? For malicious URLs on known good domains you may wish to only search on the URL to get more granular results.")
        scope_selection = widgets.RadioButtons(
            options=['URL Only', 'URL and Domain'],
            disabled=False
        )
        display(scope_selection)
    else:
        scope_selection = None
        md(f"Searching data for {domain}")
        
host_list = []

In [None]:
if scope_selection is not None:
    if scope_selection.value == "URL Only":
        scope = url
    else:
        scope = f"{domain}|{url}"
else:
    scope = domain

query_times = nbwidgets.QueryTime(units='day',
                                      max_before=20, max_after=1, before=3)
query_times.display()

In [None]:
# Get any alerts associated with the domain or URL
alerts = qry_prov.SecurityAlert.list_alerts(
    query_times)

# Check if alerts exist and if they are related to the scope
if isinstance(alerts, pd.DataFrame) and not alerts.empty:
    related_alerts = alerts[alerts["Entities"].str.contains(scope)]
else:
    alerts = None
    display(HTML("No alerts found"))

# Check if related alerts exist and display them
if isinstance(related_alerts, pd.DataFrame) and not related_alerts.empty:
    # Count the number of alerts for each alert type
    related_alerts_items = (related_alerts[['AlertName', 'TimeGenerated']]
                            .groupby('AlertName').TimeGenerated.agg('count').to_dict())

    def print_related_alerts(alertDict, entityType, entityName):
        if len(alertDict) > 0:
            display(Markdown(
                f"### Found {len(alertDict)} different alert types related to this {entityType} (\'{entityName}\')"))
            for (k, v) in alertDict.items():
                display(Markdown(f"- {k}, Count of alerts: {v}"))
        else:
            display(
                Markdown(f"No alerts for {entityType} entity \'{entityName}\'"))

    # Display the related alerts
    print_related_alerts(related_alerts_items, 'domain', domain)

    # Display alerts on timeline to aid in visual grouping
    nbdisplay.display_timeline(
        data=related_alerts, source_columns=["AlertName"], title="Host alerts over time", height=300, color="red")

    # Calculate the score based on the number of related alerts
    score = len(related_alerts.index) / 2

    # Add the alerts to the summary
    summary.add_observation(caption="Alerts", description=f"Alerts linked to {scope}", data=related_alerts, score=score)
else:
    md("No related alerts found.")

In [None]:
host_log_query = f"""
 Syslog 
 | where TimeGenerated >= datetime({query_times.start}) 
 | where TimeGenerated <= datetime({query_times.end})
 | where SyslogMessage matches regex "{scope}"
 | union isfuzzy = true (
 SecurityEvent
 | where TimeGenerated >= datetime({query_times.start}) 
 | where TimeGenerated <= datetime({query_times.end})
 | where CommandLine matches regex "{scope}")
"""
# Identify any hosts with logs relating to this URL or domain and provide a summary of those hosts
host_logs_df = qry_prov.exec_query(host_log_query)
if not host_logs_df.empty:
    md(f"Summary of logs containing {scope} by host:", "bold")
    host_log_sum = pd.DataFrame({'Log Count' : host_logs_df.groupby(['Computer']).count()['TimeGenerated']}).reset_index()
    display(host_log_sum.style.hide_index())
    #Add details to a summary for later use
    summary.add_observation(caption="Host Log Summary", description=f"Summary of logs containing {scope} by host", data=host_log_sum)
    ioc_extractor = iocextract.IoCExtract()
    print('Extracting IPs, Domains and URLs from logs.......')
    ioc_df = ioc_extractor.extract(data=host_logs_df,
                                    columns=['SyslogMessage', 'CommandLine'],
                                    os_family='Linux',
                                    ioc_types=['ipv4', 'ipv6', 'dns', 'url'])
    md("Network artifacts found in logs:", "bold")
    display(ioc_df.drop('SourceIndex', axis=1).style.hide_index())
    # Collect a list of ip addresses associated with the domain or url
    ip_addresses += [(ip) for ip in ioc_df[ioc_df['IoCType'] == "ipv4"]['Observable'] if ip not in ip_addresses]

else:
    md(f"No host logs found containing {domain} or {url}")

In [None]:
#Display the logs associated with the domain or URL for each host
def view_logs(host):
    display(host_logs_df.query('Computer == @host'))

if not host_logs_df.empty:
    items = host_log_sum['Computer'].dropna().unique().tolist()
    host_list = items
    md(f"<h3>View all host logs that contains {scope}</h3>")
    log_view = widgets.Dropdown(
        options=items, description='Select Computer to view raw logs', disabled=False, **WIDGET_DEFAULTS)
    display(widgets.interactive(view_logs, host=log_view))
else:
    md(f"No host logs found containing {domain} or {url}")