From b347b90e4bbc33c2b51fb7afd54ca591163adc23 Mon Sep 17 00:00:00 2001 From: Max Burnette Date: Mon, 9 Oct 2023 08:42:50 -0500 Subject: [PATCH 1/5] first draft of code --- csv-concatenator/Dockerfile | 7 ++ csv-concatenator/README.md | 3 + csv-concatenator/concatenator.py | 167 +++++++++++++++++++++++++++ csv-concatenator/extractor_info.json | 27 +++++ csv-concatenator/requirements.txt | 1 + 5 files changed, 205 insertions(+) create mode 100644 csv-concatenator/Dockerfile create mode 100644 csv-concatenator/README.md create mode 100644 csv-concatenator/concatenator.py create mode 100644 csv-concatenator/extractor_info.json create mode 100644 csv-concatenator/requirements.txt diff --git a/csv-concatenator/Dockerfile b/csv-concatenator/Dockerfile new file mode 100644 index 0000000..cef843f --- /dev/null +++ b/csv-concatenator/Dockerfile @@ -0,0 +1,7 @@ +ARG PYCLOWDER_PYTHON="" +FROM clowder/pyclowder${PYCLOWDER_PYTHON}:onbuild + +ENV MAIN_SCRIPT="concatenator.py" \ + RABBITMQ_QUEUE="" + +ONBUILD COPY extractor_info.json /home/clowder/ diff --git a/csv-concatenator/README.md b/csv-concatenator/README.md new file mode 100644 index 0000000..f630166 --- /dev/null +++ b/csv-concatenator/README.md @@ -0,0 +1,3 @@ +# CSV Concatenator for Clowder V2 + +This is a CSV concatenator that works with Clowder V2. diff --git a/csv-concatenator/concatenator.py b/csv-concatenator/concatenator.py new file mode 100644 index 0000000..cde2d95 --- /dev/null +++ b/csv-concatenator/concatenator.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python + +import logging +import csv +import io +import pandas as pd + +from pyclowder.extractors import Extractor +from pyclowder.utils import CheckMessage +from pyclowder.datasets import get_file_list +from pyclowder.files import download, upload_to_dataset + + +class CSVConcatenator(Extractor): + """Create and upload image thumbnail and image preview.""" + + def __init__(self): + Extractor.__init__(self) + + # parse command line and load default logging configuration + self.setup() + + self.columns_file = "column_mapping" + self.merged_file = "concatenated" + + # setup logging for the exctractor + logging.getLogger('pyclowder').setLevel(logging.DEBUG) + logging.getLogger('__main__').setLevel(logging.DEBUG) + + def load_standard_columns(self, cols_file): + """Load a column mapping from a standard file""" + standard_cols = {} + with open(cols_file, 'r', encoding='utf-8-sig') as sc: + reader = csv.reader(sc) + headers = {} + for row in reader: + # For each row, map all columns to the most recent available occurring name + if len(headers) == 0: + for i in range(len(row)): + headers[i] = row[i] + continue + for col_idx in headers: + if not pd.isna(row[col_idx]) and len(row[col_idx]) > 0: + latest_name = row[col_idx] + for col_idx in headers: + standard_cols[row[col_idx]] = latest_name + + return standard_cols + + def load_tabular_data(self, data_file): + if data_file.endswith(".csv"): + return pd.read_csv(data_file) + elif data_file.endswith(".tsv"): + return pd.read_csv(data_file, sep='\t') + elif data_file.endswith(".xlsx"): + return pd.read_excel(data_file) + + def check_message(self, connector, host, secret_key, resource, parameters): + # Don't download file if there isn't at least one other CSV to concatenate + + dataset_id = resource["parent_dataset_id"] + file_ext = resource["file_ext"] + + # Check whether the dataset includes another CSV + all_files = get_file_list(connector, host, secret_key, dataset_id) + for f in all_files: + fname = f['filename'] + if fname.endswith(file_ext) and fname != resource['name']: + return CheckMessage.download + return CheckMessage.ignore + + def process_message(self, connector, host, secret_key, resource, parameters): + # Process the file and upload the results + + inputfile = resource["local_paths"][0] + dataset_id = resource["parent"]["id"] + file_ext = resource["file_ext"] + merged_output = f"{self.merged_file}{file_ext}" + + # Determine which CSV to append to and whether there are column mappings + all_files = get_file_list(connector, host, secret_key, dataset_id) + cols_id = None + for f in all_files: + fname = f['filename'] + if fname == f"{self.columns_file}{file_ext}": + cols_id = f['id'] + break + + target_ids = [] + merge_exists = False + for f in all_files: + fname = f['filename'] + if fname == merged_output: + target_ids = [f['id']] + merge_exists = True + break + elif fname.endswith(file_ext): + # If we don't find an existing merged file, we will merge all with this extension + target_ids.append(f['id']) + + if cols_id is not None: + self.log_info(resource, f"Loading {self.columns_file}{file_ext}") + targ = download(connector, host, secret_key, cols_id, ext=file_ext) + standard_columns = self.load_standard_columns(targ) + else: + # Initialize the standard columns table + pass + + merged = None + if len(target_ids) > 0: + if merge_exists: + # Download existing merged file and append new data to the end + self.log_info(resource, f"Downloading merged file {target_ids[0]}") + targ = download(connector, host, secret_key, target_ids[0], ext=file_ext) + source_data = self.load_tabular_data(targ) + new_data = self.load_tabular_data(inputfile) + + self.log_info(resource, f"Appending new file") + source_data.rename(columns=standard_columns, inplace=True) + new_data.rename(columns=standard_columns, inplace=True) + merged = pd.concat([source_data, new_data]) + else: + # Iterate through all files with this extension and merge them + column_set = 1 + column_sets = {} + + for targ_id in target_ids: + self.log_info(resource, f"Downloading file {targ_id}") + targ = download(connector, host, secret_key, targ_id, ext=file_ext) + source_data = self.load_tabular_data(targ) + + if cols_id is None: + # Stash the column names for initializing columns file later + columns = sorted(source_data.columns) + for i in column_sets: + if column_sets[i] == columns: + exists = i + if exists is None: + column_sets[column_set] = columns + column_set += 1 + else: + # Perform renaming of existing data columns otherwise + source_data.rename(columns=standard_columns, inplace=True) + if merged is not None: + merged = pd.concat([merged, source_data]) + else: + merged = source_data + + if file_ext == ".tsv": + merged.to_csv(merged_output, sep="\t") + elif file_ext == ".xlsx": + merged.to_excel(merged_output) + else: + merged.to_csv(merged_output) + + # Finally, upload the newly merged file + file_id = upload_to_dataset(connector, host, secret_key, dataset_id, merged_output, check_duplicate=False) + if merge_exists: + # v2 can update existing file, but v1 must delete existing file + if file_id != target_ids[0]: + self.log_info(resource, f"Deleting previous version of file: {target_ids[0]}") + self.log_info(resource, f"Uploaded {merged_output}: {file_id}") + + +if __name__ == "__main__": + extractor = CSVConcatenator() + extractor.start() diff --git a/csv-concatenator/extractor_info.json b/csv-concatenator/extractor_info.json new file mode 100644 index 0000000..3483304 --- /dev/null +++ b/csv-concatenator/extractor_info.json @@ -0,0 +1,27 @@ +{ + "@context": "https://clowderframework.org/contexts/extractors.jsonld", + "name": "ncsa.csv.concatenator", + "version": "1.0.0", + "description": "Automatically concatenate tabular data into an ongoing merged file.", + "author": "Max Burnette ", + "contexts": [], + "repository": [], + "external_services": [], + "process": { + "file": [ + "text/*", + "application/excel", + "application/vnd_ms-excel", + "application/vnd_ms-excel_sheet_macroEnabled_12", + "application/vnd_ms-excel_addin_macroEnabled_12", + "application/vnd_ms-excel_sheet_binary_macroEnabled_12" + ] + }, + "dependencies": [], + "bibtex": [ + "" + ], + "labels": [ + "Type/Text" + ] +} diff --git a/csv-concatenator/requirements.txt b/csv-concatenator/requirements.txt new file mode 100644 index 0000000..d5f92cd --- /dev/null +++ b/csv-concatenator/requirements.txt @@ -0,0 +1 @@ +pyclowder==3.0.4 From fe393da728d7e7d51e138712fa3551158ad35287 Mon Sep 17 00:00:00 2001 From: Max Burnette Date: Tue, 10 Oct 2023 10:45:59 -0500 Subject: [PATCH 2/5] logic cleanup --- csv-concatenator/Dockerfile | 9 ++-- csv-concatenator/concatenator.py | 71 ++++++++++++++++++++----------- csv-concatenator/requirements.txt | 4 +- 3 files changed, 54 insertions(+), 30 deletions(-) diff --git a/csv-concatenator/Dockerfile b/csv-concatenator/Dockerfile index cef843f..24a3f65 100644 --- a/csv-concatenator/Dockerfile +++ b/csv-concatenator/Dockerfile @@ -1,7 +1,10 @@ -ARG PYCLOWDER_PYTHON="" -FROM clowder/pyclowder${PYCLOWDER_PYTHON}:onbuild +FROM python:3.8 ENV MAIN_SCRIPT="concatenator.py" \ RABBITMQ_QUEUE="" -ONBUILD COPY extractor_info.json /home/clowder/ +COPY requirements.txt ./ +RUN pip install -r requirements.txt + +COPY concatenator.py extractor_info.json ./ +CMD python concatenator.py diff --git a/csv-concatenator/concatenator.py b/csv-concatenator/concatenator.py index cde2d95..1ac1975 100644 --- a/csv-concatenator/concatenator.py +++ b/csv-concatenator/concatenator.py @@ -2,13 +2,13 @@ import logging import csv -import io +import os import pandas as pd from pyclowder.extractors import Extractor from pyclowder.utils import CheckMessage from pyclowder.datasets import get_file_list -from pyclowder.files import download, upload_to_dataset +from pyclowder.files import download, upload_to_dataset, delete class CSVConcatenator(Extractor): @@ -57,67 +57,77 @@ def load_tabular_data(self, data_file): def check_message(self, connector, host, secret_key, resource, parameters): # Don't download file if there isn't at least one other CSV to concatenate + #host = "http://host.docker.internal:8000" # TODO: Remove - dataset_id = resource["parent_dataset_id"] + dataset_id = resource["parent"]["id"] file_ext = resource["file_ext"] + if resource['name'] == self.merged_file + file_ext: + connector.message_process(resource, "Filename matches concatenation output; ignoring file.") + return CheckMessage.ignore + # Check whether the dataset includes another CSV all_files = get_file_list(connector, host, secret_key, dataset_id) for f in all_files: - fname = f['filename'] - if fname.endswith(file_ext) and fname != resource['name']: + fname = f['name'] + if fname.endswith(file_ext) and f['id'] != resource['id']: return CheckMessage.download + connector.message_process(resource, "No concatenation targets found.") return CheckMessage.ignore def process_message(self, connector, host, secret_key, resource, parameters): # Process the file and upload the results + #host = "http://host.docker.internal:8000" # TODO: Remove inputfile = resource["local_paths"][0] dataset_id = resource["parent"]["id"] file_ext = resource["file_ext"] - merged_output = f"{self.merged_file}{file_ext}" + merged_output = self.merged_file + file_ext # Determine which CSV to append to and whether there are column mappings all_files = get_file_list(connector, host, secret_key, dataset_id) cols_id = None for f in all_files: - fname = f['filename'] - if fname == f"{self.columns_file}{file_ext}": + fname = f['name'] + if fname == self.columns_file + file_ext: cols_id = f['id'] break target_ids = [] merge_exists = False for f in all_files: - fname = f['filename'] + fname = f['name'] if fname == merged_output: target_ids = [f['id']] merge_exists = True break - elif fname.endswith(file_ext): + elif fname.endswith(file_ext) and f['id'] != resource['id']: # If we don't find an existing merged file, we will merge all with this extension target_ids.append(f['id']) if cols_id is not None: - self.log_info(resource, f"Loading {self.columns_file}{file_ext}") + connector.message_process(resource, "Loading " + self.columns_file + file_ext) targ = download(connector, host, secret_key, cols_id, ext=file_ext) standard_columns = self.load_standard_columns(targ) else: # Initialize the standard columns table - pass + standard_columns = {} merged = None if len(target_ids) > 0: + # Load the just-uploaded file data + new_data = self.load_tabular_data(inputfile) + new_data.rename(columns=standard_columns, inplace=True) + if merge_exists: # Download existing merged file and append new data to the end - self.log_info(resource, f"Downloading merged file {target_ids[0]}") + connector.message_process(resource, "Downloading merged file %s" % target_ids[0]) targ = download(connector, host, secret_key, target_ids[0], ext=file_ext) source_data = self.load_tabular_data(targ) new_data = self.load_tabular_data(inputfile) - self.log_info(resource, f"Appending new file") + connector.message_process(resource, "Appending new file") source_data.rename(columns=standard_columns, inplace=True) - new_data.rename(columns=standard_columns, inplace=True) merged = pd.concat([source_data, new_data]) else: # Iterate through all files with this extension and merge them @@ -125,13 +135,14 @@ def process_message(self, connector, host, secret_key, resource, parameters): column_sets = {} for targ_id in target_ids: - self.log_info(resource, f"Downloading file {targ_id}") + connector.message_process(resource, "Downloading file %s" % targ_id) targ = download(connector, host, secret_key, targ_id, ext=file_ext) source_data = self.load_tabular_data(targ) if cols_id is None: # Stash the column names for initializing columns file later columns = sorted(source_data.columns) + exists = None for i in column_sets: if column_sets[i] == columns: exists = i @@ -141,25 +152,33 @@ def process_message(self, connector, host, secret_key, resource, parameters): else: # Perform renaming of existing data columns otherwise source_data.rename(columns=standard_columns, inplace=True) - if merged is not None: - merged = pd.concat([merged, source_data]) - else: - merged = source_data + + if merged is not None: + merged = pd.concat([merged, source_data]) + else: + merged = source_data + + # Finally, merge the newly uploaded file + merged = pd.concat([merged, new_data]) if file_ext == ".tsv": - merged.to_csv(merged_output, sep="\t") + merged.to_csv(merged_output, sep="\t", index=False) elif file_ext == ".xlsx": - merged.to_excel(merged_output) + merged.to_excel(merged_output, index=False) else: - merged.to_csv(merged_output) + merged.to_csv(merged_output, index=False) # Finally, upload the newly merged file file_id = upload_to_dataset(connector, host, secret_key, dataset_id, merged_output, check_duplicate=False) if merge_exists: - # v2 can update existing file, but v1 must delete existing file + # TODO: v2 can update existing concatenated file instead of doing this delete & replace if file_id != target_ids[0]: - self.log_info(resource, f"Deleting previous version of file: {target_ids[0]}") - self.log_info(resource, f"Uploaded {merged_output}: {file_id}") + connector.message_process(resource, "Deleting previous version of file: %s" % target_ids[0]) + delete(connector, host, secret_key, target_ids[0]) + connector.message_process(resource, "Uploaded %s: %s" % (merged_output, file_id)) + + # Delete local copies of files in the container + os.remove(merged_output) if __name__ == "__main__": diff --git a/csv-concatenator/requirements.txt b/csv-concatenator/requirements.txt index d5f92cd..f8cb2f2 100644 --- a/csv-concatenator/requirements.txt +++ b/csv-concatenator/requirements.txt @@ -1 +1,3 @@ -pyclowder==3.0.4 +pyclowder==3.0.6 +pandas +openpyxl From 5d6ef7a0719964b950df0245206d8c6be63844d1 Mon Sep 17 00:00:00 2001 From: Max Burnette Date: Wed, 11 Oct 2023 09:14:57 -0500 Subject: [PATCH 3/5] Clean up handling of columns file --- csv-concatenator/concatenator.py | 64 ++++++++++++++++++++++++++------ 1 file changed, 53 insertions(+), 11 deletions(-) diff --git a/csv-concatenator/concatenator.py b/csv-concatenator/concatenator.py index 1ac1975..cad0f56 100644 --- a/csv-concatenator/concatenator.py +++ b/csv-concatenator/concatenator.py @@ -65,6 +65,9 @@ def check_message(self, connector, host, secret_key, resource, parameters): if resource['name'] == self.merged_file + file_ext: connector.message_process(resource, "Filename matches concatenation output; ignoring file.") return CheckMessage.ignore + if resource['name'] == self.columns_file + file_ext: + connector.message_process(resource, "Filename matches columns lookup; ignoring file.") + return CheckMessage.ignore # Check whether the dataset includes another CSV all_files = get_file_list(connector, host, secret_key, dataset_id) @@ -83,13 +86,14 @@ def process_message(self, connector, host, secret_key, resource, parameters): dataset_id = resource["parent"]["id"] file_ext = resource["file_ext"] merged_output = self.merged_file + file_ext + columns_output = self.columns_file + file_ext # Determine which CSV to append to and whether there are column mappings all_files = get_file_list(connector, host, secret_key, dataset_id) cols_id = None for f in all_files: fname = f['name'] - if fname == self.columns_file + file_ext: + if fname == columns_output: cols_id = f['id'] break @@ -101,12 +105,12 @@ def process_message(self, connector, host, secret_key, resource, parameters): target_ids = [f['id']] merge_exists = True break - elif fname.endswith(file_ext) and f['id'] != resource['id']: + elif fname.endswith(file_ext) and f['id'] != resource['id'] and fname != columns_output: # If we don't find an existing merged file, we will merge all with this extension target_ids.append(f['id']) if cols_id is not None: - connector.message_process(resource, "Loading " + self.columns_file + file_ext) + connector.message_process(resource, "Loading " + columns_output) targ = download(connector, host, secret_key, cols_id, ext=file_ext) standard_columns = self.load_standard_columns(targ) else: @@ -124,10 +128,11 @@ def process_message(self, connector, host, secret_key, resource, parameters): connector.message_process(resource, "Downloading merged file %s" % target_ids[0]) targ = download(connector, host, secret_key, target_ids[0], ext=file_ext) source_data = self.load_tabular_data(targ) - new_data = self.load_tabular_data(inputfile) + source_data.rename(columns=standard_columns, inplace=True) + # Combine any same-named columns after the rename + source_data = source_data.groupby(source_data.columns, axis=1).sum() connector.message_process(resource, "Appending new file") - source_data.rename(columns=standard_columns, inplace=True) merged = pd.concat([source_data, new_data]) else: # Iterate through all files with this extension and merge them @@ -138,6 +143,7 @@ def process_message(self, connector, host, secret_key, resource, parameters): connector.message_process(resource, "Downloading file %s" % targ_id) targ = download(connector, host, secret_key, targ_id, ext=file_ext) source_data = self.load_tabular_data(targ) + source_data.rename(columns=standard_columns, inplace=True) if cols_id is None: # Stash the column names for initializing columns file later @@ -149,15 +155,24 @@ def process_message(self, connector, host, secret_key, resource, parameters): if exists is None: column_sets[column_set] = columns column_set += 1 - else: - # Perform renaming of existing data columns otherwise - source_data.rename(columns=standard_columns, inplace=True) if merged is not None: merged = pd.concat([merged, source_data]) else: merged = source_data + # Store newly uploaded file columns last, so they get choice preference + if cols_id is None: + # Stash the column names for initializing columns file later + columns = sorted(new_data.columns) + exists = None + for i in column_sets: + if column_sets[i] == columns: + exists = i + if exists is None: + column_sets[column_set] = columns + column_set += 1 + # Finally, merge the newly uploaded file merged = pd.concat([merged, new_data]) @@ -168,8 +183,38 @@ def process_message(self, connector, host, secret_key, resource, parameters): else: merged.to_csv(merged_output, index=False) + if cols_id is None: + # Restructure data for building CSV + unique_cols = [] + col_csv_rows = [] + for i in column_sets: + for col_name in column_sets[i]: + if col_name not in unique_cols: + curr_row = {i: col_name} + for j in column_sets: + if i != j: + curr_row[j] = col_name if col_name in column_sets[j] else "" + unique_cols.append(col_name) + col_csv_rows.append(curr_row) + + # Initialize the columns file for future runs + with open(columns_output, 'w') as out: + # Header + col_strs = [str(x) for x in list(column_sets.keys())] + out.write(",".join(col_strs)+'\n') + for r in col_csv_rows: + row_vals = [] + for i in column_sets: + row_vals.append(r[i]) + out.write(",".join(row_vals)+'\n') + + # Upload the columns file + upload_to_dataset(connector, host, secret_key, dataset_id, columns_output, check_duplicate=False) + os.remove(columns_output) + # Finally, upload the newly merged file file_id = upload_to_dataset(connector, host, secret_key, dataset_id, merged_output, check_duplicate=False) + os.remove(merged_output) if merge_exists: # TODO: v2 can update existing concatenated file instead of doing this delete & replace if file_id != target_ids[0]: @@ -177,9 +222,6 @@ def process_message(self, connector, host, secret_key, resource, parameters): delete(connector, host, secret_key, target_ids[0]) connector.message_process(resource, "Uploaded %s: %s" % (merged_output, file_id)) - # Delete local copies of files in the container - os.remove(merged_output) - if __name__ == "__main__": extractor = CSVConcatenator() From 936a39d8b0f9ce6fb7d13df8f68a6afd0bbcda85 Mon Sep 17 00:00:00 2001 From: Chen Wang Date: Mon, 6 Nov 2023 15:16:55 -0600 Subject: [PATCH 4/5] add to github action --- .github/workflows/docker.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index c73021b..63b99be 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -36,6 +36,9 @@ jobs: - name: word-cloud-extractor FOLDER: word-cloud-extractor PLATFORM: "linux/amd64,linux/arm64" + - name: ncsa.csv.concatenator + FOLDER: csv-concatenator + PLATFORM: "linux/amd64,linux/arm64" steps: - uses: actions/checkout@v2 From f4bbbd748f1fc55666e85e8073bf32a6aa6a749c Mon Sep 17 00:00:00 2001 From: Max Burnette Date: Tue, 7 Nov 2023 08:00:30 -0600 Subject: [PATCH 5/5] small updates --- csv-concatenator/concatenator.py | 5 +++++ csv-concatenator/extractor_info.json | 2 +- csv-concatenator/requirements.txt | 2 +- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/csv-concatenator/concatenator.py b/csv-concatenator/concatenator.py index cad0f56..67111ea 100644 --- a/csv-concatenator/concatenator.py +++ b/csv-concatenator/concatenator.py @@ -11,6 +11,11 @@ from pyclowder.files import download, upload_to_dataset, delete +# TODO: Provide regex rules for which outputs go where +# TODO: concatenation_rules.csv that has the regex mappings +# TODO: Dataset-level extractor, manually submit extractions only (by default) +# TODO: Add a ColumnName column that lets them explicitly change canonical name + class CSVConcatenator(Extractor): """Create and upload image thumbnail and image preview.""" diff --git a/csv-concatenator/extractor_info.json b/csv-concatenator/extractor_info.json index 3483304..15756b6 100644 --- a/csv-concatenator/extractor_info.json +++ b/csv-concatenator/extractor_info.json @@ -5,7 +5,7 @@ "description": "Automatically concatenate tabular data into an ongoing merged file.", "author": "Max Burnette ", "contexts": [], - "repository": [], + "repository": ["https://github.com/clowder-framework/clowder2-demos"], "external_services": [], "process": { "file": [ diff --git a/csv-concatenator/requirements.txt b/csv-concatenator/requirements.txt index f8cb2f2..cba7398 100644 --- a/csv-concatenator/requirements.txt +++ b/csv-concatenator/requirements.txt @@ -1,3 +1,3 @@ -pyclowder==3.0.6 +pyclowder>=3.0.7 pandas openpyxl