In [None]:

import pandas as pd
import re
import os
from pathlib import Path
from datetime import datetime
from glob import glob
from typing import Optional, Tuple, List

# =========================
# Date parsing helpers
# =========================
DASH_PATTERN = re.compile(r"\s*[\-–—]\s*")  # hyphen, en dash, em dash
DATE_TOKEN   = re.compile(
    r"(\d{1,2}[./]\d{1,2}[./]\d{2,4}|\d{1,2}\s*[A-Za-z]{3,9}\s*'\d{2,4}|\d{1,2}\s*[A-Za-z]{3,9}\s*\d{2,4})"
)

POSSIBLE_FMTS = [
    "%d.%m.%Y", "%d.%m.%y", "%d/%m/%Y", "%d/%m/%y", "%d-%m-%Y", "%d-%m-%y",
    "%d %b %Y", "%d %b '%y", "%d %B %Y", "%d %B '%y"
]

def parse_single_date(token: str):
    """Try multiple formats; fall back to pandas (day-first)."""
    token = str(token).strip().replace("\u2009", " ")
    for fmt in POSSIBLE_FMTS:
        try:
            dt = datetime.strptime(token, fmt)
            if dt.year < 100:  # normalize 2-digit years to 2000s
                dt = dt.replace(year=2000 + dt.year)
            return pd.Timestamp(dt)
        except Exception:
            pass
    return pd.to_datetime(token, dayfirst=True, errors="coerce")

def split_range(cell):
    """Return (start, finish) parsed from a range string or single date."""
    if pd.isna(cell):
        return (pd.NaT, pd.NaT)
    s = str(cell).strip()
    if not s:
        return (pd.NaT, pd.NaT)

    # 1) Try splitting by any dash
    parts = DASH_PATTERN.split(s)
    if len(parts) == 2:
        return (parse_single_date(parts[0]), parse_single_date(parts[1]))

    # 2) Otherwise, take the first 1–2 recognizable date tokens
    tokens = DATE_TOKEN.findall(s)
    if len(tokens) >= 2:
        return (parse_single_date(tokens[0]), parse_single_date(tokens[1]))
    if len(tokens) == 1:
        return (parse_single_date(tokens[0]), pd.NaT)
    return (pd.NaT, pd.NaT)

# =========================
# File/folder utilities
# =========================
def pick_latest_file(
    folder_path: str,
    name_contains: Optional[str] = None,
    allowed_exts: Tuple[str, ...] = (".xlsx", ".csv"),
) -> str:
    """Return full path of the newest matching file in a SharePoint-synced folder."""
    if not os.path.isdir(folder_path):
        raise NotADirectoryError(f"Folder not found: {folder_path}")

    candidates: List[Tuple[str, float]] = []
    for fname in os.listdir(folder_path):
        lower = fname.lower()
        if not lower.endswith(allowed_exts):
            continue
        if name_contains and name_contains.lower() not in lower:
            continue
        fpath = os.path.join(folder_path, fname)
        if os.path.isfile(fpath):
            candidates.append((fpath, os.path.getmtime(fpath)))

    if not candidates:
        raise FileNotFoundError(
            f"No matching files in {folder_path} "
            f"(filters: name_contains={name_contains}, exts={allowed_exts})"
        )

    candidates.sort(key=lambda x: x[1], reverse=True)
    return candidates[0][0]

# =========================
# Header detection
# =========================
def detect_header_row(df0: pd.DataFrame, must_have=("Banner Group","Brand","Variant")) -> int:
    """Locate the header row (tables often have logos/notes above)."""
    scan_rows = min(100, len(df0))
    target = [m.lower() for m in must_have]
    for i in range(scan_rows):
        row_vals = df0.iloc[i].astype(str).str.strip().str.lower().tolist()
        if all(any(m == v for v in row_vals) for m in target):
            return i
    return 0  # fallback

# =========================
# Planner readers (CSV/XLSX)
# =========================
def _read_csv_with_fallbacks(path: str) -> pd.DataFrame:
    for enc in (None, "utf-8-sig", "latin-1"):
        try:
            return pd.read_csv(path, encoding=enc)
        except Exception:
            continue
    raise ValueError(f"Failed to read CSV: {path}")

