In [1]:
import polars as pl
import numpy as np
from dotenv import load_dotenv
import os
from datetime import datetime
import os_record_types as os_records
load_dotenv()

True

In [2]:
# load CCOD
ccod_zip_file_name = os.getenv('HMLR_CCOD_OUTPUT_FILE')
ccod_file_root = ccod_zip_file_name.replace('.zip', '')
df_ccod =  pl.read_csv(f'downloads/{ccod_file_root}/{ccod_file_root}.csv')

In [3]:
len(df_ccod)

4181223

In [4]:
# load OS
zip_file_name = os.getenv('OS_FILENAME')
zip_dir = zip_file_name.replace('.zip', '')
csv_file_names = sorted(os.listdir(zip_dir))

In [5]:
record_dfs = {}
schema = {}
for i in range(30):
    schema[str(i)] = pl.String

In [None]:
# read each file and assign to correct os record type
for file_name in csv_file_names[200:]:
    print(f'file_name: [{file_name}], time: [{datetime.now().strftime("%H:%M:%S")}]')
    # temp_df = pl.read_csv(f'{zip_dir}/{file_name}', header=None, names=range(30), dtype=object)
    temp_df = pl.read_csv(f'{zip_dir}/{file_name}', has_header=False, schema=schema)
    # build individual dfs
    for record_type in os_records.record_types.keys():
        records_to_add = temp_df.filter(pl.col('0') == str(record_type))
        count = len(records_to_add)
        print(f'record_type: [{record_type}], time: [{datetime.now().strftime("%H:%M:%S")}], count: {count}')
        if count == 0: continue
        if record_type in record_dfs:
            record_dfs[record_type] = pl.concat([record_dfs[record_type], records_to_add])
        else:
            record_dfs[record_type] = records_to_add

print('done')

file_name: [AddressBasePremium_FULL_2024-05-29_201.csv], time: [16:53:08]
record_type: [11], time: [16:53:08], count: 0
record_type: [15], time: [16:53:08], count: 0
record_type: [21], time: [16:53:08], count: 0
record_type: [23], time: [16:53:08], count: 1000000
record_type: [24], time: [16:53:08], count: 0
record_type: [28], time: [16:53:08], count: 0
record_type: [30], time: [16:53:08], count: 0
record_type: [31], time: [16:53:08], count: 0
record_type: [32], time: [16:53:08], count: 0
file_name: [AddressBasePremium_FULL_2024-05-29_202.csv], time: [16:53:08]
record_type: [11], time: [16:53:09], count: 0
record_type: [15], time: [16:53:09], count: 0
record_type: [21], time: [16:53:09], count: 0
record_type: [23], time: [16:53:09], count: 1000000
record_type: [24], time: [16:53:09], count: 0
record_type: [28], time: [16:53:09], count: 0
record_type: [30], time: [16:53:09], count: 0
record_type: [31], time: [16:53:09], count: 0
record_type: [32], time: [16:53:09], count: 0
file_name: [

In [None]:
# remove excess cols
for record_type in record_dfs.keys():
    if record_dfs[record_type].is_empty(): continue
    df_null_cols = record_dfs[record_type].select(pl.all().is_null().all())
    columns_to_drop = (df_null_cols
                       .unpivot()
                       .filter(pl.col('value')==True)['variable']
                       .to_list())
    print(f'type: [{record_type}], time: [{datetime.now().strftime("%H:%M:%S")}], cols_to_drop: [{columns_to_drop}]')
    record_dfs[record_type] = record_dfs[record_type].drop(columns_to_drop)

In [None]:
# add col names
for record_type in record_dfs.keys():
    print(f'type: [{record_type}], time: [{datetime.now().strftime("%H:%M:%S")}]')
    record_dfs[record_type].columns = os_records.record_types[record_type].ColNames

In [None]:
# check everything is ok
for record_type in record_dfs.keys():
    print(f'type: [{record_type}], count: [{len(record_dfs[record_type])}]')

In [None]:
os.makedirs('output', exist_ok=True)
for record_type in record_dfs.keys():
    print(f'type: [{record_type}], time: [{datetime.now().strftime("%H:%M:%S")}]')
    record_dfs[record_type].write_parquet(f'output/os_recordtype_{record_type}.parquet')