## Objective: Create a full database for one NPORT quarter (2024q3)

In [1]:
import os
import sqlite3
import csv

### Set up directory

In [2]:
# Get current directory
curr_dir = os.getcwd()
print(curr_dir)

# Get parent directory
os.chdir("..")
parent_dir = os.getcwd()
print(parent_dir)

/Users/ritawu/Desktop/BTTAI/PIMCO-Text2SQL/setup
/Users/ritawu/Desktop/BTTAI/PIMCO-Text2SQL


### Establish connection to db file

In [3]:
# Set up connection
conn = sqlite3.connect(parent_dir + '/sqlite/nport.db')
cursor = conn.cursor()

### Get raw data folder

In [4]:
raw_data_folder = parent_dir + '/data/2024q3_nport'
print(raw_data_folder)

/Users/ritawu/Desktop/BTTAI/PIMCO-Text2SQL/data/2024q3_nport


### Create Tables

In [5]:
def infer_sqlite_types(rows, column_count):
    """
    Infer SQLite-compatible data types for columns, categorizing only as NUMBER or TEXT.
    """
    inferred_types = []
    for x in range(column_count):
        inferred_types.append("")  

    for row in rows:
        if all(field.strip() != "" for field in inferred_types):
            break
        for i, value in enumerate(row):
            if inferred_types[i] == "":  
                # Skip null or empty values
                if value.strip() == "":
                    continue
                try:
                    # Attempt to classify as NUMBER (covers both int and float cases)
                    float(value)  # Successful conversion implies it's a NUMBER
                    inferred_types[i] = "NUMBER"
                except ValueError:
                    inferred_types[i] = "TEXT"
    return inferred_types

def filter_columns(column_names, sample_rows, null_threshold):
    """
    Filters columns based on the null threshold. Removes columns where >85% of rows are empty.
    
    Args:
        column_names (list): List of column names.
        sample_rows (list): List of data rows.
        null_threshold (float): Threshold for null percentage (default 85%).

    Returns:
        list: List of column names to keep.
        list: List of column indices to keep.
    """
    total_rows = len(sample_rows)
    columns_to_keep = []
    indices_to_keep = []

    for i, col in enumerate(column_names):
        null_count = sum(1 for row in sample_rows if row[i].strip() == "")
        if null_count / total_rows <= null_threshold:
            columns_to_keep.append(col)
            indices_to_keep.append(i)

    return columns_to_keep, indices_to_keep


# Create table from .tsv file
def create_table(filename):
    if filename.endswith('.tsv'):
        table_name = os.path.splitext(filename)[0]
        file_path = raw_data_folder +'/'+ filename

        with open(file_path, 'r', newline='') as file:
            reader = csv.reader(file, delimiter='\t')
            column_names = next(reader)  # Get the first row as column names
            sample_rows = list(reader)

            filtered_column_names, filtered_indices = filter_columns(column_names, sample_rows, 0.85)
            filtered_rows = [[row[i] for i in filtered_indices] for row in sample_rows]

            inferred_types = infer_sqlite_types(filtered_rows, len(filtered_column_names))
            create_table_query = f"CREATE TABLE IF NOT EXISTS {table_name} (" + \
                                ", ".join([f"{col} {dtype}" for col, dtype in zip(filtered_column_names, inferred_types)]) + ");"
            cursor.execute(create_table_query)

            # Insert data into the table
            insert_query = f"INSERT INTO {table_name} ({', '.join(filtered_column_names)}) VALUES ({', '.join(['?'] * len(filtered_column_names))});"
            for row in filtered_rows:
                cursor.execute(insert_query, row)

        print(f"Finished creating table {table_name}")


In [6]:
for filename in os.listdir(raw_data_folder):
    create_table(filename)

