# **Data Acquisition: Fetching Product Data from Kroger API**

## **Introduction**
This notebook demonstrates the data acquisition pipeline for retrieving product information from the Kroger API. The process involves:

- **Authenticating with the Kroger API** using OAuth2.
- **Fetching product data** from Kroger locations.
- **Filtering and saving the data** for analysis.
- **Tracking API calls** to ensure efficient data retrieval.

We leverage Python scripts developed in this project:

- `main.py`: Orchestrates the data pipeline.
- `kroger_api.py`: Handles API authentication and requests.
- `fetch_product.py`: Manages data fetching and storage.
- `data_processing.py`: Filters and processes product data.
- `tracking.py`: Logs API requests and manages location tracking.

## **Kroger API Integration**

### **Overview**
The `kroger_api.py` module handles authentication and data retrieval from the **Kroger Product Compact API**. This script enables access to product details, pricing, and availability for various store locations.

### **Key Features**
- **OAuth2 Authentication**: Uses client credentials to generate an access token.
- **Token Management**: Automatically refreshes the access token when expired.
- **Product Search**: Fetches product data using keyword-based search queries.
- **Error Handling & Rate Limiting**: Implements retry mechanisms for failed requests and prevents API throttling.

### **How It Works**
1. **Authenticate** with Kroger API using client ID and secret.
2. **Obtain an access token**, storing it in environment variables.
3. **Query product data** using location IDs and search terms.
4. **Return structured product information** including pricing, category, and stock availability.
5. **Handle API errors** such as timeouts, authentication failures, and invalid responses.

This module is a core component of the data pipeline, facilitating seamless interaction with Kroger’s API for product data acquisition.


In [None]:
import os
from dotenv import get_key, load_dotenv
import requests
from requests.exceptions import RequestException
import base64
import time
from datetime import datetime, timedelta

# Set up directory paths
BASE_DIR = os.path.dirname(os.path.abspath(__file__))  # Get `src/acquisition/`
DATA_DIR = os.path.abspath(os.path.join(BASE_DIR, "..", "data"))  # Navigate to `src/data/`

# Load environment variables
ENV_FILE = os.path.join(DATA_DIR,"kroger_client_info.env")  # Define .env file path

load_dotenv()
CLIENT_ID = get_key(ENV_FILE, "KROGER_CLIENT_ID")
CLIENT_SECRET = get_key(ENV_FILE, "KROGER_CLIENT_SECRET")

# # Load API credentials from GitHub Actions environment variables
# # Preserve in the event actions is restarted
# CLIENT_ID = os.getenv("KROGER_CLIENT_ID")
# CLIENT_SECRET = os.getenv("KROGER_CLIENT_SECRET")

if not CLIENT_ID or not CLIENT_SECRET:
    raise ValueError("Missing Kroger API credentials in environment variables!")

# Encode credentials for API authentication
encoded_auth = base64.b64encode(f"{CLIENT_ID}:{CLIENT_SECRET}".encode()).decode()

TOKEN_URL = "https://api-ce.kroger.com/v1/connect/oauth2/token"
HEADERS = {
    "Authorization": f"Basic {encoded_auth}",
    "Content-Type": "application/x-www-form-urlencoded"
}

# Function to get a new token if expired
def get_kroger_product_compact_token():
    """Retrieve or refresh the Kroger Product Compact API access token."""
    access_token = os.getenv("PRODUCT_COMPACT_ACCESS_TOKEN")
    expiration = os.getenv("PRODUCT_COMPACT_ACCESS_TOKEN_EXPIRATION")

    if not access_token or datetime.now() >= datetime.fromisoformat(expiration):
        print("🔄 Token expired or missing, requesting a new one...")

        response = requests.post(TOKEN_URL, headers=HEADERS, data="grant_type=client_credentials&scope=product.compact")
        if response.status_code == 200:
            response_data = response.json()
            access_token = response_data.get("access_token")
            expires_in = response_data.get("expires_in", 1800)
            expiration_time = datetime.now() + timedelta(seconds=expires_in)

            # Save new tokens in environment for next steps
            os.environ["PRODUCT_COMPACT_ACCESS_TOKEN"] = access_token
            os.environ["PRODUCT_COMPACT_ACCESS_TOKEN_EXPIRATION"] = expiration_time.isoformat()

            print("New Token Retrieved!")
        else:
            raise RuntimeError(f"Failed to retrieve token: {response.json()}")

    return access_token

