In [3]:
import polars as pl
from polars import col
pl.__version__

'0.13.43'

### Cleaning the hospitals dataset -- motivation

The hospitals dataset at DoltHub contains 300M rows of prices. Each price is given a couple of codes (a primary `code` and an `internal_revenue_code`) which refer to how the procedure is billed. Billing codes are the standard for comparing hospital prices. If we know that a hospital charges 2,000 for code `59410`, you can be pretty sure that you (or your insurer) are going to be paying about that price for a normal childbirth.

#### Where the codes are in our dataset

Hospitals, according to the CMS law, are required to post a chargemaster with a hospital-generic code. As they should be: if hospitals only used proprietary coding for their procedures there would be no way to compare prices between hospitals. For an apples-to-apples comparison, we all need to use apples. Typically these apples are the CPT codes ("current procedural terminology") which follow a specific pattern. 

We'll do our best to extract CPT codes (as well as DRG and NDC codes) from the chargemasters as much as we reasonably can. Remember that the CMS law requires that these chargemasters be machine readable. If we can't figure out what the code is supposed to be from the chargemaster, then it's not machine readable. It's junk.


#### How codes were entered

Participants typically put the most generic code in the `code` column. This would be where the CPT code went. If there was a second code, it went in the `internal_revenue_code` column. A third code would have gone in the `code_disambiguator` column, but this turns out to not have been necessary.

However, because the codes are mixed -- some contain pharmacy codes, random codes, etc. -- it is a bit difficult to work with the data. Plus, it's not clear how many of our rows even have valid codes.

So I wrote a short cleaning pipeline to figure out the coding for each row, and see what fraction of rows were coded in some machine readable way.

### Making the pipeline: custom pipes

`pipes` are an easy way to chain data transformations together. In polars/pandas, it looks something like:

```python
out_df = in_df.pipe(transformation, **params).pipe(another_transformation, **params)
```

The first transformation we'll need is one that extracts the CPT codes from the codes column.

In [935]:
def extract_cpt(df: pl.DataFrame) -> pl.DataFrame:
    '''
    Extract CPT singlet codes
    '''
    
    # do some transformations
    
    return df

### The power of regex

We'll hunt for the CPT codes in the codes column. We'll use a few heuristics for finding them. To get those heuristics, we'll need to know what kind of junk is present in the database already, and how to distinguish that junk from valid codes to the greatest possible extent.

#### Heuristic 1: There's only one code type in each cell

If the cell contains the string 'NDC', 'MS-DRG', or 'CDM', it is overwhelmingly likely to *not* contain a CPT code.

#### Heuristic 2: A specific set of 5-char codes

CPT codes come mainly in two flavors:

1. CPT I: 5 digit numerical code from 00100 to 99499
2. CPT II: 5 digit alphanumerical code like `any('ACEGJKLMQTV')` + 4 digits or 4 digits + `any('ACEGJKLMQTV')`

We will hunt for explicitly these patterns.

In [958]:
def extract_cpt(df: pl.DataFrame) -> pl.DataFrame:
    '''
    Extract CPT singlet codes
    '''
    
    cpt_pats = ['\d{5}',       # simplified CPT I
                '[A-Z]\d{4}',  # simplified CPT II
                '\d{4}[A-Z]']  # simplified CPT II
    
    # to "extract" a pattern, it needs to be in parenthesis
    # and to extract multiple patterns with OR, they need to
    # have a bar char between them
    
    extraction_str = '|'.join([f'({pat})' for pat in cpt_pats])
    
    return df.with_column(
        pl.when(
            # Heuristic 1
            ~col('code').str.contains('NDC|MS|CDM')
        ).then(
            # Heuristic 2
            col('code').str.extract(extraction_str)
        ).otherwise(None).alias('extracted_cpt')
    )

testdf = pl.DataFrame({'code': ['NDC 00123', '00456 00123', '1101112', '123', '12341-12345']})
testdf.pipe(extract_cpt)