Finished creating table INTEREST_RATE_RISK
Finished creating table DESC_REF_INDEX_COMPONENT
Finished creating table REPURCHASE_AGREEMENT
Finished creating table FLOATING_RATE_RESET_TENOR
Finished creating table SWAPTION_OPTION_WARNT_DERIV
Finished creating table FUT_FWD_NONFOREIGNCUR_CONTRACT
Finished creating table OTHER_DERIV_NOTIONAL_AMOUNT
Finished creating table DERIVATIVE_COUNTERPARTY
Finished creating table DEBT_SECURITY_REF_INSTRUMENT
Finished creating table REPURCHASE_COLLATERAL
Finished creating table OTHER_DERIV
Finished creating table SECURITIES_LENDING
Finished creating table BORROWER
Finished creating table DESC_REF_OTHER
Finished creating table DESC_REF_INDEX_BASKET
Finished creating table FUND_VAR_INFO
Finished creating table FWD_FOREIGNCUR_CONTRACT_SWAP
Finished creating table REPURCHASE_COUNTERPARTY
Finished creating table CONVERTIBLE_SECURITY_CURRENCY
Finished creating table MONTHLY_RETURN_CAT_INSTRUMENT
Finished creating table DEBT_SECURITY
Finished creating table R

In [7]:
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()
tables = [table[0] for table in tables]
print("Number of Tables:", len(tables))
print("Tables:", tables)

Number of Tables: 30
Tables: ['INTEREST_RATE_RISK', 'DESC_REF_INDEX_COMPONENT', 'REPURCHASE_AGREEMENT', 'FLOATING_RATE_RESET_TENOR', 'SWAPTION_OPTION_WARNT_DERIV', 'FUT_FWD_NONFOREIGNCUR_CONTRACT', 'OTHER_DERIV_NOTIONAL_AMOUNT', 'DERIVATIVE_COUNTERPARTY', 'DEBT_SECURITY_REF_INSTRUMENT', 'REPURCHASE_COLLATERAL', 'OTHER_DERIV', 'SECURITIES_LENDING', 'BORROWER', 'DESC_REF_OTHER', 'DESC_REF_INDEX_BASKET', 'FUND_VAR_INFO', 'FWD_FOREIGNCUR_CONTRACT_SWAP', 'REPURCHASE_COUNTERPARTY', 'CONVERTIBLE_SECURITY_CURRENCY', 'MONTHLY_RETURN_CAT_INSTRUMENT', 'DEBT_SECURITY', 'REGISTRANT', 'MONTHLY_TOTAL_RETURN', 'SUBMISSION', 'BORROW_AGGREGATE', 'EXPLANATORY_NOTE', 'NONFOREIGN_EXCHANGE_SWAP', 'FUND_REPORTED_HOLDING', 'IDENTIFIERS', 'FUND_REPORTED_INFO']


In [8]:
conn.commit()
conn.close()

In [9]:
temp_conn = sqlite3.connect(parent_dir + '/sqlite/nport.db')
temp_cursor = temp_conn.cursor()
for table in tables:
    temp_cursor.execute(f"SELECT COUNT(*) FROM {table}")
    print(f"Number of Rows in {table}:", temp_cursor.fetchone()[0])

print('=='*40)
for table in tables:
    table_name = table

    query = f"PRAGMA table_info({table_name});"
    temp_cursor.execute(query)
    columns = temp_cursor.fetchall()
    print(f"{table_name}: "+str({col[1]: col[2] for col in columns}))  # col[1]: column name, col[2]: data type
temp_cursor.close()
temp_conn.close()