def read_planner(path: str, sheet_name: str = "PFM Promo Planner") -> pd.DataFrame:
    """
    Read the planner:
      - detect header row (Excel),
      - apply headers,
      - **forward-fill merged cells in `Banner Group` and `Brand`**,
      - ensure `Variant` exists,
      - keep only meaningful rows.
    """
    if path.lower().endswith(".csv"):
        raw = _read_csv_with_fallbacks(path)
        # ---- Forward-fill fix for CSVs as well (in case of blank repeats) ----
        for col in ("Banner Group", "Brand"):
            if col in raw.columns:
                raw[col] = raw[col].ffill()
        if "Variant" not in raw.columns:
            raw["Variant"] = pd.NA
        keep = ~raw[["Brand","Variant"]].isna().all(axis=1)
        return raw.loc[keep].reset_index(drop=True)

    # Excel workbook
    df0 = pd.read_excel(path, sheet_name=sheet_name, header=None, engine="openpyxl")
    hdr = detect_header_row(df0)
    headers = df0.iloc[hdr].astype(str).str.strip().tolist()
    raw = df0.iloc[hdr + 1 :].copy()
    raw.columns = headers
    raw = raw.reset_index(drop=True)

    # ---- Forward-fill fix for merged cells (your requested change) ----
    for col in ("Banner Group", "Brand"):
        if col in raw.columns:
            raw[col] = raw[col].ffill()

    # Make sure Variant exists (some tabs omit it)
    if "Variant" not in raw.columns:
        raw["Variant"] = pd.NA

    # Keep rows that have at least Brand or Variant
    keep = ~raw[["Brand","Variant"]].isna().all(axis=1)
    return raw.loc[keep].reset_index(drop=True)

# =========================
# Period extraction
# =========================
def _coalesce(df: pd.DataFrame, candidates: List[str]) -> Optional[pd.Series]:
    for c in candidates:
        if c in df.columns:
            return df[c]
    return None

def extract_periods(raw: pd.DataFrame):
    """Support combined range (one column) or Start/Finish (two columns)."""
    buy_combined  = ["Buy In Period", "Buy-in Period", "Buyin Period"]
    buy_start     = ["Buy In Start", "Buy-in Start", "Start (Buy In)",
                     "Buy In - Start", "Buy In: Start", "Start - Buy In", "Start"]
    buy_finish    = ["Buy In Finish", "Buy-in Finish", "Finish (Buy In)",
                     "Buy In - Finish", "Buy In: Finish", "Finish - Buy In", "Finish"]

    promo_combined = ["Promotional Period", "Promo Period", "Promotion Period"]
    promo_start    = ["Promotional Start", "Promo Start", "Promotion Start",
                      "Start (Promo)", "Start - Promo", "Start (Promotional Period)", "Start"]
    promo_finish   = ["Promotional Finish", "Promo Finish", "Promotion Finish",
                      "Finish (Promo)", "Finish - Promo", "Finish (Promotional Period)", "Finish"]

    def extract(df, combined_cols, start_cols, finish_cols):
        s_col, f_col = _coalesce(df, start_cols), _coalesce(df, finish_cols)
        if s_col is not None or f_col is not None:
            s = pd.to_datetime(s_col, errors="coerce", dayfirst=True) if s_col is not None else pd.Series(pd.NaT, index=df.index)
            f = pd.to_datetime(f_col, errors="coerce", dayfirst=True) if f_col is not None else pd.Series(pd.NaT, index=df.index)
            return s, f

        comb = _coalesce(df, combined_cols)
        if comb is None:
            return pd.Series(pd.NaT, index=df.index), pd.Series(pd.NaT, index=df.index)

        s_vals, f_vals = zip(*comb.apply(split_range))
        return pd.Series(list(s_vals), index=df.index), pd.Series(list(f_vals), index=df.index)

    buy_s, buy_f = extract(raw, buy_combined,  buy_start,  buy_finish)
    pro_s, pro_f = extract(raw, promo_combined, promo_start, promo_finish)
    return buy_s, buy_f, pro_s, pro_f

