This notebook combines all steps of the Almanac dataset preparation

In [None]:
# Native libraries
import csv
import glob
import json
import os
import shutil
import tarfile
import time
import zipfile

# External libraries
import bigframes.pandas as bf
import duckdb
import gspread
import pandas as pd
import pandas_gbq
import requests
import spacy
import networkx as nx

from google.cloud import storage, bigquery
from google.oauth2 import service_account
from matplotlib import pyplot as plt
from xml.etree import ElementTree as ET

In [None]:
project = "hks-prod-mwc-06a2"
location = "us-central1"
original_bucket_name = "hks-almanac-storage-original"
standard_bucket_name = "hks-almanac-storage-standard"

In [None]:
storage_client = storage.Client(project=project)
original_bucket = storage_client.get_bucket(original_bucket_name)
standard_bucket = storage_client.get_bucket(standard_bucket_name)

# Step 01 - Load original data sets to BigQuery

The original data sources for the second version of Workforce Almanac

https://workforcealmanac.com/methodology

We will copy the data from the original sources to v2 folder in the original bucket in GCS.
Then we will standardize the file formats to CSV and save the files in the standard bucket in GCS.
Finally, we will load the data from the standard bucket into BigQuery.


In [None]:
# base_dir = "/path/to/project/folder" # Use any path if working locally
base_dir = "/content/" # Use this if working in BigQuery Notebooks

# folders for keeping the original and standardized files
original_dir = base_dir + "original/"
standard_dir = base_dir + "standard/"

Create the v2 the BigQuery datasets (aka schema)

In [None]:
%%bigquery
CREATE SCHEMA IF NOT EXISTS deduped_v2 OPTIONS(location = 'us-central1');
CREATE SCHEMA IF NOT EXISTS novel_v2 OPTIONS(location = 'us-central1');
CREATE SCHEMA IF NOT EXISTS original_v2 OPTIONS(location = 'us-central1');
CREATE SCHEMA IF NOT EXISTS reference_v2 OPTIONS(location = 'us-central1');
CREATE SCHEMA IF NOT EXISTS standardized_v2 OPTIONS(location = 'us-central1');

## 1 - IPEDS

IPEDS dataset have two zipped CSV files.

2022 files are hosted here:

https://nces.ed.gov/ipeds/datacenter/Default.aspx?gotoReportId=7&fromIpeds=true

Directory information (HD2022.zip):

https://nces.ed.gov/ipeds/datacenter/data/HD2022.zip

Educational offerings, organization, services and athletic associations (IC2022.zip):

https://nces.ed.gov/ipeds/datacenter/data/IC2022.zip

In [None]:
# IPEDS paths
ipeds_original_dir = original_dir + "ipeds/"
ipeds_standard_dir = standard_dir + "ipeds/"

# Create the IPEDS folder if it doesn't exist
os.makedirs(ipeds_original_dir, exist_ok=True)
os.makedirs(ipeds_standard_dir, exist_ok=True)

### Download original

In [None]:
# Download the IPEDS HD2022 data
ipeds_hd_file_url = "https://nces.ed.gov/ipeds/datacenter/data/HD2022.zip"
ipeds_ic_file_url = "https://nces.ed.gov/ipeds/datacenter/data/IC2022.zip"
ipeds_hd_local_file_name = "HD2022.zip"
ipeds_ic_local_file_name = "IC2022.zip"

ipeds_hd_file_download_request = requests.get(ipeds_hd_file_url)
with open(ipeds_original_dir + ipeds_hd_local_file_name, "wb") as f:
    f.write(ipeds_hd_file_download_request.content)

In [None]:
# Repeat the same steps for the IC2022
ipeds_ic_file_download_request = requests.get(ipeds_ic_file_url)
with open(ipeds_original_dir + ipeds_ic_local_file_name, "wb") as f:
    f.write(ipeds_ic_file_download_request.content)

### Upload original

In [None]:
# Upload the data to the original bucket
ipeds_hd_blob = original_bucket.blob(f"v2/ipeds/{ipeds_hd_local_file_name}")
ipeds_ic_blob = original_bucket.blob(f"v2/ipeds/{ipeds_ic_local_file_name}")

ipeds_hd_blob.upload_from_filename(ipeds_original_dir + ipeds_hd_local_file_name)
ipeds_ic_blob.upload_from_filename(ipeds_original_dir + ipeds_ic_local_file_name)

### Standardize file format

The data files are two zipped CSV files; so, we simply extract them.

In [None]:
with zipfile.ZipFile(ipeds_original_dir + ipeds_hd_local_file_name, 'r') as zip_ref:
    zip_ref.extractall(ipeds_standard_dir)
with zipfile.ZipFile(ipeds_original_dir + ipeds_ic_local_file_name, 'r') as zip_ref:
    zip_ref.extractall(ipeds_standard_dir)

### Manual Fix

