**Title:** Nist CSF Database Creation  
**Name:** Daniel Cavazos, Million Werede  
**Date of Creation (MM/DD/YYYY):** 02/07/2023  
**Summary** Code to create Nist CSF database `nist.db`.

![nist ERD](nist_ERD.png)

In [1]:
# necessary imports
# alias imports
import pandas as pd
import sqlite3 as sql
import numpy as np

# standard imports
import os
import regex

# Format Nist Mapping DataFrame

In [2]:
df = pd.read_excel('data/nist_data/nist_mapping.xlsx', 
                   sheet_name='SP 800-53r4 to CSF')

for index, row in df.iterrows():
    if not type(row['Function']) == str:
        prev_row = df.loc[index-1]
        df.loc[index, 'Function'] = prev_row['Function']

    if not type(row['Category']) == str:
        prev_row = df.loc[index-1]
        df.loc[index, 'Category'] = prev_row['Category']


df.head(5)

Unnamed: 0,Function,Category,Subcategory,All SP 800-53 Controls
0,IDENTIFY (ID),"Asset Management (ID.AM): The data, personnel,...",ID.AM-1: Physical devices and systems within t...,"CM-8, PM-5"
1,IDENTIFY (ID),"Asset Management (ID.AM): The data, personnel,...",ID.AM-2: Software platforms and applications w...,"CM-8, PM-5"
2,IDENTIFY (ID),"Asset Management (ID.AM): The data, personnel,...",ID.AM-3: Organizational communication and data...,"AC-4, CA-3, CA-9, PL-8"
3,IDENTIFY (ID),"Asset Management (ID.AM): The data, personnel,...",ID.AM-4: External information systems are cata...,"AC-20, SA-9"
4,IDENTIFY (ID),"Asset Management (ID.AM): The data, personnel,...","ID.AM-5: Resources (e.g., hardware, devices, d...","CP-2, RA-2, SA-14, SC-6,"


# Create Database

In [3]:
def check_db(filename):
  return os.path.exists(filename)

db_path = 'data/nist.db'
schema_path = 'TABLE_FOR_CHATBOT.sql'

with open(schema_path, 'r') as rf:
  schema = rf.read()

all_tables = [table.strip() for table in schema.split('GO')]

In [4]:
db_exists = check_db(db_path)

with sql.connect(db_path) as conn:
  if not db_exists:
    for table in all_tables:
      conn.executescript(table)
    print('Created Database')
  else:
    print('Database already created')

Created Database


In [5]:
# Check tables in database
sql_tables = []
with sql.connect(db_path) as conn:
  cur = conn.cursor()
  cur.execute("SELECT name FROM sqlite_master WHERE type='table'")
  sql_tables += cur.fetchall()

print('-'*5, 'Present Tables', '-'*5)
for table in sql_tables:
  table = table[0]
  if 'tbl' in table:
    print(table)

----- Present Tables -----
tblFunction
tblCategory
tblSubcategory
tblControlFamily
tblControl
tblControlSubcategory
tblControlEnhancement


## Get Functions

In [6]:
def sprocGetFunctionID(conn, funcName):
    conn.isolation_level = None
    cur = conn.cursor()
    cur.execute(f"SELECT functionID FROM tblFunction WHERE functionName = '{funcName}'")

    return cur.fetchall()[0][0]

def sprocGetCategoryID(conn, catName):
    conn.isolation_level = None
    cur = conn.cursor()
    cur.execute(f"SELECT categoryID FROM tblCategory WHERE categoryName = '{catName}'")

    return cur.fetchall()[0][0]

def sprocGetSubCategoryID(conn, subCatAbbrev):
    conn.isolation_level = None
    cur = conn.cursor()
    cur.execute(f"SELECT SubcategoryID FROM tblSubcategory WHERE subcategoryAbbrev = '{subCatAbbrev}'")

    return cur.fetchall()[0][0]

def sprocGetControlFamilyID(conn, contFamAbbrev):
    conn.isolation_level = None
    cur = conn.cursor()
    cur.execute(f"SELECT ControlFamilyID FROM tblControlFamily WHERE ControlFamilyAbbrev = '{contFamAbbrev}'")
    return cur.fetchall()[0][0]

