# SEC Data Scraping 
Data is scraped from https://www.sec.gov/data-research/sec-markets-data/insider-transactions-data-sets

This Jupyter Notebook scrapes, downloads, extracts, processes, and consolidates SEC insider transactions data from the SEC website. The final dataset is stored as a ZIP file.

In [None]:
import os
import re
import requests
import zipfile
import shutil
import glob
import pandas as pd
from bs4 import BeautifulSoup
import json

## Scrape and Download Zip Files

In [None]:
raw_data_folder = 'data/raw' # store raw zip files
YEARS_THRESHOLD = (2005, 2021) # to match little sis network data

# URL and headers for SEC data
url = "https://www.sec.gov/data-research/sec-markets-data/insider-transactions-data-sets"
headers = {"User-Agent": "DSA4263 (dsa4263@gmail.com)"}

response = requests.get(url, headers=headers)
if response.status_code != 200:
    print(f"Error fetching page: {response.status_code}")
    exit()

soup = BeautifulSoup(response.text, "html.parser")
zip_links = []

# Look for all links that end with '.zip'
for a in soup.find_all("a", href=True):
    href = a["href"]
    if href.lower().endswith(".zip"):
        # Normalize relative URLs if needed
        if href.startswith("/"):
            href = "https://www.sec.gov" + href
        # Extract a 4-digit year from the URL and filter (e.g., after 2010)
        year_match = re.search(r"(\d{4})", href)
        if year_match:
            try:
                year = int(year_match.group(1))
                if YEARS_THRESHOLD[0] <= year <= YEARS_THRESHOLD[1]:   
                    zip_links.append(href)
            except ValueError:
                pass
        else:
            print(f"No year found in URL: {href}")
print(f"Found {len(zip_links)} zip file links")

# Download each zip file
for link in zip_links:
    try:
        r = requests.get(link, headers=headers)
        r.raise_for_status()
    except Exception as e:
        print(f"Error downloading {link}: {e}")
        continue

    zip_filename = os.path.join(raw_data_folder, link.split("/")[-1])
    with open(zip_filename, "wb") as f:
        f.write(r.content)
    print(f"Downloaded: {zip_filename}")

## Extract and Merge TSV Files

In [None]:
#temp folder 
temp_extracted = "temp_extracted"
# Final folder for merged output
final_folder = "data/interim"
os.makedirs(final_folder, exist_ok=True)

# Dictionary to store DataFrames keyed by the TSV filename (e.g., "DERIV_HOLDING.tsv")
merged_data = {}
# Keep track of whether we've copied the metadata and readme files yet
metadata_copied = 0
#large files to exclude
excluded_files = ['owner_signature.tsv', 'footnotes.tsv']

# Process each downloaded zip file from the RAW_DATA folder
zip_files = glob.glob(os.path.join(raw_data_folder, "*.zip"))

for zip_path in zip_files:
    
    #temp file
    if os.path.exists(temp_extracted):
        shutil.rmtree(temp_extracted)
    os.makedirs(temp_extracted, exist_ok=True)

    # Extract the zip contents to temp
    with zipfile.ZipFile(zip_path, "r") as z:
        z.extractall(temp_extracted)

    # Process extracted files: merge TSVs and copy metadata/readme
    for root, dirs, files in os.walk(temp_extracted):
        for filename in files:
            filepath = os.path.join(root, filename)
            # If it's a TSV, load and merge it
            if filename.lower().endswith(".tsv"):
                if filename.lower() not in excluded_files: 
                    try:
                        df = pd.read_csv(filepath, sep="\t", low_memory=False)
                    except Exception as e:
                        print(f"Error reading {filepath}: {e}")
                        continue

                    if filename not in merged_data:
                        merged_data[filename] = df
                    else:
                        merged_data[filename] = pd.concat([merged_data[filename], df], ignore_index=True)

            # If it's metadata or readme, copy only once
            elif filename in ["insider_transactions_metadata.json", "insider_transactions_readme.htm"]:
                if metadata_copied < 2:
                    dest_path = os.path.join(final_folder, filename)
                    shutil.copy2(filepath, dest_path)
                    print(f"Copied metadata/readme: {filename}")
                    metadata_copied += 1

    # Remove the temporary extraction folder
    shutil.rmtree(temp_extracted, ignore_errors=True)

# Write out the merged TSV files into the final folder
for tsv_name, df in merged_data.items():
    output_path = os.path.join(final_folder, tsv_name)
    df.to_csv(output_path, sep="\t", index=False)
    print(f"Merged TSV saved: {output_path}")

print("All TSV files have been merged.")


Copied metadata/readme: insider_transactions_metadata.json
Copied metadata/readme: insider_transactions_readme.htm
Merged TSV saved: data/interim/NONDERIV_TRANS.tsv
Merged TSV saved: data/interim/NONDERIV_HOLDING.tsv
Merged TSV saved: data/interim/REPORTINGOWNER.tsv
Merged TSV saved: data/interim/DERIV_HOLDING.tsv
Merged TSV saved: data/interim/DERIV_TRANS.tsv
Merged TSV saved: data/interim/SUBMISSION.tsv
All TSV files have been merged.