Before we can upload the standardized CSV files, we have to fix a file format issue in the `hd2022.csv` file. This file is formatted as CSV (comma separated values) and the double quote (") character is used to enclose fields that contain the value separator, i.e. the comma. One of the URLs has double quotes in them but the quotes are not properly escaped which renders the file non-machine-readable. The solution is to remove the offending commas which means manually changing the data.

Generally, we want to avoid doing this type of modifications to the original data but it is absolutely necessary here. In addition, the change we are making to the URL has no impact as it is a parameter to the URL where a string in the page is highlighted, in this case the word "veteran". When testing after the fix, the URL still works fine without the quotes and the word is highlighted.

In [None]:
# Open the file in read mode
with open(f"{ipeds_standard_dir}hd2022.csv", "r", encoding="latin-1") as file:
    # Read all lines into a list
    lines = file.readlines()

In [None]:
line = lines[1409]

Notice the text `"veteran"` in the line. This word and the full URL are both enclosed in double quotes which causes a file format inconsistency and prevents BigQuery from loading the data.

In [None]:
line

In [None]:
# Fix the issue by removing the double quotes
line.replace('"veteran"', 'veteran')

In [None]:
# Fix is working as expected. So, we apply it here.
fixed_line = line.replace('"veteran"', 'veteran')

In [None]:
# Replace the original line
lines[1409] = fixed_line

In [None]:
# Rewrite the standard file
with open('/content/standard/ipeds/hd2022.csv', 'w') as file:
    # Write the modified lines back to the file
    file.writelines(lines)

### Upload standardized files

In [None]:
# Upload the unzipped files to the standard bucket
ipeds_hd_standard_blob = standard_bucket.blob("v2/ipeds/hd2022.csv")
ipeds_ic_standard_blob = standard_bucket.blob("v2/ipeds/ic2022.csv")

ipeds_hd_standard_blob.upload_from_filename(f"{ipeds_standard_dir}hd2022.csv")
ipeds_ic_standard_blob.upload_from_filename(f"{ipeds_standard_dir}ic2022.csv")

### Load to BigQuery

Create the target tables

In [None]:
%%bigquery
CREATE TABLE IF NOT EXISTS original_v2.ipeds_hd_2022 (
    UNITID INT64,
    INSTNM STRING,
    IALIAS STRING,
    ADDR STRING,
    CITY STRING,
    STABBR STRING,
    ZIP STRING,
    FIPS INT64,
    OBEREG INT64,
    CHFNM STRING,
    CHFTITLE STRING,
    GENTELE INT64,
    EIN INT64,
    UEIS STRING,
    OPEID INT64,
    OPEFLAG INT64,
    WEBADDR STRING,
    ADMINURL STRING,
    FAIDURL STRING,
    APPLURL STRING,
    NPRICURL STRING,
    VETURL STRING,
    ATHURL STRING,
    DISAURL STRING,
    SECTOR INT64,
    ICLEVEL INT64,
    CONTROL INT64,
    HLOFFER INT64,
    UGOFFER INT64,
    GROFFER INT64,
    HDEGOFR1 INT64,
    DEGGRANT INT64,
    HBCU INT64,
    HOSPITAL INT64,
    MEDICAL INT64,
    TRIBAL INT64,
    LOCALE INT64,
    OPENPUBL INT64,
    ACT STRING,
    NEWID INT64,
    DEATHYR INT64,
    CLOSEDAT STRING,
    CYACTIVE INT64,
    POSTSEC INT64,
    PSEFLAG INT64,
    PSET4FLG INT64,
    RPTMTH INT64,
    INSTCAT INT64,
    C21BASIC INT64,
    C21IPUG INT64,
    C21IPGRD INT64,
    C21UGPRF INT64,
    C21ENPRF INT64,
    C21SZSET INT64,
    C18BASIC INT64,
    C15BASIC INT64,
    CCBASIC INT64,
    CARNEGIE INT64,
    LANDGRNT INT64,
    INSTSIZE INT64,
    F1SYSTYP INT64,
    F1SYSNAM STRING,
    F1SYSCOD INT64,
    CBSA INT64,
    CBSATYPE INT64,
    CSA INT64,
    COUNTYCD INT64,
    COUNTYNM STRING,
    CNGDSTCD INT64,
    LONGITUD NUMERIC,
    LATITUDE NUMERIC,
    DFRCGID INT64,
    DFRCUSCG INT64
);

Load data to the table, truncate first if already populated.

In [None]:
%%bigquery
TRUNCATE TABLE original_v2.ipeds_hd_2022;
LOAD DATA INTO original_v2.ipeds_hd_2022 (
  UNITID INT64,
  INSTNM STRING,
  IALIAS STRING,
  ADDR STRING,
  CITY STRING,
  STABBR STRING,
  ZIP STRING,
  FIPS INT64,
  OBEREG INT64,
  CHFNM STRING,
  CHFTITLE STRING,
  GENTELE INT64,
  EIN INT64,
  UEIS STRING,
  OPEID INT64,
  OPEFLAG INT64,
  WEBADDR STRING,
  ADMINURL STRING,
  FAIDURL STRING,
  APPLURL STRING,
  NPRICURL STRING,
  VETURL STRING,
  ATHURL STRING,
  DISAURL STRING,
  SECTOR INT64,
  ICLEVEL INT64,
  CONTROL INT64,
  HLOFFER INT64,
  UGOFFER INT64,
  GROFFER INT64,
  HDEGOFR1 INT64,
  DEGGRANT INT64,
  HBCU INT64,
  HOSPITAL INT64,
  MEDICAL INT64,
  TRIBAL INT64,
  LOCALE INT64,
  OPENPUBL INT64,
  ACT STRING,
  NEWID INT64,
  DEATHYR INT64,
  CLOSEDAT STRING,
  CYACTIVE INT64,
  POSTSEC INT64,
  PSEFLAG INT64,
  PSET4FLG INT64,
  RPTMTH INT64,
  INSTCAT INT64,
  C21BASIC INT64,
  C21IPUG INT64,
  C21IPGRD INT64,
  C21UGPRF INT64,
  C21ENPRF INT64,
  C21SZSET INT64,
  C18BASIC INT64,
  C15BASIC INT64,
  CCBASIC INT64,
  CARNEGIE INT64,
  LANDGRNT INT64,
  INSTSIZE INT64,
  F1SYSTYP INT64,
  F1SYSNAM STRING,
  F1SYSCOD INT64,
  CBSA INT64,
  CBSATYPE INT64,
  CSA INT64,
  COUNTYCD INT64,
  COUNTYNM STRING,
  CNGDSTCD INT64,
  LONGITUD NUMERIC,
  LATITUDE NUMERIC,
  DFRCGID INT64,
  DFRCUSCG INT64
)
FROM FILES(
  format = 'CSV',
  skip_leading_rows = 1,
  -- allow_quoted_newlines = true,
  uris = [
    'gs://hks-almanac-storage-standard/v2/ipeds/hd2022.csv'
    ]
)
;

# 2 - RAPIDS

This file was provided by the DOL ETA team managing RAPIDS

I downloaded the file and saved it to GCP original bucket v2

In [None]:
# Rapids paths
rapids_original_dir = original_dir + "rapids/"
rapids_standard_dir = standard_dir + "rapids/"

# Create the Rapids folder if it doesn't exist
os.makedirs(rapids_original_dir, exist_ok=True)
os.makedirs(rapids_standard_dir, exist_ok=True)

### Download original

In [None]:
# List the uploaded RAPIDS files in the original v2 bucket to see the names
[blob.name for blob in original_bucket.list_blobs(prefix="v2/rapids/")]

In [None]:
rapids_blob = original_bucket.blob("v2/rapids/ProgramData FY24Q2.zip")
# We download it from the original bucket
rapids_blob.download_to_filename(rapids_original_dir + "ProgramData FY24Q2.zip")

### Uncompress

In [None]:
# Uncompress it
with zipfile.ZipFile(rapids_original_dir + "ProgramData FY24Q2.zip", 'r') as zip_ref:
    zip_ref.extractall(rapids_standard_dir)

### Upload standardized files
We upload the apprentice and program data to GCS

In [None]:
# Upload the programs.csv file to the standard bucket
rapids_program_standard_blob = standard_bucket.blob("v2/rapids/programs.csv")
rapids_program_standard_blob.upload_from_filename(f"{rapids_standard_dir}programs.csv")

In [None]:
# Upload the apprentices.csv file to the standard bucket
rapids_apprentice_standard_blob = standard_bucket.blob("v2/rapids/apprentices.csv")
rapids_apprentice_standard_blob.upload_from_filename(f"{rapids_standard_dir}apprentices.csv")

### Load to BigQuery

In [None]:
%%bigquery
CREATE TABLE IF NOT EXISTS original_v2.rapids_program (
    SEQ_ID INT64,
    PS_ID INT64,
    PS_NUM STRING,
    PROG_TYPE INT64,
    PROG_NAME STRING,
    PROG_STATUS STRING,
    PROVISIONAL_REVIEW BOOLEAN,
    REGISTERED_DT TIMESTAMP,
    RAPIDS_UPDATED_DT TIMESTAMP,
    EIN STRING,
    SIC_CD INT64,
    PROG_ADDR STRING,
    PROG_LOCATION_CD INT64,
    PROG_STATE STRING,
    PROG_ZIP5 STRING,
    ACTIVE_APPRCT INT64,
    LAST_APPR_UPDATE_DT DATE,
    ORG_TYPE STRING,
    OTHER_ORG_TYPE STRING,
    OTHER_PROG_TYPE STRING,
    YOUTH_AGE INT64,
    LOCAL_AREA STRING,
    JOURNEY_WORKER_WAGE_IND STRING,
    PROB_LENGTH STRING,
    RTI_LENGTH STRING,
    STATUS_CD STRING,
    SPONSOR_ID STRING,
    NAICS_CD STRING,
    UNION_Y_N INT64,
    END_DT DATE,
    SAA_PORTAL_UNIQUE_ID STRING,
    ACTIVE_FLG STRING,
    SPON_EMPLOYER INT64,
    SPON_UNIONLABOR INT64,
    SPON_BSNSSASSCTN INT64,
    SPON_INTERMEDIARY INT64,
    SPON_CMMNTYCLLGUNIV INT64,
    SPON_CMNTYBSDORG INT64,
    SPON_WRKFRCDVLPMNTBRD INT64,
    SPON_FOUNDATION INT64,
    SPON_FEDERALAGENCY INT64,
    SPON_STATEAGENCY INT64,
    SPON_CITYCOUNTYAGENCY INT64,
    SPON_OTHER INT64,
    SPON_ALL_SPONSOR_TYPE STRING,
    CREATED_DT TIMESTAMP,
    CREATED_BY STRING,
    UPDATED_DT TIMESTAMP,
    UPDATED_BY STRING,
    ROW_OPERATION STRING
);

In [None]:
%%bigquery
TRUNCATE TABLE original_v2.rapids_program;
LOAD DATA INTO original_v2.rapids_program (
    SEQ_ID INT64,
    PS_ID INT64,
    PS_NUM STRING,
    PROG_TYPE INT64,
    PROG_NAME STRING,
    PROG_STATUS STRING,
    PROVISIONAL_REVIEW BOOLEAN,
    REGISTERED_DT TIMESTAMP,
    RAPIDS_UPDATED_DT TIMESTAMP,
    EIN STRING,
    SIC_CD INT64,
    PROG_ADDR STRING,
    PROG_LOCATION_CD INT64,
    PROG_STATE STRING,
    PROG_ZIP5 STRING,
    ACTIVE_APPRCT INT64,
    LAST_APPR_UPDATE_DT DATE,
    ORG_TYPE STRING,
    OTHER_ORG_TYPE STRING,
    OTHER_PROG_TYPE STRING,
    YOUTH_AGE INT64,
    LOCAL_AREA STRING,
    JOURNEY_WORKER_WAGE_IND STRING,
    PROB_LENGTH STRING,
    RTI_LENGTH STRING,
    STATUS_CD STRING,
    SPONSOR_ID STRING,
    NAICS_CD STRING,
    UNION_Y_N INT64,
    END_DT DATE,
    SAA_PORTAL_UNIQUE_ID STRING,
    ACTIVE_FLG STRING,
    SPON_EMPLOYER INT64,
    SPON_UNIONLABOR INT64,
    SPON_BSNSSASSCTN INT64,
    SPON_INTERMEDIARY INT64,
    SPON_CMMNTYCLLGUNIV INT64,
    SPON_CMNTYBSDORG INT64,
    SPON_WRKFRCDVLPMNTBRD INT64,
    SPON_FOUNDATION INT64,
    SPON_FEDERALAGENCY INT64,
    SPON_STATEAGENCY INT64,
    SPON_CITYCOUNTYAGENCY INT64,
    SPON_OTHER INT64,
    SPON_ALL_SPONSOR_TYPE STRING,
    CREATED_DT TIMESTAMP,
    CREATED_BY STRING,
    UPDATED_DT TIMESTAMP,
    UPDATED_BY STRING,
    ROW_OPERATION STRING
)
FROM FILES(
  format = 'CSV',
  skip_leading_rows = 1,
  allow_quoted_newlines = true,
  uris = [
    'gs://hks-almanac-storage-standard/v2/rapids/programs.csv'
    ]
)
;

In [None]:
%%bigquery
DROP TABLE original_v2.rapids_apprentice;
CREATE TABLE original_v2.rapids_apprentice (
SEQ_ID INT64,
APPR_ID INT64,
APPR_NUMBER STRING,
PS_ID INT64,
TERM_LENGTH_MIN INT64,
TERM_LENGTH_MAX INT64,
RAPIDS_CD STRING,
ONET_SOC_CD STRING,
VERSION_NUM STRING,
CAREER_LATTICE_LVL STRING,
PROGRAM_STATUS STRING,
APPR_STATUS_CD STRING,
PENDING_ACTION STRING,
IS_PENDING_EMP INT64,
EMP_CD INT64,
SCHEDULE_CD INT64,
AGE_COHORT_CD INT64,
REGISTER_DT STRING,
START_DT STRING,
STARTING_WAGE DECIMAL,
EXIT_WAGE_DT STRING,
EXIT_WAGE DECIMAL,
EXPECT_COMPLETE_DT STRING,
COMPLETION_EXT_DT STRING,
SUSPENDED_DT STRING,
AGE_AT_START INT64,
STUDENT_STATUS STRING,
OJT_CREDIT INT64,
ETHNICITY_CD INT64,
VET_STAT_IND INT64,
GENDER_CD STRING,
RACE_CD INT64,
DISABLED_CD INT64,
EDUCATION_CD INT64,
COMMENTS STRING,
RTI_CREDIT STRING,
RAPIDS_PRG_EMP_JOIN_ID STRING,
OCCUP_ID STRING,
OCCUP_DTL_ID STRING,
INMATE_IND STRING,
PROBATIONARY_CANCEL STRING,
CURRENT_WAGE_IND STRING,
STARTING_WAGE_IND STRING,
JOURNEYMAN_WAGE_IND STRING,
PRE_APPR_WAGE_IND STRING,
PRE_EXPER_CREDIT_IND STRING,
SAA_PORTAL_UNIQUE_ID STRING,
ASIAN_IND INT64,
AFRICAAMER_IND INT64,
NATHAWAI_IND INT64,
NATINDIAN_IND INT64,
W_IND INT64,
OCCUP_TYPE STRING,
APPR_STATUS_REASON STRING,
CREATED_DT STRING,
CREATED_BY STRING,
UPDATED_DT STRING,
UPDATED_BY STRING,
ACTIVE_IND INT64,
ACTIVE_FLG STRING,
ROW_OPERATION STRING,
STARTINGWAGEIND STRING,
ZIP5 STRING,
STATE STRING,
NAICS_CD STRING,
TERM_LENGTH STRING,
OCCUPATION_TITLE STRING
)

In [None]:
%%bigquery
TRUNCATE TABLE original_v2.rapids_apprentice;
LOAD DATA INTO original_v2.rapids_apprentice (
  SEQ_ID INT64,
  APPR_ID INT64,
  APPR_NUMBER STRING,
  PS_ID INT64,
  TERM_LENGTH_MIN INT64,
  TERM_LENGTH_MAX INT64,
  RAPIDS_CD STRING,
  ONET_SOC_CD STRING,
  VERSION_NUM STRING,
  CAREER_LATTICE_LVL STRING,
  PROGRAM_STATUS STRING,
  APPR_STATUS_CD STRING,
  PENDING_ACTION STRING,
  IS_PENDING_EMP INT64,
  EMP_CD INT64,
  SCHEDULE_CD INT64,
  AGE_COHORT_CD INT64,
  REGISTER_DT STRING,
  START_DT STRING,
  STARTING_WAGE DECIMAL,
  EXIT_WAGE_DT STRING,
  EXIT_WAGE DECIMAL,
  EXPECT_COMPLETE_DT STRING,
  COMPLETION_EXT_DT STRING,
  SUSPENDED_DT STRING,
  AGE_AT_START INT64,
  STUDENT_STATUS STRING,
  OJT_CREDIT INT64,
  ETHNICITY_CD INT64,
  VET_STAT_IND INT64,
  GENDER_CD STRING,
  RACE_CD INT64,
  DISABLED_CD INT64,
  EDUCATION_CD INT64,
  COMMENTS STRING,
  RTI_CREDIT STRING,
  RAPIDS_PRG_EMP_JOIN_ID STRING,
  OCCUP_ID STRING,
  OCCUP_DTL_ID STRING,
  INMATE_IND STRING,
  PROBATIONARY_CANCEL STRING,
  CURRENT_WAGE_IND STRING,
  STARTING_WAGE_IND STRING,
  JOURNEYMAN_WAGE_IND STRING,
  PRE_APPR_WAGE_IND STRING,
  PRE_EXPER_CREDIT_IND STRING,
  SAA_PORTAL_UNIQUE_ID STRING,
  ASIAN_IND INT64,
  AFRICAAMER_IND INT64,
  NATHAWAI_IND INT64,
  NATINDIAN_IND INT64,
  W_IND INT64,
  OCCUP_TYPE STRING,
  APPR_STATUS_REASON STRING,
  CREATED_DT STRING,
  CREATED_BY STRING,
  UPDATED_DT STRING,
  UPDATED_BY STRING,
  ACTIVE_IND INT64,
  ACTIVE_FLG STRING,
  ROW_OPERATION STRING,
  STARTINGWAGEIND STRING,
  ZIP5 STRING,
  STATE STRING,
  NAICS_CD STRING,
  TERM_LENGTH STRING,
  OCCUPATION_TITLE STRING
)
FROM FILES(
  format = 'CSV',
  skip_leading_rows = 1,
  allow_quoted_newlines = true,
  uris = [
    'gs://hks-almanac-storage-standard/v2/rapids/apprentices.csv'
    ]
)
;

### Update: 2024-08-01

Previous Rapids file didn't have multi-employer/single-employer flag which was one of the filters we used in the first version. This variable is provided in a new file and we can use the PS_ID to join the datasets.

In [None]:
%%bigquery
DROP TABLE IF EXISTS original_v2.rapids_add_prog_info;
CREATE TABLE original_v2.rapids_add_prog_info (
  PS_ID INT64,
  IS_SINGLE_EMP BOOL,
  IS_MULTI_EMP BOOL,
  IS_NATIONAL BOOL,
  IS_LOCAL BOOL,
  IS_NGS BOOL,
  NGS_ID INT64,
  IS_CBA BOOL,
  IS_PREV_CRED_DOC_REQ BOOL,
  PREV_CRED_DOC_DESC STRING,
  IS_SEL_PROC BOOL,
  SEL_PROCS_3URI STRING,
  SEL_PROCS STRING,
  SEL_PROC_DATE TIMESTAMP,
  CREATED_DT TIMESTAMP,
  CREATED_BY STRING,
  UPDATED_DT TIMESTAMP,
  UPDATED_BY STRING,
  ROW_OPERATION STRING
)
;

In [None]:
%%bigquery
TRUNCATE TABLE original_v2.rapids_add_prog_info;
LOAD DATA INTO original_v2.rapids_add_prog_info (
  PS_ID INT64,
  IS_SINGLE_EMP BOOL,
  IS_MULTI_EMP BOOL,
  IS_NATIONAL BOOL,
  IS_LOCAL BOOL,
  IS_NGS BOOL,
  NGS_ID INT64,
  IS_CBA BOOL,
  IS_PREV_CRED_DOC_REQ BOOL,
  PREV_CRED_DOC_DESC STRING,
  IS_SEL_PROC BOOL,
  SEL_PROCS_3URI STRING,
  SEL_PROCS STRING,
  SEL_PROC_DATE TIMESTAMP,
  CREATED_DT TIMESTAMP,
  CREATED_BY STRING,
  UPDATED_DT TIMESTAMP,
  UPDATED_BY STRING,
  ROW_OPERATION STRING
)
FROM FILES(
  format = 'CSV',
  skip_leading_rows = 1,
  allow_quoted_newlines = true,
  uris = [
    'gs://hks-almanac-storage-standard/v2/rapids/additional program info.csv'
    ]
)
;

# 3 - TPR

It is an Excel file hosted on TPR website

In [None]:
# TPR paths
tpr_original_dir = original_dir + "tpr/"
tpr_standard_dir = standard_dir + "tpr/"

# Create the TPR folders if they don't exist
os.makedirs(tpr_original_dir, exist_ok=True)
os.makedirs(tpr_standard_dir, exist_ok=True)

### Download original

In [None]:
tpr_file_url = "https://www.trainingproviderresults.gov/data/DownloadPrograms.xlsx"
tpr_local_file_name = "DownloadPrograms.xlsx"

In [None]:
# This takes 1-5 minutes
tpr_file_download_request = requests.get(tpr_file_url)

In [None]:
# Save the TPR file to the original folder
with open(tpr_original_dir + tpr_local_file_name, "wb") as f:
    f.write(tpr_file_download_request.content)

### Upload original

In [None]:
tpr_original_blob = original_bucket.blob("v2/tpr/" + tpr_local_file_name)
tpr_original_blob.upload_from_filename(tpr_original_dir + tpr_local_file_name)

### Standardize file format

In [None]:
tpr_df = pd.read_excel(tpr_original_dir + tpr_local_file_name)
# Convert the DataFrame to a CSV file
tpr_df.to_csv(tpr_standard_dir + "DownloadPrograms.csv", index=False)

### Upload standardized files

In [None]:
tpr_standard_blob = standard_bucket.blob("v2/tpr/DownloadPrograms.csv")
tpr_standard_blob.upload_from_filename(f"{tpr_standard_dir}DownloadPrograms.csv")

### Load to BigQuery

In [None]:
%%bigquery
CREATE TABLE IF NOT EXISTS original_v2.tpr (
    d101_eligible_training_provider STRING,
    d103_provider_address STRING,
    d104_entity_type STRING,
    d105_program_name STRING,
    d106_program_description STRING,
    d107_program_url STRING,
    d108_program INT64,
    d109_associated_credential STRING,
    d110_cip_code NUMERIC,
    d111_non_wioa_tuition_cost NUMERIC,
    d112_non_wioa_supplies_cost NUMERIC,
    d113_program_length_hours INT64,
    d114_program_length_weeks INT64,
    d115_program_prerequisites INT64,
    d116_program_format STRING,
    d117_program_soc_occupation_1 STRING,
    d118_program_soc_occupation_2 STRING,
    d119_program_soc_occupation_3 STRING,
    d120_total_served INT64,
    d121_total_exited INT64,
    d122_total_completed INT64,
    d123_total_employed_q2 INT64,
    d124_total_employed_q4 INT64,
    d125_median_earnings NUMERIC,
    d126_total_credential INT64,
    d133_total_wioa_served INT64,
    d134_total_wioa_exiters INT64,
    d135_total_wioa_served_with_ita NUMERIC,
    d136_total_wioa_exited_with_ita NUMERIC,
    d137_total_wioa_completed INT64,
    d138_cost_per_wioa_num NUMERIC,
    d139_total_wioa_exiters_employed_q2 INT64,
    d140_total_wioa_exiters_employed_q4 INT64,
    d142_total_wioa_credential INT64,
    c_wioa_completed_percent NUMERIC,
    c_total_employed_WIOA_q2_percent NUMERIC,
    c_total_employed_WIOA_q4_percent NUMERIC,
    c_completed_percent NUMERIC,
    c_total_emp_q2_perc_comp NUMERIC,
    c_wioa_earned_cred_percent NUMERIC,
    c_cost_per_wioa NUMERIC,
    c_q2_employment_percent NUMERIC,
    address STRING,
    city STRING,
    state STRING,
    zip STRING,
    lat FLOAT64,
    long FLOAT64,
    cip_formatted_4 NUMERIC,
    reportingstate STRING,
    CIP_Title STRING,
    provider_unique_id INT64,
    de129 INT64,
    de130 INT64,
    de170 INT64,
    de171 INT64,
    de172 STRING
);

In [None]:
%%bigquery
TRUNCATE TABLE original_v2.tpr;
LOAD DATA INTO original_v2.tpr (
    d101_eligible_training_provider STRING,
    d103_provider_address STRING,
    d104_entity_type STRING,
    d105_program_name STRING,
    d106_program_description STRING,
    d107_program_url STRING,
    d108_program INT64,
    d109_associated_credential STRING,
    d110_cip_code NUMERIC,
    d111_non_wioa_tuition_cost NUMERIC,
    d112_non_wioa_supplies_cost NUMERIC,
    d113_program_length_hours INT64,
    d114_program_length_weeks INT64,
    d115_program_prerequisites INT64,
    d116_program_format STRING,
    d117_program_soc_occupation_1 STRING,
    d118_program_soc_occupation_2 STRING,
    d119_program_soc_occupation_3 STRING,
    d120_total_served INT64,
    d121_total_exited INT64,
    d122_total_completed INT64,
    d123_total_employed_q2 INT64,
    d124_total_employed_q4 INT64,
    d125_median_earnings NUMERIC,
    d126_total_credential INT64,
    d133_total_wioa_served INT64,
    d134_total_wioa_exiters INT64,
    d135_total_wioa_served_with_ita NUMERIC,
    d136_total_wioa_exited_with_ita NUMERIC,
    d137_total_wioa_completed INT64,
    d138_cost_per_wioa_num NUMERIC,
    d139_total_wioa_exiters_employed_q2 INT64,
    d140_total_wioa_exiters_employed_q4 INT64,
    d142_total_wioa_credential INT64,
    c_wioa_completed_percent NUMERIC,
    c_total_employed_WIOA_q2_percent NUMERIC,
    c_total_employed_WIOA_q4_percent NUMERIC,
    c_completed_percent NUMERIC,
    c_total_emp_q2_perc_comp NUMERIC,
    c_wioa_earned_cred_percent NUMERIC,
    c_cost_per_wioa NUMERIC,
    c_q2_employment_percent NUMERIC,
    address STRING,
    city STRING,
    state STRING,
    zip STRING,
    lat FLOAT64,
    long FLOAT64,
    cip_formatted_4 NUMERIC,
    reportingstate STRING,
    CIP_Title STRING,
    provider_unique_id INT64,
    de129 INT64,
    de130 INT64,
    de170 INT64,
    de171 INT64,
    de172 STRING
)
FROM FILES(
  format = 'CSV',
  skip_leading_rows = 1,
  allow_quoted_newlines = true,
  uris = [
    'gs://hks-almanac-storage-standard/v2/tpr/DownloadPrograms.csv'
    ]
)
;

# 4 - IRS

We start with processing the Business Master Files (BMF)

In [None]:
# IRS paths
irs_original_dir = original_dir + "irs/"
irs_standard_dir = standard_dir + "irs/"

# Create the IRS folders if they don't exist
os.makedirs(irs_original_dir, exist_ok=True)
os.makedirs(irs_standard_dir, exist_ok=True)

In [None]:
bmf_urls = [
    "https://www.irs.gov/pub/irs-soi/eo1.csv",
    "https://www.irs.gov/pub/irs-soi/eo2.csv",
    "https://www.irs.gov/pub/irs-soi/eo3.csv",
    "https://www.irs.gov/pub/irs-soi/eo4.csv",
    "https://www.irs.gov/pub/irs-soi/eo_xx.csv",
    "https://www.irs.gov/pub/irs-soi/eo_pr.csv"
    ]

In [None]:
for url in bmf_urls:
    file_name = url.split("/")[-1]
    print(f"Downloading {file_name}...")
    response = requests.get(url)
    with open(f"{irs_original_dir}bmf/{file_name}", "wb") as f:
        f.write(response.content)

In [None]:
# Concatenate the BMF files
bmf_files = [x for x in os.listdir(f"{irs_original_dir}bmf/") if x.endswith(".csv")]
dataframes = [pd.read_csv(f"{irs_original_dir}bmf/{f}") for f in bmf_files]
bmf_df = pd.concat(dataframes, ignore_index=True)
bmf_df.to_csv(f"{irs_standard_dir}bmf.csv", index=False)

In [None]:
df = pd.read_csv(f"{irs_standard_dir}bmf.csv")

In [None]:
irs_standard_blob = standard_bucket.blob("v2/irs/bmf.csv")
irs_standard_blob.upload_from_filename(f"{irs_standard_dir}bmf.csv")

In [None]:
%%bigquery
CREATE TABLE IF NOT EXISTS original_v2.irs_bmf (
  EIN STRING,
  NAME STRING,
  ICO STRING,
  STREET STRING,
  CITY STRING,
  STATE STRING,
  ZIP STRING,
  `GROUP` INT64,
  SUBSECTION INT64,
  AFFILIATION INT64,
  CLASSIFICATION INT64,
  RULING INT64,
  DEDUCTIBILITY INT64,
  FOUNDATION INT64,
  ACTIVITY INT64,
  ORGANIZATION INT64,
  STATUS INT64,
  TAX_PERIOD STRING,
  ASSET_CD INT64,
  INCOME_CD INT64,
  FILING_REQ_CD INT64,
  PF_FILING_REQ_CD INT64,
  ACCT_PD INT64,
  ASSET_AMT FLOAT64,
  INCOME_AMT FLOAT64,
  REVENUE_AMT FLOAT64,
  NTEE_CD STRING,
  SORT_NAME STRING
);

In [None]:
%%bigquery
TRUNCATE TABLE original_v2.irs_bmf;
LOAD DATA INTO original_v2.irs_bmf (
  EIN STRING,
  NAME STRING,
  ICO STRING,
  STREET STRING,
  CITY STRING,
  STATE STRING,
  ZIP STRING,
  `GROUP` INT64,
  SUBSECTION INT64,
  AFFILIATION INT64,
  CLASSIFICATION INT64,
  RULING INT64,
  DEDUCTIBILITY INT64,
  FOUNDATION INT64,
  ACTIVITY INT64,
  ORGANIZATION INT64,
  STATUS INT64,
  TAX_PERIOD STRING,
  ASSET_CD INT64,
  INCOME_CD INT64,
  FILING_REQ_CD INT64,
  PF_FILING_REQ_CD INT64,
  ACCT_PD INT64,
  ASSET_AMT FLOAT64,
  INCOME_AMT FLOAT64,
  REVENUE_AMT FLOAT64,
  NTEE_CD STRING,
  SORT_NAME STRING
)
FROM FILES(
  format = 'CSV',
  skip_leading_rows = 1,
  -- allow_quoted_newlines = true,
  uris = [
    'gs://hks-almanac-storage-standard/v2/irs/bmf.csv'
    ]
)
;

At this point, the latest copies of all of the original data sources are in the v2 folder of the original bucket (except IRS form 990 XML data which is handled in the next steps)



The IRS XML data is hosted here:

https://www.irs.gov/charities-non-profits/form-990-series-downloads

We use the Form 990 (e-file) XML format.

The data is available for the last 6 years.

In [None]:
irs_xml_urls = {
    "2024": [
        "https://apps.irs.gov/pub/epostcard/990/xml/2024/2024_TEOS_XML_01A.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2024/2024_TEOS_XML_02A.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2024/2024_TEOS_XML_03A.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2024/2024_TEOS_XML_04A.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2024/2024_TEOS_XML_05A.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2024/2024_TEOS_XML_06A.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2024/2024_TEOS_XML_07A.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2024/2024_TEOS_XML_08A.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2024/2024_TEOS_XML_09A.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2024/2024_TEOS_XML_10A.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2024/2024_TEOS_XML_11A.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2024/2024_TEOS_XML_12A.zip",
    ],
    "2023": [
        "https://apps.irs.gov/pub/epostcard/990/xml/2023/2023_TEOS_XML_01A.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2023/2023_TEOS_XML_02A.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2023/2023_TEOS_XML_03A.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2023/2023_TEOS_XML_04A.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2023/2023_TEOS_XML_05A.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2023/2023_TEOS_XML_06A.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2023/2023_TEOS_XML_07A.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2023/2023_TEOS_XML_08A.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2023/2023_TEOS_XML_09A.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2023/2023_TEOS_XML_10A.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2023/2023_TEOS_XML_11A.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2023/2023_TEOS_XML_12A.zip",
    ],
}
skipped_years = """
    "2022": [
        "https://apps.irs.gov/pub/epostcard/990/xml/2022/2022_TEOS_XML_01A.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2022/2022_TEOS_XML_01B.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2022/2022_TEOS_XML_01C.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2022/2022_TEOS_XML_01D.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2022/2022_TEOS_XML_01E.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2022/2022_TEOS_XML_01F.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2022/2022_TEOS_XML_11A.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2022/2022_TEOS_XML_11B.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2022/2022_TEOS_XML_11C.zip",
    ],
    "2021": [
        "https://apps.irs.gov/pub/epostcard/990/xml/2021/2021_TEOS_XML_01A.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2021/2021_TEOS_XML_01B.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2021/2021_TEOS_XML_01C.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2021/2021_TEOS_XML_01D.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2021/2021_TEOS_XML_01E.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2021/2021_TEOS_XML_01F.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2021/2021_TEOS_XML_01G.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2021/2021_TEOS_XML_01H.zip",
    ],
    "2020": [
        "https://apps.irs.gov/pub/epostcard/990/xml/2020/2020_TEOS_XML_CT1.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2020/download990xml_2020_1.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2020/download990xml_2020_2.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2020/download990xml_2020_3.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2020/download990xml_2020_4.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2020/download990xml_2020_5.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2020/download990xml_2020_6.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2020/download990xml_2020_7.zip",
        "https://apps.irs.gov/pub/epostcard/990/xml/2020/download990xml_2020_8.zip",
    ],
}
"""

In [None]:
# Download the IRS data
for year, urls in irs_xml_urls.items():
    # Create the year folder if it doesn't exist
    os.makedirs(irs_original_dir + year, exist_ok=True)
    for url in urls:
        file_name = url.split("/")[-1]
        print(f"Downloading {file_name} from {url}...")
        request = requests.get(url)
        with open(irs_original_dir + year + "/" + file_name, "wb") as f:
            f.write(request.content)

In [None]:
# Upload the IRS data to the original bucket
for year in irs_xml_urls.keys():
    for file_name in os.listdir(irs_original_dir + year):
        blob = original_bucket.blob(f"v2/irs/{year}/{file_name}")
        blob.upload_from_filename(f"{irs_original_dir}{year}/{file_name}")

### Uncompress files

In order to process the XML files we need to extract them from the zipped files. However, the source zip files are arbitrarily compressed with multiple compression algorithms, one of which is proprietary -`Deflate64`-.

We use 7z command line tool to extract the files.

Make sure the directory is clean before extracting files. Use caution with the next script!

In [None]:
!rm -rf "/content/standard/irs/2024/*"

In [None]:
# Use 7z command line tool to extract all files one by one in the /content/original/irs/2024 directory

!for zip_file in /content/original/irs/2024/*.zip; do 7z x "$zip_file" -o/content/standard/irs/2024/ -y; done

In [None]:
# Count the number of files in /content/standard/irs/2024

!ls /content/standard/irs/2024 | wc -l

Repeat for 2023

In [None]:
!rm -rf "/content/standard/irs/2023/*"

In [None]:
!for zip_file in /content/original/irs/2023/*.zip; do 7z x "$zip_file" -o/content/standard/irs/2023/ -y | grep -i "extracting archive"; done

In [None]:
columns = [
    "return_timestamp",
    "tax_period_end_date",
    "ein",
    "business_name",
    "phone",
    "address",
    "city",
    "state",
    "zip",
    "website",
    "formation_year",
    "activity_or_mission",
    "voting_members_governing",
    "voting_members_independent",
    "total_employees",
    "total_volunteers",
    "contributions_and_grants_previous_year",
    "contributions_and_grants_current_year",
    "program_service_revenue_previous_year",
    "program_service_revenue_current_year",
    "investment_income_previous_year",
    "investment_income_current_year",
    "other_revenue_previous_year",
    "other_revenue_current_year",
    "total_revenue_previous_year",
    "total_revenue_current_year",
    "grants_and_similar_paid_previous_year",
    "grants_and_similar_paid_current_year",
    "benefits_paid_to_members_previous_year",
    "benefits_paid_to_members_current_year",
    "salaries_compensation_employee_benefits_previous_year",
    "salaries_compensation_employee_benefits_current_year",
    "total_professional_fundraising_expenses_previous_year",
    "total_professional_fundraising_expenses_current_year",
    "total_fundraising_expenses_current_year",
    "other_expenses_previous_year",
    "other_expenses_current_year",
    "total_expenses_previous_year",
    "total_expenses_current_year",
    "revenues_less_expenses_previous_year",
    "revenues_less_expenses_current_year",
    "total_assets_beginning_of_year",
    "total_assets_end_of_year",
    "total_liabilities_beginning_of_year",
    "total_liabilities_end_of_year",
    "net_assets_or_fund_balances_beginning_of_year",
    "net_assets_or_fund_balances_end_of_year",
    "mission_description",
    "primary_program_description",
    "secondary_program_description",
    "tertiary_program_description",
    "key_person_name",
    "key_person_title",
    "key_person_salary",
    "states_where_form_990_filed",
    "states_where_form_990_filed_count",
]

In [None]:
# Data extraction function
def process_xml(file):
    relevant_data = {}
    root = ET.parse(file).getroot()

    # Remove namespace from all tags in the root
    for elem in root.iter():
        if "}" in elem.tag:
            elem.tag = elem.tag.split("}", 1)[1]

    # ReturnHeader and sub elements
    return_header = root.find("ReturnHeader")
    filer = return_header.find("Filer")
    business_name_element = filer.find("BusinessName")
    us_address = filer.find("USAddress")

    # ReturnHeader columns
    return_timestamp = return_header.find("ReturnTs").text
    tax_period_end_date = return_header.find("TaxPeriodEndDt").text
    ein = filer.find("EIN").text
    business_name = business_name_element.find("BusinessNameLine1Txt").text
    phone = filer.find("PhoneNum").text if filer.find("PhoneNum") is not None else None
    if us_address is not None:
        address = us_address.find("AddressLine1Txt").text
        city = us_address.find("CityNm").text
        state = us_address.find("StateAbbreviationCd").text
        zip_code = us_address.find("ZIPCd").text
    else:
        address = None
        city = None
        state = None
        zip_code = None

    # ReturnHeader column assignment
    relevant_data["return_timestamp"] = return_timestamp
    relevant_data["tax_period_end_date"] = tax_period_end_date
    relevant_data["ein"] = ein
    relevant_data["business_name"] = business_name
    relevant_data["address"] = address
    relevant_data["city"] = city
    relevant_data["state"] = state
    relevant_data["zip"] = zip_code
    relevant_data["phone"] = phone

    # ReturnData sub elements
    return_data = root.find("ReturnData")
    irs990 = return_data.find("IRS990")
    # Not all files have irs 990 data, assign the rest of the columns to None and return
    if irs990 is None:
        relevant_data["website"] = None
        relevant_data["formation_year"] = None
        relevant_data["activity_or_mission"] = None
        relevant_data["voting_members_governing"] = None
        relevant_data["voting_members_independent"] = None
        relevant_data["total_employees"] = None
        relevant_data["total_volunteers"] = None
        relevant_data["contributions_and_grants_previous_year"] = None
        relevant_data["contributions_and_grants_current_year"] = None
        relevant_data["program_service_revenue_previous_year"] = None
        relevant_data["program_service_revenue_current_year"] = None
        relevant_data["investment_income_previous_year"] = None
        relevant_data["investment_income_current_year"] = None
        relevant_data["other_revenue_previous_year"] = None
        relevant_data["other_revenue_current_year"] = None
        relevant_data["total_revenue_previous_year"] = None
        relevant_data["total_revenue_current_year"] = None
        relevant_data["grants_and_similar_paid_previous_year"] = None
        relevant_data["grants_and_similar_paid_current_year"] = None
        relevant_data["benefits_paid_to_members_previous_year"] = None
        relevant_data["benefits_paid_to_members_current_year"] = None
        relevant_data["salaries_compensation_employee_benefits_previous_year"] = None
        relevant_data["salaries_compensation_employee_benefits_current_year"] = None
        relevant_data["total_professional_fundraising_expenses_previous_year"] = None
        relevant_data["total_professional_fundraising_expenses_current_year"] = None
        relevant_data["total_fundraising_expenses_current_year"] = None
        relevant_data["other_expenses_previous_year"] = None
        relevant_data["other_expenses_current_year"] = None
        relevant_data["total_expenses_previous_year"] = None
        relevant_data["total_expenses_current_year"] = None
        relevant_data["revenues_less_expenses_previous_year"] = None
        relevant_data["revenues_less_expenses_current_year"] = None
        relevant_data["total_assets_beginning_of_year"] = None
        relevant_data["total_assets_end_of_year"] = None
        relevant_data["total_liabilities_beginning_of_year"] = None
        relevant_data["total_liabilities_end_of_year"] = None
        relevant_data["net_assets_or_fund_balances_beginning_of_year"] = None
        relevant_data["net_assets_or_fund_balances_end_of_year"] = None
        relevant_data["mission_description"] = None
        relevant_data["primary_program_description"] = None
        relevant_data["secondary_program_description"] = None
        relevant_data["tertiary_program_description"] = None
        relevant_data["key_person_name"] = None
        relevant_data["key_person_title"] = None
        relevant_data["key_person_salary"] = None
        relevant_data["states_where_form_990_filed"] = None
        relevant_data["states_where_form_990_filed_count"] = None
        return relevant_data

    form990_part_vii_section_a_grp = irs990.find("Form990PartVIISectionAGrp")
    prog_srvc_accom_acty_2_grp = irs990.find("ProgSrvcAccomActy2Grp")
    prog_srvc_accom_acty_3_grp = irs990.find("ProgSrvcAccomActy3Grp")


    # ReturnData columns
    website = irs990.find("WebsiteAddressTxt").text if irs990.find("WebsiteAddressTxt") is not None else None
    formation_year = irs990.find("FormationYr").text if irs990.find("FormationYr") is not None else None
    activity_or_mission = irs990.find("ActivityOrMissionDesc").text
    voting_members_governing = irs990.find("VotingMembersGoverningBodyCnt").text
    voting_members_independent = irs990.find("VotingMembersIndependentCnt").text
    total_employees = irs990.find("TotalEmployeeCnt").text
    total_volunteers = irs990.find("TotalVolunteersCnt").text if irs990.find("TotalVolunteersCnt") is not None else None
    contributions_and_grants_previous_year = irs990.find("PYContributionsGrantsAmt").text if irs990.find("PYContributionsGrantsAmt") is not None else None
    contributions_and_grants_current_year = irs990.find("CYContributionsGrantsAmt").text
    program_service_revenue_previous_year = irs990.find("PYProgramServiceRevenueAmt").text if irs990.find("PYProgramServiceRevenueAmt") is not None else None
    program_service_revenue_current_year = irs990.find("CYProgramServiceRevenueAmt").text
    investment_income_previous_year = irs990.find("PYInvestmentIncomeAmt").text if irs990.find("PYInvestmentIncomeAmt") is not None else None
    investment_income_current_year = irs990.find("CYInvestmentIncomeAmt").text
    other_revenue_previous_year = irs990.find("PYOtherRevenueAmt").text if irs990.find("PYOtherRevenueAmt") is not None else None
    other_revenue_current_year = irs990.find("CYOtherRevenueAmt").text
    total_revenue_previous_year = irs990.find("PYTotalRevenueAmt").text if irs990.find("PYTotalRevenueAmt") is not None else None
    total_revenue_current_year = irs990.find("CYTotalRevenueAmt").text
    grants_and_similar_paid_previous_year = irs990.find("PYGrantsAndSimilarPaidAmt").text if irs990.find("PYGrantsAndSimilarPaidAmt") is not None else None
    grants_and_similar_paid_current_year = irs990.find("CYGrantsAndSimilarPaidAmt").text
    benefits_paid_to_members_previous_year = irs990.find("PYBenefitsPaidToMembersAmt").text if irs990.find("PYBenefitsPaidToMembersAmt") is not None else None
    benefits_paid_to_members_current_year = irs990.find("CYBenefitsPaidToMembersAmt").text
    salaries_compensation_employee_benefits_previous_year = irs990.find("PYSalariesCompEmpBnftPaidAmt").text if irs990.find("PYSalariesCompEmpBnftPaidAmt") is not None else None
    salaries_compensation_employee_benefits_current_year = irs990.find("CYSalariesCompEmpBnftPaidAmt").text
    total_professional_fundraising_expenses_previous_year = irs990.find("PYTotalProfFndrsngExpnsAmt").text if irs990.find("PYTotalProfFndrsngExpnsAmt") is not None else None
    total_professional_fundraising_expenses_current_year = irs990.find("CYTotalProfFndrsngExpnsAmt").text
    total_fundraising_expenses_current_year = irs990.find("CYTotalFundraisingExpenseAmt").text
    other_expenses_previous_year = irs990.find("PYOtherExpensesAmt").text if irs990.find("PYOtherExpensesAmt") is not None else None
    other_expenses_current_year = irs990.find("CYOtherExpensesAmt").text
    total_expenses_previous_year = irs990.find("PYTotalExpensesAmt").text if irs990.find("PYTotalExpensesAmt") is not None else None
    total_expenses_current_year = irs990.find("CYTotalExpensesAmt").text
    revenues_less_expenses_previous_year = irs990.find("PYRevenuesLessExpensesAmt").text if irs990.find("PYRevenuesLessExpensesAmt") is not None else None
    revenues_less_expenses_current_year = irs990.find("CYRevenuesLessExpensesAmt").text
    total_assets_beginning_of_year = irs990.find("TotalAssetsBOYAmt").text if irs990.find("TotalAssetsBOYAmt") is not None else None
    total_assets_end_of_year = irs990.find("TotalAssetsEOYAmt").text
    total_liabilities_beginning_of_year = irs990.find("TotalLiabilitiesBOYAmt").text if irs990.find("TotalLiabilitiesBOYAmt") is not None else None
    total_liabilities_end_of_year = irs990.find("TotalLiabilitiesEOYAmt").text
    net_assets_or_fund_balances_beginning_of_year = irs990.find("NetAssetsOrFundBalancesBOYAmt").text if irs990.find("NetAssetsOrFundBalancesBOYAmt") is not None else None
    net_assets_or_fund_balances_end_of_year = irs990.find("NetAssetsOrFundBalancesEOYAmt").text
    mission_description = irs990.find("MissionDesc").text if irs990.find("MissionDesc") is not None else None
    primary_program_description = irs990.find("Desc").text
    if prog_srvc_accom_acty_2_grp is not None:
        if prog_srvc_accom_acty_2_grp.find("Desc") is not None:
            secondary_program_description = prog_srvc_accom_acty_2_grp.find("Desc").text
        else:
            secondary_program_description = None
    else:
        secondary_program_description = None
    if prog_srvc_accom_acty_3_grp is not None:
        if prog_srvc_accom_acty_3_grp.find("Desc") is not None:
            tertiary_program_description = prog_srvc_accom_acty_3_grp.find("Desc").text
        else:
            tertiary_program_description = None
    else:
        tertiary_program_description = None
    if form990_part_vii_section_a_grp is not None:
        if form990_part_vii_section_a_grp.find("PersonNm") is not None:
            key_person_name = form990_part_vii_section_a_grp.find("PersonNm").text
        else:
            key_person_name = None
        if form990_part_vii_section_a_grp.find("TitleTxt") is not None:
            key_person_title = form990_part_vii_section_a_grp.find("TitleTxt").text
        else:
            key_person_title = None
        if form990_part_vii_section_a_grp.find("ReportableCompFromOrgAmt") is not None:
            key_person_salary = form990_part_vii_section_a_grp.find("ReportableCompFromOrgAmt").text
        else:
            key_person_salary = None
    else:
        key_person_name = None
        key_person_title = None
        key_person_salary = None
    states_where_form_990_filed_list = irs990.findall("StatesWhereCopyOfReturnIsFldCd")
    states_where_form_990_filed_count = len(states_where_form_990_filed_list)
    states_where_form_990_filed = ",".join(sorted([state.text for state in states_where_form_990_filed_list]))


    # ReturnData column assignment
    relevant_data["website"] = website
    relevant_data["formation_year"] = formation_year
    relevant_data["activity_or_mission"] = activity_or_mission
    relevant_data["voting_members_governing"] = voting_members_governing
    relevant_data["voting_members_independent"] = voting_members_independent
    relevant_data["total_employees"] = total_employees
    relevant_data["total_volunteers"] = total_volunteers
    relevant_data["contributions_and_grants_previous_year"] = contributions_and_grants_previous_year
    relevant_data["contributions_and_grants_current_year"] = contributions_and_grants_current_year
    relevant_data["program_service_revenue_previous_year"] = program_service_revenue_previous_year
    relevant_data["program_service_revenue_current_year"] = program_service_revenue_current_year
    relevant_data["investment_income_previous_year"] = investment_income_previous_year
    relevant_data["investment_income_current_year"] = investment_income_current_year
    relevant_data["other_revenue_previous_year"] = other_revenue_previous_year
    relevant_data["other_revenue_current_year"] = other_revenue_current_year
    relevant_data["total_revenue_previous_year"] = total_revenue_previous_year
    relevant_data["total_revenue_current_year"] = total_revenue_current_year
    relevant_data["grants_and_similar_paid_previous_year"] = grants_and_similar_paid_previous_year
    relevant_data["grants_and_similar_paid_current_year"] = grants_and_similar_paid_current_year
    relevant_data["benefits_paid_to_members_previous_year"] = benefits_paid_to_members_previous_year
    relevant_data["benefits_paid_to_members_current_year"] = benefits_paid_to_members_current_year
    relevant_data["salaries_compensation_employee_benefits_previous_year"] = salaries_compensation_employee_benefits_previous_year
    relevant_data["salaries_compensation_employee_benefits_current_year"] = salaries_compensation_employee_benefits_current_year
    relevant_data["total_professional_fundraising_expenses_previous_year"] = total_professional_fundraising_expenses_previous_year
    relevant_data["total_professional_fundraising_expenses_current_year"] = total_professional_fundraising_expenses_current_year
    relevant_data["total_fundraising_expenses_current_year"] = total_fundraising_expenses_current_year
    relevant_data["other_expenses_previous_year"] = other_expenses_previous_year
    relevant_data["other_expenses_current_year"] = other_expenses_current_year
    relevant_data["total_expenses_previous_year"] = total_expenses_previous_year
    relevant_data["total_expenses_current_year"] = total_expenses_current_year
    relevant_data["revenues_less_expenses_previous_year"] = revenues_less_expenses_previous_year
    relevant_data["revenues_less_expenses_current_year"] = revenues_less_expenses_current_year
    relevant_data["total_assets_beginning_of_year"] = total_assets_beginning_of_year
    relevant_data["total_assets_end_of_year"] = total_assets_end_of_year
    relevant_data["total_liabilities_beginning_of_year"] = total_liabilities_beginning_of_year
    relevant_data["total_liabilities_end_of_year"] = total_liabilities_end_of_year
    relevant_data["net_assets_or_fund_balances_beginning_of_year"] = net_assets_or_fund_balances_beginning_of_year
    relevant_data["net_assets_or_fund_balances_end_of_year"] = net_assets_or_fund_balances_end_of_year
    relevant_data["mission_description"] = mission_description
    relevant_data["primary_program_description"] = primary_program_description
    relevant_data["secondary_program_description"] = secondary_program_description
    relevant_data["tertiary_program_description"] = tertiary_program_description
    relevant_data["key_person_name"] = key_person_name
    relevant_data["key_person_title"] = key_person_title
    relevant_data["key_person_salary"] = key_person_salary
    relevant_data["states_where_form_990_filed"] = states_where_form_990_filed
    relevant_data["states_where_form_990_filed_count"] = states_where_form_990_filed_count

    # output = json.dumps(relevant_data)
    return relevant_data

In [None]:
years = ["2023", "2024"]

In [None]:
for year in years:
    csv_file_name = f"irs-990-extract-{year}.csv"
    csv_file_path = f"{irs_standard_dir}{csv_file_name}"
    file_list = glob.glob(f"{irs_standard_dir}{year}/*.xml")
    print(f"Files to process: {len(file_list)}")
    with open(csv_file_path, "w", newline="", encoding="utf-8") as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=columns)
        writer.writeheader()
        i = 0
        for file in file_list:
            data = process_xml(file)
            writer.writerow(data)
            if i % 100000 == 0:
                print(i, "files processed from year", year, "at", time.strftime("%H:%M:%S", time.localtime()))
            i += 1
    # Upload the file to the standard bucket
    blob = standard_bucket.blob(f"v2/irs/{csv_file_name}")
    blob.upload_from_filename(csv_file_path)

In [None]:
%%bigquery
CREATE TABLE IF NOT EXISTS standardized_v2.irs_990_2023 (
    return_timestamp TIMESTAMP,
    tax_period_end_date DATE,
    ein STRING,
    business_name STRING,
    phone STRING,
    address STRING,
    city STRING,
    state STRING,
    zip STRING,
    website STRING,
    formation_year INT64,
    activity_or_mission STRING,
    voting_members_governing INT64,
    voting_members_independent INT64,
    total_employees INT64,
    total_volunteers INT64,
    contributions_and_grants_previous_year FLOAT64,
    contributions_and_grants_current_year FLOAT64,
    program_service_revenue_previous_year FLOAT64,
    program_service_revenue_current_year FLOAT64,
    investment_income_previous_year FLOAT64,
    investment_income_current_year FLOAT64,
    other_revenue_previous_year FLOAT64,
    other_revenue_current_year FLOAT64,
    total_revenue_previous_year FLOAT64,
    total_revenue_current_year FLOAT64,
    grants_and_similar_paid_previous_year FLOAT64,
    grants_and_similar_paid_current_year FLOAT64,
    benefits_paid_to_members_previous_year FLOAT64,
    benefits_paid_to_members_current_year FLOAT64,
    salaries_compensation_employee_benefits_previous_year FLOAT64,
    salaries_compensation_employee_benefits_current_year FLOAT64,
    total_professional_fundraising_expenses_previous_year FLOAT64,
    total_professional_fundraising_expenses_current_year FLOAT64,
    total_fundraising_expenses_current_year FLOAT64,
    other_expenses_previous_year FLOAT64,
    other_expenses_current_year FLOAT64,
    total_expenses_previous_year FLOAT64,
    total_expenses_current_year FLOAT64,
    revenues_less_expenses_previous_year FLOAT64,
    revenues_less_expenses_current_year FLOAT64,
    total_assets_beginning_of_year FLOAT64,
    total_assets_end_of_year FLOAT64,
    total_liabilities_beginning_of_year FLOAT64,
    total_liabilities_end_of_year FLOAT64,
    net_assets_or_fund_balances_beginning_of_year FLOAT64,
    net_assets_or_fund_balances_end_of_year FLOAT64,
    mission_description STRING,
    primary_program_description STRING,
    secondary_program_description STRING,
    tertiary_program_description STRING,
    key_person_name STRING,
    key_person_title STRING,
    key_person_salary FLOAT64,
    states_where_form_990_filed STRING,
    states_where_form_990_filed_count INT64
)

In [None]:
%%bigquery
TRUNCATE TABLE standardized_v2.irs_990_2023;
LOAD DATA INTO standardized_v2.irs_990_2023 (
    return_timestamp TIMESTAMP,
    tax_period_end_date DATE,
    ein STRING,
    business_name STRING,
    phone STRING,
    address STRING,
    city STRING,
    state STRING,
    zip STRING,
    website STRING,
    formation_year INT64,
    activity_or_mission STRING,
    voting_members_governing INT64,
    voting_members_independent INT64,
    total_employees INT64,
    total_volunteers INT64,
    contributions_and_grants_previous_year FLOAT64,
    contributions_and_grants_current_year FLOAT64,
    program_service_revenue_previous_year FLOAT64,
    program_service_revenue_current_year FLOAT64,
    investment_income_previous_year FLOAT64,
    investment_income_current_year FLOAT64,
    other_revenue_previous_year FLOAT64,
    other_revenue_current_year FLOAT64,
    total_revenue_previous_year FLOAT64,
    total_revenue_current_year FLOAT64,
    grants_and_similar_paid_previous_year FLOAT64,
    grants_and_similar_paid_current_year FLOAT64,
    benefits_paid_to_members_previous_year FLOAT64,
    benefits_paid_to_members_current_year FLOAT64,
    salaries_compensation_employee_benefits_previous_year FLOAT64,
    salaries_compensation_employee_benefits_current_year FLOAT64,
    total_professional_fundraising_expenses_previous_year FLOAT64,
    total_professional_fundraising_expenses_current_year FLOAT64,
    total_fundraising_expenses_current_year FLOAT64,
    other_expenses_previous_year FLOAT64,
    other_expenses_current_year FLOAT64,
    total_expenses_previous_year FLOAT64,
    total_expenses_current_year FLOAT64,
    revenues_less_expenses_previous_year FLOAT64,
    revenues_less_expenses_current_year FLOAT64,
    total_assets_beginning_of_year FLOAT64,
    total_assets_end_of_year FLOAT64,
    total_liabilities_beginning_of_year FLOAT64,
    total_liabilities_end_of_year FLOAT64,
    net_assets_or_fund_balances_beginning_of_year FLOAT64,
    net_assets_or_fund_balances_end_of_year FLOAT64,
    mission_description STRING,
    primary_program_description STRING,
    secondary_program_description STRING,
    tertiary_program_description STRING,
    key_person_name STRING,
    key_person_title STRING,
    key_person_salary FLOAT64,
    states_where_form_990_filed STRING,
    states_where_form_990_filed_count INT64
)
FROM FILES(
  format = 'CSV',
  skip_leading_rows = 1,
  -- allow_quoted_newlines = true,
  uris = [
    'gs://hks-almanac-storage-standard/v2/irs/irs-990-extract-2023.csv'
    ]
)
;

# Extract the standard set of columns
In this notebook, we extract the relevant set of columns from each original dataset to create a unified set of organizations. Some of these organizations will be duplicate because they are present in multiple datasets. We will run deduplication steps in the next notebook

## 1 - RAPIDS

In [None]:
%%bigquery
CREATE OR REPLACE TABLE standardized_v2.rapids AS
WITH multi_employer_filter AS (
  SELECT rp.*
  FROM original_v2.rapids_program AS rp
  LEFT JOIN original_v2.rapids_add_prog_info AS api ON api.PS_ID = rp.PS_ID
  WHERE api.IS_MULTI_EMP
  ),

  bring_city_data AS (
    SELECT DISTINCT m.*, d.ProgCity
    FROM multi_employer_filter AS m
    LEFT JOIN original.dol_program AS d ON m.PS_ID = d.PsID
  ),

programs_with_standardized_address AS (
  SELECT
    UPPER(COALESCE(PROG_ADDR, "")) AS std_street,
    UPPER(COALESCE(CAST(ProgCity AS STRING), "")) AS std_city,
    UPPER(COALESCE(PROG_STATE, "")) AS std_state,
    UPPER(COALESCE(PROG_ZIP5, "")) AS std_zip,
    *
  FROM bring_city_data
  WHERE PROG_STATUS = 'Registered'
    -- AND ORG_TYPE = 'Multi-Employer' -- Applied above
    AND ACTIVE_APPRCT > 0
    AND PROG_NAME IS NOT NULL
),
standardized_rapids AS (
  SELECT DISTINCT
    reference.standardize_org_name(p.PROG_NAME) AS org_name,
    CONCAT(p.std_street, ' ', p.std_city, ' ', p.std_state, ' ', p.std_zip) AS org_address,
    p.PS_NUM,
    p.PS_ID,
    p.std_street,
    p.std_city,
    p.std_state,
    p.std_zip,
    p.ORG_TYPE,
    a.ONET_SOC_CD AS soc_code,
    p.PROG_NAME AS program_name,
    p.ACTIVE_APPRCT AS active_apprentice_count,
    p.PROG_STATUS AS program_status,
    p.PROVISIONAL_REVIEW AS provisional_review,
    p.PROG_TYPE,
    p.REGISTERED_DT,
    p.OTHER_ORG_TYPE,
    p.OTHER_PROG_TYPE,
    p.SIC_CD,
    p.EIN AS ein,
    FALSE AS in_ipeds,
    FALSE AS in_irs,
    TRUE AS in_rapids,
    FALSE AS in_tpr
  FROM programs_with_standardized_address AS p
  LEFT JOIN original_v2.rapids_apprentice AS a ON a.ps_Id = p.Ps_ID
)
SELECT
  TO_HEX(MD5(UPPER(CONCAT(org_name, COALESCE(c.output_address, d.org_address, ''))))) AS org_id,
  PS_ID AS rapids_unique_id,
  SPLIT(c.lat_lon, ',')[OFFSET(1)] AS geo_lat,
  SPLIT(c.lat_lon, ',')[OFFSET(0)] AS geo_lon,
  *
FROM standardized_rapids AS d
LEFT JOIN reference.census_gov_address_output AS c ON TO_HEX(MD5(REPLACE(d.org_address, '"', ''))) = c.address_id
;



## 2 - IPEDS


In [None]:
%%bigquery
CREATE OR REPLACE TABLE standardized_v2.ipeds AS
WITH hd_with_standardized_address AS (
    SELECT
      UPPER(COALESCE(ADDR, "")) AS std_street,
      UPPER(COALESCE(CITY, "")) AS std_city,
      UPPER(COALESCE(STABBR, "")) AS std_state,
      UPPER(COALESCE(ZIP, "")) AS std_zip,
      *
    FROM original_v2.ipeds_hd_2022
    -- WHERE INSTCAT IN (3, 4, 6)
),
standardized_ipeds AS (
  SELECT DISTINCT
    reference.standardize_org_name(INSTNM) AS org_name,
    CONCAT(std_street, ' ', std_city, ' ', std_state, ' ', std_zip) AS org_address,
    hd.UNITID,
    std_street,
    std_city,
    std_state,
    std_zip,
    LATITUDE AS geo_lat,
    LONGITUD AS geo_lon,
    hd.INSTCAT AS org_type,
    hd.ICLEVEL AS ic_level,
    hd.WEBADDR AS website,
    hd.ein AS ein,
    TRUE AS in_ipeds,
    FALSE AS in_irs,
    FALSE AS in_rapids,
    FALSE AS in_tpr
  FROM hd_with_standardized_address AS hd
  -- LEFT JOIN original_v2.ipeds_ic_2022 AS ic ON hd.UNITID = ic.UNITID
  -- LEFT JOIN original.ipeds_ef_2020  AS ef ON hd.UNITID = ef.UNITID
)
SELECT
  TO_HEX(MD5(UPPER(CONCAT(org_name, COALESCE(c.output_address, d.org_address, ''))))) AS org_id,
  d.UNITID AS ipeds_unique_id,
  *
FROM standardized_ipeds AS d
LEFT JOIN reference.census_gov_address_output AS c ON TO_HEX(MD5(REPLACE(d.org_address, '"', ''))) = c.address_id
;



## 3 - TPR

In [None]:
%%bigquery
CREATE OR REPLACE TABLE standardized_v2.tpr AS
WITH tpr_with_standardized_address AS (
  SELECT
    UPPER(COALESCE(address, "")) AS std_street,
    UPPER(COALESCE(city, "")) AS std_city,
    UPPER(COALESCE(state, "")) AS std_state,
    UPPER(COALESCE(zip, "")) AS std_zip,
    *
  FROM original_v2.tpr
),
standardized_tpr AS (
  SELECT DISTINCT
    reference.standardize_org_name(d101_eligible_training_provider) AS org_name,
    CONCAT(std_street, ' ', std_city, ' ', std_state, ' ', std_zip) AS org_address,
    provider_unique_id,
    std_street,
    std_city,
    std_state,
    std_zip,
    lat AS geo_lat,
    long AS geo_lon,
    d104_entity_type AS org_type,
    d110_cip_code AS cip_code,
    CIP_Title AS cip_title,
    cip_formatted_4 AS cip_formatted,
    d117_program_soc_occupation_1,
    d118_program_soc_occupation_2,
    d119_program_soc_occupation_3,
    d105_program_name AS program_name,
    d106_program_description AS program_description,
    d107_program_url AS website,
    d109_associated_credential AS credential_name,
    d116_program_format,
    FALSE AS in_ipeds,
    FALSE AS in_irs,
    FALSE AS in_rapids,
    TRUE AS in_tpr
  FROM tpr_with_standardized_address
)
SELECT
  TO_HEX(MD5(UPPER(CONCAT(org_name, COALESCE(c.output_address, d.org_address, ''))))) AS org_id,
  provider_unique_id AS tpr_unique_id,
  *
FROM standardized_tpr AS d
LEFT JOIN reference.census_gov_address_output AS c ON TO_HEX(MD5(REPLACE(d.org_address, '"', ''))) = c.address_id
;


## 4 - IRS

In [None]:
%%bigquery
CREATE OR REPLACE TABLE standardized_v2.irs_bmf AS
WITH bmf_with_standardized_address AS (
  SELECT
    UPPER(COALESCE(STREET, "")) AS std_street,
    UPPER(COALESCE(CITY, "")) AS std_city,
    UPPER(COALESCE(STATE, "")) AS std_state,
    UPPER(COALESCE(ZIP, "")) AS std_zip,
    *
  FROM original_v2.irs_bmf
  WHERE NTEE_CD LIKE ANY (
    'B30%', 'B41%', 'B60%',
    'J20%', 'J21%', 'J22%',
    'J30%', 'J32%', 'J33%'
    )
),
standardized_irs_bmf AS (
  SELECT DISTINCT
    reference.standardize_org_name(NAME) AS org_name,
    CONCAT(std_street, ' ', std_city, ' ', std_state, ' ', std_zip) AS org_address,
    EIN,
    std_street,
    std_city,
    std_state,
    std_zip,
    NTEE_CD AS org_type,
    NULL AS soc_code,
    NULL AS cip_code,
    NULL AS program_name,
    NULL AS credential_name,
    FALSE AS in_ipeds,
    TRUE AS in_irs,
    FALSE AS in_rapids,
    FALSE AS in_tpr
  FROM bmf_with_standardized_address
)
SELECT
  TO_HEX(MD5(UPPER(CONCAT(org_name, COALESCE(c.output_address, d.org_address, ''))))) AS org_id,
  EIN AS irs_unique_id,
  SPLIT(c.lat_lon, ',')[OFFSET(1)] AS geo_lat,
  SPLIT(c.lat_lon, ',')[OFFSET(0)] AS geo_lon,
  *
FROM standardized_irs_bmf AS d
LEFT JOIN reference.census_gov_address_output AS c ON TO_HEX(MD5(REPLACE(d.org_address, '"', ''))) = c.address_id
;


Update: 2025-04-10

Exclude orgs where revenue is not from 2022 or 2023


In [None]:
%%bigquery
DELETE
FROM standardized_v2.irs_bmf
WHERE irs_unique_id IN (SELECT DISTINCT EIN FROM standardized_v2.irs_990_2023_invalid_years)

## Combine datasets

In [None]:
%%bigquery
-- This step merges (unions) all data sources into a single data set
-- This is the first step of deduplication (notice the DISTINCT keyword in each SELECT)
CREATE OR REPLACE TABLE deduped_v2.organizations_combined AS

SELECT DISTINCT
  'rapids' AS data_source,
  org_name,
  org_id,
  NULL AS ipeds_unique_id,
  CAST(NULL AS STRING) AS irs_unique_id,
  rapids_unique_id,
  NULL AS tpr_unique_id,
  COALESCE(output_address, org_address) AS org_address,
  std_street,
  std_city,
  std_state,
  std_zip,
  CAST(geo_lat AS NUMERIC) AS geo_lat,
  CAST(geo_lon AS NUMERIC) AS geo_lon,
  org_type,
  CAST(ein AS STRING) AS ein,
  in_ipeds,
  in_irs,
  in_rapids,
  in_tpr
FROM standardized_v2.rapids

UNION ALL

SELECT DISTINCT
  'ipeds' AS data_source,
  org_name,
  org_id,
  ipeds_unique_id,
  CAST(NULL AS STRING) AS irs_unique_id,
  NULL AS rapids_unique_id,
  NULL AS tpr_unique_id,
  COALESCE(output_address, org_address) AS org_address,
  std_street,
  std_city,
  std_state,
  std_zip,
  geo_lat,
  geo_lon,
  CAST(org_type AS STRING),
  CAST(ein AS STRING) AS ein,
  in_ipeds,
  in_irs,
  in_rapids,
  in_tpr
FROM standardized_v2.ipeds

UNION ALL

SELECT DISTINCT
  'irs' AS data_source,
  org_name,
  org_id,
  NULL AS ipeds_unique_id,
  irs_unique_id,
  NULL AS rapids_unique_id,
  NULL AS tpr_unique_id,
  COALESCE(output_address, org_address) AS org_address,
  std_street,
  std_city,
  std_state,
  std_zip,
  CAST(geo_lat AS NUMERIC) AS geo_lat,
  CAST(geo_lon AS NUMERIC) AS geo_lon,
  org_type,
  ein,
  in_ipeds,
  in_irs,
  in_rapids,
  in_tpr
FROM standardized_v2.irs_bmf

UNION ALL

SELECT DISTINCT
  'tpr' AS data_source,
  org_name,
  org_id,
  NULL AS ipeds_unique_id,
  CAST(NULL AS STRING) AS irs_unique_id,
  NULL AS rapids_unique_id,
  tpr_unique_id,
  COALESCE(output_address, org_address) AS org_address,
  std_street,
  std_city,
  std_state,
  std_zip,
  geo_lat,
  geo_lon,
  org_type,
  CAST(NULL AS STRING) AS ein,
  in_ipeds,
  in_irs,
  in_rapids,
  in_tpr
FROM standardized_v2.tpr
;

# Deduplication

This lengthy step uses the logic provided by the Almanac team to dedupe organizations based on their original data source and org type

https://workforcealmanac.com/methodology


The ANALYSIS and ACTION steps are repeated for all data sources

**ANALYSIS**: Let's start by checking for duplicates in each data source using combined organizations data set.

In [None]:
%%bigquery
SELECT data_source, COUNT(*) AS org_count, COUNT(DISTINCT org_id) AS unique_org_count
FROM deduped_v2.organizations_combined
GROUP BY data_source
ORDER BY data_source


## IPEDS

The first step is to select all orgs from IPEDS. Query results above states that there are no duplicate orgs in ipeds

**ACTION**: Persist the results

In [None]:
%%bigquery
CREATE OR REPLACE TABLE deduped_v2.step_1_ipeds AS
SELECT
  CASE
    WHEN org_type = '3' THEN 'Degree-granting, not primarily baccalaureate or above'
    WHEN org_type = '4' THEN 'Degree-granting, Associates and certificates'
    WHEN org_type = '6' THEN 'Nondegree-granting, sub-baccalaureate'
  END AS chosen_org_type,
  1 AS data_source_priority,
  *
FROM deduped_v2.organizations_combined
WHERE data_source = 'ipeds'
QUALIFY ROW_NUMBER() OVER (PARTITION BY org_id ORDER BY org_type DESC) = 1
;

**ACTION**: Create the unique orgs step by step

In [None]:
%%bigquery
CREATE OR REPLACE TABLE deduped_v2.unique_orgs_sbs AS
SELECT *
FROM deduped_v2.step_1_ipeds
;

**ANALYSIS**: How many orgs are left per data source? Note that the `WHERE`  clause excludes the orgs that are already added

In [None]:
%%bigquery
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.step_1_ipeds AS i ON o.org_id = i.org_id
  WHERE i.org_id IS NULL
)
SELECT data_source, COUNT(*) AS org_count, COUNT(DISTINCT org_id) AS unique_org_count
FROM tmp1
GROUP BY data_source
;

In [None]:
%%bigquery
-- Step 1.2: IRS and B41

WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.step_1_ipeds AS i ON o.org_id = i.org_id
  WHERE i.org_id IS NULL
)
-- ANALYSIS: Are they all unique orgs
SELECT COUNT(*) AS number_of_orgs, COUNT(DISTINCT org_id) AS number_of_unique_orgs
FROM tmp1
WHERE data_source = 'irs' AND org_type LIKE 'B41%'
;

In [None]:
%%bigquery
-- Yes, all unique
-- COUNT: 330 orgs
-- ACTION: Persist orgs in Step 1.2 - IRS
CREATE OR REPLACE TABLE deduped_v2.step_1_irs AS
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.step_1_ipeds AS i ON o.org_id = i.org_id
  WHERE i.org_id IS NULL
)
SELECT 'Other higher education institution' AS chosen_org_type, 2 AS data_source_priority, *
FROM tmp1
WHERE data_source = 'irs' AND org_type LIKE 'B41%'
;

In [None]:
%%bigquery
-- ACTION: Regenerate the novel data set by combining the above two mutually exclusive sets (UNION ALL)
CREATE OR REPLACE TABLE deduped_v2.unique_orgs_sbs AS
SELECT *
FROM deduped_v2.step_1_ipeds

UNION ALL

SELECT *
FROM deduped_v2.step_1_irs
;

In [None]:
%%bigquery

-- QUESTION: How many orgs are left per data source now?
-- Use this from now on

WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT data_source, COUNT(*) AS org_count, COUNT(DISTINCT org_id) AS unique_org_count
FROM tmp1
GROUP BY data_source
;


In [None]:
%%bigquery
-- Step 1.3 - TPR - Higher Ed
-- Are there any duplicates?
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT COUNT(*) AS row_count, COUNT(DISTINCT org_id) AS unique_count
FROM tmp1
WHERE data_source = 'tpr' AND lower(org_type) LIKE '%higher ed:%'
;

In [None]:
%%bigquery
-- Yes 81 of them are duplicates
-- We can use the prioritization logic in Almanac Methodology to break the ties
CREATE OR REPLACE TABLE deduped_v2.sbs_excluded AS
WITH selection AS (
SELECT o.*,
  CASE
    WHEN lower(o.org_type) = lower('Higher Ed: Certificate of Completion') THEN 1
    WHEN lower(o.org_type) = lower('Higher Ed: Baccalaureate or Higher') THEN 2
    WHEN lower(o.org_type) = lower('Higher Ed: Associate\'s Degree') THEN 3
  END AS priority
FROM deduped_v2.organizations_combined AS o
LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
WHERE n.org_id IS NULL AND o.data_source = 'tpr' AND lower(o.org_type) LIKE '%higher ed:%'
--QUALIFY COUNT(*) OVER (PARTITION BY org_id) > 1
)
SELECT 'Other higher education institution' AS chosen_org_type,
  3 AS data_source_priority,
  * EXCEPT (priority)
FROM selection
QUALIFY ROW_NUMBER() OVER (PARTITION BY org_id ORDER BY priority) > 1
ORDER BY org_id
;

In [None]:
%%bigquery
SELECT data_source, COUNT(*) AS org_count, COUNT(DISTINCT org_id) AS unique_org_count
FROM deduped_v2.sbs_excluded
GROUP BY data_source


In [None]:
%%bigquery
-- ACTION: Add TPR to the novel dataset at this point before moving to the next stage
CREATE OR REPLACE TABLE deduped_v2.unique_orgs_sbs AS
SELECT *
FROM deduped_v2.step_1_ipeds

UNION ALL

SELECT *
FROM deduped_v2.step_1_irs

UNION ALL

SELECT *
FROM deduped_v2.step_1_tpr
;

In [None]:
%%bigquery
SELECT data_source, COUNT(*) AS org_count, COUNT(DISTINCT org_id) AS unique_org_count
FROM deduped_v2.unique_orgs_sbs
WHERE org_id IN (
  SELECT org_id
  FROM deduped_v2.sbs_excluded
)
GROUP BY data_source

In [None]:
%%bigquery
-- Step 1.4 - RAPIDS - Community College

-- Are there any duplicates?
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT COUNT(*) AS row_count, COUNT(DISTINCT org_id) AS unique_count
FROM tmp1
WHERE data_source = 'rapids' AND lower(org_type) LIKE lower('%Community College/University%')
;

In [None]:
%%bigquery
-- ANSWER: No duplicates.
-- ACTION: Select the one with the longest name as the representative in case there are dupes in the future runs
CREATE OR REPLACE TABLE deduped_v2.step_1_rapids AS
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT 'Other higher education institution' AS chosen_org_type, 4 AS data_source_priority, *
FROM tmp1
WHERE data_source = 'rapids' AND lower(org_type) LIKE lower('%Community College/University%')
QUALIFY ROW_NUMBER() OVER (PARTITION BY org_id ORDER BY LENGTH(org_name) DESC) = 1
;

In [None]:
%%bigquery
-- ACTION: Recreate novel dataset
CREATE OR REPLACE TABLE deduped_v2.unique_orgs_sbs AS
SELECT *
FROM deduped_v2.step_1_ipeds

UNION ALL

SELECT *
FROM deduped_v2.step_1_irs

UNION ALL

SELECT *
FROM deduped_v2.step_1_tpr

UNION ALL
SELECT *
FROM deduped_v2.step_1_rapids
;

In [None]:
%%bigquery
-- Assert uniqueness
ASSERT (
  SELECT COUNT(*) = COUNT(DISTINCT org_id) FROM deduped_v2.unique_orgs_sbs
)
;

In [None]:
%%bigquery

-- Step 2.1 -- IRS non B41
-- Are there any duplicates?
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT COUNT(*) AS row_count, COUNT(DISTINCT org_id) AS unique_count
FROM tmp1
WHERE data_source = 'irs' AND org_type NOT LIKE 'B41%'
;

In [None]:
%%bigquery
-- Yes, only 6 dupes
-- What do they look like and how can we break ties?
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT *
FROM tmp1
WHERE data_source = 'irs' AND org_type NOT LIKE 'B41%'
QUALIFY COUNT(*) OVER (PARTITION BY org_id) > 1
ORDER BY org_id
;

In [None]:
%%bigquery
-- Pick the longer street address as the representative for the dupes because it contains more information
CREATE OR REPLACE TABLE deduped_v2.step_2_irs AS
WITH tmp1 AS (
  SELECT 'Private non-profit' AS chosen_org_type,  -- this overrides all other org types in IRS
  5 AS data_source_priority,
    o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT *
FROM tmp1
WHERE data_source = 'irs' AND org_type NOT LIKE 'B41%'
QUALIFY ROW_NUMBER() OVER (PARTITION BY org_id ORDER BY LENGTH(std_street) DESC) = 1
;

In [None]:
%%bigquery
INSERT INTO deduped_v2.sbs_excluded
WITH tmp1 AS (
  SELECT 'Private non-profit' AS chosen_org_type,  -- this overrides all other org types in IRS
  5 AS data_source_priority,
    o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT *
FROM tmp1
WHERE data_source = 'irs' AND org_type NOT LIKE 'B41%'
QUALIFY ROW_NUMBER() OVER (PARTITION BY org_id ORDER BY LENGTH(std_street) DESC) > 1
;

In [None]:
%%bigquery
-- ACTION: Recreate novel dataset
CREATE OR REPLACE TABLE deduped_v2.unique_orgs_sbs AS
SELECT *
FROM deduped_v2.step_1_ipeds

UNION ALL

SELECT *
FROM deduped_v2.step_1_irs

UNION ALL

SELECT *
FROM deduped_v2.step_1_tpr

UNION ALL
SELECT *
FROM deduped_v2.step_1_rapids
UNION ALL
SELECT *
FROM deduped_v2.step_2_irs

;

In [None]:
%%bigquery
ASSERT (
  SELECT COUNT(*) = COUNT(DISTINCT org_id) FROM deduped_v2.unique_orgs_sbs
)
;

In [None]:
%%bigquery
-- Step 2.2 -- TPR non-profit
-- Are there any duplicates?
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT COUNT(*) AS row_count, COUNT(DISTINCT org_id) AS unique_count
FROM tmp1
WHERE data_source = 'tpr' AND lower(org_type) = 'private non-profit'
;

In [None]:
%%bigquery
-- Yes, 1 dupe
-- What do they look like and how can we break ties?
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT *
FROM tmp1
WHERE data_source = 'tpr' AND lower(org_type) = 'private non-profit'
QUALIFY COUNT(*) OVER (PARTITION BY org_id) > 1
ORDER BY org_id
;

In [None]:
%%bigquery
-- Pick the smaller tpr_unique_id
CREATE OR REPLACE TABLE deduped_v2.step_2_tpr AS
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT 'Private non-profit' AS chosen_org_type, 6 AS data_source_priority, *
FROM tmp1
WHERE data_source = 'tpr' AND lower(org_type) = 'private non-profit'
QUALIFY ROW_NUMBER() OVER (PARTITION BY org_id ORDER BY tpr_unique_id) = 1
;

In [None]:
%%bigquery
-- Save the dupe(s)
INSERT INTO deduped_v2.sbs_excluded
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT 'Private non-profit' AS chosen_org_type, 6 AS data_source_priority, *
FROM tmp1
WHERE data_source = 'tpr' AND lower(org_type) = 'private non-profit'
QUALIFY ROW_NUMBER() OVER (PARTITION BY org_id ORDER BY tpr_unique_id) > 1
;

In [None]:
%%bigquery
-- ACTION: Recreate novel dataset
CREATE OR REPLACE TABLE deduped_v2.unique_orgs_sbs AS
SELECT *
FROM deduped_v2.step_1_ipeds

UNION ALL

SELECT *
FROM deduped_v2.step_1_irs

UNION ALL

SELECT *
FROM deduped_v2.step_1_tpr

UNION ALL
SELECT *
FROM deduped_v2.step_1_rapids
UNION ALL
SELECT *
FROM deduped_v2.step_2_irs
UNION ALL
SELECT *
FROM deduped_v2.step_2_tpr
;

ASSERT (
  SELECT COUNT(*) = COUNT(DISTINCT org_id) FROM deduped_v2.unique_orgs_sbs
)
;

In [None]:
%%bigquery
-- Step 2.3 -- DOL non-profit
-- Are there any duplicates?
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT COUNT(*) AS row_count, COUNT(DISTINCT org_id) AS unique_count
FROM tmp1
WHERE data_source = 'rapids' AND lower(org_type) = 'community based organization'
;

In [None]:
%%bigquery
-- Pick the longer name address as the representative for the dupes
CREATE OR REPLACE TABLE deduped_v2.step_2_rapids AS
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT 'Private non-profit' AS chosen_org_type, 7 AS data_source_priority, *
FROM tmp1
WHERE data_source = 'rapids' AND lower(org_type) = 'community based organization'
QUALIFY ROW_NUMBER() OVER (PARTITION BY org_id ORDER BY LENGTH(org_name) DESC) = 1
;

In [None]:
%%bigquery
-- ACTION: Recreate novel dataset
CREATE OR REPLACE TABLE deduped_v2.unique_orgs_sbs AS
SELECT *
FROM deduped_v2.step_1_ipeds

UNION ALL

SELECT *
FROM deduped_v2.step_1_irs

UNION ALL

SELECT *
FROM deduped_v2.step_1_tpr

UNION ALL
SELECT *
FROM deduped_v2.step_1_rapids
UNION ALL
SELECT *
FROM deduped_v2.step_2_irs
UNION ALL
SELECT *
FROM deduped_v2.step_2_tpr
UNION ALL
SELECT *
FROM deduped_v2.step_2_rapids
;

ASSERT (
  SELECT COUNT(*) = COUNT(DISTINCT org_id) FROM deduped_v2.unique_orgs_sbs
)
;

In [None]:
%%bigquery

-- Step 3.1 -- DOL - Known Org Types
-- Are there any duplicates?
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT COUNT(*) AS row_count, COUNT(DISTINCT org_id) AS unique_count
FROM tmp1
WHERE data_source = 'rapids' AND org_type IS NOT NULL AND lower(org_type) NOT IN ('other', 'none')
;

In [None]:
%%bigquery
-- Pick the longer name as the representative for the dupes
CREATE OR REPLACE TABLE deduped_v2.step_3_rapids AS
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT CONCAT('Apprenticeship - ', org_type) AS chosen_org_type, 8 AS data_source_priority, *
FROM tmp1
WHERE data_source = 'rapids' AND org_type IS NOT NULL AND lower(org_type) NOT IN ('other', 'none')
QUALIFY ROW_NUMBER() OVER (PARTITION BY org_id ORDER BY LENGTH(org_name) DESC) = 1
;

In [None]:
%%bigquery
-- Save the dupe(s)
INSERT INTO deduped_v2.sbs_excluded
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT CONCAT('Apprenticeship - ', org_type) AS chosen_org_type, 8 AS data_source_priority, *
FROM tmp1
WHERE data_source = 'rapids' AND org_type IS NOT NULL AND lower(org_type) NOT IN ('other', 'none')
QUALIFY ROW_NUMBER() OVER (PARTITION BY org_id ORDER BY LENGTH(org_name) DESC) > 1
;

In [None]:
%%bigquery
-- ACTION: Recreate novel dataset
CREATE OR REPLACE TABLE deduped_v2.unique_orgs_sbs AS
SELECT *
FROM deduped_v2.step_1_ipeds

UNION ALL

SELECT *
FROM deduped_v2.step_1_irs

UNION ALL

SELECT *
FROM deduped_v2.step_1_tpr

UNION ALL
SELECT *
FROM deduped_v2.step_1_rapids
UNION ALL
SELECT *
FROM deduped_v2.step_2_irs
UNION ALL
SELECT *
FROM deduped_v2.step_2_tpr
UNION ALL
SELECT *
FROM deduped_v2.step_2_rapids
UNION ALL
SELECT *
FROM deduped_v2.step_3_rapids
;

ASSERT (
  SELECT COUNT(*) = COUNT(DISTINCT org_id) FROM deduped_v2.unique_orgs_sbs
)
;

In [None]:
%%bigquery
-- Step 3.2 -- DOL - Unknown Org Types
-- Are there any duplicates?
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT COUNT(*) AS row_count, COUNT(DISTINCT org_id) AS unique_count
FROM tmp1
WHERE data_source = 'rapids' AND (org_type IS NULL OR lower(org_type) IN ('other', 'none'))
;


In [None]:
%%bigquery
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT 'Apprenticeship - Unknown' AS chosen_org_type, 9 AS data_source_priority, *
FROM tmp1
WHERE data_source = 'rapids' AND (org_type IS NULL OR lower(org_type) IN ('other', 'none'))
QUALIFY COUNT(*) OVER (PARTITION BY org_id) > 1
ORDER BY org_id

In [None]:
%%bigquery

-- Pick the first of each rapids_unique_id
CREATE OR REPLACE TABLE deduped_v2.step_3_rapids_null_other AS
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT 'Apprenticeship - Unknown' AS chosen_org_type, 9 AS data_source_priority, *
FROM tmp1
WHERE data_source = 'rapids' AND (org_type IS NULL OR lower(org_type) IN ('other', 'none'))
QUALIFY ROW_NUMBER() OVER (PARTITION BY org_id ORDER BY rapids_unique_id) = 1
;

In [None]:
%%bigquery
-- Save the rest
INSERT INTO deduped_v2.sbs_excluded
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT 'Apprenticeship - Unknown' AS chosen_org_type, 9 AS data_source_priority, *
FROM tmp1
WHERE data_source = 'rapids' AND (org_type IS NULL OR lower(org_type) IN ('other', 'none'))
QUALIFY ROW_NUMBER() OVER (PARTITION BY org_id ORDER BY rapids_unique_id) > 1
;

In [None]:
%%bigquery
-- ACTION: Recreate novel dataset
CREATE OR REPLACE TABLE deduped_v2.unique_orgs_sbs AS
SELECT *
FROM deduped_v2.step_1_ipeds
UNION ALL
SELECT *
FROM deduped_v2.step_1_irs
UNION ALL
SELECT *
FROM deduped_v2.step_1_tpr
UNION ALL
SELECT *
FROM deduped_v2.step_1_rapids

UNION ALL
SELECT *
FROM deduped_v2.step_2_irs
UNION ALL
SELECT *
FROM deduped_v2.step_2_tpr
UNION ALL
SELECT *
FROM deduped_v2.step_2_rapids

UNION ALL
SELECT *
FROM deduped_v2.step_3_rapids
UNION ALL
SELECT *
FROM deduped_v2.step_3_rapids_null_other
;

ASSERT (
  SELECT COUNT(*) = COUNT(DISTINCT org_id) FROM deduped_v2.unique_orgs_sbs
)
;

In [None]:
%%bigquery
-- Step 3.3 -- TPR
-- Are there any duplicates?
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT COUNT(*) AS row_count, COUNT(DISTINCT org_id) AS unique_count
FROM tmp1
WHERE data_source = 'tpr' AND lower(org_type) = 'national apprenticeship'
;

In [None]:
%%bigquery
-- 3 dupes
-- What do they look like and how can we break ties?
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT *
FROM tmp1
WHERE data_source = 'tpr' AND lower(org_type) = 'national apprenticeship'
QUALIFY COUNT(*) OVER (PARTITION BY org_id) > 1
ORDER BY org_id
;


In [None]:
%%bigquery
-- Pick the first tpr_unique_id
CREATE OR REPLACE TABLE deduped_v2.step_3_tpr AS
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT org_type AS chosen_org_type, 10 AS data_source_priority, *
FROM tmp1
WHERE data_source = 'tpr' AND lower(org_type) = 'national apprenticeship'
QUALIFY ROW_NUMBER() OVER (PARTITION BY org_id ORDER BY tpr_unique_id) = 1
;

In [None]:
%%bigquery
INSERT INTO deduped_v2.sbs_excluded
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT org_type AS chosen_org_type, 10 AS data_source_priority, *
FROM tmp1
WHERE data_source = 'tpr' AND lower(org_type) = 'national apprenticeship'
QUALIFY ROW_NUMBER() OVER (PARTITION BY org_id ORDER BY tpr_unique_id) > 1
;

In [None]:
%%bigquery
-- ACTION: Recreate novel dataset
CREATE OR REPLACE TABLE deduped_v2.unique_orgs_sbs AS
SELECT *
FROM deduped_v2.step_1_ipeds
UNION ALL
SELECT *
FROM deduped_v2.step_1_irs
UNION ALL
SELECT *
FROM deduped_v2.step_1_tpr
UNION ALL
SELECT *
FROM deduped_v2.step_1_rapids

UNION ALL
SELECT *
FROM deduped_v2.step_2_irs
UNION ALL
SELECT *
FROM deduped_v2.step_2_tpr
UNION ALL
SELECT *
FROM deduped_v2.step_2_rapids

UNION ALL
SELECT *
FROM deduped_v2.step_3_rapids
UNION ALL
SELECT *
FROM deduped_v2.step_3_rapids_null_other
UNION ALL
SELECT *
FROM deduped_v2.step_3_tpr
;

ASSERT (
  SELECT COUNT(*) = COUNT(DISTINCT org_id) FROM deduped_v2.unique_orgs_sbs
)
;

In [None]:
%%bigquery
-- Step 4.1 -- TPR
-- Are there any duplicates?
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT COUNT(*) AS row_count, COUNT(DISTINCT org_id) AS unique_count
FROM tmp1
WHERE data_source = 'tpr' AND lower(org_type) = 'private for-profit'
;

In [None]:
%%bigquery
-- Yes, 23 dupes
-- What do they look like and how can we break ties?
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT *
FROM tmp1
WHERE data_source = 'tpr' AND lower(org_type) = 'private for-profit'
QUALIFY COUNT(*) OVER (PARTITION BY org_id) > 1
ORDER BY org_id
;

In [None]:
%%bigquery
-- Pick the fist tpr_unique_id
CREATE OR REPLACE TABLE deduped_v2.step_4_tpr AS
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT 'Private for-profit' AS chosen_org_type, 11 AS data_source_priority, *
FROM tmp1
WHERE data_source = 'tpr' AND lower(org_type) = 'private for-profit'
QUALIFY ROW_NUMBER() OVER (PARTITION BY org_id ORDER BY tpr_unique_id) = 1
;

In [None]:
%%bigquery
INSERT INTO deduped_v2.sbs_excluded
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT 'Private for-profit' AS chosen_org_type, 11 AS data_source_priority, *
FROM tmp1
WHERE data_source = 'tpr' AND lower(org_type) = 'private for-profit'
QUALIFY ROW_NUMBER() OVER (PARTITION BY org_id ORDER BY tpr_unique_id) > 1
;

In [None]:
%%bigquery
-- ACTION: Recreate novel dataset
CREATE OR REPLACE TABLE deduped_v2.unique_orgs_sbs AS
SELECT *
FROM deduped_v2.step_1_ipeds
UNION ALL
SELECT *
FROM deduped_v2.step_1_irs
UNION ALL
SELECT *
FROM deduped_v2.step_1_tpr
UNION ALL
SELECT *
FROM deduped_v2.step_1_rapids

UNION ALL
SELECT *
FROM deduped_v2.step_2_irs
UNION ALL
SELECT *
FROM deduped_v2.step_2_tpr
UNION ALL
SELECT *
FROM deduped_v2.step_2_rapids

UNION ALL
SELECT *
FROM deduped_v2.step_3_rapids
UNION ALL
SELECT *
FROM deduped_v2.step_3_rapids_null_other
UNION ALL
SELECT *
FROM deduped_v2.step_3_tpr

UNION ALL
SELECT *
FROM deduped_v2.step_4_tpr
;

ASSERT (
  SELECT COUNT(*) = COUNT(DISTINCT org_id) FROM deduped_v2.unique_orgs_sbs
)
;

In [None]:
%%bigquery
-- Step 5.1 -- TPR Public
-- Are there any duplicates?
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT COUNT(*) AS row_count, COUNT(DISTINCT org_id) AS unique_count
FROM tmp1
WHERE data_source = 'tpr' AND lower(org_type) = 'public'
;

In [None]:
%%bigquery
-- No dupes
CREATE OR REPLACE TABLE deduped_v2.step_5_tpr AS
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT 'Public' AS chosen_org_type, 12 AS data_source_priority, *
FROM tmp1
WHERE data_source = 'tpr' AND lower(org_type) = 'public'
QUALIFY ROW_NUMBER() OVER (PARTITION BY org_id ORDER BY LENGTH(org_name)) = 1
;

In [None]:
%%bigquery
-- ACTION: Recreate novel dataset
CREATE OR REPLACE TABLE deduped_v2.unique_orgs_sbs AS
SELECT *
FROM deduped_v2.step_1_ipeds
UNION ALL
SELECT *
FROM deduped_v2.step_1_irs
UNION ALL
SELECT *
FROM deduped_v2.step_1_tpr
UNION ALL
SELECT *
FROM deduped_v2.step_1_rapids

UNION ALL
SELECT *
FROM deduped_v2.step_2_irs
UNION ALL
SELECT *
FROM deduped_v2.step_2_tpr
UNION ALL
SELECT *
FROM deduped_v2.step_2_rapids

UNION ALL
SELECT *
FROM deduped_v2.step_3_rapids
UNION ALL
SELECT *
FROM deduped_v2.step_3_rapids_null_other
UNION ALL
SELECT *
FROM deduped_v2.step_3_tpr

UNION ALL
SELECT *
FROM deduped_v2.step_4_tpr

UNION ALL
SELECT *
FROM deduped_v2.step_5_tpr
;

ASSERT (
  SELECT COUNT(*) = COUNT(DISTINCT org_id) FROM deduped_v2.unique_orgs_sbs
)
;

In [None]:
%%bigquery
-- Step 5.2 -- NULL or TPR Other
-- Are there any duplicates?
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT COUNT(*) AS row_count, COUNT(DISTINCT org_id) AS unique_count
FROM tmp1
WHERE org_type IS NULL OR (data_source = 'tpr' AND lower(org_type) = 'other')
;

In [None]:
%%bigquery
-- Yes, 7 dupes
-- What do they look like and how can we break ties?
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT *
FROM tmp1
WHERE org_type IS NULL OR (data_source = 'tpr' AND lower(org_type) = 'other')
QUALIFY COUNT(*) OVER (PARTITION BY org_id) > 1
ORDER BY org_id
;

In [None]:
%%bigquery
-- Pick the first tpr_unique_id
CREATE OR REPLACE TABLE deduped_v2.step_5_null_tpr_other AS
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT 'Unknown' AS chosen_org_type, 13 AS data_source_priority, *
FROM tmp1
WHERE org_type IS NULL OR (data_source = 'tpr' AND lower(org_type) = 'other')
QUALIFY ROW_NUMBER() OVER (PARTITION BY org_id ORDER BY tpr_unique_id) = 1
;

In [None]:
%%bigquery
INSERT INTO deduped_v2.sbs_excluded
WITH tmp1 AS (
  SELECT o.*
  FROM deduped_v2.organizations_combined AS o
  LEFT JOIN deduped_v2.unique_orgs_sbs AS n ON o.org_id = n.org_id
  WHERE n.org_id IS NULL
)
SELECT 'Unknown' AS chosen_org_type, 13 AS data_source_priority, *
FROM tmp1
WHERE org_type IS NULL OR (data_source = 'tpr' AND lower(org_type) = 'other')
QUALIFY ROW_NUMBER() OVER (PARTITION BY org_id ORDER BY tpr_unique_id) > 1
;

In [None]:
%%bigquery
-- ACTION: Recreate novel dataset
CREATE OR REPLACE TABLE deduped_v2.unique_orgs_sbs AS
SELECT *
FROM deduped_v2.step_1_ipeds
UNION ALL
SELECT *
FROM deduped_v2.step_1_irs
UNION ALL
SELECT *
FROM deduped_v2.step_1_tpr
UNION ALL
SELECT *
FROM deduped_v2.step_1_rapids

UNION ALL
SELECT *
FROM deduped_v2.step_2_irs
UNION ALL
SELECT *
FROM deduped_v2.step_2_tpr
UNION ALL
SELECT *
FROM deduped_v2.step_2_rapids

UNION ALL
SELECT *
FROM deduped_v2.step_3_rapids
UNION ALL
SELECT *
FROM deduped_v2.step_3_rapids_null_other
UNION ALL
SELECT *
FROM deduped_v2.step_3_tpr

UNION ALL
SELECT *
FROM deduped_v2.step_4_tpr

UNION ALL
SELECT *
FROM deduped_v2.step_5_tpr

UNION ALL
SELECT *
FROM deduped_v2.step_5_null_tpr_other
;

ASSERT (
  SELECT COUNT(*) = COUNT(DISTINCT org_id) FROM deduped_v2.unique_orgs_sbs
)
;



Some number checks

In [None]:
%%bigquery

SELECT data_source, COUNT(*) AS org_count, COUNT(DISTINCT org_id) AS unique_org_count
FROM deduped_v2.unique_orgs_sbs
GROUP BY data_source
ORDER BY data_source

In [None]:
%%bigquery
SELECT data_source, COUNT(*) AS org_count, COUNT(DISTINCT org_id) AS unique_org_count
FROM deduped_v2.sbs_excluded
GROUP BY data_source
ORDER BY data_source

Are there cases where the excluded org is present in multiple sources?

In [None]:
%%bigquery
SELECT *
FROM deduped_v2.sbs_excluded AS e
LEFT JOIN deduped_v2.unique_orgs_sbs AS u ON e.org_id = u.org_id
WHERE e.data_source <> u.data_source


## Cross Dataset Deduplication

In [None]:
bigquery_client = bigquery.Client(project=project, location=location)

In [None]:
%%bigquery
-- These orgs have the same address and their names are "very" similar, therefore we treat them as the same
CREATE OR REPLACE TABLE deduped_v2.orgs_with_same_address_and_similar_name AS
WITH address_match AS (
  SELECT DISTINCT
    t1.org_id AS t1_org_id, t1.org_name AS t1_org_name, t2.org_name AS t2_org_name,
    reference.jaro_winkler_similarity(t1.org_name, t2.org_name) AS match_rate,
    reference.levenshtein_distance(t1.org_name, t2.org_name) AS lev_distance,
    CAST(100*ROUND(1 - reference.levenshtein_distance(t1.org_name, t2.org_name) / LENGTH(t1.org_name), 2) AS INT64) AS lev_similarity,
    t2.org_id AS t2_org_id
  FROM deduped_v2.unique_orgs_sbs AS t1
  INNER JOIN deduped_v2.unique_orgs_sbs AS t2
    ON t1.org_address = t2.org_address
    AND t1.org_id > t2.org_id
    AND LENGTH(t1.org_name) >= 10
    AND LENGTH(t2.org_name) >= 10
)
SELECT t1_org_id, t2_org_id
FROM address_match
WHERE match_rate >= 50 AND lev_similarity >= 60
ORDER BY lev_similarity, match_rate
;


In [None]:
query = "SELECT DISTINCT org_id FROM deduped_v2.organizations_combined"
query_job = client.query(query)

In [None]:
deduped_orgs = query_job.to_dataframe()

In [None]:
nodes = set(deduped_orgs["org_id"])

In [None]:
query = "SELECT t1_org_id, t2_org_id FROM deduped_v2.orgs_with_same_address_and_similar_name"
query_job = client.query(query)
similar_orgs = query_job.to_dataframe()
edges = set(zip(similar_orgs["t1_org_id"], similar_orgs["t2_org_id"]))

In [None]:
graph = nx.Graph()

In [None]:
graph.add_nodes_from(nodes)
graph.add_edges_from(edges)

In [None]:
components = {i:n for i, n in enumerate(nx.connected_components(graph))}

In [None]:
matchset = [(k, w) for k, v in components.items() for w in v]

In [None]:
match_df = pd.DataFrame(matchset, columns=["dedupe_id", "org_id"])

In [None]:
match_df.head(10)

In [None]:
match_df.to_gbq("deduped_v2.unique_org_ids", project_id=project, if_exists='replace')

Create the novel dataset by selecting the higher priority orgs whenever there is a dupe

In [None]:
%%bigquery

CREATE OR REPLACE TABLE novel_v2.unique_orgs AS
WITH unique_org AS (
  SELECT
    dedupe_id,
    d.*
  FROM deduped_v2.unique_org_ids AS u
  LEFT JOIN deduped_v2.unique_orgs_sbs AS d ON u.org_id = d.org_id
  QUALIFY ROW_NUMBER() OVER(PARTITION BY dedupe_id ORDER BY data_source_priority, LENGTH(org_name)) = 1
)
SELECT *,
  CASE WHEN data_source = 'ipeds' THEN org_type END AS org_subtype_ipeds,
  CASE WHEN data_source = 'irs' THEN org_type END AS org_subtype_irs,
  CASE WHEN data_source = 'rapids' THEN org_type END AS org_subtype_rapids,
  CASE WHEN data_source = 'tpr' THEN org_type END AS org_subtype_tpr
FROM unique_org
ORDER BY dedupe_id, data_source
;

In [None]:
%%bigquery
SELECT data_source, COUNT(*) AS org_count, COUNT(DISTINCT org_id) AS unique_org_count
FROM deduped_v2.unique_orgs_sbs
GROUP BY data_source
ORDER BY data_source

In [None]:
%%bigquery
CREATE OR REPLACE TABLE deduped_v2.unique_orgs_excluded AS
WITH unique_org AS (
  SELECT
    dedupe_id,
    d.*
  FROM deduped_v2.unique_org_ids AS u
  LEFT JOIN deduped_v2.unique_orgs_sbs AS d ON u.org_id = d.org_id
  QUALIFY ROW_NUMBER() OVER(PARTITION BY dedupe_id ORDER BY data_source_priority, LENGTH(org_name)) > 1
)
SELECT *,
  -- CAST(in_ipeds AS INT64) + CAST(in_rapids AS INT64) + CAST(in_irs AS INT64) + CAST(in_tpr AS INT64) AS num_data_sources,
  CASE WHEN data_source = 'ipeds' THEN org_type END AS org_subtype_ipeds,
  CASE WHEN data_source = 'irs' THEN org_type END AS org_subtype_irs,
  CASE WHEN data_source = 'rapids' THEN org_type END AS org_subtype_rapids,
  CASE WHEN data_source = 'tpr' THEN org_type END AS org_subtype_tpr
FROM unique_org
ORDER BY dedupe_id, data_source
;

In [None]:
%%bigquery
-- Find orgs with the same dedupe_id
SELECT *
FROM deduped_v2.unique_orgs_excluded
WHERE dedupe_id IN (
  SELECT dedupe_id
  FROM deduped_v2.unique_orgs_excluded
  GROUP BY dedupe_id
  HAVING COUNT(*) > 1
)
ORDER BY dedupe_id, data_source

In [None]:
%%bigquery
-- Reduce duplicate orgs to a single dedupe_id
CREATE OR REPLACE TABLE deduped_v2.unique_orgs_excluded_deduped AS
SELECT dedupe_id,
  MAX(in_ipeds) AS in_ipeds,
  MAX(in_irs) AS in_irs,
  MAX(in_rapids) AS in_rapids,
  MAX(in_tpr) AS in_tpr,
  MAX(org_subtype_ipeds) AS org_subtype_ipeds,  --questionable. what if the dupes have different subtype? Data quality issue?
  MAX(org_subtype_irs) AS org_subtype_irs,
  MAX(org_subtype_rapids) AS org_subtype_rapids,
  MAX(org_subtype_tpr) AS org_subtype_tpr,
  MAX(ipeds_unique_id) AS ipeds_unique_id,
  MAX(irs_unique_id) AS irs_unique_id,
  MAX(rapids_unique_id) AS rapids_unique_id,
  MAX(tpr_unique_id) AS tpr_unique_id
FROM deduped_v2.unique_orgs_excluded
GROUP BY dedupe_id

Update `in_<data_source>` flags of the novel dataset using the excluded orgs

In [None]:
%%bigquery
CREATE OR REPLACE TABLE deduped_v2.orgs_with_similar_names
(
  jws_score INT64,
  lev_score INT64,
  org1_dedupe_id INT64,
  org2_dedupe_id INT64,
  org1_name STRING,
  org2_name STRING,
  org1_data_source STRING,
  org2_data_source STRING,
  chosen_org_type STRING,
  org_id STRING,
  ipeds_unique_id INT64,
  irs_unique_id STRING,
  rapids_unique_id INT64,
  tpr_unique_id INT64,
  org_address STRING,
  std_street STRING,
  std_city STRING,
  std_state STRING,
  std_zip STRING,
  geo_lat FLOAT64,
  geo_lon FLOAT64,
  org_type STRING,
  ein STRING,
  in_ipeds BOOL,
  in_dol BOOL,
  in_irs BOOL,
  in_tpr BOOL,
  org_subtype_ipeds STRING,
  org_subtype_dol STRING,
  org_subtype_irs STRING,
  org_subtype_tpr STRING,
  --num_data_sources INT64
);

Apply name similarity deduplication but this time instead of exact address match, just use the same city and higher name similarity threshold as the filter


In [None]:
%%bigquery
DECLARE max_dedupe_id INT64;
DECLARE i INT64 DEFAULT 0;
EXECUTE IMMEDIATE "SELECT MAX(dedupe_id) FROM novel_v2.unique_orgs" INTO max_dedupe_id;

SELECT max_dedupe_id;

TRUNCATE TABLE deduped_v2.orgs_with_similar_names;

WHILE i < max_dedupe_id DO
  INSERT INTO deduped_v2.orgs_with_similar_names
  WITH selected AS (
    SELECT dedupe_id, org_name, std_state, std_city, data_source
    FROM novel_v2.unique_orgs
    WHERE dedupe_id BETWEEN i AND i + 999
  ),
  calculations AS (
    SELECT reference.jaro_winkler_similarity(t1.org_name, t2.org_name) AS jws_score,
      -- reference.levenshtein_distance(t1.org_name, t2.org_name) AS lev_distance,
      CAST(100*ROUND(1 - reference.levenshtein_distance(t1.org_name, t2.org_name) / LENGTH(t1.org_name), 2) AS INT64) AS lev_score,
      t1.dedupe_id AS org1_dedupe_id,
      t2.dedupe_id AS org2_dedupe_id,
      t1.org_name AS org1_name,
      t2.org_name AS org2_name,
      t1.data_source AS org1_data_source,
      t2.data_source AS org2_data_source,
      t2.* EXCEPT(dedupe_id, org_name, data_source, data_source_priority)
    FROM selected AS t1
    CROSS JOIN novel_v2.unique_orgs AS t2
    WHERE t1.dedupe_id > t2.dedupe_id AND t1.std_state = t2.std_state AND t1.std_city = t2.std_city
  )
  SELECT *
  FROM calculations
  -- Thresholds are higher than the exact address match version
  WHERE jws_score >= 80 AND lev_score >= 90
  ;
  SET i = i + 1000;
END WHILE;


Find the duplicate org clusters and send for review

In [None]:
query = "SELECT DISTINCT dedupe_id FROM novel_v2.unique_orgs"
query_job = client.query(query)

In [None]:
deduped_orgs = query_job.to_dataframe()

In [None]:
nodes = set(deduped_orgs["dedupe_id"])

In [None]:
query = "SELECT org1_dedupe_id, org2_dedupe_id FROM deduped_v2.orgs_with_similar_names"
query_job = client.query(query)
similar_orgs = query_job.to_dataframe()
edges = set(zip(similar_orgs["org1_dedupe_id"], similar_orgs["org2_dedupe_id"]))

In [None]:
graph = nx.Graph()

In [None]:
graph.add_nodes_from(nodes)
graph.add_edges_from(edges)

In [None]:
components = {i:n for i, n in enumerate(nx.connected_components(graph))}

In [None]:
matchset = [(k, w) for k, v in components.items() for w in v]

In [None]:
match_df = pd.DataFrame(matchset, columns=["match_id", "dedupe_id"])

In [None]:
match_df.to_gbq("deduped_v2.orgs_with_similar_names_match_clusters", project_id=project, if_exists='replace')

In [None]:
%%bigquery
CREATE OR REPLACE TABLE deduped_v2.orgs_with_similar_names_to_check AS
WITH clusters AS (
  SELECT *,
    COUNT(dedupe_id) OVER (PARTITION BY match_id) AS row_count
  FROM deduped_v2.orgs_with_similar_names_match_clusters
),
relevant AS (
  SELECT match_id, dedupe_id
  FROM clusters
  WHERE row_count > 1
)
SELECT match_id, n.*
FROM relevant AS r
INNER JOIN novel_v2.unique_orgs AS n ON r.dedupe_id = n.dedupe_id
ORDER BY match_id, dedupe_id
;

# Dedupe and QA Steps

In [None]:
%%bigquery
CREATE SCHEMA IF NOT EXISTS qa_v2 OPTIONS(location='us-central1');

Deduplicate by prioritizing IPEDS above the other data sources

In [None]:
%%bigquery
CREATE OR REPLACE TABLE qa_v2.novel_deleted_unique_orgs_bc_similar_name AS
WITH preferred_source AS (
  SELECT *,
    CASE
      WHEN data_source = 'ipeds'
      THEN 1
      ELSE 2
    END AS preferred
  FROM deduped_v2.orgs_with_similar_names_to_check
),
ranked AS (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY match_id ORDER BY preferred) AS source_rank
  FROM preferred_source
),

exceptions AS (
  SELECT COUNT(data_source) OVER(PARTITION BY match_id) AS counts, *
  FROM ranked
  WHERE source_rank = 1 AND data_source = 'ipeds'
  QUALIFY counts > 1
)
SELECT
  CASE
    WHEN e.dedupe_id IS NULL AND r.source_rank <> 1
    THEN 'delete'
    ELSE 'keep'
  END AS decision,
  CASE
    WHEN r.data_source = 'ipeds' THEN 'data source is ipeds'
    WHEN r.source_rank = 1 THEN 'survivor of the match group'
    ELSE 'marked as duplicate'
  END AS reason,
  r.*
FROM ranked AS r
LEFT JOIN exceptions AS e ON r.match_id = e.match_id AND r.dedupe_id = e.dedupe_id
ORDER BY match_id, dedupe_id
;

In [None]:
%%bigquery
UPDATE novel_v2.unique_orgs AS u
SET
  in_ipeds = u.in_ipeds OR d.in_ipeds,
  in_irs = u.in_irs OR d.in_irs,
  in_rapids = u.in_rapids OR d.in_rapids,
  in_tpr = u.in_tpr OR d.in_tpr,
  org_subtype_ipeds = COALESCE(u.org_subtype_ipeds, d.org_subtype_ipeds),
  org_subtype_irs = COALESCE(u.org_subtype_irs, d.org_subtype_irs),
  org_subtype_rapids = COALESCE(u.org_subtype_rapids, d.org_subtype_rapids),
  org_subtype_tpr = COALESCE(u.org_subtype_tpr, d.org_subtype_tpr),
  ipeds_unique_id = COALESCE(u.ipeds_unique_id, d.ipeds_unique_id),
  irs_unique_id = COALESCE(u.irs_unique_id, d.irs_unique_id),
  rapids_unique_id = COALESCE(u.rapids_unique_id, d.rapids_unique_id),
  tpr_unique_id = COALESCE(u.tpr_unique_id, d.tpr_unique_id)
FROM qa_v2.novel_deleted_unique_orgs_bc_similar_name AS d
WHERE u.dedupe_id = d.dedupe_id

Remove IPEDS orgs in unwanted categories

In [None]:
%%bigquery
DELETE FROM novel_v2.unique_orgs
WHERE data_source = 'ipeds' AND org_type_ipeds IN ('-2', '-1', '1', '2', '5')
;

Remove orgs with blank state

In [None]:
%%bigquery
DELETE FROM novel_v2.unique_orgs
WHERE std_state = '';


In [None]:
%%bigquery
CREATE OR REPLACE TABLE novel_v2.unique_orgs AS
SELECT *, CAST(in_ipeds AS INT64) + CAST(in_rapids AS INT64) + CAST(in_irs AS INT64) + CAST(in_tpr AS INT64) AS num_data_sources
FROM novel_v2.unique_orgs
;

In [None]:
%%bigquery
CREATE OR REPLACE TABLE qa_v2.novel_deleted_orgs AS
--qa_t2
SELECT dedupe_id, 'qa_t2' AS delete_reason
FROM novel_v2.unique_orgs
WHERE REGEXP_CONTAINS(LOWER(org_name), "(high|charter|public|adult|grammar|grade) school|school district")

UNION ALL

--qa_t3
SELECT dedupe_id, 'qa_t3' AS delete_reason
FROM novel_v2.unique_orgs
WHERE REGEXP_CONTAINS(LOWER(org_name), " foundation| fund| trust|foundation |fundacion ")
  AND data_source <> 'rapids'

UNION ALL

--qa_t4
SELECT dedupe_id, 'qa_t4' AS delete_reason
FROM novel_v2.unique_orgs
WHERE REGEXP_CONTAINS(LOWER(org_name), "career center")

UNION ALL

--qa_t5
SELECT dedupe_id, 'qa_t5' AS delete_reason
FROM novel_v2.unique_orgs
WHERE data_source = 'irs'
  AND org_type = 'B60'
  AND num_data_sources = 1

UNION ALL

--qa_t6
SELECT dedupe_id, 'qa_t6' AS delete_reason
FROM novel_v2.unique_orgs
WHERE data_source = 'tpr'
  AND org_type = 'Higher Ed: Baccalaureate or Higher'
;

In [None]:
%%bigquery
DELETE
FROM novel_v2.unique_orgs
WHERE dedupe_id IN (
  SELECT DISTINCT dedupe_id
  FROM qa_v2.novel_deleted_orgs
)
;

In [None]:
%%bigquery
DELETE
FROM novel_v2.unique_orgs
WHERE dedupe_id IN (
  SELECT DISTINCT dedupe_id
  FROM qa_v2.novel_deleted_unique_orgs_bc_similar_name
  WHERE decision = 'delete'
)

In [None]:
%%bigquery
SELECT data_source, COUNT(*) AS org_count, COUNT(DISTINCT org_id) AS unique_org_count
FROM novel_v2.unique_orgs
GROUP BY data_source
ORDER BY data_source

In [None]:
%%bigquery
-- Task 2.1: Rename variable org_type for org_subtype_in_data_source
ALTER TABLE novel_v2.unique_orgs RENAME COLUMN org_type TO org_subtype_in_data_source;

In [None]:
%%bigquery
CREATE OR REPLACE TABLE novel_v2.unique_orgs AS
SELECT *,
  CASE WHEN in_ipeds THEN 'higher ed institution' END AS org_type_ipeds,
  CASE WHEN in_irs THEN 'non profit' END AS org_type_irs,
  CASE WHEN in_rapids THEN 'federally registered apprenticeship' END AS org_type_rapids,
  CASE WHEN in_tpr THEN 'WIOA funds recipient' END AS org_type_tpr
FROM novel_v2.unique_orgs
;

In [None]:
%%bigquery
ALTER TABLE novel_v2.unique_orgs ADD COLUMN last_updated DATE;

In [None]:
%%bigquery
UPDATE novel_v2.unique_orgs
SET last_updated = '2025-06-02'
WHERE TRUE
;

In [None]:
%%bigquery
UPDATE novel_v2.unique_orgs
SET org_subtype_irs = "Vocational, Technical Schools"
WHERE org_subtype_irs LIKE "B30%";

UPDATE novel_v2.unique_orgs
SET org_subtype_irs = "Community or Junior Colleges"
WHERE org_subtype_irs LIKE "B41%";

UPDATE novel_v2.unique_orgs
SET org_subtype_irs = "Adult, Continuing Education"
WHERE org_subtype_irs LIKE "B60%";

UPDATE novel_v2.unique_orgs
SET org_subtype_irs = "Employment Procurement Assistance, Job Training"
WHERE org_subtype_irs LIKE "J20%";

UPDATE novel_v2.unique_orgs
SET org_subtype_irs = "Vocational Counseling, Guidance and Testing"
WHERE org_subtype_irs LIKE "J21%";

UPDATE novel_v2.unique_orgs
SET org_subtype_irs = "Vocational Training"
WHERE org_subtype_irs LIKE "J22%";

UPDATE novel_v2.unique_orgs
SET org_subtype_irs = "Vocational Rehabilitation"
WHERE org_subtype_irs LIKE "J30%";

UPDATE novel_v2.unique_orgs
SET org_subtype_irs = "Goodwill Industries"
WHERE org_subtype_irs LIKE "J32%";

UPDATE novel_v2.unique_orgs
SET org_subtype_irs = "Sheltered Remunerative Employment, Work Activity Center N.E.C."
WHERE org_subtype_irs LIKE "J33%";

In [None]:
%%bigquery
UPDATE novel_v2.unique_orgs
SET std_zip = LPAD(std_zip, 5, '0')
WHERE LENGTH(std_zip) < 5
;

In [None]:
%%bigquery
ALTER TABLE novel_v2.unique_orgs DROP COLUMN data_source_priority;

These next steps are for novel dataset QA. The goal is to make sure there are no dupes left in the final novel dataset. It also lists orgs that were dropped during the deduplication steps to confirm that they were dropped for a good reason.

In [None]:
%%bigquery
CREATE OR REPLACE TABLE deduped_v2.potential_dupes_from_novel_dataset AS
WITH address_match AS (
  SELECT DISTINCT

    t1.org_id AS t1_org_id,
    t2.org_id AS t2_org_id,
    t1.org_name AS t1_org_name,
    t2.org_name AS t2_org_name,
    reference.jaro_winkler_similarity(t1.org_name, t2.org_name) AS jw_similarity,
    -- reference.levenshtein_distance(t1.org_name, t2.org_name) AS lev_distance,
    CAST(100*ROUND(1 - reference.levenshtein_distance(t1.org_name, t2.org_name) / LENGTH(t1.org_name), 2) AS INT64) AS lev_similarity,
    t1.std_state AS t1_std_state,
    t2.std_state AS t2_std_state,
    t1.std_city AS t1_std_city,
    t2.std_city AS t2_std_city,
    t1.data_source AS t1_data_source,
    t2.data_source AS t2_data_source
  FROM novel_v2.unique_orgs AS t1
  INNER JOIN novel_v2.unique_orgs AS t2
    ON t1.std_state = t2.std_state
    AND t1.std_city = t2.std_city
    AND t1.org_id > t2.org_id
    AND LENGTH(t1.org_name) >= 10
    AND LENGTH(t2.org_name) >= 10
)
SELECT *
FROM address_match
WHERE jw_similarity >= 50 AND lev_similarity >= 60
ORDER BY lev_similarity DESC, jw_similarity DESC

In [None]:
%%bigquery
SELECT *
FROM qa_v2.novel_deleted_unique_orgs_bc_similar_name
-- WHERE decision = 'delete'
ORDER BY match_id

### The outcome of the review was that we should remove these duplicates by
1. deciding on a similarity threshold
2. for any two orgs matching above a specific threshold delete the org from the lower priority data source

Org priority:
IPEDS > IRS > RAPIDS > TPR

### Thresholds
jw_similarity >= 70
lev_similarity >= 85


In [None]:
%%bigquery
CREATE OR REPLACE TABLE deduped_v2.orgs_to_delete_step08 AS
WITH prioritized AS (SELECT
  CASE t1_data_source
  WHEN 'ipeds' THEN 1
  WHEN 'irs' THEN 2
  WHEN 'rapids' THEN 3
  WHEN 'tpr' THEN 4
  END AS t1_data_source_priority, --lower is better
    CASE t2_data_source
  WHEN 'ipeds' THEN 1
  WHEN 'irs' THEN 2
  WHEN 'rapids' THEN 3
  WHEN 'tpr' THEN 4
  END AS t2_data_source_priority,
  *
FROM deduped_v2.potential_dupes_from_novel_dataset
-- to increase dedupe confidence increase the threshold
WHERE jw_similarity >= 70 AND lev_similarity >= 85
)
SELECT
  CASE
    WHEN t1_data_source_priority > t2_data_source_priority
    THEN t1_org_id
    ELSE t2_org_id -- includes "same data source" scenarios
  END AS org_id_to_delete,
  CASE
    WHEN t1_data_source_priority <= t2_data_source_priority
    THEN t1_org_id
    ELSE t2_org_id
  END AS survivor_org_id,
  *
FROM prioritized
ORDER BY lev_similarity DESC


In [None]:
# Fetch the orgs to be deleted
sql = "SELECT * FROM deduped_v2.orgs_to_delete_step08"
df_to_delete = pandas_gbq.read_gbq(sql, project_id=project, dialect="standard")

# Fetch the novel_v2 unique orgs data
sql = "SELECT * FROM novel_v2.unique_orgs"
df_novel = pandas_gbq.read_gbq(sql, project_id=project, dialect="standard")


# Update the in_ipeds, in_irs, etc. fields in df_novel
for index, row in df_to_delete.iterrows():
    org_id_to_delete = row['org_id_to_delete']
    survivor_org_id = row['survivor_org_id']

    # Find the corresponding rows in df_novel
    delete_row = df_novel[df_novel['org_id'] == org_id_to_delete]
    survivor_row = df_novel[df_novel['org_id'] == survivor_org_id]

    if not delete_row.empty and not survivor_row.empty:
        # Update boolean columns. Use the 'or' operator to combine values.
        bool_cols = ['in_ipeds', 'in_irs', 'in_rapids', 'in_tpr']
        for col in bool_cols:
            df_novel.loc[df_novel['org_id'] == survivor_org_id, col] = survivor_row[col].iloc[0] or delete_row[col].iloc[0]

        # Update subtype columns. Prioritize the value from the deleted row if it exists.
        subtype_cols = ['org_subtype_ipeds', 'org_subtype_irs', 'org_subtype_rapids', 'org_subtype_tpr']
        for col in subtype_cols:
            if not pd.isnull(delete_row[col].iloc[0]):
                df_novel.loc[df_novel['org_id'] == survivor_org_id, col] = delete_row[col].iloc[0]
            elif pd.isnull(df_novel.loc[df_novel['org_id'] == survivor_org_id, col].iloc[0]) and not pd.isnull(survivor_row[col].iloc[0]):
                df_novel.loc[df_novel['org_id'] == survivor_org_id, col] = survivor_row[col].iloc[0]

        # Update unique_id columns
        unique_id_cols = ['ipeds_unique_id', 'irs_unique_id', 'rapids_unique_id', 'tpr_unique_id']
        for col in unique_id_cols:
            if not pd.isnull(delete_row[col].iloc[0]):
                df_novel.loc[df_novel['org_id'] == survivor_org_id, col] = delete_row[col].iloc[0]
            elif pd.isnull(df_novel.loc[df_novel['org_id'] == survivor_org_id, col].iloc[0]) and not pd.isnull(survivor_row[col].iloc[0]):
                df_novel.loc[df_novel['org_id'] == survivor_org_id, col] = survivor_row[col].iloc[0]


In [None]:
df_novel.to_gbq("novel_v2.unique_orgs", project_id=project, if_exists='replace')

In [None]:
%%bigquery
DELETE FROM novel_v2.unique_orgs
WHERE org_id IN (
  SELECT DISTINCT org_id_to_delete
  FROM deduped_v2.orgs_to_delete_step08
  )

In [None]:
%%bigquery
CREATE OR REPLACE TABLE novel_v2.unique_orgs AS
SELECT *,
  CASE WHEN org_subtype_ipeds = "3" THEN TRUE ELSE FALSE END AS org_subtype_highest_degree_bachelors_plus,
  CASE WHEN org_subtype_ipeds = "4" THEN TRUE ELSE FALSE END AS org_subtype_highest_degree_associates,
  CASE WHEN org_subtype_ipeds = "6" THEN TRUE ELSE FALSE END AS org_subtype_highest_degree_certificate,
  CASE WHEN (
      (org_subtype_ipeds IS NULL AND org_subtype_irs LIKE "B41%") OR
      (org_subtype_ipeds IS NULL AND org_subtype_tpr LIKE "Higher Ed%") OR
      (org_subtype_ipeds IS NULL AND org_subtype_rapids = "Community College/University"))
    THEN TRUE ELSE FALSE
  END AS org_subtype_other_higher_ed_institution,
  CASE WHEN org_subtype_tpr = "Private For-Profit" THEN TRUE ELSE FALSE END AS org_subtype_private_for_profit,
  CASE WHEN org_subtype_rapids = "Business Association" THEN TRUE ELSE FALSE END AS org_subtype_apprenticeship_business_association,
  CASE WHEN org_subtype_rapids = "Employer" THEN TRUE ELSE FALSE END AS org_subtype_apprenticeship_employer,
  CASE WHEN org_subtype_rapids = "Intermediary" THEN TRUE ELSE FALSE END AS org_subtype_apprenticeship_intermediary,
  CASE
    WHEN org_subtype_rapids IN (
      "Federal Agency",
      "City/County Agency",
      "State Agency")
    THEN TRUE ELSE FALSE
  END AS org_subtype_apprenticeship_gov,
  CASE WHEN org_subtype_rapids = "Workforce Investment Board" THEN TRUE ELSE FALSE END AS org_subtype_apprenticeship_wib,
  CASE WHEN org_subtype_rapids = 'Union/Labor'
    THEN TRUE
    ELSE FALSE
  END AS org_subtype_apprenticeship_union_labor,
  CASE WHEN org_subtype_rapids = "Foundation" THEN TRUE ELSE FALSE END AS org_subtype_apprenticeship_foundation,
  CASE
    WHEN org_subtype_rapids IN ("Other", "None", "Unknown")
    OR org_subtype_tpr = "National Apprenticeship"
    THEN TRUE ELSE FALSE
  END AS org_subtype_apprenticeship_other,
  CASE
    WHEN org_subtype_irs <> 'Community or Junior Colleges'
      OR org_subtype_tpr = "Private Non-Profit"
      OR org_subtype_rapids = "Community Based Organization"
    THEN TRUE ELSE FALSE
  END AS org_subtype_job_training_non_profits
FROM novel_v2.unique_orgs
;

In [None]:
%%bigquery
UPDATE novel_v2.unique_orgs
SET org_subtype_ipeds = "Degree-granting, not primarily baccalaureate or above"
WHERE org_subtype_ipeds = "3"
;

UPDATE novel_v2.unique_orgs
SET org_subtype_ipeds = "Degree-granting, Associate’s and certificates"
WHERE org_subtype_ipeds = "4"
;

UPDATE novel_v2.unique_orgs
SET org_subtype_ipeds = "Nondegree-granting, sub-baccalaureate"
WHERE org_subtype_ipeds = "6"
;

In [None]:
%%bigquery
UPDATE novel_v2.unique_orgs
SET in_ipeds = TRUE
WHERE org_id IN (SELECT DISTINCT org_id FROM deduped_v2.organizations_combined WHERE data_source = 'ipeds');

UPDATE novel_v2.unique_orgs
SET in_rapids = TRUE
WHERE org_id IN (SELECT DISTINCT org_id FROM deduped_v2.organizations_combined WHERE data_source = 'rapids');

UPDATE novel_v2.unique_orgs
SET in_irs = TRUE
WHERE org_id IN (SELECT DISTINCT org_id FROM deduped_v2.organizations_combined WHERE data_source = 'irs');

UPDATE novel_v2.unique_orgs
SET in_tpr = TRUE
WHERE org_id IN (SELECT DISTINCT org_id FROM deduped_v2.organizations_combined WHERE data_source = 'tpr');

In [None]:
%%bigquery
UPDATE novel_v2.unique_orgs
SET num_data_sources = CAST(in_ipeds AS INT64) + CAST(in_rapids AS INT64) + CAST(in_irs AS INT64) + CAST(in_tpr AS INT64)
WHERE TRUE
;

In this part, we reuse the output of EIN matching to increase the number of orgs with EIN code in v2

In [None]:
bucket_name = "hks-notebook-files"
credentials_file = "hks-prod-mwc-06a2-d023d43b35e0.json"
scopes = [
    "https://www.googleapis.com/auth/drive",
    "https://www.googleapis.com/auth/bigquery",
]

In [None]:
ein_spreadsheet_id = "1XQQCarxY7sbcjR0JF2O1cF2pC9fXBghjyVBKHrDve-w"
f5500_sheet_name = "f5500_2022"
irs_sheet_name = "irs_990_2022"
tpr_sheet_name = "tpr_domains_deduped_no_original_urls"

In [None]:
bigquery_client = bigquery.Client(location=location, project=project)
storage_client = storage.Client(project=project)
bucket = storage_client.get_bucket(bucket_name)
credentials_blob = bucket.blob(credentials_file)

In [None]:
credentials_text = credentials_blob.download_as_text()
credentials_dict = json.loads(credentials_text)
credentials = service_account.Credentials.from_service_account_info(credentials_dict, scopes=scopes)

In [None]:
gc = gspread.authorize(credentials)

In [None]:
f5500_sheet = gc.open_by_key(ein_spreadsheet_id).worksheet(f5500_sheet_name)
irs_sheet = gc.open_by_key(ein_spreadsheet_id).worksheet(irs_sheet_name)
tpr_sheet = gc.open_by_key(ein_spreadsheet_id).worksheet(tpr_sheet_name)

In [None]:
f5500_df = pd.DataFrame(f5500_sheet.get_all_records())
irs_df = pd.DataFrame(irs_sheet.get_all_records())
tpr_df = pd.DataFrame(tpr_sheet.get_all_records())

In [None]:
f5500_ein = duckdb.sql("SELECT org_id, sf_spons_ein AS ein, 'f5500' AS source FROM f5500_df WHERE invalid = 0")
irs_ein = duckdb.sql("SELECT org_id, ein, 'irs' AS source, FROM irs_df WHERE invalid = 0")
tpr_ein = duckdb.sql("SELECT novel_org_id AS org_id, irs_ein AS ein, 'tpr' AS source FROM tpr_df WHERE invalid = 0")

In [None]:
union_query = """
SELECT * FROM f5500_ein
UNION ALL
SELECT * FROM irs_ein
UNION ALL
SELECT * FROM tpr_ein
"""

In [None]:
combined_df = duckdb.sql(union_query)

In [None]:
final_df = combined_df.to_df()

In [None]:
final_df.groupby("source").count()

In [None]:
pandas_gbq.to_gbq(
    final_df,
    f"{project}.analysis.ein_matching_combined",
    project_id=project,
    if_exists="replace",
    credentials=credentials)

In [None]:
%%bigquery
SELECT e.*, o.org_name, o.ein, data_source
FROM analysis.ein_matching_combined AS e
INNER JOIN novel_v2.unique_orgs AS o ON e.org_id = o.org_id
WHERE CAST(e.ein AS STRING) <> COALESCE(o.ein, '')
ORDER BY e.org_id

In [None]:
%%bigquery
SELECT * FROM novel_v2.unique_orgs WHERE org_id IN (
SELECT DISTINCT org_id
FROM analysis.ein_matching_combined
QUALIFY ROW_NUMBER() OVER (PARTITION BY org_id ORDER BY source) = 1)

In [None]:
%%bigquery
UPDATE novel_v2.unique_orgs AS u
SET u.ein = CAST(e.ein AS STRING)
FROM (
 SELECT *
 FROM analysis.ein_matching_combined
QUALIFY ROW_NUMBER() OVER (PARTITION BY org_id ORDER BY source) = 1) AS e
WHERE u.org_id = e.org_id

In this part, we use Bag of Words (BoW) approach to increase the number of orgs from IRS

In [None]:
bf.options.bigquery.location = location
bf.options.bigquery.project = project
bf.options.display.progress_bar = "notebook"

In [None]:
# The version parameter used to create different versions of the data in BigQuery tables
version = "20241015"

# Bag of Words spreadsheet details
bow_spreadsheet_id = "1diQYb7VWqykwgfrmbJsg1Deb8RCMareR1X_Xh9nJ6p8"
bow_sheet_name = "v10"

In [None]:
bigquery_client = bigquery.Client(location=location)

In [None]:
storage_client = storage.Client(project=project)
bucket = storage_client.get_bucket(bucket_name)
credentials_blob = bucket.blob(credentials_file)

In [None]:
credentials_text = credentials_blob.download_as_text()
credentials_dict = json.loads(credentials_text)
credentials = service_account.Credentials.from_service_account_info(credentials_dict, scopes=scopes)

In [None]:
gc = gspread.authorize(credentials)

In [None]:
bag_of_words_sheet = gc.open_by_key(bow_spreadsheet_id).worksheet(bow_sheet_name)

In [None]:
bow_df = pd.DataFrame(bag_of_words_sheet.get_all_records())

In [None]:
bag_a_list = bow_df[bow_df["Bag"] == "A"]["Word"].to_list()
bag_b_list = bow_df[bow_df["Bag"] == "B"]["Word"].to_list()
bag_c_list = bow_df[bow_df["Bag"] == "C"]["Word"].to_list()
bag_d_list = bow_df[bow_df["Bag"] == "D"]["Word"].to_list()
bag_e_list = bow_df[bow_df["Bag"] == "E"]["Word"].to_list()
bag_f_list = bow_df[bow_df["Bag"] == "F"]["Word"].to_list()

In [None]:
# Load English language model
nlp = spacy.load("en_core_web_sm")

In [None]:
def find_lemma(text: str) -> str:
    lemma = ""
    if text is None:
        return lemma
    doc = nlp(text)
    for token in doc:
        lemma += token.lemma_ + " "
    return lemma.strip()

In [None]:
bag_a_lemmatized = sorted(list(set([find_lemma(x.lower())  for x in bag_a_list])))
bag_b_lemmatized = sorted(list(set([find_lemma(x.lower())  for x in bag_b_list])))
bag_c_lemmatized = sorted(list(set([find_lemma(x.lower())  for x in bag_c_list])))
bag_d_lemmatized = sorted(list(set([find_lemma(x.lower())  for x in bag_d_list])))
bag_e_lemmatized = sorted(list(set([find_lemma(x.lower())  for x in bag_e_list])))
bag_f_lemmatized = sorted(list(set([find_lemma(x.lower())  for x in bag_f_list])))

In [None]:
query = """
SELECT
  row_id,
  lemmatized_mission_description,
  lemmatized_primary_program_description,
  lemmatized_business_name,
FROM standardized_v2.irs_990_2023_bow
;
"""

In [None]:
df = bf.read_gbq_query(query)

In [None]:
orgs = df.to_pandas()

In [None]:
def find_bag_a_lemmas(text):
  found_lemmas = []
  for lemma in bag_a_lemmatized:
    if lemma in text:
      found_lemmas.append(lemma)

  found_lemmas_str = ",".join(found_lemmas)
  return found_lemmas_str

def find_bag_b_lemmas(text):
  found_lemmas = []
  for lemma in bag_b_lemmatized:
    if lemma in text:
      found_lemmas.append(lemma)

  found_lemmas_str = ",".join(found_lemmas)
  return found_lemmas_str

def find_bag_c_lemmas(text):
  found_lemmas = []
  for lemma in bag_c_lemmatized:
    if lemma in text:
      found_lemmas.append(lemma)

  found_lemmas_str = ",".join(found_lemmas)
  return found_lemmas_str

In [None]:
def find_bag_d_lemmas(text):
  found_lemmas = []
  for lemma in bag_d_lemmatized:
    if lemma in text:
      found_lemmas.append(lemma)

  found_lemmas_str = ",".join(found_lemmas)
  return found_lemmas_str

def find_bag_e_lemmas(text):
  found_lemmas = []
  for lemma in bag_e_lemmatized:
    if lemma in text:
      found_lemmas.append(lemma)

  found_lemmas_str = ",".join(found_lemmas)
  return found_lemmas_str

def find_bag_f_lemmas(text):
  found_lemmas = []
  for lemma in bag_f_lemmatized:
    if lemma in text:
      found_lemmas.append(lemma)

  found_lemmas_str = ",".join(found_lemmas)
  return found_lemmas_str

In [None]:
orgs["lemmatized_mission_description"] = orgs["lemmatized_mission_description"].fillna("").str.lower()
orgs["lemmatized_primary_program_description"] = orgs["lemmatized_primary_program_description"].fillna("").str.lower()
orgs["lemmatized_business_name"] = orgs["lemmatized_business_name"].fillna("").str.lower()

In [None]:
orgs["bag_a_lemmas_in_mission"] = orgs["lemmatized_mission_description"].apply(find_bag_a_lemmas)
orgs["bag_a_lemmas_in_primary_program"] = orgs["lemmatized_primary_program_description"].apply(find_bag_a_lemmas)

orgs["bag_b_lemmas_in_mission"] = orgs["lemmatized_mission_description"].apply(find_bag_b_lemmas)
orgs["bag_b_lemmas_in_primary_program"] = orgs["lemmatized_primary_program_description"].apply(find_bag_b_lemmas)

orgs["bag_c_lemmas_in_mission"] = orgs["lemmatized_mission_description"].apply(find_bag_c_lemmas)
orgs["bag_c_lemmas_in_primary_program"] = orgs["lemmatized_primary_program_description"].apply(find_bag_c_lemmas)

# New rules
orgs["bag_d_lemmas_in_business_name"] = orgs["lemmatized_business_name"].apply(find_bag_d_lemmas)
orgs["bag_e_lemmas_in_mission"] = orgs["lemmatized_mission_description"].apply(find_bag_e_lemmas)
orgs["bag_e_lemmas_in_primary_program"] = orgs["lemmatized_primary_program_description"].apply(find_bag_e_lemmas)
orgs["bag_f_lemmas_in_mission"] = orgs["lemmatized_mission_description"].apply(find_bag_f_lemmas)
orgs["bag_f_lemmas_in_primary_program"] = orgs["lemmatized_primary_program_description"].apply(find_bag_f_lemmas)
orgs["bag_f_lemmas_in_business_name"] = orgs["lemmatized_business_name"].apply(find_bag_f_lemmas)

In [None]:
file_name = f"irs_990_2023_bow_{version}.csv"

In [None]:
orgs.to_csv(file_name, index=False)

In [None]:
almanac_bucket = storage_client.get_bucket("hks-almanac-storage-standard")

irs_blob = almanac_bucket.blob(f"v2/irs/irs-990-extract-2023-bow-{version}.csv")
irs_blob.upload_from_filename(file_name)

In [None]:
job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.CSV,
    skip_leading_rows=1,
    autodetect=True,
    write_disposition="WRITE_TRUNCATE",
)
uri = f"gs://hks-almanac-storage-standard/v2/irs/irs-990-extract-2023-bow-{version}.csv"
table_id = f"standardized_v2.irs_990_2023_bow_{version}"

load_job = bigquery_client.load_table_from_uri(
    uri, table_id, job_config=job_config
)
load_job.result()

destination_table = bigquery_client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))

In [None]:
%%bigquery
CREATE OR REPLACE TABLE standardized_v2.irs_990_2023_bow_20241015 AS
SELECT *,
  ARRAY_LENGTH(SPLIT(bag_a_lemmas_in_mission)) AS bag_a_mission_matched_word_count,
  ARRAY_LENGTH(SPLIT(bag_a_lemmas_in_primary_program)) AS bag_a_primary_program_matched_word_count,
  ARRAY_LENGTH(SPLIT(bag_b_lemmas_in_mission)) AS bag_b_mission_matched_word_count,
  ARRAY_LENGTH(SPLIT(bag_b_lemmas_in_primary_program)) AS bag_b_primary_program_matched_word_count,
  ARRAY_LENGTH(SPLIT(bag_c_lemmas_in_mission)) AS bag_c_mission_matched_word_count,
  ARRAY_LENGTH(SPLIT(bag_c_lemmas_in_primary_program)) AS bag_c_primary_program_matched_word_count,
  ARRAY_LENGTH(SPLIT(bag_d_lemmas_in_business_name)) AS bag_d_business_name_matched_word_count,
  ARRAY_LENGTH(SPLIT(bag_e_lemmas_in_mission)) AS bag_e_mission_matched_word_count,
  ARRAY_LENGTH(SPLIT(bag_e_lemmas_in_primary_program)) AS bag_e_primary_program_matched_word_count,
  ARRAY_LENGTH(SPLIT(bag_f_lemmas_in_mission)) AS bag_f_mission_matched_word_count,
  ARRAY_LENGTH(SPLIT(bag_f_lemmas_in_primary_program)) AS bag_f_primary_program_matched_word_count,
  ARRAY_LENGTH(SPLIT(bag_f_lemmas_in_business_name)) AS bag_f_business_name_matched_word_count,
FROM standardized_v2.irs_990_2023_bow_20241015
;

In [None]:
%%bigquery

CREATE OR REPLACE TABLE analysis.irs_990_2023_bow_20241015_all_data AS
WITH merged_data AS (
  SELECT
    v1.* EXCEPT (
      mission_description_matched_lemmas,
      mission_description_matched_word_count,
      primary_program_description_matched_lemmas,
      primary_program_description_matched_word_count),
    v2.* EXCEPT (
      row_id,
      lemmatized_mission_description,
      lemmatized_primary_program_description,
      lemmatized_business_name)
  FROM standardized_v2.irs_990_2023_bow AS v1
  INNER JOIN standardized_v2.irs_990_2023_bow_20241015 AS v2
    ON v1.row_id = v2.row_id
),
ntee_codes AS (
  SELECT ein, NTEE_CD
  FROM original_v2.irs_bmf
  QUALIFY ROW_NUMBER() OVER(PARTITION BY ein ORDER BY TAX_PERIOD DESC) = 1
),

all_merged AS (
  SELECT irs.*, NTEE_CD
  FROM merged_data AS irs
  LEFT JOIN ntee_codes AS ntee ON CAST(irs.ein AS STRING) = ntee.ein
)
SELECT *
FROM all_merged
WHERE
    -- Rule 1: Select organizations with at least one word from both bag_a and bag_b in either mission or primary program
    ((bag_a_mission_matched_word_count > 0 OR bag_a_primary_program_matched_word_count > 0)
    AND
    (bag_b_mission_matched_word_count > 0 OR bag_b_primary_program_matched_word_count > 0))

    OR

    -- Rule 2: Select organizations with bag_c words in either mission or primary program
    (bag_c_mission_matched_word_count > 0 OR bag_c_primary_program_matched_word_count > 0)
;


In [None]:
# @title Full dataset
df_full = bf.read_gbq("analysis.irs_990_2023_bow_20241015_all_data", use_cache=False).to_pandas()

In [None]:
# @title Lemma counts
lemma_counts_query = """
WITH unnested_lemmas AS (
  -- Bag C: Mission
  SELECT row_id, lemmas, "bag_c_mission" AS bag
  FROM analysis.irs_990_2023_bow_20241015_all_data
  CROSS JOIN UNNEST(SPLIT(bag_c_lemmas_in_mission)) AS lemmas

  UNION ALL

  -- Bag C: Primary Program
  SELECT row_id, lemmas, "bag_c_primary_program" AS bag
  FROM analysis.irs_990_2023_bow_20241015_all_data
  CROSS JOIN UNNEST(SPLIT(bag_c_lemmas_in_primary_program)) AS lemmas

  UNION ALL

  -- Bag A: Mission
  SELECT row_id, lemmas, "bag_a_mission" AS bag
  FROM analysis.irs_990_2023_bow_20241015_all_data
  CROSS JOIN UNNEST(SPLIT(bag_a_lemmas_in_mission)) AS lemmas

  UNION ALL

  -- Bag A: Primary Program
  SELECT row_id, lemmas, "bag_a_primary_program" AS bag
  FROM analysis.irs_990_2023_bow_20241015_all_data
  CROSS JOIN UNNEST(SPLIT(bag_a_lemmas_in_primary_program)) AS lemmas

  UNION ALL

  -- Bag B: Mission
  SELECT row_id, lemmas, "bag_b_mission" AS bag
  FROM analysis.irs_990_2023_bow_20241015_all_data
  CROSS JOIN UNNEST(SPLIT(bag_b_lemmas_in_mission)) AS lemmas

  UNION ALL

  -- Bag B: Primary Program
  SELECT row_id, lemmas, "bag_b_primary_program" AS bag
  FROM analysis.irs_990_2023_bow_20241015_all_data
  CROSS JOIN UNNEST(SPLIT(bag_b_lemmas_in_primary_program)) AS lemmas
),
lemma_count_per_bag AS (
  SELECT bag, lemmas, COUNT(DISTINCT row_id) AS lemma_count
  FROM unnested_lemmas
  GROUP BY bag, lemmas
)
SELECT lemmas, SUM(lemma_count) AS total_count
FROM lemma_count_per_bag
GROUP BY lemmas
"""

In [None]:
df_lemma_counts = bf.read_gbq(lemma_counts_query).to_pandas().sort_values(by='total_count', ascending=False)

In [None]:
df_lemma_counts

In [None]:
plt.bar(df_lemma_counts['lemmas'], df_lemma_counts['total_count'])
plt.xticks(rotation=90)
for i, v in enumerate(df_lemma_counts['total_count']):
    plt.text(i, v, str(v), ha='center', va='bottom')
plt.title('BoW v5 Word Frequency')
plt.show

In [None]:
%%bigquery
CREATE OR REPLACE TABLE analysis.irs_990_2023_bow_20241015_filtered_final AS
SELECT *
FROM analysis.irs_990_2023_bow_20241015_all_data
WHERE
    -- Rule 2: Exclude organizations with bag_d words in business name
    IFNULL(bag_d_business_name_matched_word_count, 0) = 0

    -- Rule 3: Exclude organizations with bag_f words in mission or primary program
    -- unless they also have bag_e words in either mission or primary program
    AND (
        (IFNULL(bag_f_mission_matched_word_count, 0) = 0 AND IFNULL(bag_f_primary_program_matched_word_count, 0) = 0)
        OR
        ((IFNULL(bag_f_mission_matched_word_count, 0) > 0 OR IFNULL(bag_f_primary_program_matched_word_count, 0) > 0)
         AND
         (IFNULL(bag_e_mission_matched_word_count, 0) > 0 OR IFNULL(bag_e_primary_program_matched_word_count, 0) > 0))
    );

In [None]:
%%bigquery
SELECT data_source, COUNT(DISTINCT row_id) AS match_count
FROM analysis.irs_990_2023_bow_20241015_filtered_final AS b
LEFT JOIN novel_v2.unique_orgs AS u ON CAST(b.ein AS STRING) = u.ein
WHERE u.ein IS NOT NULL
GROUP BY data_source
ORDER BY match_count DESC

Transform and map the fields

In [None]:
%%bigquery
CREATE OR REPLACE TABLE standardized_v2.irs_bow_new_orgs AS
WITH new_orgs AS (
  SELECT b.*,
    UPPER(COALESCE(address, "")) AS std_street,
    UPPER(COALESCE(city, "")) AS std_city,
    UPPER(COALESCE(state, "")) AS std_state,
    UPPER(COALESCE(CAST(zip AS STRING), "")) AS std_zip
  FROM analysis.irs_990_2023_bow_20241015_filtered_final AS b
  LEFT JOIN novel_v2.unique_orgs AS u ON CAST(b.ein AS STRING) = u.ein
  WHERE u.ein IS NULL
),
extra_columns AS (
  SELECT
    30704 + ROW_NUMBER() OVER() AS dedupe_id,
    'Private non-profit' AS chosen_org_type,
    'irs xml' AS data_source,
    reference.standardize_org_name(business_name) AS org_name,
    CONCAT(std_street, ' ', std_city, ' ', std_state, ' ', std_zip) AS org_address,
    CAST(ein AS STRING) AS original_unique_id,
    NULL AS geo_lat,
    NULL AS geo_lon,
    NTEE_CD AS org_subtype_in_data_source,
    '2025-06-02' AS last_updated,
    *
  FROM new_orgs
),
address_matching AS (
  SELECT
    TO_HEX(MD5(UPPER(CONCAT(org_name, COALESCE(c.output_address, d.org_address, ''))))) AS org_id,
    CAST(NULL AS STRING) AS org_subtype_ipeds,
    CAST(NULL AS STRING) AS org_subtype_rapids,
    CASE
      WHEN org_subtype_in_data_source LIKE "B41%" THEN "Community or Junior Colleges"
      WHEN org_subtype_in_data_source LIKE "B60%" THEN "Adult, Continuing Education"
      WHEN org_subtype_in_data_source LIKE "J20%" THEN "Employment Procurement Assistance, Job Training"
      WHEN org_subtype_in_data_source LIKE "J21%" THEN "Vocational Counseling, Guidance and Testing"
      WHEN org_subtype_in_data_source LIKE "J22%" THEN "Vocational Training"
      WHEN org_subtype_in_data_source LIKE "J30%" THEN "Vocational Rehabilitation"
      WHEN org_subtype_in_data_source LIKE "J32%" THEN "Goodwill Industries"
      WHEN org_subtype_in_data_source LIKE "J33%" THEN "Sheltered Remunerative Employment, Work Activity Center N.E.C."
    END AS org_subtype_irs,
    CAST(NULL AS STRING) AS org_subtype_tpr,
    *
  FROM extra_columns AS d
  LEFT JOIN reference.census_gov_address_output AS c ON TO_HEX(MD5(REPLACE(d.org_address, '"', ''))) = c.address_id
),
flag_columns AS (
SELECT
  org_id IN (SELECT DISTINCT org_id FROM deduped_v2.organizations_combined WHERE data_source = 'ipeds') AS in_ipeds,
  org_id IN (SELECT DISTINCT org_id FROM deduped_v2.organizations_combined WHERE data_source = 'rapids') AS in_rapids,
  TRUE AS in_irs,
  org_id IN (SELECT DISTINCT org_id FROM deduped_v2.organizations_combined WHERE data_source = 'tpr') AS in_tpr,
 *
FROM address_matching
)
SELECT dedupe_id,
  chosen_org_type,
  data_source,
  org_name,
  org_id,
  NULL AS ipeds_unique_id,
  original_unique_id AS irs_unique_id,
  NULL AS rapids_unique_id,
  NULL AS tpr_unique_id,
  org_address,
  std_street,
  std_city,
  std_state,
  std_zip,
  geo_lat,
  geo_lon,
  org_subtype_in_data_source,
  original_unique_id AS ein,
  in_ipeds,
  in_irs,
  in_rapids,
  in_tpr,
  org_subtype_ipeds,
  org_subtype_irs,
  org_subtype_rapids,
  org_subtype_tpr,
  CAST(in_ipeds AS INT64) + CAST(in_rapids AS INT64) + CAST(in_irs AS INT64) + CAST(in_tpr AS INT64) AS num_data_sources,
  CAST(NULL AS STRING) AS org_type_ipeds,
  org_subtype_in_data_source AS org_type_irs,
  CAST(NULL AS STRING) AS org_type_rapids,
  CAST(NULL AS STRING) AS org_type_tpr,
  CAST(last_updated AS DATE) AS last_updated,
  FALSE AS org_subtype_highest_degree_bachelors_plus,
  FALSE AS org_subtype_highest_degree_associates,
  FALSE AS org_subtype_highest_degree_certificate,
  FALSE AS org_subtype_other_higher_ed_institution,
  FALSE AS org_subtype_private_for_profit,
  FALSE AS org_subtype_apprenticeship_business_association,
  FALSE AS org_subtype_apprenticeship_employer,
  FALSE AS org_subtype_apprenticeship_intermediary,
  FALSE AS org_subtype_apprenticeship_gov,
  FALSE AS org_subtype_apprenticeship_wib,
  FALSE AS org_subtype_apprenticeship_union_labor,
  FALSE AS org_subtype_apprenticeship_foundation,
  FALSE AS org_subtype_apprenticeship_other,
  CASE
    WHEN org_subtype_in_data_source
    LIKE ANY ("B30%","J20%", "J21%", "J22%", "J30%", "J32%", "J33%")
    THEN TRUE
    ELSE FALSE
  END AS org_subtype_job_training_non_profits
FROM flag_columns


In [None]:
%%bigquery
-- Make sure there are no duplicate org_ids before inserting new orgs
SELECT *
FROM standardized_v2.irs_bow_new_orgs
WHERE org_id IN (SELECT org_id FROM novel_v2.unique_orgs)

In [None]:
%%bigquery
DELETE FROM standardized_v2.irs_bow_new_orgs
WHERE org_id IN (SELECT org_id FROM novel_v2.unique_orgs)

In [None]:
%%bigquery
INSERT INTO novel_v2.unique_orgs
SELECT *
FROM standardized_v2.irs_bow_new_orgs

Create a new table with extra data points from IRS XML extracts

In [None]:
%%bigquery
CREATE OR REPLACE TABLE novel_v2.unique_orgs_with_irs_data AS
WITH one_ein_per_org AS (
  SELECT *
  FROM standardized_v2.irs_990_2023_bow
  WHERE EXTRACT(YEAR FROM tax_period_end_date) IN (2022, 2023)
  QUALIFY ROW_NUMBER() OVER (PARTITION BY ein ORDER BY return_timestamp DESC) = 1
)
SELECT u.*,
  i.website,
  i.formation_year,
  i.total_employees,
  i.total_revenue_previous_year,
  CASE
    WHEN EXTRACT(YEAR FROM tax_period_end_date) = 2022
    THEN i.total_revenue_current_year * 1.041
    ELSE i.total_revenue_current_year
  END AS total_revenue_current_year,
  i.total_expenses_previous_year,
  CASE
    WHEN EXTRACT(YEAR FROM tax_period_end_date) = 2022
    THEN i.total_expenses_current_year * 1.041
    ELSE i.total_expenses_current_year
  END AS total_expenses_current_year,
  i.mission_description,
  i.primary_program_description,
  i.states_where_form_990_filed,
  i.states_where_form_990_filed_count
FROM novel_v2.unique_orgs AS u
LEFT JOIN one_ein_per_org AS i ON u.ein = CAST(i.ein AS STRING)
;


In these steps, we add IPEDS revenue and expense data to Novel v2

Source files are uploaded to `hks-almanac-storage-standard/v2/ipeds` bucket


In [None]:
%%bigquery
-- Are the 3 datasets mutually exclusive?
SELECT unitid
FROM standardized_v2.ipeds_f2223_f1a_data
INTERSECT DISTINCT
SELECT unitid
FROM standardized_v2.ipeds_f2223_f2_data
INTERSECT DISTINCT
SELECT unitid
FROM standardized_v2.ipeds_f2223_f3_data

Great. No results mean they are indeed mutually exclusive.

Create a union of all revenue and expense columns from the 3 tables.

In [None]:
%%bigquery

CREATE OR REPLACE TABLE standardized_v2.ipeds_f2223_revenue_and_expense AS
SELECT unitid, f1d01 AS ipeds_revenue, f1d02 AS ipeds_expense, 'f1a' AS source_file
FROM standardized_v2.ipeds_f2223_f1a_data
UNION ALL
SELECT unitid, f2b01 AS ipeds_revenue, f2b02 AS ipeds_expense, 'f2' AS source_file
FROM standardized_v2.ipeds_f2223_f2_data
UNION ALL
SELECT unitid, f3b01 AS ipeds_revenue, f3b02 AS ipeds_expense, 'f3' AS source_file
FROM standardized_v2.ipeds_f2223_f3_data

In [None]:
%%bigquery
CREATE OR REPLACE TABLE novel_v2.unique_orgs_with_irs_data AS
SELECT n.*, i.ipeds_revenue, i.ipeds_expense
FROM novel_v2.unique_orgs_with_irs_data AS n
LEFT JOIN standardized_v2.ipeds_f2223_revenue_and_expense AS i
ON CAST(n.ipeds_unique_id AS STRING) = i.unitid


Orgs with ZA state code have their data fixed manually in the below spreadsheet

In [None]:
spreadsheet_id = "1emb9dLyViwDf6D20uzGSQ9F04NhrfOR8Wif-v0sHlXU"

In [None]:
sheet = gc.open_by_key(spreadsheet_id).worksheet("Sheet1")

In [None]:
df = pd.DataFrame(sheet.get_all_records())

In [None]:
# Load DataFrame to BigQuery
pandas_gbq.to_gbq(df, "reference_v2.std_state_fix", project_id=project, if_exists='replace')

In [None]:
%%bigquery
UPDATE novel_v2.unique_orgs AS u
SET std_state = r.std_state
FROM reference_v2.std_state_fix AS r
WHERE u.org_id = r.org_id

In [None]:
%%bigquery
UPDATE novel_v2.unique_orgs_with_irs_data AS u
SET std_state = r.std_state
FROM reference_v2.std_state_fix AS r
WHERE u.org_id = r.org_id

This concludes the processing steps