In [26]:
import pandas as pd
import json
import datetime
import os

In [27]:
# The following values can be loaded from a SQL DB parameters table

filename = "tech_assessment_transactions"

# Specify the path to your JSON file
json_file_path = "/drive/Snoop Tech Assessment/Raw/"

archive_file_path = f"/drive/Snoop Tech Assessment/Archive/{datetime.date.today()}/"

# Specify list of allowed currencies
list_allowed_currencies = ['USD', 'GBP', 'EUR']

### json to DataFrame

In [36]:
def json_to_dataframe(filename, json_file_path, archive_file_path):
    """
    Reads a JSON file, normalizes its data into a Pandas DataFrame, adds metadata columns, 
    and archives the resulting DataFrame as a CSV file.

    Parameters:
    - filename (str): The base name of the JSON file (excluding the extension).
    - json_file_path (str): The directory path where the JSON file is located.
    - archive_file_path (str): The directory path where the CSV archive file will be saved.

    Returns:
    pandas.DataFrame: A DataFrame containing normalized data from the JSON file, including
    a primaryKey combining 'customerId' and 'transactionId', and an 'auditDate' column 
    indicating the current timestamp.

    Example:
    ```python
    json_to_dataframe("example_data", "/path/to/json/files/", "/path/to/archive/")
    ```

    Note:
    - The 'transactions' key is assumed to be present in the JSON file structure.
    - The CSV archive file is saved with the same base name as the JSON file in the specified archive directory.
    """

    # Creating raw path including file name and extension
    raw_full_path = json_file_path + filename + ".json"
    
    # Read the JSON file directly into a dataframe
    json_data = pd.read_json(raw_full_path)

    df_normalized = pd.json_normalize(json_data['transactions'])

    # Create primaryKey to identify records and check for unique values
    df_normalized['primaryKey'] = df_normalized['customerId'] + '_' + df_normalized['transactionId']

    # Adding current timestamp column
    df_normalized['auditDate'] = pd.to_datetime(datetime.datetime.now())

    # Create new path if it does not exist
    if not os.path.exists(archive_file_path):
        os.mkdir(archive_file_path)

    # Creating archive path including file name and extension
    archive_full_path = archive_file_path + filename + ".csv"
    
    # Archive converted json data
    df_normalized.to_csv(archive_full_path)
    
    return df_normalized

### Duplicate Check

In [29]:
def duplicate_check(df_normalized):
    """
    Performs a duplicate check on a DataFrame based on the 'primaryKey' column.
    
    Parameters:
    - df_normalized (pandas.DataFrame): The DataFrame containing normalized data,
      typically generated by the json_to_dataframe function.

    Returns:
    Tuple[str, pandas.DataFrame]: A tuple containing a message indicating the result
    of the duplicate check and a DataFrame containing records with duplicate primary keys.
    The message can be "Duplicate check passed." if no duplicates are found, or
    "Duplicate check failed. Record count: <count>" if duplicates are detected.
    The DataFrame includes records with duplicate primary keys, selecting the
    ones with the lower 'sourceDate' value.

    Example:
    ```python
    message, duplicate_records = duplicate_check(df_normalized)
    print(message)
    print(duplicate_records)
    ```

    Note:
    - The function assumes the presence of a 'primaryKey' column in the DataFrame.
    - If duplicates are found, the returned DataFrame includes the selected records
      with the lower 'sourceDate' value and an additional 'quality_check' column.
    """
    
    # Check for duplicates based on the primary key
    duplicates = df_normalized[df_normalized.duplicated(subset='primaryKey', keep=False)]
    
    # If duplicates exist, select records with the lower sourceDate value
    if not duplicates.empty:
        # Convert SourceDate to datetime for proper comparison
        df_normalized['sourceDate'] = pd.to_datetime(df_normalized['sourceDate'])
        
        # Sort by SourceDate and keep the first record for each duplicate key
        df_duplicate_check = duplicates.sort_values(by=['primaryKey', 'sourceDate']).drop_duplicates(subset='primaryKey', keep='first')
        df_duplicate_check['quality_check'] = 'Duplicate check'
        
        return_message_duplicate_check = f"Duplicate check failed. Record count: {df_duplicate_check.count()}"
    else:
        return_message_duplicate_check = "Duplicate check passed."
        df_duplicate_check = pd.DataFrame()

    return return_message_duplicate_check, df_duplicate_check

### Currency Check

