In [16]:
import os
import re
import sys
import typing

from collections import namedtuple

import numpy as np
import pandas as pd

from nvdlib.nvd import NVD

In [17]:
sys.path.append('../src/')

!pushd nvdlib/ && pip install --upgrade . && python setup.py install && popd

In [18]:
from f8a_version_comparator.comparable_version import ComparableVersion

from toolkit.preprocessing import NVDFeedPreprocessor

In [19]:
feed = NVD.from_recent()
# feed.update()

data = list(feed.cves())

In [20]:
transformed = NVDFeedPreprocessor(attributes=['cve_id', 'configurations']).fit_transform(data, use_filter=False)

In [21]:
# Series.configurations contains List[ConfigurationNode]
transformed_filtered = list(filter(lambda e: any(e.configurations), transformed))

In [22]:
df = pd.DataFrame(transformed_filtered).drop('repository', axis=1)

df

Unnamed: 0,user,project,cve_id,configurations
0,,,CVE-2014-1457,[<nvdlib.model.ConfigurationNode object at 0x7...
1,,,CVE-2014-1665,[<nvdlib.model.ConfigurationNode object at 0x7...
2,,,CVE-2014-2550,[<nvdlib.model.ConfigurationNode object at 0x7...
3,,,CVE-2014-4612,[<nvdlib.model.ConfigurationNode object at 0x7...
4,,,CVE-2014-4613,[<nvdlib.model.ConfigurationNode object at 0x7...
5,,,CVE-2014-4928,[<nvdlib.model.ConfigurationNode object at 0x7...
6,,,CVE-2015-7440,[<nvdlib.model.ConfigurationNode object at 0x7...
7,,,CVE-2015-7449,[<nvdlib.model.ConfigurationNode object at 0x7...
8,,,CVE-2015-7453,[<nvdlib.model.ConfigurationNode object at 0x7...
9,,,CVE-2015-7458,[<nvdlib.model.ConfigurationNode object at 0x7...


In [23]:
class CustomObject:
    
    def __init__(self, stream: str):
        if stream is None:
            raise TypeError()
            
        self.stream = stream
    
    def __repr__(self):
        return "{cls!s}(stream={stream!r})".format(
            cls=self.__class__.__name__,
            stream=self.stream
        )

    def __str__(self):
        return "{stream!s}".format(
            stream=self.stream
        )
        
    def __lt__(self, other):
        if other is None:
            return False
        
        return self.stream < other.stream
        
    def __gt__(self, other):
        if other is None:
            return True
        
        return self.stream > other.stream
        
    def __eq__(self, other):
        if other is None:
            return False
        
        return self.stream == other.stream
    
#     def __hash__(self):
#         return super().__hash__()  # FIXME

In [24]:
from itertools import compress

CPE_VERSION_ATTRIBUTE_LIST = [
    'vendor',
    'product',
    'versionExact',
    'versionStartExcluding',
    'versionStartIncluding',
    'versionEndIncluding',
    'versionEndExcluding',
]


VICTIM_VERSION_OPERATOR_LIST = ['==', '>', '>=', '<=', '<']


class VersionNode(namedtuple('VersionNode', CPE_VERSION_ATTRIBUTE_LIST)):
    
    def __new__(cls, cpe=None):
        attr_dict = dict()
        for attr in CPE_VERSION_ATTRIBUTE_LIST:
            value = getattr(cpe, attr, None)
            
            if value is not None and re.match(r"^[-]?\s*$", value):
                value = None
                
            if attr.startswith('version') and value is not None:
                # attr_dict[attr] = ComparableVersion(value)  # FIXME: This doesn't work!!
                attr_dict[attr] = CustomObject(value)
            else:
                attr_dict[attr] = value
                
        return super(VersionNode, cls).__new__(
            cls,
            **attr_dict
        )
    
    def __eq__(self, other):
        if (self.vendor, self.product) != (other.vendor, other.product):
            return False
        
        return self[2:] == other[2:]
        
    @classmethod
    def from_cpe(cls, cpe_list: list) -> list:
        if not cpe_list:
            return None
        
        return [cls(cpe) for cpe in cpe_list]
    
    @property
    def victims_notation(self):
        return VersionNode.get_victims_notation(self)
    
    @staticmethod
    def get_victims_notation(other: typing.Union["VersionNode", tuple]):
        version_string =  ",".join([
            f"{op}{version}" for op, version in zip(VICTIM_VERSION_OPERATOR_LIST, other)
            if version is not None
        ])
        
        return version_string

import names

def feed():
    return type('Feed', (), {
        attr: names.get_first_name() if attr.startswith('substr') else names.get_last_name()
        for attr in CPE_VERSION_ATTRIBUTE_LIST
    })

feed = [feed() for i in range(100)]

version_nodes = [VersionNode(f) for f in feed]


In [25]:
version_nodes = list()
for config in df.configurations:
    for node in config:
        version_ranges = VersionNode.from_cpe(node.cpe)
        version_nodes.extend(version_ranges or [])

In [26]:
version_series = pd.Series(version_nodes)
version_series.dropna(inplace=True)
version_series.drop_duplicates(inplace=True)

version_series = version_series.reset_index(drop=True)

df_records = pd.DataFrame.from_records(version_series.values.tolist(), columns=CPE_VERSION_ATTRIBUTE_LIST)

for i in range(0, 80, 3):
    df_records.iat[i, 2] = None
    df_records.iat[i, 3] = None
    df_records.iat[i-1, 4] = None
    df_records.iat[i+1, 5] = None

In [28]:
# drop items whith missing version entry
df_records.drop(*df_records[CPE_VERSION_ATTRIBUTE_LIST[2:]].isna().all(axis=1).nonzero(), inplace=True)

In [29]:
df_records.head()

Unnamed: 0,vendor,product,versionExact,versionStartExcluding,versionStartIncluding,versionEndIncluding,versionEndExcluding
0,openwebanalytics,open_web_analytics,,,,,1.5.6
1,owncloud,owncloud,,,,,6.0.1
2,disable_comments,disable_comments_project,,,,,1.0.4
3,coppermine-gallery,coppermine_photo_gallery,,,,,1.5.28
4,coppermine-gallery,coppermine_photo_gallery,,,1.6.0,,1.6.01


In [30]:
grp = df_records.sort_values(by=CPE_VERSION_ATTRIBUTE_LIST[2:]).groupby(by=['product'])
max_count = grp.count().max().vendor  # count the greatest size of a group for new df

df_sorted = grp.head(max_count)
df_sorted.set_index(keys=['vendor', 'product'])

TypeError: unhashable type: 'CustomObject'