code,extracted_cpt
str,str
"""NDC 00123""",
"""00456 00123""","""00456"""
"""1101112""","""11011"""
"""123""",
"""12341-12345""","""12341"""


We right away see some flaws here. 

1. Because there can be multiple CPT codes in a cell, we'll need a way to extract them all
2. Sometimes we're extracting codes that we shouldn't be (row 3 is not a valid CPT code)
3. The last row, which is a range of CPT codes, should probably be handled separately

In [885]:
cpt_marks   = ['CPT', 'HCPCS?/CPT', 'CPT/HCPCS?', 'HCPCS?']
drg_marks   = ['DRG', 'MS-DRG', 'MSD', 'MS']
ndc_marks   = ['NDC']
other_marks = ['CDM', 'PH\d+']

def list_to_regexp(items, b = True): 
    '''Create a regexp that matches any items in the list
    b = True: word boundaries included
    b = False: word boundaries excluded'''
    if b: return '|'.join([fr'\b{item}\b' for item in items])
    return '|'.join(items)

def in_column(colname, items, b = True): 
    '''Does the column contain any of the items?'''
    return col(colname).str.contains(list_to_regexp(items, b))

def extract_from(colname, items):
    '''Convenience function'''
    return col(f'{colname}').str.extract_all('|'.join(items))

def extract_cpt(df):
    '''
    Extract CPT singlet codes (i.e. not ranges) from a single column (usually 'code')
    '''
    
    # regexp to capture procedural terminology codes from 00100-99499
    # https://3widgets.com/: used this to generate the numeric aspect of the code
    
    # We make a column 'code_' where we'll put the partially cleaned data
    # The lack of positive/negative lookaheads in the rust regex parser
    # forces us to work in two passes.
    
    cpt_i_codes  = ['0010[0-9]','001[1-9][0-9]','00[2-9][0-9]{2}',
                    '0[1-9][0-9]{3}','[1-8][0-9]{4}','9[0-8][0-9]{3}',
                    '99[0-4][0-9]{2}']

    cpt_ii_codes = ['[ACEGJKLMQTV]\d{4}', '\d{4}[ACEGJKLMQTV]']
    
    code_ranges = ['([ACEGJKLMQTV]|\d)\d{3}([ACEGJKLMQTV]|\d)\s?-\s?([ACEGJKLMQTV]|\d)\d{3}([ACEGJKLMQTV]|\d)']
    
    cpt_found = in_column('code_', cpt_i_codes + cpt_ii_codes)
    other_marks_found = in_column('code_', drg_marks + ndc_marks + other_marks)
    
    df_ = df.with_columns([
        
        # Erase CPT code ranges (like 00100-00111, which confuse the regex) and create new column
        
        col('code').str.replace_all(list_to_regexp(cpt_ranges), '').alias('code_')
    
    ]).with_columns([
        
        # Extract CPT codes from column with ranges erased
        # This extracts any code like 00100 or A1921
        
        pl.when(cpt_found & ~other_marks_found).then(
            extract_from('code_', cpt_codes + cpt_ii_codes)
        ).otherwise(None).alias('cpts_extracted'),
        
        # Sometimes CPT/HCPCS is marked in the column. 
        # In this case, extract the code that follows the marker
        # This extracts codes like CPT1234 --> 1234
        
        pl.when(
            in_column('code_', cpt_marks, b = False) & 
            ~other_marks_found
        ).then(
            col('code').str.extract(r'\b(?:CPT|HCPCS?/CPT|CPT/HCPCS?|HCPCS?)\s?([A-Z]?\d{3,5}[A-Z]?)\b', 1)
        ).otherwise(None).apply(lambda x: [x]).alias('cpts_extracted_')
        
    ])
    
    # This is a workaround for what (I believe) is a polars bug
    if any(df_['cpts_extracted_'].is_not_null()): 
        df_ = df_.with_column(
            col('cpts_extracted').fill_null(col('cpts_extracted_'))
        )
        
    return df_.drop(['code_', 'cpts_extracted_'])
        