# Ensure API base URL is set
PRODUCTS_API_URL = "https://api-ce.kroger.com/v1/products"

def search_kroger_products(location_id, search_terms=["Eggs", "Bread"], limit=50):
    """Search for products at a Kroger location with pagination and enhanced error handling."""
    
    token = get_kroger_product_compact_token()
    if not token:
        print("Failed to retrieve API token. Skipping product search.")
        return None

    headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
    all_products = []
    max_pages = 5

    for term in search_terms:
        start = 1
        for page in range(max_pages):
            params = {
                "filter.term": term,
                "filter.locationId": location_id,
                "filter.limit": limit,
                "filter.start": start,
            }

            try:
                response = requests.get(PRODUCTS_API_URL, headers=headers, params=params)

                if response.status_code == 200:
                    data = response.json()
                    
                    if "data" in data:
                        for product in data["data"]:
                            product["locationId"] = location_id  # Ensure location ID is stored
                        all_products.extend(data["data"])

                        # Check pagination limits
                        total_results = data.get("meta", {}).get("pagination", {}).get("total", 0)
                        if len(all_products) >= total_results or len(data["data"]) < limit:
                            break  # Stop pagination if we've fetched all results

                        # Move to next batch
                        start += limit
                        time.sleep(1)  # Prevent rate-limiting
                    else:
                        print(f"No more products found for '{term}' at location {location_id}.")
                        break
                else:
                    print(f"API Error ({response.status_code}) fetching '{term}': {response.json()}")
                    break

            except RequestException as e:
                print(f"Network error while fetching '{term}' at location {location_id}: {e}")
                break  # Exit loop on request failure

    return all_products




## **Product Data Retrieval**

### **Overview**
The `fetch_product.py` module is responsible for **fetching, filtering, and storing** product data from the Kroger API. It acts as the intermediary between the API and the local dataset, ensuring only relevant products are retained.

### **Key Features**
- **Batch Processing**: Iterates through multiple store locations, retrieving product data efficiently.
- **Product Filtering**: Applies category and keyword-based filtering to retain only relevant products.
- **Data Storage**: Saves retrieved and filtered data into structured CSV files.
- **Error Handling & Logging**: Captures API failures, logs progress, and prevents redundant API calls.

### **How It Works**
1. **Fetch Products**: Calls the `search_kroger_products()` function from `kroger_api.py` for a given store location.
2. **Extract Relevant Data**: Retrieves product details, including brand, description, price, and stock levels.
3. **Apply Filtering**: Uses predefined criteria (categories and keywords) to retain essential products.
4. **Save Data**: Appends the filtered data to `kroger_product_data.csv`.
5. **Update Tracker**: Marks locations as successfully queried to prevent unnecessary repeat calls.

### **Dependencies**
- `kroger_api.py` for making API requests.
- `data_processing.py` for product filtering.
- `tracking.py` for updating the API call log.

This module ensures that the data pipeline retrieves and processes only **relevant grocery product data** while efficiently managing API resources.


In [None]:
import os
import sys
import time
import pandas as pd
import datetime
from acquisition.kroger_api import search_kroger_products
from acquisition.data_processing import filter_products, save_to_csv
from acquisition.tracking import update_tracker, update_log

# Set up directory paths
BASE_DIR = os.path.dirname(os.path.abspath(__file__))  # Get `src/acquisition/`
DATA_DIR = os.path.abspath(os.path.join(BASE_DIR, "..", "data"))  # Navigate to `src/data/`

# # Set base directory dynamically
# # Preserve for reference
# BASE_DIR = os.getenv("GITHUB_WORKSPACE", os.path.dirname(os.path.abspath(__file__)))
# DATA_DIR = os.path.join(BASE_DIR, "kroger-data-pipeline", "src", "data")