def sprocGetControlID(conn, contName):
    conn.isolation_level = None
    cur = conn.cursor()
    cur.execute(f"SELECT ControlID FROM tblControl WHERE ControlName = '{contName}'")

    return cur.fetchall()[0][0]

def sprocGetControlID_Abbrev(conn, contAbbrev):
    conn.isolation_level = None
    cur = conn.cursor()
    cur.execute(f"SELECT ControlID FROM tblControl WHERE ControlAbbrev = '{contAbbrev}'")

    return cur.fetchall()[0][0]

def sprocGetControlEnhancementID(conn, contEnName):
    conn.isolation_level = None
    cur = conn.cursor()
    cur.execute(f"SELECT ControlEnhancementID FROM tblControlEnhancement WHERE ControlEnhancementName = '{contEnName}'")

    return cur.fetchall()[0][0]

## Insert Functions

In [7]:
def sprocInsertFunction(conn, funcName, funcAbbrev, funcSumm='Null'):
    """
    Inserts new observation in to tblFunction
    :conn: sqlite3 connection to database
    :funcName: name of function
    :funcAbbrev: function abbreviation
    :funcSumm: function summary, if avaialable (default Null)

    :returns: True if trans sucessfully committed, false if not
    """
    conn.isolation_level = None
    cur = conn.cursor()
    cur.execute("BEGIN TRANSACTION TInsertFunction")
    try:
        cur.execute(f"""
        INSERT INTO tblFunction (functionName, functionAbbrev, functionSummary)
        VALUES ('{funcName}', '{funcAbbrev}', '{funcSumm}')
        """)
        conn.commit()
    except sql.IntegrityError as e:
        print('Error occured during transaction TInsertFunction, rolling back Tran')
        conn.rollback()
        return False

    return True

def sprocInsertCategory(conn, funcName, catName, catAbbrev, catSumm='Null'):
    """
    Inserts new observation in to tblCategory
    :conn: sqlite3 connection to database
    :funcName: name of function
    :catName: name of category
    :catAbbrev: category abbreviation
    :catSumm: category summary, if available (default Null)

    :returns: True if trans sucessfully committed, false if not
    """
    conn.isolation_level = None
    cur = conn.cursor()


    funcID = sprocGetFunctionID(conn, funcName)
    if not funcID:
        print('Error occurred when fetching FunctionID, check function name.')
        return False

    cur.execute("BEGIN TRANSACTION TInsertCategory")
    try:
        cur.execute(f"""
        INSERT INTO tblCategory (functionID, categoryName, categorySummary, categoryAbbrev)
        VALUES ('{funcID}', '{catName}', '{catSumm}', '{catAbbrev}')
        """)
        conn.commit()
    except sql.IntegrityError as e:
        print('Error occured during transaction TInsertCategory, rolling back Tran')
        conn.rollback()
        return False

    return True

def sprocInsertSubcategory(conn, catName, subcatAbbrev, subcatSumm='Null'):
    """
    Inserts new observation in to tblSubcategory
    :conn: sqlite3 connection to database
    :catName: name of category
    :subcatAbbrev: subcategory abbreviation
    :subcatSumm: subcategory summary, if available (default Null)

    :returns: True if trans sucessfully committed, false if not
    """
    conn.isolation_level = None
    cur = conn.cursor()


    catID = sprocGetCategoryID(conn, catName)
    if not catID:
        print('Error occurred when fetching CategoryID, check category name.')
        return False

    cur.execute("BEGIN TRANSACTION TInsertSubcategory")
    try:
        cur.execute(f"""
        INSERT INTO tblSubcategory (categoryID, subcategorySummary, subcategoryAbbrev)
        VALUES ('{catID}', '{subcatSumm}', '{subcatAbbrev}')
        """)
        conn.commit()
    except sql.IntegrityError as e:
        print('Error occured during transaction TInsertSubcategory, rolling back Tran')
        conn.rollback()
        return False

    return True