Number of Rows in INTEREST_RATE_RISK: 10128
Number of Rows in DESC_REF_INDEX_COMPONENT: 55996
Number of Rows in REPURCHASE_AGREEMENT: 9354
Number of Rows in FLOATING_RATE_RESET_TENOR: 60630
Number of Rows in SWAPTION_OPTION_WARNT_DERIV: 28203
Number of Rows in FUT_FWD_NONFOREIGNCUR_CONTRACT: 28549
Number of Rows in OTHER_DERIV_NOTIONAL_AMOUNT: 616
Number of Rows in DERIVATIVE_COUNTERPARTY: 218037
Number of Rows in DEBT_SECURITY_REF_INSTRUMENT: 25200
Number of Rows in REPURCHASE_COLLATERAL: 9955
Number of Rows in OTHER_DERIV: 616
Number of Rows in SECURITIES_LENDING: 6097928
Number of Rows in BORROWER: 32135
Number of Rows in DESC_REF_OTHER: 123379
Number of Rows in DESC_REF_INDEX_BASKET: 23317
Number of Rows in FUND_VAR_INFO: 4731
Number of Rows in FWD_FOREIGNCUR_CONTRACT_SWAP: 96073
Number of Rows in REPURCHASE_COUNTERPARTY: 8510
Number of Rows in CONVERTIBLE_SECURITY_CURRENCY: 14568
Number of Rows in MONTHLY_RETURN_CAT_INSTRUMENT: 265708
Number of Rows in DEBT_SECURITY: 3980194
Numbe

### Deprecated: Drop Columns Missing more than 85% of Values 

def drop_columns(table_name):
    temp_cursor= conn.cursor()

    temp_cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table_name,))
    result = temp_cursor.fetchone()

    # Check if the result is not None
    if not result:
        return("There are currently no tables in the database.")

    temp_cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
    total_rows = temp_cursor.fetchone()[0]

    temp_cursor.execute(f"PRAGMA table_info({table_name})")
    columns = temp_cursor.fetchall()

    columns_to_drop = []
    columns_to_keep = []


    for column in columns:
        column_name = column[1]

        # Skip columns that end with ':1'
        if column_name.endswith(':1'):
            print(f"Skipping column {column_name} in table {table_name}")
            continue

        temp_cursor.execute(f"SELECT COUNT({column_name}) FROM {table_name} WHERE {column_name} = ''")
        null_count = temp_cursor.fetchone()[0]

        if null_count > 0.85 * total_rows:
            columns_to_drop.append(column_name)
        else:
            columns_to_keep.append(column_name)

    if columns_to_drop:
        # Create new table with remaining columns
        new_table_name = f"{table_name}_new"
        columns_definition = ', '.join(columns_to_keep)
        temp_cursor.execute(f"CREATE TABLE {new_table_name} ({columns_definition})")

        # Copy columns to new table
        temp_cursor.execute(f"INSERT INTO {new_table_name} ({columns_definition}) SELECT {columns_definition} FROM {table_name}")

        # Drop old table
        temp_cursor.execute(f"DROP TABLE {table_name}")

        # Rename new table to original table name
        temp_cursor.execute(f"ALTER TABLE {new_table_name} RENAME TO {table_name}")

        print(f"Dropped columns {', '.join(columns_to_drop)} from table {table_name}")
    else:
        print(f"No columns to drop from table {table_name}")
    
    temp_cursor.close()
    return columns_to_drop, columns_to_keep

columns_to_drop = {}
columns_to_keep = {}
for table in tables:
    columns_to_drop[table],columns_to_keep[table] = drop_columns(table)
print("Kept columns: ", columns_to_keep)
print("Dropped columns: ", columns_to_drop)

empty_count = 0

for table, kept_columns in columns_to_keep.items():
    if not kept_columns:  # Checks if the list is empty
        print(f"Table '{table}' is empty.")
        empty_count += 1
    else:
        print(f"Table '{table}' has kept columns: {kept_columns}")

print("Empty Tables:", empty_count)

temp_conn = sqlite3.connect(parent_dir + '/sqlite/nport.db')
temp_cursor = temp_conn.cursor()
for table in tables:
    temp_cursor.execute(f"SELECT COUNT(*) FROM {table}")
    print(f"Number of Rows in {table}:", temp_cursor.fetchone()[0])

print('=='*40)
for table in tables:
    table_name = table

    query = f"PRAGMA table_info({table_name});"
    temp_cursor.execute(query)
    columns = temp_cursor.fetchall()
    print(f"{table_name}: "+str({col[1]: col[2] for col in columns}))  # col[1]: column name, col[2]: data type
temp_cursor.close()
temp_conn.close()