# Update file paths
PRODUCTS_FILE = os.path.join(DATA_DIR, "kroger_product_data.csv")
LOCATION_FILE = os.path.join(DATA_DIR, "kroger_locations.csv")
PRODUCT_API_LOG = os.path.join(DATA_DIR, "product_api_log.csv")

# Ensure data folder exists in GitHub Actions runner
if not os.path.exists(DATA_DIR):
    os.makedirs(DATA_DIR, exist_ok=True)
    print(f"Created missing data directory: {DATA_DIR}")

def fetch_and_filter_products(location_id):
    """Fetch products from Kroger API, filter relevant ones, and save to CSV."""
    
    # Call API (retrieves all products from Dairy & Bakery)
    product_results = search_kroger_products(location_id=location_id)
    
    if not product_results:
        print(f"⚠️ No results found for Location ID {location_id}")
        return None

    filtered_products = []

    if product_results:  # Ensure data exists
        for product in product_results:
            # Extract core product details
            product_id = product.get("productId", "Unknown")
            upc = product.get("upc", "Unknown")
            brand = product.get("brand", "Unknown")
            description = product.get("description", "Unknown")
            category = ", ".join(product.get("categories", []))  # Convert list to string
            location_id = product.get("locationId", "UNKNOWN_LOCATION")  # Ensure location ID is included
            
            # Extract pricing & availability
            price_info = product.get("items", [{}])[0]  # Get first item
            regular_price = price_info.get("price", {}).get("regular", 0)
            promo_price = price_info.get("price", {}).get("promo", 0)
            stock_level = price_info.get("inventory", {}).get("stockLevel", "Unknown")
            
            # Extract packaging details
            size = price_info.get("size", "Unknown")
            sold_by = price_info.get("soldBy", "Unknown")
            
            # Add date retrieved
            date_retrieved = datetime.date.today().strftime('%Y-%m-%d')
            
            # Store filtered product data
            filtered_products.append({
                "Product ID": product_id,
                "UPC": upc,
                "Brand": brand,
                "Description": description,
                "Category": category,
                "Location ID": location_id,
                "Regular Price": regular_price,
                "Promo Price": promo_price,
                "Stock Level": stock_level,
                "Size": size,
                "Sold By": sold_by,
                "Date Retrieved": date_retrieved
            })

    # Convert to DataFrame
    products_df = pd.DataFrame(filtered_products)
    
    # Explicitly set correct data types before saving
    products_df = products_df.astype({
        "Product ID": str,
        "UPC": str,
        "Location ID": str
    }, errors="ignore")  # Prevent errors if some values can't be converted

    # Apply Filtering Step (Using `filter_products`)
    filtered_df = filter_products(products_df)

    # Save to CSV with correct pathing
    save_to_csv(filtered_df, PRODUCTS_FILE)

    return filtered_df

def fetch_products_in_batches(batch_size=10):
    """Iterates over locations that need data and fetches products."""    
    # Refresh needs_data status before fetching
    update_log()
    
    tracker_df = pd.read_csv(PRODUCT_API_LOG, dtype={"Location ID": str})
    
    # Select only locations that need data
    locations_to_process = tracker_df[tracker_df["Needs Data"] == True]["Location ID"].tolist()
    
    total_locations = len(locations_to_process)
    print(f"{total_locations} require updates, this run will process {batch_size} when complete.")
    processed_count = 0
    processed_locations = []

    for i, location_id in enumerate(locations_to_process, start=1):
        try:
            fetch_and_filter_products(location_id)  # Calls the API and saves data
            processed_locations.append(location_id)
            processed_count += 1

            # Mental sanity feature so I don't constantly wonder how many records have been processed    
            sys.stdout.write(f"\r🔄 Progress: {i}/{batch_size} ZIP codes processed")
            sys.stdout.flush()
            
            # Stop after reaching batch limit
            if processed_count >= batch_size:
                break

            # Prevent rate limiting
            if processed_count < batch_size:
                time.sleep(2)
                
        except Exception as e:
            print(f"\n❌ Error processing {location_id}: {str(e)}")

    print(f"\nCompleted {processed_count} API calls in this run.")
    
    return processed_locations

if __name__ == "__main__":
    fetch_products_in_batches(batch_size=10)


