In [278]:
import pandas as pd
import os
import json

In [279]:
# read in schema field df, and create list of valid file names
schema_field_df = pd.read_csv("specification/schema-field.csv")
valid_file_names = schema_field_df["schema"].drop_duplicates().to_list()

In [280]:
# inputs
dir_name = input("Enter the name of the directory to check: ")
save_results = input("Would you like to save lists of all files with extra or missing fields? (y/n): ")
remove_changes = input("Would you like to remove extra fields? (y/n)")
remove_only_empty = input("When removing fields would you like to remove only those with no values? (y/n)")
add_changes = input("Would you like to add in missing fields? (y/n)")

In [290]:
# functions
def get_schema_fields(file_name):
    
    schema_fields = schema_field_df[schema_field_df["schema"] == file_name]["field"].to_list()
    return schema_fields

def check_for_extra_fields(df, file_name):

    # filter schema fields csv to file name and create list
    schema_fields = get_schema_fields(file_name)
    df_fields = df.columns.to_list()

    if len(schema_fields) == 0:
        print(f"Error: schema for '{file_name}' not found in schema-field.csv")
        raise ValueError(f"Error: schema not found in schema-field.csv")

    # find df fields not in schema list
    extra_fields = list(set(df_fields).difference(schema_fields))

    results = []
    # save a dict of field name and n non-null values
    for field in extra_fields:

        results.append(dict({
            "field_name" : field,
            "n_values" : len(df[df[field].notnull()])
        }))

    return results


def check_for_missing_fields(df, file_name):

    # filter schema fields csv to file name and create list
    schema_fields = schema_field_df[schema_field_df["schema"] == file_name]["field"].to_list()
    df_fields = df.columns.to_list()

    if len(schema_fields) == 0:
        print(f"Error: schema for '{file_name}' not found in schema-field.csv")
        raise ValueError(f"Error: schema not found in schema-field.csv")

    # find df fields not in schema list
    missing_fields = list(set(schema_fields).difference(df_fields))

    return missing_fields
    
# test = pd.read_csv("pipeline_test/ancient-woodland/column.csv")
# check_for_extra_fields(test, "column")

In [282]:
# check for any files with extra fields

results_extra_fields = []

for path,subdir,files in os.walk(dir_name):
   for file in files:    
       
       collection_name = path.split("/")[-1]
       file_name = file.split(".")[0]

       if file.lower().endswith('.csv') & any(file_name.lower() in fn for fn in valid_file_names):
          
          try:
             df = pd.read_csv(os.path.join(path, file), dtype = str)
             
             extra_fields = check_for_extra_fields(df, file_name)

             if len(extra_fields) > 0:

               results_extra_fields.append(dict({
                  "file_path" : os.path.join(path, file),
                  "extra_fields" : extra_fields
               }))

          except pd.errors.EmptyDataError:
             print(f"Empty CSV file: {file}")
          except pd.errors.ParserError:
             print(f"Error reading CSV file: {file}")


if save_results.lower() == "y":
   with open(dir_name + "-files_extra_fields.json", "w") as f:
      json.dump(results_extra_fields, f , indent = 2)

In [283]:
# remove all extra fields (or EMPTY fields only depending on remove_only_empty)

if remove_changes.lower() == "y":
    for file in results_extra_fields:
        print("Reading file: " + file["file_path"])
        df = pd.read_csv(file["file_path"], dtype = str)

        for field in file["extra_fields"]:

            if remove_only_empty.lower() == "y":
                if field["n_values"] == 0:
                    df.drop(field["field_name"], axis = 1, inplace = True)
                    print("Dropped field: " + field["field_name"])

            else:
                df.drop(field["field_name"], axis = 1, inplace = True)
                print("Dropped field: " + field["field_name"])
        
        df.to_csv(file["file_path"], index = False)
        # print("Overwritten file: " + file["file_path"])
        print("-" * 50)


