# COVID-19 Indicator feed notebook

Author: @JohnLaTwC

This notebook uses the COVID-19 threat indicator feed from Microsoft. It emits a Yara rule that matches on the COVID-19 indicator feed from Microsoft. It uses the VT API to add this rule to your hunting rulesets. 

It also can search VirusTotal to find matches.

References:
* https://www.microsoft.com/security/blog/2020/05/14/open-sourcing-covid-threat-intelligence/
* https://aka.ms/msft-covid19-Indicators

In [None]:
!!pip install pandas
!!pip install vt-py
!!pip install nest_asyncio

In [None]:
# if you want to query VT, define your API KEY
VT_APIKEY = "<DEFINE APIKEY HERE>"

In [None]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:90% !important; }</style>")) 

In [None]:
feedurl = 'https://raw.githubusercontent.com/Azure/Azure-Sentinel/master/Sample%20Data/Feeds/Microsoft.Covid19.Indicators.csv'

In [None]:
def get_feed(url):
    import requests
    import pandas as pd
    from io import StringIO
    results = None
    r = requests.get(url)
    if r.status_code==404:
        print('feed does not exist')
    else:
        csv = StringIO(r.content.decode())
        results = pd.read_csv(csv, sep=',', names=["Timestamp", "sha256", "IndicatorType", "TLP", "Product", "ThreatType", "Description"])
    return results

## Fetch the indicators and list some of them

In [None]:
import pandas as pd

df = get_feed(feedurl)
i = 5
print(f"Feed contains {len(df)} indicators. Listing {i} example indicators:")

df['Timestamp'] = pd.to_datetime(df['Timestamp'])
df.sort_values(by='Timestamp', ascending=True).tail(i)

## Search VirusTotal for any matching indicators (requires API key)

In [None]:
import vt 
import nest_asyncio
nest_asyncio.apply()
def get_vt_search_results(api_key, query):
    with vt.Client(api_key) as client:
        lst = []
        obj = client.get_json('/intelligence/search',
                             params = {'query':query, 'descriptors_only':'true'})
        if len(obj['data']) == 0:
            return lst
        for item in obj['data']:
            lst.append(item['id'])
        return lst

def query_for_indicators(lst):
    #chunk up the API requests to VT
    BATCH_SIZE = 40
    partitions = int(len(lst) / BATCH_SIZE)
    subs = [lst[i::partitions] for i in range(partitions)] 
    r = []
    for sub in subs:
        r = r + get_vt_search_results(VT_APIKEY, '\n'.join(sub))
    return r

In [None]:
lst = df['sha256'].tolist()
matches = query_for_indicators(lst)
print(len(matches))

In [None]:
matches

# Create a Yara rule for indicators

In [None]:
def create_rule(hsh):
    return f'hash.sha256(0, filesize) == "{hsh}"\n'

def print_vt_rule(df, feedurl):
    import datetime 
    timestamp = datetime.datetime.now()

    x = map(create_rule, df['sha256'].tolist())
    formatted_lst = '            or '.join(list(x))
    s = f'''
import "hash"
rule covid19_indicator_match
{{
    meta:
        feed = "{feedurl}"
        created_on = "{timestamp}"
        total_indicators = {len(df)}
    condition:
        filesize < 1MB and
        (
            {formatted_lst}
        )
}}

    ''' 
    return s

vt_covid_rule = print_vt_rule(df, feedurl)
print(vt_covid_rule)

# Save COVID to your VirusTotal account
API: https://github.com/VirusTotal/vt-py/

Docs: https://virustotal.github.io/vt-py/

In [None]:
import vt 
import nest_asyncio
nest_asyncio.apply()

RULE_NAME = 'MSCOVID19_FEED'

def get_ruleset_id(api_key, rule_name):
    with vt.Client(api_key) as client:
        obj = client.get_json('/intelligence/hunting_rulesets',
                             params = {'filter':'enabled:true name:%s ' % rule_name, 'limit':1})
        if len(obj['data']) == 0:
            return -1
        return obj['data'][0]['id']
    
      
def create_update_rule(api_key, rule_name, vt_covid_rule):
    with vt.Client(api_key) as client:
        
        id = get_ruleset_id(api_key, rule_name)

        rs = vt.Object("hunting_ruleset")
        rs.name = rule_name
        rs.enabled = True
        rs.rules = vt_covid_rule
        res = None
        if id == -1:
            res = client.post_object("/intelligence/hunting_rulesets", obj=rs)
        else:
            res = client.patch_object("/intelligence/hunting_rulesets/%s" % id, obj=rs)
        if 'id' in res.to_dict():
            print("Success.")
        else:
            print("Failed.")

In [None]:
create_update_rule(VT_APIKEY, RULE_NAME, vt_covid_rule)