## **Data Processing & Filtering**

### **Overview**
The `data_processing.py` module is responsible for **structuring, filtering, and managing** the product data retrieved from the Kroger API. It ensures that only relevant data is retained and optimally stored for further analysis.

### **Key Features**
- **File Management**: Ensures data directories and CSV files exist before writing data.
- **Product Filtering**: Selects products based on predefined **categories** and **keywords** (e.g., "Dairy", "Bakery").
- **Tracker Initialization**: Maintains and updates API request logs to monitor data freshness.
- **Data Cleaning**: Removes duplicate records and updates tracking logs for stale data.

### **How It Works**
1. **Initialize Files**: Ensures that required CSV files exist (`kroger_product_data.csv`, `product_api_log.csv`).
2. **Filter Products**: Uses category and keyword-based filtering to retain only necessary items.
3. **Update API Tracker**: Refreshes `product_api_log.csv` to flag locations requiring updates.
4. **Remove Duplicates**: Cleans up redundant data entries in the tracking log.
5. **Save Processed Data**: Stores filtered results in `kroger_product_data.csv` for downstream analysis.

### **Dependencies**
- `tracking.py` for managing API call logs.

This module ensures **efficient data storage and retrieval**, keeping the dataset clean, relevant, and ready for analysis.


In [None]:
import pandas as pd
from datetime import datetime, timedelta
import os

# Set up directory paths
BASE_DIR = os.path.dirname(os.path.abspath(__file__))  # Get `src/acquisition/`
DATA_DIR = os.path.abspath(os.path.join(BASE_DIR, "..", "data"))  # Navigate to `src/data/`

# # Set base directory dynamically
# BASE_DIR = os.getenv("GITHUB_WORKSPACE", os.path.dirname(os.path.abspath(__file__)))
# DATA_DIR = os.path.join(BASE_DIR, "kroger-data-pipeline", "src", "data")

# Update file paths
PRODUCTS_FILE = os.path.join(DATA_DIR, "kroger_product_data.csv")
LOCATION_FILE = os.path.join(DATA_DIR, "kroger_locations.csv")
PRODUCT_API_LOG = os.path.join(DATA_DIR, "product_api_log.csv")

# Ensure data folder exists in GitHub Actions runner
if not os.path.exists(DATA_DIR):
    os.makedirs(DATA_DIR, exist_ok=True)
    print(f"Created missing data directory: {DATA_DIR}")

# Ensure the product data CSV exists (Creates empty file if missing)
if not os.path.exists(PRODUCTS_FILE):
    pd.DataFrame(columns=[
        "Product ID", "UPC", "Brand", "Description", "Category", "Location ID", 
        "Regular Price", "Promo Price", "Stock Level", "Size", "Sold By", "Date Retrieved"
    ]).to_csv(PRODUCTS_FILE, index=False)
    print(f"Created new CSV file: {PRODUCTS_FILE}")


def initialize_tracker():
    """Initializes or loads the location tracking file."""
    locations_df = pd.read_csv(LOCATION_FILE, dtype={"Location ID": str})

    if os.path.exists(PRODUCT_API_LOG):
        print("🔄 Loading existing product API tracker...")
        tracker_df = pd.read_csv(PRODUCT_API_LOG, dtype={"Location ID": str})
    else:
        print("🆕 Creating new product tracker file...")
        tracker_df = pd.DataFrame({
            "Location ID": locations_df["Location ID"].astype(str),
            "Last Retrieved Date": "",
            "Successful Calls": 0,
            "Needs Data": True
        })
        tracker_df.to_csv(PRODUCT_API_LOG, index=False)
        print(f"Created new tracker file: {PRODUCT_API_LOG}")

    return tracker_df


def update_tracker(location_id):
    """Updates the tracker after a successful API call."""
    tracker_df = pd.read_csv(PRODUCT_API_LOG, dtype={"Location ID": str})

    # Update Last Retrieved Date & Increment Successful Calls
    today_str = datetime.today().strftime("%Y-%m-%d")
    tracker_df.loc[tracker_df["Location ID"] == location_id, "Last Retrieved Date"] = today_str
    tracker_df.loc[tracker_df["Location ID"] == location_id, "Successful Calls"] += 1

    tracker_df.to_csv(PRODUCT_API_LOG, index=False)
    print(f"Tracker updated for Location ID {location_id}")


