In [1]:
import pandas as pd
import numpy as np
import glob
import os
import shutil
import sys

In [2]:
# Get source and destination from environment variables
src = os.getenv('SRC_PATH')
dst = os.getenv('DST_PATH')

if not src or not dst:
    print("Source or destination path not provided.")
    sys.exit(1)

# Ensure that the source directory exists
if not os.path.exists(src):
    print(f"Source path does not exist: {src}")
    sys.exit(1)

# Ensure that the destination directory exists or create it
if not os.path.exists(dst):
    os.makedirs(dst, exist_ok=True)

print(f"Source: {src}")
print(f"Destination: {dst}")

Source: /Users/evie/Documents/GitHub/ChartFact/sd_0/3_translated_data_0
Destination: /Users/evie/Documents/GitHub/ChartFact/sd_0/3_translated_data_0


In [3]:
# Paths to datasets with DePlot tables
figureqa_pattern = os.path.join(src, "FigureQA/**/*.csv")
plotqa_pattern = os.path.join(src, "PlotQA/**/*.csv")

# Function to filter out ChartQA CSVs
def valid_chartqa_file(file_path):
    # Get file name without extension
    base_name = os.path.splitext(os.path.basename(file_path))[0]
    
    # Ensure it follows the naming pattern chartQA_{name}-{split}
    if base_name.startswith("chartQA_") and "-" in base_name:
        name_part = base_name.split("chartQA_")[1].split("-")[0]
        # Return True if the name part has fewer than 9 characters
        return len(name_part) < 9
    return False

chartqa_pattern = os.path.join(src, "ChartQA/**/*.csv")

# Get all DePlot CSV files 
figureqa_files = glob.glob(figureqa_pattern, recursive=True)
plotqa_files = glob.glob(plotqa_pattern, recursive=True)
chartqa_files = [file for file in glob.glob(chartqa_pattern, recursive=True) if valid_chartqa_file(file)]

# Combine all files
all_files = figureqa_files + plotqa_files + chartqa_files

In [4]:
''' Convert data string to a DataFrame and a title '''
def string_to_dataframe_and_title(data_string, filepath):
    # Split the data into rows
    rows = data_string.strip().split("<0x0A>")
    
    title = None
    header = None

    # Check if the first row is a title and extract it
    if "TITLE |" in rows[0]:
        potential_title = rows[0].split(" | ", 1)[1]  # Split and take second part as the potential title
        if potential_title.strip().lower() not in ['', 'title']: # Handle cases with placeholder titles
            title = potential_title  # Only set title if its meaningful
        rows.pop(0)  # Remove the title row from processing

    # The next row should be the header
    if rows:
        header = [h.strip() for h in rows.pop(0).split("|") if h.strip()]
        # rows.pop(0).split(" | ")
    else:
        raise ValueError("No header row found after the title row.")

    # Check if there are any data rows left after removing title and header
    if not rows:
        raise ValueError("No data rows found after the header row.")

    # Split remaining rows based on number of headers
    records = []
    for row in rows:
        record = [r.strip() for r in row.strip().split("|") if r.strip()]
        if len(record) != len(header):
            print(f"File: {filepath}: Row has {len(record)} columns, expected {len(header)} - row data: {record}")
            return None, title  # to keep original CSV instead
        records.append(record)

    # Create df and convert data to numeric where possible
    df = pd.DataFrame(records, columns=header).apply(pd.to_numeric, errors='ignore')

    return df, title

In [5]:
''' Save the title to a separate txt file '''
def save_title_to_file(title, filepath):
    if title:
        title_filepath = f"{filepath.rsplit('.', 1)[0]}.txt" # same name with 'txt' extension
        with open(title_filepath, 'w') as f:
            f.write(title)

In [6]:
''' Handle reformatting of non properly formatted CSVs '''
def process_non_formatted_csv(file_path):
    with open(file_path, 'r') as file:
        data_string = file.read()

    # Split rows based on <0x0A> delimiter
    rows = data_string.split('<0x0A>')

    title = None

    # Check if first row is a title and extract it
    if "TITLE |" in rows[0]:
        potential_title = rows[0].split(" | ", 1)[1]  # Split and take second part as potential title
        if potential_title.strip().lower() not in ['', 'title']:  # Handle cases with placeholder titles
            title = potential_title  # Only set title if its meaningful
        rows.pop(0)  # Remove the title row from processing

    # Split each row by | delimiter to get columns
    split_rows = [row.strip().split('|') for row in rows]

    # Find the maximum number of columns across all rows
    max_columns = max(len(row) for row in split_rows)

    # Pad rows with fewer columns with blank space
    formatted_rows = [row + [np.nan] * (max_columns-len(row)) for row in split_rows]

    df = pd.DataFrame(formatted_rows)

    # Save corrected df back to same file 
    df.to_csv(file_path, index=False, header=False)

    # Save title to a separate file if it exists
    if title:
        save_title_to_file(title, file_path)

    print(f"Reformatted table saved to: {file_path}")

In [8]:
# Main Processing Logic
def process_csv_files(src, dst):
    if not os.path.exists(dst):
        os.makedirs(dst)
    
    # data_pattern = os.path.join(src, '**', '*-dp.csv')

    csvs_not_converted_properly = []

    for filepath in all_files:
    # glob.glob(data_pattern, recursive=True):
        with open(filepath, 'r') as file:
            data_string = file.read()

        try:
            result = string_to_dataframe_and_title(data_string, filepath)
            if result is None:
                print(f"Skipping conversion for {filepath} because it removes data.")
                continue

            df, title = result

            if df is not None:
                # Save the DataFrame (overwrite original file)
                df.to_csv(filepath, index=False)

                # Save the title to a separate file
                save_title_to_file(title, filepath)
                print(f"Preprocessing successful for {filepath}")
            else:
                csvs_not_converted_properly.append(filepath)
        except ValueError as e:
            print(f"Error processing {filepath}: {e}")

    if csvs_not_converted_properly:
        print(f"CSV files not converted properly: {csvs_not_converted_properly}")
        print(f"Number of CSVs not converted properly: {len(csvs_not_converted_properly)}")

        for file_path in csvs_not_converted_properly:
            print(f"Fixing formatting for: {file_path}")
            process_non_formatted_csv(file_path)
    else:
        print("No CSV files need reformatting.")

In [9]:
process_csv_files(src, dst)

Preprocessing successful for /Users/evie/Documents/GitHub/ChartFact/sd_0/3_translated_data_0/FigureQA/train/tables/figureQA_76797-train.csv
Preprocessing successful for /Users/evie/Documents/GitHub/ChartFact/sd_0/3_translated_data_0/FigureQA/train/tables/figureQA_9596-train.csv
File: /Users/evie/Documents/GitHub/ChartFact/sd_0/3_translated_data_0/FigureQA/train/tables/figureQA_71753-train.csv: Row has 6 columns, expected 7 - row data: ['x2x2', '73', '71', '78', '73', '18.6']
Preprocessing successful for /Users/evie/Documents/GitHub/ChartFact/sd_0/3_translated_data_0/FigureQA/train/tables/figureQA_88613-train.csv
Preprocessing successful for /Users/evie/Documents/GitHub/ChartFact/sd_0/3_translated_data_0/FigureQA/train/tables/figureQA_35384-train.csv
Preprocessing successful for /Users/evie/Documents/GitHub/ChartFact/sd_0/3_translated_data_0/FigureQA/train/tables/figureQA_61985-train.csv
Preprocessing successful for /Users/evie/Documents/GitHub/ChartFact/sd_0/3_translated_data_0/FigureQ