In [1]:
import json
import logging
import os
import pandas as pd
import re
import sqlalchemy
import sys

In [2]:
# Logging init
os.remove("./build-db-from-osv.log") if os.path.exists("./build-db-from-osv.log") else None
logger = logging.getLogger()
fhandler = logging.FileHandler(filename='build-db-from-osv.log', mode='a')
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fhandler.setFormatter(formatter)
logger.addHandler(fhandler)
logger.setLevel(logging.DEBUG)

- Read the json files for each ecosystem.
- Parse and make dataframe of these.
- Put the dataframe to database.

In [3]:
def get_severity_score_range(score):
    """Convert severity score to qualitative range"""
    if score is None:
        return 'None'
    try:
        score_float = float(score)
        if score_float == 0.0:
            return 'None'
        elif 0.1 <= score_float <= 3.9:
            return 'Low'
        elif 4.0 <= score_float <= 6.9:
            return 'Medium'
        elif 7.0 <= score_float <= 8.9:
            return 'High'
        elif 9.0 <= score_float <= 10.0:
            return 'Critical'
        else:
            return 'None'
    except (ValueError, TypeError):
        return 'None'


In [8]:
df = pd.DataFrame(columns = ['vul_id', 'system_name', 'package_name', 'vul_introduced', 'vul_fixed', 'severity_type', 'severity_score', 'severity_score_range'])
# df = pd.DataFrame()
data_dir = os.path.join(os.path.join(os.path.join(os.path.join(os.getcwd(), os.pardir), os.pardir), "data"), "osv-data-updated-2025-12-13")

for (root,dirs,files) in os.walk(data_dir, topdown=True):
    for file in files:
        if file.endswith(".json"):
            file_path = os.path.join(data_dir, root, file)
            # logging.debug(f"files: {root}/{file} -> start")
            with open(file_path, 'r', encoding='utf-8') as f:
                # try:
                    data = json.load(f)

                    vul_id = data["id"]
                    # logging.debug(f"{vul_id}")

                    package_name = ""
                    
                    if 'affected' in data:
                        for affected in data['affected']:
                            if 'package' in affected and 'ranges' in affected:
                                package_name = affected['package']['name']
                                # logging.debug(f"package_name: {package_name}")
                                system_name = affected['package']['ecosystem']
                                # logging.debug(f"system_name: {system_name}")
                                
                                # Extract severity information from affected
                                severity_type = None
                                severity_score = None
                                severity_score_range = 'None'
                                
                                if 'severity' in affected:
                                    for severity in affected['severity']:
                                        severity_type = severity.get('type')
                                        severity_score = severity.get('score')
                                        severity_score_range = get_severity_score_range(severity_score)
                                        break  # Take the first severity entry
                                
                                ranges = affected['ranges']
                                for range in ranges:
                                    events = range['events']
                                    for event in events:
                                        if 'introduced' in event:
                                            vul_introduced = event['introduced']
                                        elif 'fixed' in event:
                                            vul_fixed = event['fixed']
                                            # logging.debug(f"vul intro: {vul_introduced}, fixed: {vul_fixed}")
                                            df_temp = pd.DataFrame({
                                                'vul_id': vul_id,
                                                'system_name': system_name.upper(),
                                                'package_name': package_name,
                                                'vul_introduced': vul_introduced,
                                                'vul_fixed': vul_fixed,
                                                'severity_type': severity_type,
                                                'severity_score': severity_score,
                                                'severity_score_range': severity_score_range
                                            }, index=[0])
                                            # logging.debug(f"dataframe: {df_temp.to_string()}")
                                            df = pd.concat([df, df_temp], ignore_index=True)
                # except Exception:
                #     pass
df.head()

Unnamed: 0,vul_id,system_name,package_name,vul_introduced,vul_fixed,severity_type,severity_score,severity_score_range
0,GHSA-8c93-4hch-xgxp,NPM,wrangler,0,2.20.1,,,
1,GHSA-vj76-c3g6-qr5v,NPM,tar-fs,3.0.0,3.1.1,,,
2,GHSA-vj76-c3g6-qr5v,NPM,tar-fs,2.0.0,2.1.4,,,
3,GHSA-vj76-c3g6-qr5v,NPM,tar-fs,0,1.16.6,,,
4,GHSA-w5pw-gmcw-rfc8,NPM,squirrelly,9.0.0,9.1.0,,,


#### Qualitative Security Scoring

https://nvd.nist.gov/vuln-metrics/cvss

- None 0.0
- Low 0.1-3.9
- Medium 4.0-6.9
- High 7.0-8.9
- Critical 9.0-10.0