def update_log():
    """Updates 'Needs Data' column based on last retrieval date and removes duplicates."""
    tracker_df = pd.read_csv(PRODUCT_API_LOG, dtype={"Location ID": str})

    # **Remove Duplicates**
    tracker_df = tracker_df.drop_duplicates(subset=["Location ID"], keep="last")

    # **Optimize Date Processing (Vectorized Approach)**
    tracker_df["Last Retrieved Date"] = pd.to_datetime(tracker_df["Last Retrieved Date"], errors="coerce")

    # **Set 'Needs Data' True if never retrieved OR 7+ days since last call**
    tracker_df["Needs Data"] = tracker_df["Last Retrieved Date"].isna() | \
                               (datetime.today() - tracker_df["Last Retrieved Date"]).dt.days >= 7

    # Save the cleaned tracker
    tracker_df.to_csv(PRODUCT_API_LOG, index=False)
    print("'Needs Data' column refreshed and duplicates removed.")


def filter_products(df):
    """Filter products based on relevant categories and keywords."""
    
    valid_categories = ["Dairy", "Bakery"]
    valid_keywords = ["egg", "bread"]

    return df[
        df["Category"].apply(lambda x: any(cat in x for cat in valid_categories)) &
        df["Description"].str.lower().apply(lambda x: any(kw in x for kw in valid_keywords))
    ]


def save_to_csv(df, filename):
    """Append DataFrame to CSV file or create new file if missing."""
    
    if df.empty:
        print("⚠️ No relevant products found, skipping file update.")
        return
    
    # Ensure the directory exists
    os.makedirs(os.path.dirname(filename), exist_ok=True)

    if os.path.exists(filename):
        existing_df = pd.read_csv(filename)
        df = pd.concat([existing_df, df], ignore_index=True)

    df.to_csv(filename, index=False)
    print(f"Filtered data saved to {filename}")


## **API Call Tracking & Management**

### **Overview**
The `tracking.py` module is responsible for **logging, monitoring, and managing** API requests to the Kroger API. It ensures that data retrieval is efficient by keeping track of which store locations have been queried and when they need updates.

### **Key Features**
- **API Call Logging**: Maintains a record of successful API calls for each store location.
- **Data Freshness Tracking**: Flags locations needing updates based on a **7-day** refresh cycle.
- **Duplicate Removal**: Cleans up redundant entries in tracking logs.
- **Automatic File Initialization**: Ensures tracking files exist before execution.

### **How It Works**
1. **Initialize Tracking File**: If `product_api_log.csv` is missing, it creates a new file with default values.
2. **Load Existing Logs**: Reads the current API call history from `product_api_log.csv`.
3. **Update Tracker**: Marks successful API calls and timestamps the last retrieval date.
4. **Check Data Freshness**: Flags locations as needing updates if the last retrieval was **7+ days ago**.
5. **Save Updates**: Writes the refreshed tracker back to `product_api_log.csv`.

This module ensures that API requests are **efficiently managed** and prevents redundant data retrieval while maintaining data freshness.


In [None]:
import pandas as pd
from datetime import datetime
import os

# Set up directory paths
BASE_DIR = os.path.dirname(os.path.abspath(__file__))  # Get `src/acquisition/`
DATA_DIR = os.path.abspath(os.path.join(BASE_DIR, "..", "data"))  # Navigate to `src/data/`

# # Set base directory dynamically
# BASE_DIR = os.getenv("GITHUB_WORKSPACE", os.path.dirname(os.path.abspath(__file__)))
# DATA_DIR = os.path.join(BASE_DIR, "kroger-data-pipeline", "src", "data")

# Update file paths
PRODUCTS_FILE = os.path.join(DATA_DIR, "kroger_product_data.csv")
LOCATION_FILE = os.path.join(DATA_DIR, "kroger_locations.csv")
PRODUCT_API_LOG = os.path.join(DATA_DIR, "product_api_log.csv")


