# H1 Stock Reconciliation Transformation Solution

## H2 Specifications
Andrew has provide the intended behaviour here:
[Andrews Spec](https://mapofag.sharepoint.com/:w:/s/Project/project_MapofAg_PureFarming/Eb6Rmp1Nk4pBmKmShQm2_RoB4mbEvtc6XRdjKJGynzOESw?e=4UAoe1)


**TimeZone**
To start with, we will need to obtain the timezone of the holding that we are transforming.
For Easy Dairy, this is passed when the holding upload's the file via Provider.
This timezone will then be mapped to determine which hamisphere the holding belongs to. 
We need this to determine the bithing period of an animal:

- **Northern Hamisphere**: 01/01/2025 to 31/12/2025
- **Southern Hamisphere**: 01/07/2025 to 30/06/2026

As for mapping, after discussing with Chris H and looking at which client/hondings are to be using stock rec, we have decided to only map NZ and Australian timezones for now. The rest would be considered as Northern hamisphere.

In [None]:
#Dependencies 
import datetime

In [None]:
#Example of the timezone being fetched 
print('Select a timezone from the list below: Just type in the number')
print('1. Pacific/Auckland')
print('2. Australia/Sydney')
print('3. America/New_York')
tz_choise = input('Enter the number corresponding to your timezone: ')
if tz_choise == '1':
    timezone = 'Pacific/Auckland'
elif tz_choise == '2':
    timezone = 'Australia/Sydney'
elif tz_choise == '3':
    timezone = 'America/New_York'
else:
    print('Invalid choice')
    timezone = tz_choise

print (f'You have selected timezone: {timezone}')

In [None]:
#Timezone hamisphere mapping

def hamisphere_mapper(timezone):
    if timezone in ['Australia/Sydney', 'Australia/Melbourne', 'Australia/Brisbane', 'Australia/Adelaide', 'Australia/Perth', 'Australia/Hobart']:
        return 'Southern'
    elif timezone in ['Pacific/Auckland']:
        return 'Southern'
    else:
        return 'Northern'  

tz = hamisphere_mapper(timezone)
print(f'The timezone {timezone} is in the {tz} Hemisphere.')

# H 2 Fetching the Data
Animal and  events are obtained via the Primary Data Store DB.
Initial analysis on the data suggests that there are two ways of approaching it. The first would be to query and combine all tables and let the DB do most of the heavy lifting. The other way is to combine the data within the Lambda itself. 

I have tried both methods and both have their pros and cons as shown in the example below. This is left to the devs to decide the best approach.

# H 3 Fetching via query 

In [None]:
--sql query
WITH animals AS (
  SELECT
      entity_id,
      animal_id,
      specie,
      gender,
      status
  FROM data.livestock_animal
  WHERE entity_id = 'd574c19f-8a6f-4787-9561-19afaa5ce661'
    AND meta_is_deleted = FALSE
),
events AS (
  -- death
  SELECT
      a.entity_id,
      a.specie,
      a.gender,
      a.status,
      a.animal_id,
      d.event_date_time,
      'death'::text AS event_type
  FROM animals a
  JOIN data.livestock_animal_death d
    ON d.entity_id = a.entity_id
   AND d.animal_id = a.animal_id
   AND d.meta_is_deleted = FALSE

  UNION ALL

  -- birth registration
  SELECT
      a.entity_id,
      a.specie,
      a.gender,
      a.status,
      a.animal_id,
      b.event_date_time,
      'birth'::text AS event_type
  FROM animals a
  JOIN data.livestock_animal_birth_registration b
    ON b.entity_id = a.entity_id
   AND b.animal_id = a.animal_id
   AND b.meta_is_deleted = FALSE

  UNION ALL

  -- departure
  SELECT
      a.entity_id,
      a.specie,
      a.gender,
      a.status,
      a.animal_id,
      dp.event_date_time,
      'departure'::text AS event_type
  FROM animals a
  JOIN data.livestock_animal_departure dp
    ON dp.entity_id = a.entity_id
   AND dp.animal_id = a.animal_id
   AND dp.meta_is_deleted = FALSE

  UNION ALL

  -- arrival
  SELECT
      a.entity_id,
      a.specie,
      a.gender,
      a.status,
      a.animal_id,
      ar.event_date_time,
      'arrival'::text AS event_type
  FROM animals a
  JOIN data.livestock_animal_arrival ar
    ON ar.entity_id = a.entity_id
   AND ar.animal_id = a.animal_id
   AND ar.meta_is_deleted = FALSE
)
SELECT
  entity_id, specie, gender, status, animal_id, event_date_time, event_type
FROM events
ORDER BY event_date_time;


-- part 2
select* from data.livestock_animal la 
where la.identifier_id = '416075060'
and entity_id ='9a66abe5-d071-4326-83e8-e7fd63a686bf'



create temp table animal_main AS
select entity_id,animal_id, identifier_id, birth_date, status, location_identifier_id, location_identifier_scheme
from data.livestock_animal la 
where 
la.entity_id ='9a66abe5-d071-4326-83e8-e7fd63a686bf'
and la.animal_id is not null
and la.meta_is_deleted = false
--and la.birth_date <= TIMESTAMPTZ '2023-11-01 00:00:00+00'
--and la.birth_date > TIMESTAMPTZ '2022-09-01 00:00:00+00'
order by birth_date desc
limit 100;



select* from data.livestock_animal_arrival 
--where entity_id ='9a66abe5-d071-4326-83e8-e7fd63a686bf'
order by animal_identifier_id desc
limit 50;

select* from data.livestock_animal_death
where entity_id ='9a66abe5-d071-4326-83e8-e7fd63a686bf'
and animal_identifier_id in (select identifier_id from animal_main)
order by event_date_time  desc


drop table if exists animal_main

select la.entity_id,la.identifier_id, la.birth_date, jt.event_date_time
from data.livestock_animal la 
join data.livestock_animal_death jt on jt.animal_identifier_id = la.identifier_id 
and jt.entity_id = la.entity_id
where la.entity_id = '9a66abe5-d071-4326-83e8-e7fd63a686bf'
order by la.birth_date desc
limit 100;

select la.entity_id,la.identifier_id, la.birth_date, jt.event_date_time
from data.livestock_animal la 
join data.livestock_animal_birth_registration jt on jt.animal_identifier_id = la.identifier_id 
and jt.entity_id = la.entity_id
where la.entity_id = '9a66abe5-d071-4326-83e8-e7fd63a686bf'
order by la.birth_date desc


# H 3 combing via python

In [None]:
# H 4 Animal data

In [None]:
import pandas as pd
import json

# reading the data. This example I am using a local file path.
# When reading from PDS this will need to be updated.
def load_data(file_path):
    df = pd.read_csv(file_path)
    df['birth_date'] = pd.to_datetime(df['birth_date'], errors='coerce', utc=True)
    df = df[df['meta_is_deleted'] == False]
    return df

# From the data, I then saperate the birth periods based on hemisphere. Hemisphere is inputed in this case.
# We will need to get the timezone of the holding then map it to hemisphere.
def get_birth_period(date, hemisphere):
    if pd.isnull(date):
        return 'Unknown'
    year = date.year
    if hemisphere == 'southern':
        return f"{year}-{year+1}" if date.month >= 7 else f"{year-1}-{year}"
    else:
        return str(year)

# We then need to group the data by inventory clasifications. Gender, Species and Birth Period.
# We then count the number of animals in each group.
def process_data(df, hemisphere):
    df['birthPeriod'] = df['birth_date'].apply(lambda x: get_birth_period(x, hemisphere))
    
    grouped = df.groupby(['specie', 'gender', 'birthPeriod']).agg({
        'animal_id': lambda x: tuple(x),  # Collect animal_ids as a tuple
        'animal_id': 'count'  # Count of animals
    }).reset_index().rename(columns={'animal_id': 'count', '<lambda_0>': 'animal_ids'})
    
    # Fix column names after aggregation
    grouped['animal_ids'] = df.groupby(['specie', 'gender', 'birthPeriod'])['animal_id'].apply(tuple).reset_index(drop=True)
    
    return grouped.to_dict(orient='records')

# The output will need to be transformed to the correct json format.

def transform_result(data, hemisphere):
    transformed = []
    for item in data:
        new_item = {
            "species": item["specie"],
            "sex": item["gender"],
            "count": item["count"],
            "animal_id": item["animal_ids"]  # Add the tuple of IDs
        }

        bp = item["birthPeriod"]
        if bp == "Unknown":
            new_item["birthPeriod"] = "Unknown"
        elif hemisphere == "southern":
            start_year, end_year = map(int, bp.split('-'))
            new_item["birthPeriod"] = f"01-07-{start_year}/30-06-{end_year}"
        else:
            year = int(bp)
            new_item["birthPeriod"] = f"01-01-{year}/31-12-{year}"

        transformed.append(new_item)
    return transformed


def main():
    file_path = "../example_files/livestock_animal_202509160441.csv"
    hemisphere = input("Enter hemisphere (northern/southern): ").strip().lower()
    
    df = load_data(file_path)
    result = process_data(df, hemisphere)
    transformed_result = transform_result(result, hemisphere)

    print(json.dumps(transformed_result, indent=2))

if __name__ == "__main__":
    main()


# H 4 Event files

In [None]:
#combining the event files
from pathlib import Path
from typing import List, Dict

# ----------------------------
# Configuration
# ----------------------------
FILES = [
    "../example_files/livestock_animal_arrival_202509160444.csv",
    "../example_files/livestock_animal_birth_registration_202509160442.csv",
    "../example_files/livestock_animal_death_202509160442.csv",
    "../example_files/livestock_animal_departure_202509160443.csv",
]

REQUIRED_COLS = ["animal_id", "event_date_time", "entity_id", "meta_is_deleted"]


# ----------------------------
# Helpers
# ----------------------------
def infer_event_from_path(p: Path) -> str:
    """
    Infer event type from filename.
    Always map any birth-related file to 'birth'.
    """
    s = p.stem.lower()
    if "departure" in s:
        return "departure"
    if "death" in s:
        return "death"
    if "arrival" in s:
        return "arrival"
    if "birth_registration" in s or "birth" in s:
        return "birth"
    return "unknown"


def parse_bool_series(series: pd.Series) -> pd.Series:
    """
    Coerce common truthy/falsey strings to booleans.
    Unrecognized values default to False.
    """
    map_values = {
        "true": True, "t": True, "1": True, "yes": True, "y": True,
        "false": False, "f": False, "0": False, "no": False, "n": False, "": False
    }
    return (
        series.astype(str)
              .str.strip()
              .str.lower()
              .map(map_values)
              .fillna(False)
              .astype(bool)
    )


def read_and_tag(file_path: str) -> pd.DataFrame:
    """
    Read one CSV, keep required columns, standardize types, and add 'event'.
    """
    p = Path(file_path)
    event = infer_event_from_path(p)

    # Prefer reading only needed columns
    try:
        df = pd.read_csv(
            p,
            usecols=REQUIRED_COLS,
            dtype={"animal_id": "string", "entity_id": "string"},
            low_memory=False,
        )
    except ValueError:
        # Fallback if usecols fails (or columns out of order)
        df_all = pd.read_csv(p, low_memory=False)
        missing = [c for c in REQUIRED_COLS if c not in df_all.columns]
        if missing:
            raise ValueError(f"{p.name} is missing required columns: {missing}")
        df = df_all[REQUIRED_COLS].copy()
        df["animal_id"] = df["animal_id"].astype("string")
        df["entity_id"] = df["entity_id"].astype("string")

    # Normalize
    df["meta_is_deleted"] = parse_bool_series(df["meta_is_deleted"])
    df["event_date_time"] = pd.to_datetime(df["event_date_time"], errors="coerce", utc=True)

    # Add event
    df["event"] = event

    # Final column order
    return df[["animal_id", "event_date_time", "entity_id", "meta_is_deleted", "event"]]


def get_combined_records(files: List[str]) -> List[Dict]:
    """
    Combine all files and return a list of dicts.
    No files are written—purely in-memory.
    """
    frames = [read_and_tag(f) for f in files]
    combined = pd.concat(frames, ignore_index=True)

    # Optional: dedupe + sort for determinism
    combined = (
        combined.drop_duplicates(subset=["animal_id", "event_date_time", "event", "entity_id"])
                .sort_values(["animal_id", "event_date_time"], kind="stable")
                .reset_index(drop=True)
    )

    # Serialize datetime to ISO 8601 without timezone.
    # NaT becomes None so JSON shows null (not "NaT").
    iso = combined["event_date_time"].dt.strftime("%Y-%m-%dT%H:%M:%S")
    combined["event_date_time"] = iso.where(pd.notna(iso), None)

    # Produce list[dict]
    records: List[Dict] = combined.to_dict(orient="records")
    return records


# Example usage (won't save files)
if __name__ == "__main__":
    records = get_combined_records(FILES)
    # Quick peek
    print(f"Total records: {len(records)}")
    # Sample 3 rows
    for r in records[:3]:
        print(r)
    # Optional: event counts
    from collections import Counter
    print("Counts by event:", dict(Counter(r["event"] for r in records)))


   # "/home/wanmusa/work/stock_recon/example_data/livestock_animal_arrival_202509160444.csv"
    #"/home/wanmusa/work/stock_recon/example_data/livestock_animal_birth_registration_202509160442.csv"
    #"/home/wanmusa/work/stock_recon/example_data/livestock_animal_death_202509160442.csv"
    #"/home/wanmusa/work/stock_recon/example_data/livestock_animal_departure_202509160443.csv"

# H 4 Aggrigate both processes 

In [None]:
# inventory_aggregator.py
from __future__ import annotations

import json
import calendar
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Tuple

# Keep your existing modules unmodified
import stock_recon_mapping
import event_file_comb


# -------------------------
# Helpers: IDs, dates, formatting
# -------------------------

def _to_str_id(x: Any) -> str:
    return str(x).strip()

def _parse_event_dt(s: str) -> Optional[datetime]:
    """
    event_file_comb emits naive 'YYYY-MM-DDTHH:MM:SS'. Treat as UTC.
    """
    if not s:
        return None
    try:
        return datetime.strptime(s.strip(), "%Y-%m-%dT%H:%M:%S").replace(tzinfo=timezone.utc)
    except Exception:
        return None

def _month_bounds_utc(year: int, month: int) -> Tuple[datetime, datetime]:
    """
    Calendar month bounds as UTC midnights:
      - start: first day 00:00:00Z
      - end: last day 00:00:00Z (matches your example end stamp)
    """
    start = datetime(year, month, 1, tzinfo=timezone.utc)
    last_day = calendar.monthrange(year, month)[1]
    end = datetime(year, month, last_day, tzinfo=timezone.utc)
    return start, end

def _format_z(dt: datetime) -> str:
    # 'YYYY-MM-DDTHH:MM:SS.000Z' at UTC midnight per your example
    return dt.strftime("%Y-%m-%dT%H:%M:%S.000Z")

def _month_iter_inclusive(start: datetime, end: datetime) -> List[Tuple[datetime, datetime]]:
    """
    Build inclusive list of (start,end) month boundaries. Only year-month is used.
    """
    s = datetime(start.year, start.month, 1, tzinfo=timezone.utc)
    e = datetime(end.year, end.month, 1, tzinfo=timezone.utc)
    out = []
    y, m = s.year, s.month
    while (y < e.year) or (y == e.year and m <= e.month):
        out.append(_month_bounds_utc(y, m))
        if m == 12:
            y, m = y + 1, 1
        else:
            m += 1
    return out

def _normalize_birth_period(bp: Optional[str]) -> Optional[str]:
    """
    Best-effort normalize to 'YYYY-MM-DD/YYYY-MM-DD'. 'Unknown' stays 'Unknown'.
    Accepts 'DD-MM-YYYY/DD-MM-YYYY' from your transformer, or already ISO strings.
    """
    if not bp or str(bp).lower() == "unknown":
        return "Unknown"
    if "/" not in bp:
        return bp

    left, right = bp.split("/", 1)

    def parse_any(x: str) -> datetime:
        x = x.strip()
        for fmt in ("%d-%m-%Y", "%Y-%m-%d"):
            try:
                return datetime.strptime(x, fmt)
            except ValueError:
                continue
        raise ValueError

    try:
        ldt, rdt = parse_any(left), parse_any(right)
        return f"{ldt.strftime('%Y-%m-%d')}/{rdt.strftime('%Y-%m-%d')}"
    except Exception:
        return bp


# -------------------------
# Main aggregation
# -------------------------

def aggregate_inventory_rollup(
    *,
    inventory_csv_path: str,
    hemisphere: str = "southern",
    event_files: Optional[List[str]] = None,  # None -> event_file_comb.FILES
    duration: str = "M",
    open_value: int = 0,             # per your requirement
    force_close: Optional[int] = 0,   # default close = 0; set to None to compute from events
    output_json_path: str = "inventory_rollup.json",
    include_unclassified: bool = False
) -> List[Dict[str, Any]]:
    """
    Build monthly roll-ups per stock class across the full month range
    present in the dataset (first -> last month).

    Stock class key: (species, sex, birthPeriod)
    'open' and 'close' default to 0. Counts are computed from events.
    """

    # ----- 1) Inventory classes from stock_recon_mapping -----
    df = stock_recon_mapping.load_data(inventory_csv_path)
    grouped = stock_recon_mapping.process_data(df, hemisphere)
    classes = stock_recon_mapping.transform_result(grouped, hemisphere)

    # Normalize classes for matching
    norm_classes = []
    for ic in classes:
        norm_classes.append({
            "species": str(ic.get("species", "")).strip(),
            "sex": str(ic.get("sex", "")).strip(),
            "birthPeriod": _normalize_birth_period(ic.get("birthPeriod")),
            "animal_ids": {_to_str_id(aid) for aid in (ic.get("animal_id") or [])},
        })

    # Build an index: animal_id -> class index (first match wins)
    animal_to_class_idx: Dict[str, int] = {}
    for idx, ic in enumerate(norm_classes):
        for aid in ic["animal_ids"]:
            if aid not in animal_to_class_idx:
                animal_to_class_idx[aid] = idx

    # ----- 2) Events from event_file_comb -----
    files_to_read = event_files if event_files is not None else getattr(event_file_comb, "FILES", [])
    events = event_file_comb.get_combined_records(files_to_read)

    normalized_events = []
    for e in events:
        if e.get("meta_is_deleted", False):
            continue
        dt = _parse_event_dt(e.get("event_date_time") or "")
        if not dt:
            continue
        aid = _to_str_id(e.get("animal_id", ""))
        evt = str(e.get("event", "")).strip().lower()
        normalized_events.append({"animal_id": aid, "event": evt, "dt": dt})

    # Optionally include an "unclassified" bucket for ids not in classes
    unclassified_key = {"species": "Unknown", "sex": "Unknown", "birthPeriod": "Unknown"}
    if include_unclassified:
        norm_classes.append({**unclassified_key, "animal_ids": set()})
        unclassified_idx = len(norm_classes) - 1
    else:
        unclassified_idx = None

    class_events: List[Tuple[int, datetime, str]] = []  # (class_idx, dt, event)
    for ev in normalized_events:
        idx = animal_to_class_idx.get(ev["animal_id"], unclassified_idx)
        if idx is None:
            continue
        class_events.append((idx, ev["dt"], ev["event"]))

    # ----- 3) Determine month range from data (events first; fallback to inventory) -----
    if class_events:
        min_dt = min(x[1] for x in class_events)
        max_dt = max(x[1] for x in class_events)
    else:
        if "birth_date" in df and df["birth_date"].notna().any():
            min_dt = df["birth_date"].min().to_pydatetime()
            max_dt = df["birth_date"].max().to_pydatetime()
        else:
            today = datetime.now(timezone.utc)
            min_dt = max_dt = today

    months = _month_iter_inclusive(min_dt, max_dt)

    # Pre-bucket event counts per (class_idx, month_index)
    per_bucket: Dict[Tuple[int, int], Dict[str, int]] = {}

    def month_index_for(dt: datetime) -> int:
        y, m = dt.year, dt.month
        for i, (ms, _) in enumerate(months):
            if ms.year == y and ms.month == m:
                return i
        return -1

    for class_idx, dt, evt in class_events:
        mi = month_index_for(dt)
        if mi == -1:
            continue
        key = (class_idx, mi)
        if key not in per_bucket:
            per_bucket[key] = {"birth": 0, "death": 0, "arrival": 0, "departure": 0}
        if evt in per_bucket[key]:
            per_bucket[key][evt] += 1

    # ----- 4) Emit monthly records for EVERY (class, month) -----
    out: List[Dict[str, Any]] = []

    for mi, (m_start, m_end) in enumerate(months):
        for class_idx, ic in enumerate(norm_classes):
            counts = per_bucket.get((class_idx, mi), {"birth": 0, "death": 0, "arrival": 0, "departure": 0})
            births = counts["birth"]
            deaths = counts["death"]
            arrivals = counts["arrival"]
            departures = counts["departure"]

            # Default close to 0; set force_close=None to compute from events
            computed_close = open_value + births + arrivals - deaths - departures
            close_val = force_close if force_close is not None else computed_close

            record = {
                "startDate": _format_z(m_start),
                "endDate": _format_z(m_end),
                "duration": duration,
                "open": open_value,
                "births": births,
                "deaths": deaths,
                "close": close_val,
                "departuresTotal": departures,
                "arrivalsTotal": arrivals,
                "inventoryClassification": {
                    "sex": ic["sex"],
                    "species": ic["species"],
                    "name": f"{ic['sex']}-{ic['species']}-{m_start.year}",  # <== your requested format
                    "birthPeriod": ic["birthPeriod"],
                },
            }
            out.append(record)

    # ----- 5) Write JSON and return list of dicts -----
    with open(output_json_path, "w", encoding="utf-8") as f:
        json.dump(out, f, ensure_ascii=False, indent=2)

    return out


# -------------------------
# CLI example
# -------------------------
if __name__ == "__main__":
    results = aggregate_inventory_rollup(
        inventory_csv_path="/home/wanmusa/work/stock_recon/example_data/livestock_animal_202509160441.csv",
        hemisphere="southern",
        event_files=None,      # None -> use event_file_comb.FILES
        duration="M",
        open_value=0,          # required default
        force_close=0,         # required default (set to None to compute)
        output_json_path="inventory_rollup.json",
        include_unclassified=False
    )
    print(f"Wrote {len(results)} records to inventory_rollup.json")
    print(json.dumps(results[:3], indent=2))


These are just examples. I leave it to the devs to work out the solution. This is an example