In [2]:
import pyarrow.parquet as pq
import pandas as pd
import polars as pl

# Read only the first 300,000 rows from the parquet file using pyarrow, avoiding loading the entire file into memory
parquet_file = r"C:\Users\ChristopherCato\OneDrive - clarity-dx.com\code\bph\workcomp-rates\core\data\commercial_rates_GA.parquet"
# Use the 'use_threads' and 'columns' arguments if needed, but to limit rows, use read_pandas with nrows
# table = pq.read_table(parquet_file)
# rates = table.to_pandas().head(300_000)


In [3]:
len(rates)

300000

In [4]:
rates.columns

Index(['rate', 'negotiated_type', 'billing_class', 'service_codes',
       'billing_code', 'billing_code_type', 'code_desc', 'name',
       'negotiation_arrangement', 'payer', 'payer_type', 'rate_updated_on',
       'prov_npi', 'tin_type', 'tin_value', 'GA_PROF_MAR', 'GA_OP_MAR',
       'GA_ASC_MAR', 'org_name', 'status', 'primary_taxonomy_code',
       'primary_taxonomy_desc', 'city', 'state', 'postal_code', 'prov_lat',
       'prov_lng', 'cbsa', 'medicare_prof', 'state_up', 'billing_code_norm',
       'state_wage_index_avg', 'opps_weight', 'opps_si', 'opps_short_desc',
       'asc_pi', 'asc_nat_rate', 'asc_short_desc',
       'medicare_opps_mar_national', 'medicare_asc_mar_national',
       'opps_adj_factor_stateavg', 'asc_adj_factor_stateavg',
       'medicare_opps_mar_stateavg', 'medicare_asc_mar_stateavg',
       'procedure_set', 'procedure_class', 'procedure_group'],
      dtype='object')

In [6]:
# Show the count of each taxonomy_description grouped by procedure_class, in descending order of count

if 'procedure_class' in rates.columns and 'primary_taxonomy_desc' in rates.columns:
    taxonomy_by_proc = (
        rates.groupby(['procedure_class', 'primary_taxonomy_desc'])
        .size()
        .reset_index(name='count')
        .sort_values(['procedure_class', 'count'], ascending=[True, False])
    )
    display(taxonomy_by_proc)
else:
    print("Columns 'procedure_class' and/or 'taxonomy_description' not found in rates DataFrame.")

taxonomy_by_proc.to_csv('taxonomy_by_proc.csv', index=False)

Unnamed: 0,procedure_class,primary_taxonomy_desc,count
1,Abdomen/GI Imaging,Family Medicine,159
2,Abdomen/GI Imaging,General Acute Care Hospital,159
0,Abdomen/GI Imaging,"Clinic/Center, Multi-Specialty",81
3,Abdomen/GI Imaging,Psychiatric Unit,60
4,Abdomen/GI Imaging,Skilled Nursing Facility,39
...,...,...,...
344,Urinary System,Family Medicine,200
345,Urinary System,General Acute Care Hospital,200
343,Urinary System,"Clinic/Center, Multi-Specialty",160
346,Urinary System,Psychiatric Unit,80


In [8]:
distinct_taxonomies = rates['primary_taxonomy_desc'].unique()
print(distinct_taxonomies)

['Family Medicine' 'Clinic/Center, Multi-Specialty'
 'General Acute Care Hospital' 'Skilled Nursing Facility'
 'Psychiatric Unit' 'Neurological Surgery' 'Specialist'
 'Nurse Practitioner, Family' 'Clinic/Center, Medical Specialty'
 'Pain Medicine, Interventional Pain Medicine' 'Clinic/Center, Endoscopy'
 'Clinic/Center, Ambulatory Surgical'
 'Obstetrics & Gynecology, Reproductive Endocrinology' 'Pediatrics'
 'Urology' 'Orthopaedic Surgery'
 'General Acute Care Hospital, Critical Access'
 'Durable Medical Equipment & Medical Supplies'
 'Radiology, Diagnostic Radiology' 'Registered Nurse' 'Anesthesiology'
 'Hospitalist' 'Pathology, Anatomic Pathology & Clinical Pathology'
 'Internal Medicine' 'Obstetrics & Gynecology' 'Long Term Care Hospital'
 'Dermatology' 'Internal Medicine, Nephrology'
 'Clinic/Center, Primary Care'
 'Clinic/Center, Federally Qualified Health Center (FQHC)'
 'Clinic/Center, Rural Health' 'Case Management'
 'Obstetrics & Gynecology, Obstetrics'
 'Dermatology, MOHS-Mic

In [3]:
def filter_parquet_by_blacklist_chunked(
    parquet_file,
    column,
    output_parquet_file,
    blacklist_txt_path="taxonomy_blacklist.txt",
    chunk_size=300_000,
):
    """
    Reads a large parquet file in chunks, drops rows where the specified column contains
    any of the blacklist substrings (case-insensitive), and writes the filtered data
    to a new parquet file chunk by chunk, using Polars for efficient processing.

    Args:
        parquet_file (str): Path to the input parquet file.
        column (str): The column to search for substrings.
        output_parquet_file (str): Path to the output parquet file.
        blacklist_txt_path (str): Path to the blacklist text file.
        chunk_size (int): Number of rows per chunk to process.

    Returns:
        None. Writes filtered data to output_parquet_file.
    """
    import polars as pl

    # Read blacklist substrings from file, stripping whitespace and skipping empty lines
    with open(blacklist_txt_path, "r", encoding="utf-8") as f:
        substrings = [line.strip() for line in f if line.strip()]

    # If no substrings, just copy the file in chunks
    if not substrings:
        # Just copy the file using polars scan and sink
        scan = pl.scan_parquet(parquet_file)
        scan.sink_parquet(output_parquet_file)
        return

    # Build a single regex pattern for all substrings, case-insensitive
    # Escape substrings to avoid regex injection
    import re
    pattern = "|".join([re.escape(s) for s in substrings])

    # Use Polars scan_parquet for lazy, chunked processing
    scan = pl.scan_parquet(parquet_file)

    # Filter out rows where the column contains any of the substrings (case-insensitive)
    filtered = scan.filter(
        ~pl.col(column).str.contains(pattern, case=False)
    )

    # Write out in chunks
    # Polars' sink_parquet will write the whole result, but we can collect in batches if needed
    # For very large files, use the streaming API
    filtered.sink_parquet(output_parquet_file, batch_size=chunk_size)

# Example usage:
# filter_parquet_by_blacklist_chunked(
#     parquet_file,
#     'primary_taxonomy_desc',
#     'filtered_commercial_rates_GA.parquet',
#     blacklist_txt_path="taxonomy_blacklist.txt",
#     chunk_size=300_000
# )


In [4]:
filter_parquet_by_blacklist_chunked(
    parquet_file,
    'primary_taxonomy_desc',
    'filtered_commercial_rates_GA.parquet',
    blacklist_txt_path="taxonomy_blacklist.txt",
    chunk_size=300_000
)