In [30]:
def currency_check(df_normalized, list_allowed_currencies):
    """
    Performs a currency check on a DataFrame, filtering out records with currencies not
    present in the specified list of allowed currencies.

    Parameters:
    - df_normalized (pandas.DataFrame): The DataFrame containing normalized data,
      typically generated by the json_to_dataframe function.
    - list_allowed_currencies (list): A list of allowed currency codes.

    Returns:
    Tuple[str, pandas.DataFrame]: A tuple containing a message indicating the result
    of the currency check and a DataFrame containing records with excluded currencies.
    The message can be "Currency check passed." if no excluded currencies are found, or
    "Currency check failed. Record count: <count>" if records with excluded currencies exist.
    The DataFrame includes records with excluded currencies and an additional 'qualityCheck'
    column indicating the failed currency check.

    Example:
    ```python
    message, excluded_records = currency_check(df_normalized, ['USD', 'EUR', 'GBP'])
    print(message)
    print(excluded_records)
    ```

    Note:
    - The function assumes the presence of a 'currency' column in the DataFrame.
    - The returned DataFrame includes records with currencies not present in the list of allowed currencies.
    - The 'qualityCheck' column is added to indicate the failed currency check.
    """

    # filter normalized 
    df_excluded_currencies = df_normalized[~df_normalized['currency'].isin(list_allowed_currencies)]

    if not df_excluded_currencies.empty:
        return_message_currency_check = f"Currency check failed. Record count: {df_excluded_currencies.count()}"
        df_excluded_currencies['qualityCheck'] = 'Currency check' 
    else:
        return_message_currency_check = "Currency check passed."
        df_excluded_currencies = pd.DataFrame()
    return return_message_currency_check, df_excluded_currencies

### Date check

In [31]:
def date_check(df_normalized):
    """
    Performs a date check on a DataFrame, verifying if the 'transactionDate' column
    has values in the format 'yyyy-MM-dd'.

    Parameters:
    - df_normalized (pandas.DataFrame): The DataFrame containing normalized data,
      typically generated by the json_to_dataframe function.

    Returns:
    Tuple[str, pandas.DataFrame]: A tuple containing a message indicating the result
    of the date check and a DataFrame containing records with incorrectly formatted dates.
    The message can be "Date check passed." if all dates follow the correct format, or
    "Date check failed. Record count: <count>" if records with incorrect date formats exist.
    The DataFrame includes records with incorrectly formatted dates and an additional
    'qualityCheck' column indicating the failed date check.

    Example:
    ```python
    message, incorrect_date_records = date_check(df_normalized)
    print(message)
    print(incorrect_date_records)
    ```

    Note:
    - The function assumes the presence of a 'transactionDate' column in the DataFrame.
    - Dates that don't follow the correct format will be considered as Null.
    - The 'qualityCheck' column is added to indicate the failed date check.
    """
    
    # Check if the 'Date' column has values in the format 'yyyy-MM-dd'
    df_normalized['transactionDate_check'] = pd.to_datetime(df_normalized['transactionDate'], format='%Y-%m-%d', errors='coerce')

    # dates that don't follow the correct format will be Null
    df_date_check = df_normalized[df_normalized['transactionDate'].isnull()]
     
    if not df_date_check.empty:
        return_message_date_check = f"Date check failed. Record count: {df_date_check.count()}"
        df_date_check['qualityCheck'] = 'Date check'
    else:
        return_message_date_check = "Date check passed."
        df_date_check = pd.DataFrame()
        
        
    return return_message_date_check, df_date_check

### Upsert Datamart

In [47]:
def upsert_datamart(filename, datamart_filepath, df, primary_key):
    """
    Upserts a DataFrame into a datamart, performing an update or insert operation based on a primary key.

    Parameters:
    - filename (str): The base name of the datamart file (excluding the extension).
    - datamart_filepath (str): The directory path where the datamart file will be stored.
    - df (pandas.DataFrame): The DataFrame to be upserted into the datamart.
    - primary_key (str): The column in the DataFrame used as the primary key for the upsert operation.

    Returns:
    None

    Example:
    ```python
    upsert_datamart("example_datamart", "/path/to/datamart/files/", df_example, 'ID')
    ```

    Note:
    - If the specified datamart file does not exist, a new file is created.
    - The function uses the specified primary key column to identify and upsert records.
    - The DataFrame is upserted into the datamart, and the updated datamart is saved to the file.
    """
    
    # Create new path if it does not exist
    if not os.path.exists(datamart_filepath):
        os.mkdir(datamart_filepath)

    # Creating file path including file name and extension
    datamart_full_path = datamart_filepath + filename + ".csv"
    
    try:
        # Read existing datamart from file
        datamart = pd.read_csv(datamart_full_path, index_col=primary_key)
    except FileNotFoundError:
        # If the file doesn't exist, create an empty datamart
        datamart = pd.DataFrame()

    for _, row in df.iterrows():
        key_value = row[primary_key]

        if key_value in datamart.index:
            # Update existing row
            datamart.loc[key_value] = row
        else:
            # Insert new row
            datamart = pd.concat([datamart, row.to_frame().T])
    
    # Save the updated datamart to the file
    datamart.to_csv(datamart_full_path)
    
    print(f"DataFrame upserted into the mock datamart at {datamart_full_path}")

### Error logging