##  Load metadata and build a mapping from TSV filename `{col_name -> datatype}`

In [None]:
with open("data/interim/insider_transactions_metadata.json", "r") as f:
    metadata = json.load(f)

conversion_mapping = {}
for table in metadata["tables"]:
    tsv_filename = table["url"]  # e.g. "OWNER_SIGNATURE.tsv"
    if tsv_filename.lower() in excluded_files:
        continue
    col_mappings = {}
    for col in table["tableSchema"]["columns"]:
        # e.g. col["name"] might be "ACCESSION_NUMBER"
        col_mappings[col["name"]] = col["datatype"]
    conversion_mapping[tsv_filename] = col_mappings

# -----------------------------------------------------------------------------
# Helper function for converting datatypes 
# -----------------------------------------------------------------------------
def convert_value(series, datatype):
    base = datatype["base"].lower()
    if "number" in base:
        # Convert to numeric; non-convertible values become NaN
        return pd.to_numeric(series, errors="coerce")
    elif "date (dd-mon-yyyy)" in base:
        # Convert to datetime using format "DD-MON-YYYY" (e.g. "01-JAN-2020")
        return pd.to_datetime(series, format="%d-%b-%Y", errors="coerce")
    else:
        #Ignore for rest
        return series.astype(str)

## Process each TSV file: read, convert, then save as CSV & delete TSV

In [None]:
data_folder = "data/interim"
tsv_files = glob.glob(os.path.join(data_folder, "*.tsv"))
problems = []

for tsv_file in tsv_files:
    df = pd.read_csv(tsv_file, sep="\t", dtype=str,low_memory=False) #low_memory params used here for accuracy 
    filename = os.path.basename(tsv_file)
    if filename.lower() in excluded_files: 
        continue
    print(f"\nProcessing {filename}...")

    #Debugging
    #print("  Columns found in TSV:", df.columns.tolist())

    # Conversion of datatypes 
    if filename in conversion_mapping:
        meta_for_file = conversion_mapping[filename]
        for col_name, datatype_info in meta_for_file.items():
            if col_name in df.columns:
                df[col_name] = convert_value(df[col_name], datatype_info)
            else:
                problems.append(f"Column '{col_name}' not found in {filename}.")
    else: 
        print(f"  No metadata mapping found for {filename}.")

    # Debugging
    # print("  Data types after conversion:")
    # print(df.dtypes)

    #Store as csv files
    csv_filename = os.path.splitext(tsv_file)[0] + ".csv"
    df.to_csv(csv_filename, index=False)
    print(f"  Saved converted data to {csv_filename}")

    #Delete original tsv
    os.remove(tsv_file)
    print(f"  Deleted original file: {tsv_file}")



Processing NONDERIV_TRANS.tsv...
  Saved converted data to data/interim/NONDERIV_TRANS.csv
  Deleted original file: data/interim/NONDERIV_TRANS.tsv

Processing NONDERIV_HOLDING.tsv...
  Saved converted data to data/interim/NONDERIV_HOLDING.csv
  Deleted original file: data/interim/NONDERIV_HOLDING.tsv

Processing REPORTINGOWNER.tsv...
  Saved converted data to data/interim/REPORTINGOWNER.csv
  Deleted original file: data/interim/REPORTINGOWNER.tsv

Processing DERIV_HOLDING.tsv...
  Saved converted data to data/interim/DERIV_HOLDING.csv
  Deleted original file: data/interim/DERIV_HOLDING.tsv

Processing DERIV_TRANS.tsv...
  Saved converted data to data/interim/DERIV_TRANS.csv
  Deleted original file: data/interim/DERIV_TRANS.tsv

Processing SUBMISSION.tsv...
  Saved converted data to data/interim/SUBMISSION.csv
  Deleted original file: data/interim/SUBMISSION.tsv


## Zip Folder, and remove the csv

In [None]:
# -------------------------------
# Zip the Final Data Folder
# -------------------------------
zip_filename = "data/interim/FINAL_RAW_DATA.zip"
with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf:
    for root, dirs, files in os.walk(final_folder):
        for file in files:
            filepath = os.path.join(root, file)
            arcname = os.path.relpath(filepath, final_folder)
            zipf.write(filepath, arcname)
print(f"Created zip file: {zip_filename}")

# -------------------------------
# Clean Up: Remove all files from data/interim 
# -------------------------------
for root, dirs, files in os.walk(final_folder):
    for file in files:
        if file not in [".gitkeep", "FINAL_RAW_DATA.zip"]:
            file_path = os.path.join(root, file)
            os.remove(file_path)
            print(f"Removed file: {file_path}")

Created zip file: data/interim/FINAL_RAW_DATA.zip
Removed file: data/interim/SUBMISSION.csv
Removed file: data/interim/DERIV_TRANS.csv
Removed file: data/interim/insider_transactions_metadata.json
Removed file: data/interim/insider_transactions_readme.htm
Removed file: data/interim/DERIV_HOLDING.csv
Removed file: data/interim/REPORTINGOWNER.csv
Removed file: data/interim/NONDERIV_TRANS.csv
Removed file: data/interim/NONDERIV_HOLDING.csv