def extract_drg(df):
    '''
    Extract DRG singlet codes (i.e. not ranges) from multiple columns.
    Searches principally 'code' column, followed by 'internal_revenue_code'
    '''
    
    newcolname = 'glob'
    other_marks_found = in_column(newcolname, cpt_marks + ndc_marks + other_marks)
    
    df_ = df.with_column(
        
        pl
        .concat_str(['code', 'internal_revenue_code'], '~')
        .str.replace_all('\\bFY\\b|\\b\d{4,}\\b|\\bV\d{2}\\b|\(|\)', '')
        .alias('glob')
        
    ).with_columns([
        
        # Extract DRG codes from code column when the 
        # code column simply contains a 3 digit number
        
        pl.when(
            col('code').str.contains(r'^\d{1,3}(-\d{1,2})?$')
        ).then(
            col('code').str.extract(r'^(\d{1,3})(-\d{1,2})?$', 1)
        ).otherwise(None).alias('drg_extracted'),
        
        # Sometimes DRG is marked in the column. 
        # In this case, extract the code that follows the marker
        
        pl.when(
            in_column('glob', drg_marks, b = False) & ~other_marks_found
        ).then(
            col('code').str.extract(r'\b(?:MS|MSD)?(?:-|\s)?(?:DRG)?(\d{1,3})\b', 1)
        ).otherwise(None).alias('drg_extracted_')

    ]).with_column(
        col('drg_extracted').fill_null(col('drg_extracted_'))
    ).drop(['drg_extracted_', 'glob'])
    
    return df_

def extract_ndc(df):
    '''
    Extract NDC singlet codes (i.e. not ranges) from multiple columns.
    Searches principally 'code' column, followed by 'internal_revenue_code'
    and 'description'
    
    See: https://en.wikipedia.org/wiki/National_drug_code
    '''
    
    newcolname = 'glob'
    ndc_codes = ['0?\d{4}-\d{4}-\d{2}0?','0?\d{5}-\d{3}-\d{2}0?','0?\d{5}-\d{4}-\d{1}0?']
    
    df_ = df.with_column(
        
        pl
        .concat_str(['code', 'internal_revenue_code', 'description'], '~')
        .alias('glob')
        
    ).with_columns([
        
        # Extract NDC codes from code column when the 
        # code column contains an NDC string
        
        pl.when(
            in_column('code', ndc_codes)
        ).then(
            col('code').str.extract(list_to_regexp(ndc_codes))
        ).otherwise(None).alias('ndc_extracted'),
        
        # Sometimes NDC is marked in the column. 
        # In this case, extract the code that follows the marker
        # Can use more generous searching
        
        pl.when(
            in_column('glob', ['NDC'], b = False)
        ).then(
            col('glob').str.extract(r'([0-9-]{12})', 1)
        ).otherwise(None).alias('ndc_extracted_')

    ]).with_column(
        col('ndc_extracted').fill_null(col('ndc_extracted_'))
    ).drop(['ndc_extracted_', 'glob'])
    
    return df_
    
def extract_cpt_range(df):
    '''
    Extract CPT ranges from the 'code' column
    '''
    
    code_ranges = ['([ACEGJKLMQTV]|\d)\d{3}([ACEGJKLMQTV]|\d)\s?-\s?([ACEGJKLMQTV]|\d)\d{3}([ACEGJKLMQTV]|\d)']
    
    df_ = df.with_columns([
        
        pl.when(
            in_column('code', code_ranges)
        ).then(
            col('code').str.extract_all(list_to_regexp(code_ranges))
        ).otherwise(None).alias('cpt_ranges_extracted'),
        
    ])
    
    return df_

In [927]:
lf = pl.scan_csv('../prices.csv', n_rows = 100_000_000, infer_schema_length = 0, encoding = 'utf8-lossy')

In [928]:
lf.fetch(2)

