# MPESA SMS Data Preparation Guide for Fine-tuning LLMs
This guide explains how to transform raw MPESA SMS messages into training data for fine-tuning LLMs. It covers both basic models and instruct/chat models, with steps for cleaning, anonymizing, and formatting data. The goal is to extract:

We need to extract the key fields:
- **transaction_id**
- **amount**
- **transaction_type** (sent, received, withdrawn, failed)
- **counterparty**
- **date_time**
- **balance**


## Overview
We will be using raw MPESA SMS messages as input. The messages typically contain information about transactions such as money sent, received, withdrawn, or failed transactions.

We will follow these steps:
1. Data Collection
2. Data Anonymization
3. Data Parsing
4. Data Formatting
5. Data Validation

## Step 1: Data Collection
Gather all MPESA SMS messages. This can be done by exporting SMS data from your phone or using an SMS backup app.



In [None]:
import xml.etree.ElementTree as ET
import pandas as pd

def load_sms_from_xml(xml_file: str):
    tree = ET.parse(xml_file)
    root = tree.getroot()

    sms_list = []
    for sms in root.findall("sms"):
        body = sms.get("body")
        date = sms.get("readable_date")
        sms_list.append({"sms_text": body, "date": date})

    df = pd.DataFrame(sms_list)
    return df

# Example usage
df = load_sms_from_xml("../data/mpesa-sms.xml")
print(df.head())


## Step 2: Data Anonymization
To protect privacy, we will anonymize sensitive information such as phone numbers and names. We can replace them with placeholders like `<PHONE_NUMBER>` and `<NAME>`.

In [None]:
import re
import uuid

def anonymize_sms(sms: str):
    # Mask phone numbers
    sms = re.sub(r"\b(?:\+254|0)?7\d{8}\b", "XXXXXXX", sms)

    # Replace names with CUSTOMER_<uuid> (same name in one SMS gets same UUID)
    name_uuid_map = {}
    def replace_name(match):
        name = match.group(1)
        if name not in name_uuid_map:
            name_uuid_map[name] = f"CUSTOMER_{uuid.uuid4().hex[:8].upper()}"
        return f"{match.group(0).split()[0]} {name_uuid_map[name]}"

    sms = re.sub(r"from\s+([A-Za-z]+(?:\s+[A-ZaZ]+)*)", replace_name, sms, flags=re.IGNORECASE)
    sms = re.sub(r"to\s+([A-Za-z]+(?:\s+[A-ZaZ]+)*)", replace_name, sms, flags=re.IGNORECASE)

    # Replace Agent numbers
    sms = re.sub(r"Agent\s+(\d+)", r"Agent_\1", sms, flags=re.IGNORECASE)

    return sms

# Example usage
df["anonymized_sms"] = df["sms_text"].apply(anonymize_sms)
print(df[["sms_text", "anonymized_sms"]].head())

## Step 3: Data Parsing
Next, we will parse the anonymized SMS messages to extract key fields such as transaction ID, amount, transaction type, counterparty, date/time, and balance.

In [None]:
import re

def parse_mpesa_sms(sms: str, date_time: str = None):
    transaction_id = sms.split()[0] if sms else None

    amount_match = re.search(r"Ksh\s*([\d,]+(?:\.\d{2})?)", sms)
    amount = amount_match.group(1).replace(",", "") if amount_match else None

    # Comprehensive transaction type extraction
    transaction_type = "unknown"
    transaction_types = [
        (r"received from", "received"),
        (r"sent to", "sent"),
        (r"withdrew at", "withdrawn"),
        (r"withdrawn at", "withdrawn"),
        (r"failed", "failed"),
        (r"buy goods and services at", "buy_goods"),
        (r"paid to", "paybill"),
        (r"airtime purchase", "airtime"),
        (r"bought .*? of airtime", "airtime"),
        (r"deposited to", "deposit"),
        (r"reversed", "reversal"),
        (r"bill payment to", "bill_payment"),
        (r"loan disbursed", "loan"),
        (r"fuliza", "fuliza"),
        (r"received for account", "received_account"),
        (r"transferred to", "transferred"),
        (r"transferred from", "transferred_in"),
    ]
    for pattern, ttype in transaction_types:
        if re.search(pattern, sms, re.IGNORECASE):
            transaction_type = ttype
            break

    # Improved counterparty extraction
    counterparty = None
    if transaction_type in ["received", "sent"]:
        match = re.search(r"(?:from|to)\s+(CUSTOMER_\w+|AGENT_\w+)", sms, re.IGNORECASE)
        if match:
            counterparty = match.group(1)
    elif transaction_type == "withdrawn":
        match = re.search(r"AGENT_(\w+)", sms, re.IGNORECASE)
        if match:
            counterparty = f"AGENT_{match.group(1)}"
    elif transaction_type == "buy_goods":
        match = re.search(r"buy goods and services at ([A-Za-z0-9_\- ]+)", sms, re.IGNORECASE)
        if match:
            counterparty = match.group(1).strip()
    elif transaction_type == "paybill":
        match = re.search(r"paid to ([A-Za-z0-9_\- ]+)", sms, re.IGNORECASE)
        if match:
            counterparty = match.group(1).strip()
    elif transaction_type == "airtime":
        counterparty = "self"
    elif transaction_type == "deposit":
        match = re.search(r"deposited to ([A-Za-z0-9_\- ]+)", sms, re.IGNORECASE)
        if match:
            counterparty = match.group(1).strip()
    elif transaction_type == "bill_payment":
        match = re.search(r"bill payment to ([A-Za-z0-9_\- ]+)", sms, re.IGNORECASE)
        if match:
            counterparty = match.group(1).strip()
    elif transaction_type == "transferred":
        match = re.search(r"transferred to ([A-Za-z0-9_\- ]+)", sms, re.IGNORECASE)
        if match:
            counterparty = match.group(1).strip()
    elif transaction_type == "transferred_in":
        match = re.search(r"transferred from ([A-Za-z0-9_\- ]+)", sms, re.IGNORECASE)
        if match:
            counterparty = match.group(1).strip()
    # fallback: try to find any CUSTOMER_ or AGENT_ if not already found
    if not counterparty:
        match = re.search(r"(CUSTOMER_\w+|AGENT_\w+)", sms, re.IGNORECASE)
        if match:
            counterparty = match.group(1)

    # FIXED: Use date_time from XML if provided, else try to extract from SMS with improved patterns
    if date_time:
        parsed_date_time = date_time
    else:
        # Fixed date/time extraction - multiple patterns to handle different formats
        date_patterns = [
            # Pattern 1: d/m/yy at h:mm AM/PM (single digits allowed)
            r"(\d{1,2}/\d{1,2}/\d{2}\s+at\s+\d{1,2}:\d{2}\s+(?:AM|PM))",
            # Pattern 2: on d/m/yy at h:mm AM/PM
            r"on\s+(\d{1,2}/\d{1,2}/\d{2}\s+at\s+\d{1,2}:\d{2}\s+(?:AM|PM))",
            # Pattern 3: Just the date and time part
            r"(\d{1,2}/\d{1,2}/\d{2}.*?\d{1,2}:\d{2}\s+(?:AM|PM))",
        ]

        parsed_date_time = None
        for pattern in date_patterns:
            date_match = re.search(pattern, sms)
            if date_match:
                parsed_date_time = date_match.group(1).strip()
                break

    balance_match = re.search(r"balance.*?Ksh\s*([\d,]+(?:\.\d{2})?)", sms, re.IGNORECASE)
    balance = balance_match.group(1).replace(",", "") if balance_match else None

    return {
        "transaction_id": transaction_id,
        "amount": amount,
        "transaction_type": transaction_type,
        "counterparty": counterparty,
        "date_time": parsed_date_time,
        "balance": balance
    }

