In [266]:
import re
import csv
from bs4 import BeautifulSoup
import requests
from datetime import datetime

def clean_timestamp(timestamp):
    """Convert timestamp to a cleaner format: 'YYYY-MM-DD'."""
    try:
        dt = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S%z")
        return dt.strftime("%Y-%m-%d")
    except ValueError:
        return timestamp  # Return the original if it can't be parsed

def extract_coffee_details(body_text):
    """Extract coffee details dynamically from body text."""
    fields = {
        "Region": None,
        "Variety": None,
        "Elevation": None,
        "Processing": None,
        "Roast profile": None,
        "Flavour notes": None,
        "Acidity": None,
        "Body": None,
        "Tasting Experience": None,
        "Farm Information": None,  # New field
        "Moisture content of Green Coffee": None,  # New field
        "Packaging": None  # New field
    }
    
    patterns = {
        "Region": r"Region:\s*([\w\s.,&-]+)",
        "Variety": r"Variety:\s*([\w\s.,&-]+)",
        "Elevation": r"Elevation:\s*([\w\s.,&-]+)",
        "Processing": r"Processing:\s*([\w\s.,&-]+)",
        "Roast profile": r"Roast profile:\s*([\w\s.,&-]+)",
        "Flavour notes": r"Flavour notes:\s*([\w\s.,&-]+)",
        "Acidity": r"Acidity:\s*([\w\s.,&-]+)",
        "Body": r"Body:\s*([\w\s.,&-]+)",
        "Tasting Experience": r"Tasting experience:\s*([\w\s.,&-]+)",
        "Farm Information": r"Farm Information:\s*([\w\s.,&-]+)",
        "Moisture content of Green Coffee": r"Moisture content of Green Coffee:\s*([\w\s.,&-]+)",
        "Packaging": r"Packaging:\s*([\w\s.,&-]+)"
    }
    
    # Clean the HTML content
    soup = BeautifulSoup(body_text, "html.parser")
    plain_text = soup.get_text()

    for field, pattern in patterns.items():
        match = re.search(pattern, plain_text, re.IGNORECASE | re.DOTALL)
        if match:
            fields[field] = match.group(1).strip()

    return fields

def fetch_products(page):
    """Fetch products from the paginated API."""
    url = f"https://cowpressocoffee.sg/collections/frontpage/products.json?page={page}"
    response = requests.get(url)
    
    if response.status_code == 200:
        data = response.json()
        return data.get("products", [])
    return []

def parse_product(product):
    """Extract and clean details from a product dictionary."""
    description_html = product.get("body_html", "")
    extracted_details = extract_coffee_details(description_html)
    
    parsed_products = []
    
    # Add rows for each variant
    variants = product.get("variants", [])
    image_url = product.get("images", [{}])[0].get("src", "")  # Use the first image URL for all variants
    
    for variant in variants:
        parsed_products.append({
            "Product ID": product.get("id"),
            "Title": product.get("title"),
            "Vendor": product.get("vendor"),
            "Created At": clean_timestamp(product.get("created_at", "")),
            "Updated At": clean_timestamp(product.get("updated_at", "")),
            "Variant ID": variant.get("id"),
            "Variant Title": variant.get("title"),
            "Variant Price": variant.get("price"),
            "Image URL": image_url,
            **extracted_details,
        })

    return parsed_products

def write_to_csv(data, file_name="products_with_details.csv"):
    """Write the flattened data into a CSV file."""
    with open(file_name, mode="w", newline="", encoding="utf-8") as file:
        writer = csv.writer(file, quoting=csv.QUOTE_MINIMAL, escapechar="\\")
        # Write header
        writer.writerow([
            "Product ID", "Title", 
            "Vendor", "Created At", "Updated At", "Variant ID",
            "Variant Title", "Variant Price", "Image URL",
            "Region", "Variety", "Elevation", "Processing", 
            "Roast profile", "Flavour notes", "Acidity", "Body",
            "Tasting Experience", "Farm Information", 
            "Moisture content of Green Coffee", "Packaging"
        ])
        # Write product data
        for product in data:
            writer.writerow([
                product["Product ID"], product["Title"], product["Vendor"],
                product["Created At"], product["Updated At"], product["Variant ID"],
                product["Variant Title"], product["Variant Price"], product["Image URL"],
                product["Region"], product["Variety"], product["Elevation"],
                product["Processing"], product["Roast profile"], product["Flavour notes"],
                product["Acidity"], product["Body"], product["Tasting Experience"],
                product["Farm Information"], product["Moisture content of Green Coffee"],
                product["Packaging"]
            ])


def scrape_and_export_to_csv():
    """Main function to scrape and export data to a CSV file."""
    page = 1
    all_products = []
    
    while True:
        products = fetch_products(page)
        if not products:  # Break if no products are found
            break

        for product in products:
            parsed_products = parse_product(product)
            all_products.extend(parsed_products)
        
        page += 1

    # Write all collected data to a CSV file
    write_to_csv(all_products)
    print("Data exported to 'products_with_details.csv'.")

# Run the scraper and export
scrape_and_export_to_csv()

Data exported to 'products_with_details.csv'.


In [268]:
def clean_csv(input_file, output_file):
    """Cleans up the CSV file based on specific rules."""
    # Open the input file for reading
    with open(input_file, mode="r", encoding="utf-8") as infile:
        reader = csv.DictReader(infile)
        fieldnames = reader.fieldnames

        # Drop the specified columns
        columns_to_drop = [
            "Tasting Experience", "Farm Information", 
            "Moisture content of Green Coffee", "Packaging", "Body"
        ]
        fieldnames = [col for col in fieldnames if col not in columns_to_drop]

        # Open the output file for writing
        with open(output_file, mode="w", newline="", encoding="utf-8") as outfile:
            writer = csv.DictWriter(outfile, fieldnames=fieldnames)

            # Write the header
            writer.writeheader()

            for row in reader:
                # Apply the cleaning rules
                misplaced_headers = [
                    "Elevation", "Variety", "Processing", "Roast profile",
                    "Flavour notes", "Acidity", "Body", "Tasting notes", 
                    "District", "Farm", "Packaging", "Type of Soil", "Soil Type", "MASL", "Varietal", "Flavour", 
                    "Average Annual Rainfall", "notes", "this crop", "Roast Level", "Process"
                ]

                for key, value in row.items():
                    if value:  # Only process non-empty fields
                        # Handle concatenated misplaced headers
                        for header in misplaced_headers:
                            # Insert space before misplaced headers if needed
                            value = re.sub(
                                rf"(?<!\s){header}",
                                rf" {header}",
                                value,
                                flags=re.IGNORECASE
                            )
                            # Remove the header text if it's misplaced in the value
                            value = re.sub(
                                rf"\b{header}\b",
                                "",
                                value,
                                flags=re.IGNORECASE
                            )

                        # Special cleaning for "Elevation" column
                        if key == "Elevation":
                            # Remove units like MASL, ASL, and clean up the value
                            value = re.sub(r"[^\d\s,-]", "", value).strip()

                        # Remove redundant spaces and clean up
                        value = re.sub(r"\s{2,}", " ", value).strip()
                        row[key] = value

                # Remove dropped columns from the row
                for col in columns_to_drop:
                    if col in row:
                        del row[col]

                # Write the cleaned row to the output file
                writer.writerow(row)

# Example usage
input_file = "products_with_details.csv"
output_file = "cowpresso.csv"
clean_csv(input_file, output_file)
print(f"Cleaned CSV written to {output_file}")

Cleaned CSV written to cowpresso.csv