def sprocInsertControlFamily(conn, contFamName, contFamAbbrev):
    """
    Inserts new observation in to tblControlFamily
    :conn: sqlite3 connection to database
    :contFamName: name of control family
    :contFamAbbrev: control family abbreviation

    :returns: True if trans sucessfully committed, false if not
    """
    conn.isolation_level = None
    cur = conn.cursor()

    cur.execute("BEGIN TRANSACTION TInsertControlfamily")
    try:
        cur.execute(f"""
        INSERT INTO tblControlFamily (ControlFamilyName, ControlFamilyAbbrev)
        VALUES ('{contFamName}', '{contFamAbbrev}')
        """)
        conn.commit()
    except sql.IntegrityError as e:
        print('Error occured during transaction TInsertControlfamily, rolling back Tran')
        conn.rollback()
        return False

    return True

def sprocInsertControl(conn, contFamAbbrev, contName, contAbbrev, contStatement, contGuidance):
    """
    Inserts new observation in to tblSubcategory
    :conn: sqlite3 connection to database
    :contFamAbbrev: control family abbreviation
    :contName: name of control
    :contAbbrev: control abbreviation
    :contStatement: control statement
    :contGuidance: control guidance

    :returns: True if trans sucessfully committed, false if not
    """
    conn.isolation_level = None
    cur = conn.cursor()

    contFamID = sprocGetControlFamilyID(conn, contFamAbbrev=contFamAbbrev)
    if not contFamID:
        print('Error occurred when fetching ControlFamilyID, check control family name.')
        return False

    cur.execute("BEGIN TRANSACTION TInsertControl")
    try:
        cur.execute(f"""
        INSERT INTO tblControl (ControlFamilyID, ControlName, ControlAbbrev, ControlStatement, ControlGuidance)
        VALUES ('{contFamID}', '{contName}', '{contAbbrev}', '{contStatement}', '{contGuidance}')
        """)
        conn.commit()
    except sql.IntegrityError as e:
        print('Error occured during transaction TInsertControl, rolling back Tran')
        conn.rollback()
        return False

    return True

def sprocInsertControlSubcategory(conn, subcatAbbrev, contAbbrev):
    """
    Inserts new observation in to tblSubcategory
    :conn: sqlite3 connection to database
    :subcatAbbrev: subcategory abbreviation
    :contAbbrev: control abbreviation

    :returns: True if trans sucessfully committed, false if not
    """
    conn.isolation_level = None
    cur = conn.cursor()


    contID = sprocGetControlID_Abbrev(conn, contAbbrev=contAbbrev)
    if not contID:
        print('Error occurred when fetching ControlID, check control abbrev.')
        return False

    subcatID = sprocGetSubCategoryID(conn, subCatAbbrev=subcatAbbrev)
    if not subcatID:
        print('Error occurred when fetching SubcategroyID, check subcategory name.')
        return False

    cur.execute("BEGIN TRANSACTION TControlSubcategory")
    try:
        cur.execute(f"""
        INSERT INTO tblControlSubcategory (ControlID, SubcategoryID)
        VALUES ('{contID}', '{subcatID}')
        """)
        conn.commit()
    except sql.IntegrityError as e:
        print('Error occured during transaction TControlSubcategory, rolling back Tran')
        conn.rollback()
        return False

    return True

def sprocInsertControlEnhancement(conn, contAbbrev, contEnName, contEnAbbrev, contEnStatement, contEnGuidance):
    """
    Inserts new observation in to tblSubcategory
    :conn: sqlite3 connection to database
    :contAbbrev: control abbreviation
    :contEnName: name of control enhancement
    :contEnAbbrev: control enhancement abbreviation
    :contEnStatement: control enhancement statement
    :contEnGuidance: control enhancement guidance

    :returns: True if trans sucessfully committed, false if not
    """
    conn.isolation_level = None
    cur = conn.cursor()


    contID = sprocGetControlID_Abbrev(conn, contAbbrev=contAbbrev)
    if not contID:
        print('Error occurred when fetching ControlID, check control name.')
        return False

    cur.execute("BEGIN TRANSACTION TInsertControlEnhancement")
    try:
        cur.execute(f"""
        INSERT INTO tblControlEnhancement (controlID, controlEnhancementName, controlEnhancementAbbrev, controlEnhancementStatement, controlEnhancementGuidence)
        VALUES ('{contID}', '{contEnName}', '{contEnAbbrev}', '{contEnStatement}', '{contEnGuidance}')
        """)
        conn.commit()
    except sql.IntegrityError as e:
        print('Error occured during transaction TInsertControlEnhancement, rolling back Tran')
        conn.rollback()
        return False

    return True