# =========================
# Build final result
# =========================
def build_result(raw: pd.DataFrame) -> pd.DataFrame:
    buy_s, buy_f, pro_s, pro_f = extract_periods(raw)

    res = pd.DataFrame({
        "Banner Group": raw.get("Banner Group", pd.Series([pd.NA]*len(raw))),
        "Brand":        raw.get("Brand", pd.Series([pd.NA]*len(raw))),
        "Variant":      raw.get("Variant", pd.Series([pd.NA]*len(raw))),
        "Buy In Start": buy_s,
        "Buy In Finish":buy_f,
        "Promo Start":  pro_s,
        "Promo Finish": pro_f
    })

    # Keep rows that have a Variant OR at least one date (avoid spacer rows)
    date_cols = ["Buy In Start","Buy In Finish","Promo Start","Promo Finish"]
    keep = (~res["Variant"].isna()) | (~pd.isna(res[date_cols]).all(axis=1))
    res = res.loc[keep].reset_index(drop=True)

    # Clean text fields
    for c in ["Banner Group","Brand","Variant"]:
        if c in res.columns:
            res[c] = res[c].astype(str).str.strip().replace({"nan": "", "None": ""})

    return res

# =========================
# Transformations
# =========================
def promo_configurator(df: pd.DataFrame) -> pd.DataFrame:
    # Final structure
    field_list = [
        'name',
        'start_date',
        'end_date',
        'manufacturer_id',
        'buyer_group_id',
        'user',
        'date_added'
    ]

    promo_config = pd.DataFrame(columns=field_list)

    # Base fields
    promo_config['name'] = df['Banner Group'].astype(str).str.strip() + " - " + df['Brand'].astype(str).str.strip()
    promo_config['start_date'] = pd.to_datetime(df['Promo Start'], dayfirst=True, errors='coerce')
    promo_config['end_date']   = pd.to_datetime(df['Promo Finish'], dayfirst=True, errors='coerce')
    promo_config['manufacturer_id'] = 206575077
    promo_config['user'] = 'bevco_master'
    promo_config['date_added'] = datetime.now().strftime('%Y-%m-%d')

    # -----------------------
    # Buyer Group Mapping
    # -----------------------
    mapping = {
        "Sasol": 177268,
        "TotalEnergies": 178524,
        "BP Express": 1864328,
        "Engen": 1865071,
        "Shell": 1867297,
        "Freshstop /Astron / Caltex": 172785,
        "Pick 'n Pay Express": 1864328
    }

    # Normalize Banner Group for matching
    cleaned = df['Banner Group'].astype(str).str.strip()

    buyer_ids = []
    for val in cleaned:
        assigned = None
        # Attempt direct match
        if val in mapping:
            assigned = mapping[val]
        else:
            # Attempt substring match (e.g. "Caltex (Astron) Caltex" etc.)
            for key in mapping:
                if key in val:
                    assigned = mapping[key]
                    break

        buyer_ids.append(assigned if assigned is not None else "")

    promo_config['buyer_group_id'] = buyer_ids

    return promo_config

# =========================
# Main convenience entrypoint
# =========================
def extract_promos_from_sharepoint_folder(
    folder_path: str,
    name_contains: Optional[str] = None,
    sheet_name: str = "PFM Promo Planner",
    github_data_folder: str = r"http://github.com/insight-base/promo-loading-automation/data"
):
    """
    1) Select newest .xlsx/.csv in the SharePoint-synced folder.
    2) Read the planner tab and normalize (with forward-fill fix).
    3) Parse date ranges.
    4) Save a dated CSV into your GitHub repo's /data folder.
    """
    latest_path = pick_latest_file(folder_path, name_contains=name_contains)
    raw = read_planner(latest_path, sheet_name=sheet_name)
    result = build_result(raw)
    result = promo_configurator(result)
    

    # --- Save to GitHub repository instead of SharePoint ---
    os.makedirs(github_data_folder, exist_ok=True)

    out_name = f"bevco_promo_{datetime.now().strftime('%Y%m%d')}.csv"
    output_path = os.path.join(github_data_folder, out_name)

    result.to_csv(output_path, index=False)
    return result, output_path