In [9]:
print (df.system_name.unique())

['NPM' 'NUGET' 'PACKAGIST' 'CRATES.IO' 'MAVEN' 'GO' 'SWIFTURL' 'RUBYGEMS'
 'PUB' 'PYPI' 'HEX']


In [10]:
df.shape


(22469, 8)

In [11]:
df.dropna(inplace=True, subset=['system_name', 'package_name', 'vul_introduced', 'vul_fixed'])
df.shape

(22469, 8)

In [12]:
def transformation_semver(x):
    if x == '0':
        return '0.0.0'
    elif x.count('.') == 0:
        return x + '.0.0'
    elif re.match(r'(\d+(\.\d*))', x) and x.count('.') == 1:
        return x + '.0'
    else:
        return x

In [13]:
def transformation_system_name(x):
    if x == 'CRATES.IO':
        return 'CARGO'
    else:
        return x

In [14]:
df['system_name'] = df['system_name'].apply(transformation_system_name)

In [15]:
df['vul_introduced'] = df['vul_introduced'].apply(transformation_semver)

In [16]:
def filter_rows_by_values(df, col, values):
    return df[~df[col].isin(values)]

In [17]:
df = filter_rows_by_values(df, 'system_name', ['MAVEN', 'NUGET', 'PACKAGIST', 'GO', 'RUBYGEMS',
 'SWIFTURL', 'PUB', 'HEX'])

In [18]:
df.head()

Unnamed: 0,vul_id,system_name,package_name,vul_introduced,vul_fixed,severity_type,severity_score,severity_score_range
0,GHSA-8c93-4hch-xgxp,NPM,wrangler,0.0.0,2.20.1,,,
1,GHSA-vj76-c3g6-qr5v,NPM,tar-fs,3.0.0,3.1.1,,,
2,GHSA-vj76-c3g6-qr5v,NPM,tar-fs,2.0.0,2.1.4,,,
3,GHSA-vj76-c3g6-qr5v,NPM,tar-fs,0.0.0,1.16.6,,,
4,GHSA-w5pw-gmcw-rfc8,NPM,squirrelly,9.0.0,9.1.0,,,


In [19]:
df


Unnamed: 0,vul_id,system_name,package_name,vul_introduced,vul_fixed,severity_type,severity_score,severity_score_range
0,GHSA-8c93-4hch-xgxp,NPM,wrangler,0.0.0,2.20.1,,,
1,GHSA-vj76-c3g6-qr5v,NPM,tar-fs,3.0.0,3.1.1,,,
2,GHSA-vj76-c3g6-qr5v,NPM,tar-fs,2.0.0,2.1.4,,,
3,GHSA-vj76-c3g6-qr5v,NPM,tar-fs,0.0.0,1.16.6,,,
4,GHSA-w5pw-gmcw-rfc8,NPM,squirrelly,9.0.0,9.1.0,,,
...,...,...,...,...,...,...,...,...
22464,PYSEC-2023-45,PYPI,redis,4.2.0,4.3.6,,,
22465,PYSEC-2023-45,PYPI,redis,4.4.0,4.4.3,,,
22466,PYSEC-2023-45,PYPI,redis,4.5.0,4.5.3,,,
22467,PYSEC-2023-283,PYPI,modoboa,0.0.0,7f0573e917227686d2cc127be1364e2908740807,,,


In [20]:
print (df.system_name.unique())

['NPM' 'CARGO' 'PYPI']


# Send it to POSTGRES

In [21]:
connection_str = "postgresql{dbapi}://{user}:{pw}@{host}:{port}/{db}".format(
        dbapi='',
        # dbapi="+pg8000",
        # dbapi="+psycopg",
        user="metricsuser",
        pw="metricspassword",
        host="localhost",
        port="5432",
        db="metrics")
# print (connection_str)
engine = sqlalchemy.create_engine(connection_str)
with engine.begin() as connection:
        df.to_sql(con=connection,
                name='osv',
                if_exists='replace',
                index=False,
                dtype={
                        'vul_id': sqlalchemy.types.VARCHAR,
                        'system_name': sqlalchemy.types.VARCHAR,
                        'package_name': sqlalchemy.types.VARCHAR,
                        'vul_introduced': sqlalchemy.types.VARCHAR,
                        'vul_fixed': sqlalchemy.types.VARCHAR,
                        'severity_type': sqlalchemy.types.VARCHAR,
                        'severity_score': sqlalchemy.types.VARCHAR,
                        'severity_score_range': sqlalchemy.types.VARCHAR
                }
        )