In [1]:
import requests
import yaml
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import (
    ForeignKey,
    create_engine,
    Column,
    Integer,
    Float,
    String,
    DateTime,
    Boolean, 
    Table,
)
# from eralchemy2 import render_er
import re
from datetime import datetime
from supermarktconnector.ah import AHConnector

In [7]:
connector = AHConnector()
connector.get_categories()
connector.search_products(query='HALFV MELK', size=1, page=0)

{'page': {'size': 1, 'totalElements': 59, 'totalPages': 59, 'number': 0},
 'products': [{'webshopId': 546644,
   'hqId': 0,
   'title': "Kellogg's Kellogg's cornflakes ontbijt pakket",
   'salesUnitSize': 'per pakket',
   'images': [],
   'priceBeforeBonus': 4.94,
   'orderAvailabilityStatus': 'IN_ASSORTMENT',
   'mainCategory': 'Zuivel, plantaardig en eieren',
   'subCategory': 'Halfvolle melk',
   'brand': "Kellogg's",
   'shopType': 'AH',
   'availableOnline': True,
   'isPreviouslyBought': False,
   'descriptionHighlights': "<p>Kellogg's corn flakes ontbijtgranen zijn heerlijke goudbruin gebakken maisvlokken. De iconosche combinatie Kellogg's Cornflakes met melk is sinds 1906 het favoriete ontbijt van miljoenen mensen.</p><p><ul><li>De Zaanse Hoeve Halfvolle Melk 2L 1x</li><li>Kellogg's Cornflakes 1x</li></ul></p>",
   'propertyIcons': [],
   'nix18': False,
   'isStapelBonus': False,
   'extraDescriptions': [],
   'isBonus': False,
   'descriptionFull': 'Dit pakket wordt als losse

In [81]:
REFRESH_TOKEN_URL = "https://api.ah.nl/mobile-auth/v1/auth/token/refresh"
TOKEN_URL = "https://api.ah.nl/mobile-auth/v1/auth/token"
RECEIPTS_URL = "https://api.ah.nl/mobile-services/v1/receipts/"
RECEIPT_DETAILS_URL = "https://api.ah.nl/mobile-services/v2/receipts/{transaction_id}"

In [82]:
with open("config.yml", "r") as f:
    config = yaml.safe_load(f)

In [83]:
def login():
    """Login to the API."""
    data = {
        "code": config["api"]["code"],
        "clientId": "appie",
    }
    response = requests.post(TOKEN_URL, json=data, headers={"Content-Type": "application/json"})
    response.raise_for_status()
    tokens = response.json()
    config["api"]["access_token"] = tokens["access_token"]
    config["api"]["refresh_token"] = tokens["refresh_token"]
    with open("config.yml", "w") as f:
        yaml.dump(config, f)
    return tokens


def update_tokens():
    """Update the access token and refresh token in the config file."""
    tokens = fetch_new_tokens()
    config["api"]["access_token"] = tokens["access_token"]
    config["api"]["refresh_token"] = tokens["refresh_token"]
    with open("config.yml", "w") as f:
        yaml.dump(config, f)

def fetch_new_tokens():
    """Fetch new tokens using the refresh token."""
    data = {
        "refreshToken": config["api"]["refresh_token"],
        "clientId": "appie",
    }
    response = requests.post(REFRESH_TOKEN_URL, json=data, headers={"Content-Type": "application/json"})
    response.raise_for_status()
    return response.json()


def fetch_receipts():
    """Fetch the receipts from the API."""
    response = requests.get(RECEIPTS_URL, headers={"Authorization": f"Bearer {config['api']['access_token']}"})
    if response.status_code == 401:
        update_tokens()
        response = requests.get(RECEIPTS_URL, headers={"Authorization": f"Bearer {config['api']['access_token']}"})
    response.raise_for_status()
    result = response.json()

    receipts = [Receipt(receipt) for receipt in result]
    return receipts



def parse_quantity(quantity: str) -> tuple[float, str]:
    """Parse the quantity and unit from the quantity string.
    
    Args:
        quantity (str): The quantity string.

    Returns:
        tuple[float, str]: The quantity and unit.
    """
    quantity = quantity.replace(",", ".")
    regex_result = re.match(r'(\d+.\d+)([a-zA-Z]+)', quantity)
    if quantity.isnumeric():
        return float(quantity), None
    elif regex_result:
        return float(regex_result.group(1)), regex_result.group(2)


def string_to_float(string: str) -> float:
    """Convert the string to a float.
    
    Args:
        string (str): The number string.

    Returns:
        float: The number as a float.
    """
    return float(string.replace(",", "."))

In [84]:
class Product:
    def __init__(self, quantity: float=None, unit: str=None, name: str=None, price: float=None, total_price: float=None, indicator: str=None):
        self.quantity = quantity
        self.unit = unit
        self.name = name
        self.price = price
        self.total_price = total_price
        self.indicator = indicator

    def __repr__(self):
        return f"Product(quantity={self.quantity}, unit={self.unit}, description={self.name}, price={self.price}, total_price={self.total_price}, indicator={self.indicator})"


class Discount:
    def __init__(self, type: str=None, description: str=None, amount: float=None):
        self.type = type
        self.description = description
        self.amount = amount

    def __repr__(self):
        return f"Discount(description={self.description}, amount={self.amount})"


class Location:
    def __init__(self, name: str=None, address: str=None, house_number: str=None, postal_code: str=None, city: str=None):
        self.name = name
        self.address = address
        self.house_number = house_number
        self.postal_code = postal_code
        self.city = city

    def __repr__(self):
        return f"Location(name={self.name}, address={self.address}, house_number={self.house_number}, postal_code={self.postal_code}, city={self.city})"


class Receipt:
    def __init__(self, receipt):
        self.transaction_id = receipt["transactionId"]
        self.datetime = datetime.strptime(receipt["transactionMoment"], "%Y-%m-%dT%H:%M:%SZ")
        self.receipt_details = self._get_receipt_details()
        self.location = self._get_location(receipt["storeAddress"])
        self.total = receipt["total"]["amount"]["amount"]
        self.products = self._get_products()
        self.discounts = self._get_discounts()


    def _get_receipt_details(self) -> list:
        """Fetch the details of a receipt from the API."""
        url = RECEIPT_DETAILS_URL.format(transaction_id=self.transaction_id)
        response = requests.get(url, headers={"Authorization": f"Bearer {config['api']['access_token']}"})
        if response.status_code == 401:
            update_tokens()
            response = requests.get(url, headers={"Authorization": f"Bearer {config['api']['access_token']}"})
        response.raise_for_status()
        return response.json()["receiptUiItems"]

    def _get_location(self, store):
        """Set the store location of the receipt."""
        return Location(
            name=self.receipt_details[1]["value"],
            address=store["street"],
            house_number=store["houseNumber"],
            postal_code=store["postalCode"],
            city=store["city"],
        )


    def _get_products(self) -> list:
        receipt_rows = self.receipt_details

        # Remove all elements before "bonuskaart" (and that element itself) and after "subtotaal" to get only the products that have been purchased (without discounts etc.)
        before_index = next((index for (index, d) in enumerate(receipt_rows) if d["type"].lower() == "product" and d["description"].lower() == "bonuskaart"), None)
        after_index = next((index for (index, d) in enumerate(receipt_rows) if d["type"].lower() == "subtotal" and d["text"].lower() == "subtotaal"), None)
        product_rows = receipt_rows[before_index + 1:after_index]

        # remove all entires don't have the type "product"
        product_rows = [item for item in product_rows if item["type"].lower() == "product"]
        products = self._parse_products(product_rows)
        return products


    def _get_discounts(self) -> dict:
        receipt_rows = self.receipt_details

        # Remove all elements before "subtotaal" (and that element itself) and after "uw voordeel" to get only the discounts that were applied
        before_index = next((index for (index, d) in enumerate(receipt_rows) if d["type"].lower() == "subtotal" and d["text"].lower() == "subtotaal"), None)
        after_index = next((index for (index, d) in enumerate(receipt_rows) if d["type"].lower() == "total" and d["label"].lower() == "uw voordeel"), None)
        product_rows = receipt_rows[before_index + 1:after_index]

        # remove all entires don't have the type "product", as all discounts also have the type product
        product_rows = [item for item in product_rows if item["type"].lower() == "product"]
        discounts = {"discounts": [], "total_discount": 0.0}
        for row in product_rows:
            discount_amount = abs(string_to_float(row["amount"]))
            discount = Discount(row["quantity"], row["description"], discount_amount)
            discounts["discounts"].append(discount)
            discounts["total_discount"] += discount_amount
        return discounts
    
    
    def _parse_products(self, items: list) -> list:
        """Parse the products from the API response.
        
        Args:
            items (list): The items from the API response.
            
        Returns:
            list: A list of Product objects.
        """
        products = []
        for row in items:
            if "statiegeld" in row["description"].lower():
                product = Product(1.0, None, row["description"], None, string_to_float(row["amount"]), None)
            else:
                quantity, unit = parse_quantity(row["quantity"])
                price = string_to_float(row["price"]) if "price" in row else None
                if row["indicator"] == "":
                    row["indicator"] = None
                amount = string_to_float(row["amount"])
                product = Product(quantity, unit, row["description"], price, amount, row["indicator"])
            products.append(product)
        return products


    def __repr__(self):
        return f"Receipt(transaction_id={self.transaction_id}, datetime={self.datetime}, location={self.location}, total={self.total}, products={self.products}, discounts={self.discounts})"


In [85]:
# login()

In [86]:
# Create database connection based on config. Either use a local SQLite database, remote MySQL database or a PostgreSQL database.
if config["database"]["type"] == "sqlite":
    engine = create_engine("sqlite:///" + config["database"]["path"])
elif config["database"]["type"] == "mysql":
    engine = create_engine(
        f"mysql+pymysql://{config['database']['username']}:{config['database']['password']}@{config['database']['host']}/{config['database']['database']}"
    )
elif config["database"]["type"] == "postgresql" or config["database"]["type"] == "postgres":
    engine = create_engine(
        f"postgresql://{config['database']['username']}:{config['database']['password']}@{config['database']['host']}/{config['database']['database']}"
    )
else:
    raise ValueError("Database type not supported.")

Base = declarative_base()


class DbReceipt(Base):
    __tablename__ = "receipts" 

    id = Column(Integer, primary_key=True)
    transaction_id = Column(String(255), nullable=False)
    datetime = Column(DateTime)
    location = Column(Integer, ForeignKey("locations.id"))
    total_price = Column(Float)
    total_discount = Column(Float)


class DbProduct(Base):
    __tablename__ = "products"
    id = Column(Integer, primary_key=True)
    name = Column(String)
    receipt = Column(Integer, ForeignKey("receipts.id"))
    quantity = Column(Integer)
    unit = Column(String)
    price = Column(Float)
    total_price = Column(Float)
    category = Column(Integer, ForeignKey("categories.id"))
    

class DbDiscount(Base):
    __tablename__ = "discounts"
    id = Column(Integer, primary_key=True)
    receipt = Column(Integer, ForeignKey("receipts.id"))
    type = Column(String)
    description = Column(String)
    amount = Column(Float)


class DbCategory(Base):
    __tablename__ = "categories"
    id = Column(Integer, primary_key=True)
    name = Column(String, nullable=False)


class DbLocation(Base):
    __tablename__ = "locations"
    id = Column(Integer, primary_key=True)
    name = Column(String)
    address = Column(String)
    house_number = Column(String)
    city = Column(String)
    postal_code = Column(String)


class DbTag(Base):
    __tablename__ = "tags"
    id = Column(Integer, primary_key=True)
    name = Column(String, nullable=False)


# Many-to-many tags and items
tags_items = Table(
    "tags_products",
    Base.metadata,
    Column("tag_id", Integer, ForeignKey("tags.id")),
    Column("product_id", Integer, ForeignKey("products.id")),
)

Base.metadata.create_all(engine)

# filename = 'mymodel.png'
# render_er(Base.metadata, filename)
# imgplot = plt.imshow(mpimg.imread(filename))
# plt.show()


In [87]:
receipts = fetch_receipts()

In [88]:
receipts = fetch_receipts()

Session = sessionmaker(bind=engine)
session = Session()
for receipt in receipts:
    # if receipt is already in the database, skip it
    if session.query(DbReceipt).filter_by(transaction_id=receipt.transaction_id).first():
        continue
    # Create a new location
    location = receipt.location
    dbLocation = DbLocation(
        name=location.name,
        address=location.address,
        house_number=location.house_number,
        city=location.city,
        postal_code=location.postal_code,
    )
    # Add the location to the database if it doesn't exist yet
    existing_location = session.query(DbLocation).filter_by(name=location.name).first()
    if not existing_location:
        session.add(dbLocation)
        session.commit()
    # Create a new receipt
    dbReceipt = DbReceipt(
        transaction_id=receipt.transaction_id,
        datetime=receipt.datetime,
        location=dbLocation.id if not existing_location else existing_location.id,
        total_price=receipt.total,
        total_discount=receipt.discounts["total_discount"],
    )
    # Add the receipt to the session
    session.add(dbReceipt)
    session.commit()
    # Add the products to the session
    for product in receipt.products:
        dbProduct = DbProduct(
            name=product.name,
            receipt=dbReceipt.id,
            quantity=product.quantity,
            unit=product.unit,
            price=product.price,
            total_price=product.total_price,
        )
        session.add(dbProduct)

    # Add the discounts to the session
    for discount in receipt.discounts["discounts"]:
        dbDiscount = DbDiscount(
            receipt=dbReceipt.id,
            type=discount.type,
            description=discount.description,
            amount=discount.amount,
        )
        session.add(dbDiscount)
        
    # Commit the session to the database
    session.commit()
# Close the session
session.close()