# Example usage
parsed_data = [parse_mpesa_sms(row["anonymized_sms"], row["date"]) for _, row in df.iterrows()]
parsed_df = pd.DataFrame(parsed_data)
print(parsed_df.head())

## Step 4: Data Formatting
We will format the parsed data into JSONL format for basic models and a conversational format for instruct/chat models.

In [None]:
from pathlib import Path
import json

def prepare_from_xml(xml_file: str, output_file: str, mode="basic"):
    df = load_sms_from_xml(xml_file)
    records = []

    for i, row in df.iterrows():
        sms = row["sms_text"]
        anonymized_sms = anonymize_sms(sms)
        parsed = parse_mpesa_sms(anonymized_sms)

        if mode == "basic":
            training_example = {
                "input": anonymized_sms,
                "output": json.dumps(parsed)
            }
        else:  # instruct mode
            training_example = {
                "messages": [
                    {
                        "role": "user",
                        "content": f'Extract transaction details from the following SMS:\n"{anonymized_sms}"'
                    },
                    {
                        "role": "assistant",
                        "content": json.dumps(parsed)
                    }
                ]
            }

        records.append(training_example)

    # Print a sample of the formatted data
    print(f"\nSample for mode='{mode}':\n", json.dumps(records[0], indent=2))

    with open(output_file, "w") as f:
        for rec in records:
            f.write(json.dumps(rec) + "\n")

# Example usage
Path("../output").mkdir(exist_ok=True)
prepare_from_xml("../data/mpesa-sms.xml", "../output/mpesa_basic.jsonl", mode="basic")
prepare_from_xml("../data/mpesa-sms.xml", "../output/mpesa_instruct.jsonl", mode="instruct")

## Step 5: Push Anonymized Data to Hugging Face Hub
Now that the data is anonymized and formatted, you can upload it to the Hugging Face Hub for sharing or fine-tuning.


In [None]:
import os
from dotenv import load_dotenv
from huggingface_hub import HfApi, login

# Load environment variables from .env file
load_dotenv()
repo_id = os.getenv("REPO_ID")
hf_token = os.getenv("HF_TOKEN")

# 1. Login to Hugging Face (run this once per session)
if hf_token:
    login(token=hf_token)
else:
    raise ValueError("HF_TOKEN not set in .env file.")

# 2. Set your repo name and path to files
files_to_upload = [
    "output/mpesa_basic.jsonl",
    "output/mpesa_instruct.jsonl"
]

api = HfApi()

# 3. Create repo if it doesn't exist
try:
    api.create_repo(repo_id, repo_type="dataset", private=False, exist_ok=True)
    print(f"Repo '{repo_id}' created or already exists.")
except Exception as e:
    print(f"Error creating repo: {e}")
    # If repo creation fails, try to continue with upload (repo might already exist)
    print("Continuing with file upload...")

# 4. Upload files
for file in files_to_upload:
    try:
        print(f"Uploading {file} to {repo_id}...")
        api.upload_file(
            path_or_fileobj=file,
            path_in_repo=file.split("/", 1)[-1],
            repo_id=repo_id,
            repo_type="dataset"
        )
        print(f"Uploaded {file}.")
    except Exception as e:
        print(f"Error uploading {file}: {e}")

print(f"Process completed. Check your dataset at: https://huggingface.co/datasets/{repo_id}")
