In [7]:
import great_expectations as gx
import pandas as pd
from pathlib import Path
import warnings
import json
import os

warnings.filterwarnings("ignore", message="`result_format` configured at the Validator-level*")

# --- Check and print current working directory ---
print(f"Current working directory: {Path().resolve()}")

# --- Utility to identify the transaction file ---
def identify_transaction_csv(data_dir: Path) -> Path:
    for file in data_dir.glob("*.csv"):
        try:
            df = pd.read_csv(file, nrows=1)
            columns = set(col.lower() for col in df.columns)
            if {"transaction_id", "amount", "sender_account"}.issubset(columns):
                return file
        except Exception as e:
            print(f"Failed to read {file}: {e}")
    raise FileNotFoundError("No transaction CSV file found in data/")

# --- Load transaction CSV ---
DATA_DIR = Path().resolve() / "data"
transaction_csv = identify_transaction_csv(DATA_DIR)
df_trans = pd.read_csv(transaction_csv)
print(f"Loaded transaction file: {transaction_csv.name} with {len(df_trans)} rows")

# --- Set up Great Expectations context ---
context = gx.get_context()
datasource = context.data_sources.add_pandas(name="pandas_source")
data_asset = datasource.add_dataframe_asset(name="transactions_data")
batch_def = data_asset.add_batch_definition_whole_dataframe(name="batch_def")
batch = batch_def.get_batch(batch_parameters={"dataframe": df_trans})

# --- Create expectation suite and validator ---
suite = gx.core.ExpectationSuite(name="transactions_suite")
validator = context.get_validator(batch=batch, expectation_suite=suite)

# --- Add validation rules ---
validator.expect_column_values_to_not_be_null("transaction_id")
validator.expect_column_values_to_be_unique("transaction_id")

validator.expect_column_values_to_match_strftime_format("timestamp", "%Y-%m-%d %H:%M:%S")

validator.expect_column_values_to_be_between("amount", min_value=0.01, max_value=100000)

validator.expect_column_values_to_be_in_set(
    "currency",
    ["SEK", "USD", "EUR", "DKK", "JPY", "ZMW", "NOK", "ZAR", "RMB", "GBP"]
)

validator.expect_column_values_to_be_in_set("transaction_type", ["incoming", "outgoing"])

validator.expect_column_values_to_match_regex(
    "receiver_account", r"^(?:SE\d{4}[A-Z]{4}\d{14}|GB\d{2}[A-Z]{4}\d{14})$"
)
validator.expect_column_values_to_match_regex(
    "sender_account", r"^(?:SE\d{4}[A-Z]{4}\d{14}|GB\d{2}[A-Z]{4}\d{14})$"
)

validator.expect_column_values_to_not_be_null("sender_country")
validator.expect_column_values_to_not_be_null("receiver_country")
validator.expect_column_values_to_not_be_null("sender_municipality")
validator.expect_column_values_to_not_be_null("receiver_municipality")

# --- Print rows with missing receiver_country ---
missing_country = df_trans[df_trans["receiver_country"].isnull()]
if not missing_country.empty:
    print("Rows with missing receiver_country:")
    print(missing_country.head())

# --- Print rows with missing receiver_municipality ---
missing_municipality = df_trans[df_trans["receiver_municipality"].isnull()]
if not missing_municipality.empty:
    print("Rows with missing receiver_municipality:")
    print(missing_municipality.head())

# --- Fill null values with placeholder for output copy ---
df_trans_copy = df_trans.copy()
df_trans_copy["receiver_country"].fillna("Unknown", inplace=True)
df_trans_copy["receiver_municipality"].fillna("Unknown", inplace=True)

# --- Flag transactions by criteria ---
df_trans["flagged"] = (
    (df_trans["amount"] > 100000) |
    (df_trans["notes"].str.lower().str.contains("gift|cash|transfer", na=False))
)

flagged_transactions = df_trans[df_trans["flagged"]]
print(f"Number of flagged transactions: {len(flagged_transactions)}")
print(flagged_transactions.head())

# --- Prepare report directory ---
report_dir = Path("report")
if not report_dir.exists():
    report_dir.mkdir()
    print(f"Created report directory at {report_dir.resolve()}")
else:
    print(f"Report directory already exists at {report_dir.resolve()}")