Reading file: pipeline/local-nature-reserve/convert.csv
Dropped field: script
--------------------------------------------------
Reading file: pipeline/local-nature-reserve/transform.csv
Dropped field: pipeline
--------------------------------------------------
Reading file: pipeline/design-code/convert.csv
Dropped field: script
--------------------------------------------------
Reading file: pipeline/local-authority-district/convert.csv
Dropped field: script
Dropped field: pipeline
--------------------------------------------------
Reading file: pipeline/local-authority-district/patch.csv
Dropped field: pipeline
--------------------------------------------------
Reading file: pipeline/local-authority-district/concat.csv
Dropped field: pipeline
--------------------------------------------------
Reading file: pipeline/local-authority-district/transform.csv
Dropped field: pipeline
--------------------------------------------------
Reading file: pipeline/local-authority-district/skip.csv


In [284]:
# check for any files with extra fields

results_missing_fields = []

for path,subdir,files in os.walk(dir_name):
   for file in files:    
       
       collection_name = path.split("/")[-1]
       file_name = file.split(".")[0]

       if file.lower().endswith('.csv') & any(file_name.lower() in fn for fn in valid_file_names):
          
          try:
             df = pd.read_csv(os.path.join(path, file), dtype = str)
             
             missing_fields = check_for_missing_fields(df, file_name)

             if len(missing_fields) > 0:

               results_missing_fields.append(dict({
                  "file_path" : os.path.join(path, file),
                  "missing_fields" : missing_fields
               }))

          except pd.errors.EmptyDataError:
             print(f"Empty CSV file: {file}")
          except pd.errors.ParserError:
             print(f"Error reading CSV file: {file}")

# pd.DataFrame.from_dict(results_extra_fields).to_csv(dir_name + "-consistency_check_results.csv", index = False)

if save_results.lower() == "y":
   with open(dir_name + "-files_missing_fields.json", "w") as f:
      json.dump(results_missing_fields, f , indent = 2)

In [285]:
# add in all missing fields

if add_changes.lower() == "y":
    for file in results_missing_fields:
        print("Reading file: " + file["file_path"])
        df = pd.read_csv(file["file_path"], dtype = str)
        
        # add empty fields for all missing
        df[file["missing_fields"]] = None
        print("Added fields: " + ", ".join(file["missing_fields"]))

        df.to_csv(file["file_path"], index = False)
        print("Overwritten file: " + file["file_path"])
        print("-" * 50)


Reading file: pipeline/local-nature-reserve/default-value.csv
Added fields: entry-number, resource, start-date, end-date, entry-date
Overwritten file: pipeline/local-nature-reserve/default-value.csv
--------------------------------------------------
Reading file: pipeline/local-nature-reserve/combine.csv
Added fields: resource
Overwritten file: pipeline/local-nature-reserve/combine.csv
--------------------------------------------------
Reading file: pipeline/local-nature-reserve/column.csv
Added fields: start-date, end-date, entry-date, endpoint
Overwritten file: pipeline/local-nature-reserve/column.csv
--------------------------------------------------
Reading file: pipeline/local-nature-reserve/convert.csv
Added fields: plugin, start-date, end-date, entry-date, endpoint, parameters
Overwritten file: pipeline/local-nature-reserve/convert.csv
--------------------------------------------------
Reading file: pipeline/local-nature-reserve/default.csv
Added fields: entry-number, endpoint
O

In [289]:
# # generate table of all collections, files and fields for cross-comparison

# results = dict({"collection" : [],
#                 "file" : [],
#                 "field" : []})

# for path,subdir,files in os.walk("./pipeline"):
#    for file in files:    
       
#        collection_name = path.split("/")[-1]
#        file_name = file.split(".")[0]

#        if file.lower().endswith('.csv'):
#           try:
#             #  print(os.path.join(path, file))
#              df = pd.read_csv(os.path.join(path, file))
#              fields = df.columns.values

#              for field_name in fields:

#                 results["collection"].append(collection_name)
#                 results["file"].append(file_name)
#                 results["field"].append(field_name)
             

#           except pd.errors.EmptyDataError:
#              print(f"Empty CSV file: {file}")
#           except pd.errors.ParserError:
#              print(f"Error reading CSV file: {file}")

# pd.DataFrame.from_dict(results).groupby(["file", "field"]).size().reset_index(name = "count").to_csv("temp_file_field_check.csv", index=False)