# Ensure data folder exists in GitHub Actions runner
if not os.path.exists(DATA_DIR):
    os.makedirs(DATA_DIR, exist_ok=True)
    print(f"Created missing data directory: {DATA_DIR}")

print(f"📂 Looking for location file in: {LOCATION_FILE}")
print(f"📂 Looking for product log in: {PRODUCT_API_LOG}")

# Ensure 'data' folder exists
if not os.path.exists(DATA_DIR):
    raise FileNotFoundError(f"❌ Data directory is missing: {DATA_DIR}")

# Ensure 'kroger_locations.csv' exists
if not os.path.exists(LOCATION_FILE):
    raise FileNotFoundError(f"❌ Missing location file: {LOCATION_FILE}")

# Ensure 'PRODUCT_API_LOG.csv' exists
if not os.path.exists(PRODUCT_API_LOG):
    print("⚠️ Tracker file missing! Creating new file...")
    pd.DataFrame(columns=["Location ID", "Last Retrieved Date", "Successful Calls", "Needs Data"]).to_csv(PRODUCT_API_LOG, index=False)

def load_location_tracker():
    """Loads the location tracker and initializes missing locations."""
    if not os.path.exists(LOCATION_FILE):
        raise FileNotFoundError(f"❌ Missing location file: {LOCATION_FILE}")

    locations_df = pd.read_csv(LOCATION_FILE, dtype={"Location ID": str})
    location_ids = locations_df["Location ID"].astype(str).tolist()

    if os.path.exists(PRODUCT_API_LOG):
        print("🔄 Loading existing product API tracker...")
        tracker_df = pd.read_csv(PRODUCT_API_LOG, dtype={"Location ID": str})
        
        # Ensure column types are correct
        tracker_df["Last Retrieved Date"] = tracker_df["Last Retrieved Date"].astype(str)
        tracker_df["Successful Calls"] = tracker_df["Successful Calls"].astype(int)
        tracker_df["Needs Data"] = tracker_df["Needs Data"].astype(bool)
    else:
        print("🆕 Creating new product tracker file...")
        tracker_df = pd.DataFrame({
            "Location ID": location_ids,
            "Last Retrieved Date": "",
            "Successful Calls": 0,
            "Needs Data": True
        })
        tracker_df.to_csv(PRODUCT_API_LOG, index=False)

    return tracker_df

def update_tracker(location_id):
    """Updates the tracker after a successful API call."""
    tracker_df = pd.read_csv(PRODUCT_API_LOG, dtype={"Location ID": str})

    today_str = datetime.today().strftime("%Y-%m-%d")
    
    # Ensure we're updating the correct row
    tracker_df.loc[tracker_df["Location ID"] == location_id, "Last Retrieved Date"] = today_str
    tracker_df.loc[tracker_df["Location ID"] == location_id, "Successful Calls"] += 1

    print(f"📝 Updating tracker: {location_id} - {today_str}")

    tracker_df.to_csv(PRODUCT_API_LOG, index=False)  # Ensure changes are saved

def update_log():
    """Updates 'Needs Data' column based on the last retrieval date and removes duplicates."""
    tracker_df = pd.read_csv(PRODUCT_API_LOG, dtype={"Location ID": str})

    # Remove Duplicate Entries Based on 'Location ID'
    tracker_df = tracker_df.drop_duplicates(subset=["Location ID"], keep="last")

    def check_needs_data(row):
        last_call = str(row["Last Retrieved Date"])  # Convert NaN to string

        # Handle empty or invalid dates gracefully
        if last_call in ["", "nan", "NaT", "None"] or pd.isna(row["Last Retrieved Date"]):
            return True  # No previous retrieval, needs data

        try:
            last_call_date = datetime.strptime(last_call, "%Y-%m-%d")
            return (datetime.today() - last_call_date).days >= 7
        except ValueError:
            return True  # Handle invalid date format

    # Apply function to update 'Needs Data' column
    tracker_df["Needs Data"] = tracker_df.apply(check_needs_data, axis=1)

    # Save the updated tracker, ensuring no duplicates
    tracker_df.to_csv(PRODUCT_API_LOG, index=False)
    print("'Needs Data' column refreshed and duplicates removed.")