cms_certification_num,payer,code,internal_revenue_code,units,description,inpatient_outpatient,price,code_disambiguator
str,str,str,str,str,str,str,str,str
"""010001""","""BLUE ADVANTAGE...","""HCPCS 82441""","""3018244101""",,"""HC TEST FOR CH...","""UNSPECIFIED""","""279.02""","""NONE"""
"""010001""","""BLUE CROSS OF ...","""HCPCS 82441""","""3018244101""",,"""HC TEST FOR CH...","""UNSPECIFIED""","""279.02""","""NONE"""


In [929]:
df = (lf
      .collect()
      .pipe(extract_ndc)
      .pipe(extract_cpt)
      .pipe(extract_drg)
      .pipe(extract_cpt_range)
      .drop(['units', 'code_disambiguator'])
      .with_columns(
          pl.col(['cms_certification_num', 'inpatient_outpatient']).cast(pl.Categorical)
      )
     )

In [930]:
df.sample(5)

cms_certification_num,payer,code,internal_revenue_code,description,inpatient_outpatient,price,ndc_extracted,cpts_extracted,drg_extracted,cpt_ranges_extracted
cat,str,str,str,str,cat,str,str,list[str],str,list[str]
"""150044""","""AETNA COMMERCI...","""C1769""","""272""","""H965970002111 ...","""OUTPATIENT""","""22.97""",,"[""C1769""]",,
"""140007""","""MAX""","""CDM2700208021""","""272""","""DRILL, UCSS CA...","""UNSPECIFIED""","""1724.02""",,,,
"""140189""","""HEALTHLINK PPO...","""74300""","""0320""","""IR Cholangiogr...","""INPATIENT""","""1108.60""",,"[""74300""]",,
"""100127""","""GROSS CHARGE""","""84100""","""NONE""","""Phosphate leve...","""OUTPATIENT""","""80.19""",,"[""84100""]",,
"""100025""","""COMMERCIAL OTH...","""CDM950016""","""278""","""LEAD PACING CA...","""UNSPECIFIED""","""2587.05""",,,,


In [931]:
g = df.filter(
    col('ndc_extracted').is_null()  & 
    col('cpts_extracted').is_null() &
    col('drg_extracted').is_null()  &
    col('cpt_ranges_extracted').is_null()
)

In [932]:
g.sample(5)

cms_certification_num,payer,code,internal_revenue_code,description,inpatient_outpatient,price,ndc_extracted,cpts_extracted,drg_extracted,cpt_ranges_extracted
cat,str,str,str,str,cat,str,str,list[str],str,list[str]
"""100319""","""Aetna Internat...","""00904530909""","""NONE""","""IBU 100MG/5ML ...","""UNSPECIFIED""","""65.01""",,,,
"""100118""","""GMMI""","""52565001951""","""NONE""","""BETAMET DIP AU...","""UNSPECIFIED""","""96.12""",,,,
"""100289""","""CARESOURCE ME""","""27200005""","""0272""","""FILTER ADVANTA...","""INPATIENT""","""255.55""",,,,
"""121306""","""GROSS CHARGE""","""2500693""","""0636""","""prednisoLONE 1...","""UNSPECIFIED""","""3.00""",,,,
"""140155""","""CIGNA CIGGR501...","""CDM2701800017""","""270""","""SPRAY, EPI FOA...","""UNSPECIFIED""","""178.27""",,,,


In [933]:
len(g)/len(df)

0.51955281

In [934]:
df['cms_certification_num'].unique()

shape: (553,)
Series: 'cms_certification_num' [cat]
[
	"010001"
	"010011"
	"010018"
	"010005"
	"010033"
	"010022"
	"010036"
	"010040"
	"010049"
	"010055"
	"010056"
	"010083"
	...
	"150006"
	"150007"
	"150010"
	"150015"
	"150017"
	"150018"
	"150022"
	"150030"
	"150035"
	"150037"
	"150044"
	"150042"
]