## Insert Data

In [8]:
# Insert Functions
df_func = pd.read_csv('data/nist_data/functions.csv')
with sql.connect(db_path) as conn:
    for i in range(len(df_func)):
        funcName, funcAbbrev, funcSumm = tuple(df_func.iloc[i].tolist())
        if not sprocInsertFunction(conn, 
                                   funcName=funcName, 
                                   funcAbbrev=funcAbbrev, 
                                   funcSumm=funcSumm):
            print(f'Failed to insert Function {funcName} with abbrev {funcAbbrev}')

In [9]:
# Insert Categories
# Remove duplicate categories
df_cats = df[['Function', 'Category']].copy().drop_duplicates(inplace=False)

# isolate the function name
df_cats['Function'] = df_cats['Function'].map(lambda x: x.split(' ')[0])

# create a temporary df to extract the category information and the CatSum
cat_temp_df = pd.DataFrame(df_cats['Category'].str.split(':').to_list(), columns=['Category', 'CatSum'])
cat_temp_df['cats_index'] = df_cats.index
df_cats = pd.merge(df_cats, cat_temp_df, how='inner', left_index = True, right_on='cats_index')[['Function', 'Category_y', 'CatSum']]

# create CatName and CatAbbrev
df_cats[['CatName', 'CatAbbrev']] = pd.DataFrame(df_cats['Category_y'].str.split(' \(').to_list())
# Remedy CatAbbrev
df_cats['CatAbbrev'] = df_cats['CatAbbrev'].map(lambda x: x.replace(')', ''))
# Remove unnecessary cols
df_cats = df_cats[['Function', 'CatName', 'CatAbbrev', 'CatSum']]

# Insert categories
with sql.connect(db_path) as conn:
    for i in range(len(df_cats)):
        funcName, catName, catAbbrev, catSum = tuple(df_cats.iloc[i].tolist())
        if not sprocInsertCategory(conn, funcName = funcName, catName = catName,
                                   catAbbrev = catAbbrev.strip(), 
                                   catSumm = catSum.strip()):
            print(f'Failed to insert Category {catName} with Function {funcName}')

In [10]:
# Insert Subcategories
# Select relevant cols and drop dupes
df_subcat = df[['Category', 'Subcategory']].copy()
df_subcat.drop_duplicates(inplace=True)

# Trim category names
df_subcat['Category'] = df_subcat['Category'].map(lambda x: x.split(' (')[0])

# create a temporary df to extract the Abbrev information and the subcatsum
subcat_temp_df = pd.DataFrame(df_subcat['Subcategory'].str.split(':').to_list(), columns=['SubcatAbbrev', 'SubcatSumm'])
subcat_temp_df['subcat_index'] = df_subcat.index
df_subcat = pd.merge(df_subcat, subcat_temp_df, how='inner', left_index = True, right_on='subcat_index')[['Category', 'SubcatAbbrev', 'SubcatSumm']]

# Insert subcategories
with sql.connect(db_path) as conn:
    for i in range(len(df_subcat)):
        catName, subcatAbbrev, subcatSumm = tuple(df_subcat.iloc[i].tolist())
        if not sprocInsertSubcategory(conn, catName=catName,
                                      subcatAbbrev=subcatAbbrev.strip(),
                                      subcatSumm=subcatSumm.strip()):
            print(f'Failed to insert Subcategory {subcatAbbrev} with Category {catName}')

In [11]:
# Insert Control Families
control_fam_df = pd.read_csv('data/nist_data/control_families.csv')

control_fam_df.head()