## **Kroger Data Pipeline Orchestration**

### **Overview**
The `main.py` script serves as the **central orchestrator** of the Kroger data pipeline. It coordinates the execution of various modules to ensure **efficient data acquisition, processing, and tracking**.

### **Key Features**
- **Pipeline Execution**: Automates the end-to-end process of fetching and processing product data.
- **API Token Management**: Ensures a valid authentication token is available before querying the API.
- **Batch Processing**: Fetches data in controlled batches to optimize API usage.
- **Tracking & Logging**: Updates logs to prevent redundant API calls.
- **Data Filtering & Storage**: Cleans and stores retrieved product data in structured files.

### **How It Works**
1. **Authenticate**: Retrieves a valid Kroger API access token.
2. **Initialize Tracker**: Loads and updates API call logs to determine which locations need data.
3. **Fetch Product Data**: Calls `fetch_product.py` to retrieve and filter grocery product details.
4. **Update API Logs**: Marks successful API calls to prevent redundant queries.
5. **Filter & Save Data**: Applies `data_processing.py` logic to retain only relevant products.
6. **Complete Execution**: Saves all updates and logs completion.

### **Dependencies**
- `fetch_product.py` for retrieving and filtering data.
- `kroger_api.py` for API authentication and product queries.
- `data_processing.py` for cleaning and storing data.
- `tracking.py` for managing API request logs.

This script automates the **entire data pipeline**, ensuring that product data retrieval from Kroger is **structured, optimized, and repeatable**.


In [None]:
import os
import sys
import pandas as pd

# Set up directory paths
BASE_DIR = os.path.dirname(os.path.abspath(__file__))  # Get `src/acquisition/`
DATA_DIR = os.path.abspath(os.path.join(BASE_DIR, "..", "data"))  # Navigate to `src/data/`
PROJECT_ROOT = os.path.dirname(BASE_DIR)
sys.path.append(PROJECT_ROOT)

from acquisition.tracking import load_location_tracker, update_log, update_tracker
from acquisition.fetch_product import fetch_products_in_batches
from acquisition.kroger_api import get_kroger_product_compact_token
from acquisition.data_processing import filter_products, save_to_csv

# # Set up directory paths
# BASE_DIR = os.path.dirname(os.path.abspath(__file__))  # Get `src/` directory
# DATA_DIR = os.path.join(BASE_DIR, "data")

# Define file paths
PRODUCTS_FILE = os.path.join(DATA_DIR, "kroger_product_data.csv")
PRODUCT_API_LOG = os.path.join(DATA_DIR, "product_api_log.csv")

print(f"PRODUCTS_FILE: {PRODUCTS_FILE}")
print(f"PRODUCT_API_LOG: {PRODUCT_API_LOG}")

def main(batch_size=10):
    """Orchestrates the full Kroger data pipeline."""
    
    print("\n**Starting Kroger Data Pipeline**\n")

    # Step 1: Ensure API Token is Valid
    token = get_kroger_product_compact_token()
    if not token:
        print("Failed to retrieve API token. Exiting...")
        return
    
    # Step 2: Initialize & Update Logs
    print("Loading tracker & checking logs...")
    load_location_tracker()
    update_log()  # Mark locations that need updates

    # Step 3: Fetch & Process Product Data
    print("Fetching product data...")
    updated_locations = fetch_products_in_batches(batch_size=batch_size)

    # Step 4: Apply Updates After Fetching
    print("Updating tracker after batch fetch...")
    for location_id in updated_locations:
        update_tracker(location_id)  # Ensure updates are applied after processing

    # Step 5: Load & Filter Data
    print("Processing and filtering product data...")
    if os.path.exists(PRODUCTS_FILE):
        df = pd.read_csv(PRODUCTS_FILE)
        filtered_df = filter_products(df)
        save_to_csv(filtered_df, PRODUCTS_FILE)
        print(f"Processed and saved filtered product data to {PRODUCTS_FILE}")
    else:
        print("No product data file found! Skipping filtering step.")
    
    print("\n**Pipeline Execution Complete!**")

if __name__ == "__main__":
    main(batch_size=1)  # Adjust batch size as needed
