In [8]:
import pathlib

def list_arg(s):
    return s.split(',')

def check_csvfile(filepath):
    if not filepath.exists():
        raise FileNotFoundError(f'File does not exist: {filepath}')
    
    # 使用csv模块简单检查文件是否符合CSV格式
    import csv
    with open(filepath, 'r') as f:
        reader = csv.reader(f)
        try:
            first_line = next(reader)  # 尝试读取第一行
        except StopIteration:
            raise ValueError(f'File is empty: {filepath}')
    
    return first_line

def construct_line(line_data, max_lengths, in_between="|"):
    line = in_between
    for val, m in zip(line_data, max_lengths):
        space_bef = " " * ((m - len(val)) // 2)
        space_after = " " * (m - len(val) - len(space_bef))
        line += f"{space_bef}{val}{space_after}{in_between}"
    return line

def adjust_max_lengths(lst, max_lengths):
    for i in range(len(max_lengths)):
        max_lengths[i] = max(max_lengths[i], len(lst[i]))

def convert_csv_to_ipac(csv_path, header=None, units=None, null=None):
    csv_path = pathlib.Path(csv_path)
    check_csvfile(csv_path)

    # 逐行读取文件，避免占用过多内存
    with open(csv_path, 'r') as fin:
        header = next(fin).strip().split(',') if header is None else header
        max_lengths = list(map(len, header))
        types = [None] * len(header)
        
        lines = []  # 缓存所有行
        for line_num, line in enumerate(fin, start=2):  # 从第二行开始，第一行已经是header
            vals = line.strip().split(',')
            if len(vals) != len(header):
                raise ValueError(f'Line {line_num} contains {len(vals)} columns!')
            lines.append(vals)

            adjust_max_lengths(vals, max_lengths)

            for i in range(len(max_lengths)):
                was_double = types[i] == 'double'
                if types[i] != 'char':
                    try:
                        float(vals[i])
                        types[i] = 'double'
                    except ValueError:
                        types[i] = 'char'
                    else:
                        if not was_double:
                            try:
                                int(vals[i])
                                types[i] = 'int'
                            except ValueError:
                                pass
    
    adjust_max_lengths(types, max_lengths)
    
    output_path = csv_path.with_suffix('.ipac')
    with open(output_path, 'w', encoding='ascii') as fout:
        fout.write(construct_line(header, max_lengths, in_between="|") + "\n")
        fout.write(construct_line(types, max_lengths, in_between="|") + "\n")

        if units:
            adjust_max_lengths(units, max_lengths)
            fout.write(construct_line(units, max_lengths, in_between="|") + "\n")

        if null:
            adjust_max_lengths(null, max_lengths)
            fout.write(construct_line(null, max_lengths, in_between="|") + "\n")

        # 写入数据部分时不添加 "|"
        for idx, line_data in enumerate(lines):
            line_str = construct_line(line_data, max_lengths, in_between=" ")  # 使用空格作为分隔符
            fout.write(line_str)
            if idx != len(lines) - 1:  # 如果不是最后一行，添加换行符
                fout.write("\n")

    print(f"IPAC table saved to {output_path}")

# Example usage
# Replace 'your_file.csv' with the path to your CSV file
convert_csv_to_ipac('Catalogs/SDSS_DR16Q_ugriz_JHKs.csv')


IPAC table saved to Catalogs\SDSS_DR16Q_ugriz_JHKs.ipac