In [33]:
def error_logging(df1, df2, df3, error_log_path):
    """
    Logs error information from three DataFrames into an error log file.

    Parameters:
    - df1, df2, df3: Pandas DataFrames representing error information.
    - error_log_path (str): The file path to the error log file.

    Returns:
    None

    Example:
    ```python
    error_logging(df_errors1, df_errors2, df_errors3, "/path/to/error/log.csv")
    ```

    Note:
    - The function performs a union operation on the provided DataFrames and the existing error log.
    - If the error log file doesn't exist, a new file is created.
    - The resulting DataFrame is written to the error log file in CSV format.
    - The 'ignore_index=True' parameter ensures a continuous index in the unioned DataFrame.
    - The function does not handle or capture specific error information; it's designed for logging general error data.
    """
    
    # Read existing logs
    try:
        df_existing_log = pd.read_csv(error_log_path)
    except FileNotFoundError:
        # If the file doesn't exist, create an empty datamart
        df_existing_log = pd.DataFrame()
        
    # Perform union operation using concat
    unioned_df = pd.concat([df_existing_log, df1, df2, df3], ignore_index=True)

    # Write the unioned DataFrame to CSV
    unioned_df.to_csv(error_log_path, index=False)

    print(f"Unioned DataFrame written to {error_log_path}")

### Data Quality Checks

In [52]:
# Call the function to convert JSON to dataframe
df_normalized = json_to_dataframe(filename, json_file_path, archive_file_path)

# Call the function for duplicate check
return_message_duplicate_check, df_duplicate_check = duplicate_check(df_normalized)
print(return_message_duplicate_check)

# Call the function for currency check
return_message_currency_check, df_excluded_currencies = currency_check(df_normalized, list_allowed_currencies)
print(return_message_currency_check)

# Call the function for date check
return_message_date_check, df_date_check = date_check(df_normalized)
print(return_message_date_check)

Duplicate check failed. Record count: customerId         5
customerName       5
transactionId      5
transactionDate    5
sourceDate         5
merchantId         5
categoryId         5
currency           5
amount             5
description        5
primaryKey         5
auditDate          5
quality_check      5
dtype: int64


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_excluded_currencies['qualityCheck'] = 'Currency check'


Currency check failed. Record count: customerId         11
customerName       11
transactionId      11
transactionDate    11
sourceDate         11
merchantId         11
categoryId         11
currency           11
amount             11
description        11
primaryKey         11
auditDate          11
dtype: int64
Date check passed.


### Removing failed records

In [40]:
# Create list of df from data quality checks
subset_dfs = [df_duplicate_check, df_excluded_currencies, df_date_check]

# Create new df to make changes
df_main = df_normalized

# Remove values from main DataFrame based on the 'ID' column
for subset_df in subset_dfs:
    if 'primaryKey' in subset_df:
        df_main = df_main[~df_main['primaryKey'].isin(subset_df['primaryKey'])]

print(df_main.count())

customerId               6848
customerName             6848
transactionId            6848
transactionDate          6848
sourceDate               6848
merchantId               6848
categoryId               6848
currency                 6848
amount                   6848
description              6848
primaryKey               6848
auditDate                6848
transactionDate_check    6842
dtype: int64


### Creating df_transactions

In [41]:
# Creating a df of transactions
df_transactions = df_main[['primaryKey', 'customerId', 'transactionId', 'transactionDate', 'sourceDate', 'merchantId', 'categoryId', 'currency', 'amount', 'description', 'auditDate']]

print(df_transactions.count())

primaryKey         6848
customerId         6848
transactionId      6848
transactionDate    6848
sourceDate         6848
merchantId         6848
categoryId         6848
currency           6848
amount             6848
description        6848
auditDate          6848
dtype: int64


### Creating df_customers

In [42]:
# Creating a df of customerId based on latest transactionDate
df_customers = df_normalized[['customerId', 'transactionDate', 'auditDate']] \
                            .sort_values(by='transactionDate', ascending=False) \
                            .drop_duplicates(subset='customerId')

print(df_customers.count())

customerId         29
transactionDate    29
auditDate          29
dtype: int64


### Upsert df_transactions

In [48]:
# Set table name
tableName = "fact_transactions"

# Mock datamart
datamart_transactions_filepath = "/drive/Snoop Tech Assessment/Datamart/fact_transactions/"

# Specify the primary key column
transaction_primary_key = 'primaryKey'

# Upsert the df_transactions into the mock datamart
upsert_datamart(tableName, datamart_transactions_filepath, df_transactions, transaction_primary_key)

DataFrame upserted into the mock datamart at /drive/Snoop Tech Assessment/Datamart/fact_transactions/fact_transactions.csv


### Upsert df_transactions

In [50]:
# Set table name
tableName = "dim_customers"

# Mock datamart
datamart_transactions_filepath = "/drive/Snoop Tech Assessment/Datamart/dim_customers/"

# Specify the primary key column
customers_primary_key = 'customerId'

# Upsert the df_transactions into the mock datamart
upsert_datamart(tableName, datamart_transactions_filepath, df_customers, customers_primary_key)

DataFrame upserted into the mock datamart at /drive/Snoop Tech Assessment/Datamart/dim_customers/dim_customers.csv


### Error logging

In [51]:
error_log_path = "/drive/Snoop Tech Assessment/Datamart/error_logs/error_logs.csv"

error_logging(df_duplicate_check, df_excluded_currencies, df_date_check, error_log_path)

Unioned DataFrame written to /drive/Snoop Tech Assessment/Datamart/error_logs/error_logs.csv