with sql.connect(db_path) as conn:
    for i in range(len(control_fam_df)):
        contFamAc, contFamName = tuple(control_fam_df.iloc[i].tolist())
        if not sprocInsertControlFamily(conn, contFamName=contFamName,
                                        contFamAbbrev=contFamAc):
            print(f'Failed to insert ControlFamily {contFamName}')

In [12]:
# Insert controls
cont_enchance_df = pd.read_csv('data/nist_data/control_and_enhancements.csv')
cols = ['identifier', 'name', 'control_text', 'discussion']

control_df = cont_enchance_df[~cont_enchance_df['identifier'].str.\
                              contains('\(')].\
                                copy()[cols]
enhance_df = cont_enchance_df[cont_enchance_df['identifier'].str\
                              .contains('\(')].\
                                copy()[cols]

with sql.connect(db_path) as conn:
    for i in range(len(control_df)):
        contAbbrev, contName, contStatement, contGuidance = \
          tuple(control_df.iloc[i].tolist())
        contFamAbbrev = contAbbrev.split('-')[0].strip()

        if not (sprocInsertControl(conn, 
                          contFamAbbrev=contFamAbbrev, 
                          contName=contName, 
                          contAbbrev=contAbbrev, 
                          contStatement=contStatement, 
                          contGuidance=contGuidance
        )):
            print(f'Failed to insert control {contName} with Control Family {contFamAbbrev}')

In [13]:
# Insert control subcategories
cont_sub_df = df[['Subcategory', 'All SP 800-53 Controls']].copy()
cont_sub_df['Subcategory'] = cont_sub_df['Subcategory'].map(lambda x: x.split(': ')[0])
bad_controls = {
    '-1 controls from all families': 'AC-1, AT-1, AU-1, CA-1, CM-1, CP-1, IA-1, IR-1, MA-1, MP-1, PE-1, PL-1, PM-1, PS-1, PT-1, RA-1, SA-1, SC-1, SI-1, SR-1',
    '-1 controls from all families (except PM-1)': 'AC-1, AT-1, AU-1, CA-1, CM-1, CP-1, IA-1, IR-1, MA-1, MP-1, PE-1, PL-1, PS-1, PT-1, RA-1, SA-1, SC-1, SI-1, SR-1',
    'AU Family': 'AU-1, AU-2, AU-3, AU-4, AU-5, AU-6, AU-7, AU-8, AU-9, AU-10, AU-11, AU-12, AU-13, AU-14, AU-15, AU-16',
    np.nan: ''
    }

with sql.connect(db_path) as conn:
    for i in range(len(cont_sub_df)):
        subcat, all_controls = tuple(cont_sub_df.iloc[i].tolist())
        if all_controls in bad_controls.keys():
            all_controls = bad_controls[all_controls]
        for control in regex.split('\.|, ', all_controls):
            if control != '':
                control = control.replace(',', '').strip().replace(' ', '')
                control = regex.sub(r'\p{Pd}+', '-', control)
                if not sprocInsertControlSubcategory(conn,
                                                     subcatAbbrev=subcat,
                                                     contAbbrev=control):
                    print(f'Failed to insert control {control} with subcategory {subcat}')

In [14]:
# Insert enhancements

with sql.connect(db_path) as conn:
    for i in range(len(enhance_df)):
        contEnAbbrev, contEnName, contEnStatement, contEnGuidance = tuple(enhance_df.iloc[i].tolist())
        contEnAbbrev = regex.sub(r'\p{Pd}+', '-', contEnAbbrev)
        contAbbrev = contEnAbbrev.split('(')[0]

        contEnStatement = (contEnStatement, contEnStatement.replace("'", "''"))[isinstance(contEnStatement, str)]
        contEnGuidance = (contEnStatement, contEnStatement.replace("'", "''"))[isinstance(contEnStatement, str)]
        sprocInsertControlEnhancement(conn, 
                                      contAbbrev=contAbbrev, 
                                      contEnName=contEnName, 
                                      contEnAbbrev=contEnAbbrev, 
                                      contEnStatement=contEnStatement, 
                                      contEnGuidance=contEnGuidance)