['.env', '.git', '.mypy_cache', '.vscode', 'data', 'er2m_agents', 'img', 'installation.ipynb', 'LICENSE', 'README.md', 'requirements.txt', 'setup.py']


In [None]:
# ---- config ----
sharepoint_folder =  r"C:\Users\Eddie\OneDrive - eRoute2Market\eRoute2Market\eR2m Team Sharepoint - Eddwin\Loading Promos" # OneDrive-synced path

df, out_csv = extract_promos_from_sharepoint_folder(
    folder_path=sharepoint_folder,
    name_contains="PFM Promotional Planner",   # optional filter
    sheet_name="PFM Promo Planner",            # default; change if the tab name differs
    github_data_folder=r".\data"  # your local GitHub repo path
)
print(out_csv)
df


In [None]:
import mysql.connector
from mysql.connector import errorcode
from datetime import date

In [None]:
def load_promos_from_df(df: pd.DataFrame, mysql_cfg: dict) -> pd.DataFrame:
    """
    Calls front_end.sp_insert_config_promo for each row in df.
    Returns df with a new column `config_promo_id` holding AUTO_INCREMENT IDs.
    Required df columns:
      name, start_date, end_date, manufacturer_id, buyer_group_id, user, date_added
    """
    cnx = mysql.connector.connect(**mysql_cfg)
    cnx.autocommit = False  # we control commits
    cur = cnx.cursor()

    inserted_ids = []
    try:
        for i, row in df.iterrows():
            # Ensure date types are correct (MySQL connector likes python date)
            start_date = pd.to_datetime(row["start_date"]).date() if pd.notna(row["start_date"]) else None
            end_date   = pd.to_datetime(row["end_date"]).date()   if pd.notna(row["end_date"]) else None
            date_added = pd.to_datetime(row["date_added"]).date() if "date_added" in df.columns and pd.notna(row["date_added"]) else date.today()

            args = (
                str(row["name"]),
                start_date,
                end_date,
                int(row["manufacturer_id"]),
                int(row["buyer_group_id"]),
                str(row["user"]),
                date_added,
                0  # placeholder for OUT param (some connectors require it even if unused)
            )

            # 1) assign OUT param to a session variable
            cur.execute("SET @o_config_promo_id = LAST_INSERT_ID();")

            # 2) call procedure passing @o_config_promo_id as OUT
            cur.callproc("supply_chain.insert_config_promo", [
                args[0], args[1], args[2], args[3], args[4], args[5], args[6], "@o_config_promo_id", "@o_message"
            ])

            # 3) read the OUT value
            print(f"Running the stored procedure for row {i+1}/{len(df)}: {args[0]}")
            cur.execute("SELECT @o_config_promo_id;")
            new_id = cur.fetchone()[0]
            inserted_ids.append(int(new_id))

        cnx.commit()
    except Exception as e:
        cnx.rollback()
        raise
    finally:
        cur.close()
        cnx.close()
        print("MySQL connection closed.")

    out_df = df.copy()
    out_df["config_promo_id"] = inserted_ids
    return out_df

In [None]:
import os
MYSQL_HOST=os.getenv("MYSQL_HOST")
MYSQL_PORT=os.getenv("MYSQL_PORT")
MYSQL_USER=os.getenv("MYSQL_USER")
MYSQL_PASS=os.getenv("MYSQL_PASS")
MYSQL_DB=os.getenv("front_end")

df = pd.DataFrame([{
    "name": "Engen - Pepsi 500ml - 2026/01/22 - 2026/03/11",
    "start_date": "2026-03-01",
    "end_date": "2026-04-30",
    "manufacturer_id": 206575077,
    "buyer_group_id": 1865071,
    "user": "bevco_master",
    "date_added": "2026-01-26"
}])

mysql_cfg = {
    "host": os.getenv("MYSQL_HOST"),
    "user": os.getenv("MYSQL_USER"),
    "password": os.getenv("MYSQL_PASS"),
    "database": os.getenv("front_end"),
    "port": os.getenv("MYSQL_PORT")
}

df_with_ids = load_promos_from_df(df, mysql_cfg)
df_with_ids