In [1]:
# Read file into json rows

import json

def run(file_name:str, print_lines:bool=False) -> None:
    """
    From JSON row file read each row

    Args:
        file_name (str): file path to read
        print_lines (bool): print lines from file to console.
    """
    # track the row count
    line_num = 1        # Current line number
    ok_count = 0        # Rows that didn't return an error
    reject_count = 0    # Rows that did return an error
    with open(file_name, "r") as json_file:
        for line in json_file:
            try:
                # Read lines into a dict
                row = json.loads(line.strip())
                # Print lines
                if print_lines:
                    print(f"[{line_num:04d} [OK]: {row}")
                # Line count update
                ok_count += 1
            except Exception as err:
                # print row with error
                print(f"[{line_num:04d}][ERROR:] {str(err)}, [DATA]: {row}")
                reject_count += 1
            finally:
                # Update line count regardless if exception or not
                line_num += 1
        # Print line count summary
    print(f"Read {line_num - 1} rows")
    print(f"OK rows: {ok_count:04d}, Rejected rows: {reject_count:04d}") 

# Call the function for a test
file_path = "./data/profiles2.json"
run(file_path, print_lines=False)

Read 10 rows
OK rows: 0010, Rejected rows: 0000


In [2]:
# add meta data columns

from datetime import datetime
import shortuuid

# get a unique ID for this ETL batch (script)
BATCH_ID = shortuuid.uuid()

def add_metadata(row:dict) -> None:
    """
    Adds ETL metadata columns to a row. Follwing metadata columns 
    are added:
        - modified_timestamp: current datetime
        - batch_id: unique ETL batch_id
        - tags: a key/value dict to store various data tags
        - tags.errors: a list of error message encountered while processing this row

    Args:
        row (dict): data row
    """
    global BATCH_ID
    now_utc = datetime.utcnow()
    # add timestamps for when we have processed this row
    row["modified_timestamp"] = now_utc
    # add the ETL (script) batch_id
    row["batch_id"] = BATCH_ID
    # a series of additional processing tags 
    row["tags"] = {
        # other tags can go here, for example:
        "security_level": "high",
        "allow_user_groups": ["admin",]
    }