# --- Save cleaned transactions CSV ---
csv_path = report_dir / "customers_and_accounts_cleaned.csv"
df_trans_copy.to_csv(csv_path, index=False)
print(f"Saved validation CSV to: {csv_path.resolve()}")

# --- Run validation ---
results = validator.validate()
print("Validation results summary:")
print(results)

# --- Save validation results as JSON ---
json_path = report_dir / "validation_results_customers_and_accounts.json"
with open(json_path, "w") as f:
    json.dump(results.to_json_dict(), f, indent=4)
print(f"Saved validation JSON report to: {json_path.resolve()}")


Current working directory: /Users/thomasrosen/Documents/Dev/DATA24STO/Datakvalitet/Bank_Projekt
Loaded transaction file: transactions-500000.csv with 500000 rows


Calculating Metrics: 100%|██████████| 6/6 [00:00<00:00, 305.44it/s] 
Calculating Metrics: 100%|██████████| 8/8 [00:00<00:00, 41.07it/s]  
Calculating Metrics: 100%|██████████| 8/8 [00:02<00:00,  3.30it/s]  
Calculating Metrics: 100%|██████████| 8/8 [00:00<00:00, 66.90it/s]  
Calculating Metrics: 100%|██████████| 8/8 [00:00<00:00, 52.37it/s]  
Calculating Metrics: 100%|██████████| 8/8 [00:00<00:00, 54.02it/s]  
Calculating Metrics: 100%|██████████| 8/8 [00:00<00:00, 21.92it/s]  
Calculating Metrics: 100%|██████████| 8/8 [00:00<00:00, 21.88it/s]  
Calculating Metrics: 100%|██████████| 6/6 [00:00<00:00, 348.68it/s] 
Calculating Metrics: 100%|██████████| 6/6 [00:00<00:00, 373.02it/s] 
Calculating Metrics: 100%|██████████| 6/6 [00:00<00:00, 376.01it/s] 
Calculating Metrics: 100%|██████████| 6/6 [00:00<00:00, 252.15it/s] 


Rows with missing receiver_country:
                           transaction_id            timestamp    amount  \
108  e9eef6a8-d699-4cda-ac9e-31ef9f7a857c  2025-06-02 13:04:36   1358.64   
151  3b0006c5-604a-42d1-a1fa-55df6564b048  2025-06-02 05:28:16  34390.87   
292  f9517b4b-c57e-4ec3-a986-b0bcd4317828  2025-06-03 21:58:16  18051.68   
339  d157c83f-0172-4e1c-808d-3313f2e591bf  2025-06-02 15:56:25   4329.85   
461  53c69e26-4b43-4ad1-a850-a8b4f6ac2de7  2025-06-03 03:07:01  49034.31   

    currency            sender_account          receiver_account  \
108      SEK  SE8902PIOV56292630694895  SE8902RIUL46979876193296   
151      SEK  SE8902RIEM96034989833215  SE8902AMLF07213242417141   
292      SEK  SE8902KSAW62643702417245  SE8902DFTD05468859251048   
339      SEK  SE8902RPCP06050518500359  SE8902AJAX56710674154489   
461      SEK  SE8902PORG15469074162310  SE8902QIQW30606734486239   

    sender_country sender_municipality receiver_country receiver_municipality  \
108         Swede

Calculating Metrics: 100%|██████████| 54/54 [00:03<00:00, 14.16it/s] 

Validation results summary:
{
  "success": false,
  "results": [
    {
      "success": true,
      "expectation_config": {
        "type": "expect_column_values_to_not_be_null",
        "kwargs": {
          "batch_id": "pandas_source-transactions_data",
          "column": "transaction_id"
        },
        "meta": {}
      },
      "result": {
        "element_count": 500000,
        "unexpected_count": 0,
        "unexpected_percent": 0.0,
        "partial_unexpected_list": []
      },
      "meta": {},
      "exception_info": {
        "raised_exception": false,
        "exception_traceback": null,
        "exception_message": null
      }
    },
    {
      "success": true,
      "expectation_config": {
        "type": "expect_column_values_to_be_unique",
        "kwargs": {
          "batch_id": "pandas_source-transactions_data",
          "column": "transaction_id"
        },
        "meta": {}
      },
      "result": {
        "element_count": 500000,
        "unexpected_cou


