In [None]:
# default_exp csv_header_restore

# csv_header_restore

> This is a short script that tries to restore the header in a file to the start of the file.

It is used for legacy files where the header got moved by applying sort to the file.

It requires 2 files, 1 a set of headers, and 2 the file to reposition the header in.






In [None]:
#hide
from nbdev.showdoc import *
from fastcore.test import *
from nbdev import *

In [None]:
#export
#! python
import re

def read_headers(header_file):
    """Reads the header file which consists of lines starting with the table name followed by a : and the header.
    It returns a dictionary of table names and their associated header lines.
    """
    header_lines = {}
    with open(header_file, 'r') as f:
        for line in f:
            table_name, header = parse_header_line(line.rstrip("\n"))
            if table_name is not None:
                header_lines[table_name] = header



    return header_lines

def parse_header_line(line):
    m = re.match('^([^:]+):', line)
    if m:
        table_name = m.group(1)
        line = line[m.end():]
        return table_name,line
    return None,None

def restore_header(filename,output_filename,header,encoding ="latin-1"): 
    with open(filename, 'r',encoding=encoding) as f:
        with open(output_filename, 'w',encoding=encoding) as out:
            out.write(header+'\n')
            for line in f:
                line = line.rstrip('\n')
                if line == header:
                    continue
                out.write(line+'\n')

        

    

In [None]:
#export
try: from nbdev.imports import IN_NOTEBOOK
except: IN_NOTEBOOK=False
    
if __name__ == "__main__" and not IN_NOTEBOOK:
    import argparse
    import os
    import sys

    ap = argparse.ArgumentParser()
    ap.add_argument("-n", "--header", required=True, help="headers file for restore")
    ap.add_argument("-f", "--file", required=True,
                help="file to be converted")
    ap.add_argument("-o", "--output", required=False,default='',
                help="output filename")
    args = vars(ap.parse_args())

    filename = args["file"]
    headerfile = args["header"]
    output_filename = args['output']
    
    # Stip the suffex of the filename to get the table_name
    base=os.path.basename(filename)
    table_name = os.path.splitext(base)[0]
    
    if output_filename == '':
        output_filename = table_name + '.hdr'
    
    headers = read_headers(headerfile)
    if table_name in headers:
        restore_header(filename,output_filename,headers[table_name])
    else:
        print(f"Skipping file:{filename}  - no associated header found", file=sys.stderr)    

In [None]:
#hide
table_name,line = parse_header_line('table_name:anything else is untouched')


test_eq(table_name,'table_name')
test_eq(line,'anything else is untouched')

headers = read_headers('headers.txt')
test_eq(len(headers),67)
test_eq(headers['COMMAND_PROFILE'],'COMMAND_PROFILE|COMMAND_NAME|PRIVILEGE|STATUS|')

In [None]:
#hide
from nbdev.export import notebook2script; notebook2script()

Converted 00_csv_header_restore.ipynb.
Converted index.ipynb.


In [None]:
!cp config_tracker/csv_header_restore.py scripts/
!chmod +x scripts/csv_header_restore.py