Skip to content

Commit

Permalink
adds amazon handling
Browse files Browse the repository at this point in the history
  • Loading branch information
WolfgangFahl committed Nov 16, 2023
1 parent d6bf62d commit 61a8843
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 26 deletions.
93 changes: 93 additions & 0 deletions scan/amazon.py
@@ -0,0 +1,93 @@
"""
Created on 12023-11-16
@author: wf
"""
import requests
from bs4 import BeautifulSoup
from dataclasses import dataclass
from typing import List

@dataclass
class Product:
"""
Data class representing a product.
Attributes:
title (str): Title of the product.
image_url (str): URL of the product image.
price (str): Price of the product.
"""
title: str
image_url: str
price: str

class Amazon:
"""
lookup products on amazon web site
"""

@classmethod
def extract_amazon_products(cls, soup: BeautifulSoup) -> List[Product]:
"""
Extracts product information from Amazon product listing HTML content.
Args:
soup (BeautifulSoup): Soup object of HTML content of the Amazon product listing page.
Returns:
List[Product]: A list of extracted product information as Product objects.
"""
products = []
# Find all div elements that match the product listing structure
for div in soup.find_all("div", class_="puisg-row"):
product_info = {}

# Extracting product title
title_div = div.find("h2", class_="a-size-mini")
if title_div and title_div.a:
product_info['title'] = title_div.a.get_text(strip=True)

# Extracting product image URL
image_div = div.find("div", class_="s-product-image-container")
if image_div and image_div.img:
product_info['image_url'] = image_div.img['src']

# Extracting product price
price_span = div.find("span", class_="a-price")
if price_span and price_span.find("span", class_="a-offscreen"):
product_info['price'] = price_span.find("span", class_="a-offscreen").get_text(strip=True)
# Replace '\xa0€' with ' €' in price
product_info['price'] = product_info.get('price', '').replace('\xa0', ' ')


# Add product info to list if it contains any relevant data
# Create a Product instance if title is present
if 'title' in product_info:
product = Product(
title=product_info['title'],
image_url=product_info.get('image_url', ''),
price=product_info.get('price', '')
)
products.append(product)

return products

@classmethod
def lookup_products(cls,search_key:str):
"""
lookup the given search key e.g. ISBN or EAN
"""
url = f"https://www.amazon.de/s?k={search_key}"

headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"}

response = requests.get(url, headers=headers)
if response.status_code == 200:
soup = BeautifulSoup(response.content, 'html.parser')
product_list=cls.extract_amazon_products(soup)
return product_list
else:
msg=f"lookup for {search_key} failed with HTML status code {response.status_code}"
raise Exception(msg)
82 changes: 56 additions & 26 deletions scan/webcam.py
Expand Up @@ -9,82 +9,112 @@
from datetime import datetime
from ngwidgets.widgets import Link
from ngwidgets.background import BackgroundTaskHandler
from scan.barcode import Barcode


class WebcamForm:
"""
allow scanning pictures from a webcam
"""
def __init__(self,webserver,default_url:str):

def __init__(self, webserver, default_url: str):
"""
construct me
"""
self.task_handler = BackgroundTaskHandler()
# @TODO refactor to link
self.red_link="color: red;text-decoration: underline;"
self.blue_link="color: blue;text-decoration: underline;"

self.webserver=webserver
self.scandir=webserver.scandir
self.url=default_url
self.shot_url=f"{self.url}/shot.jpg"
self.red_link = "color: red;text-decoration: underline;"
self.blue_link = "color: blue;text-decoration: underline;"

self.webserver = webserver
self.scandir = webserver.scandir
self.url = default_url
self.shot_url = f"{self.url}/shot.jpg"
self.image_path = None
self.setup_form()


def notify(self, msg):
ui.notify(msg)
if self.webserver.log_view:
self.webserver.log_view.push(msg)

async def run_scan(self):
"""
Start the scan process in the background.
"""
_, scan_coro = self.task_handler.execute_in_background(self.save_webcam_shot)
image_path = await scan_coro()
self.update_preview(image_path)
self.image_path = await scan_coro()
self.update_preview(self.image_path)

def save_webcam_shot(self) -> str:
"""
Fetches an image from the webcam URL and saves it with a timestamp in the specified directory.
Returns:
str: The file name of the saved webcam image, or an error message if the fetch failed.
"""
image_file_name=None
image_file_name = None
try:
response = requests.get(self.url + '/shot.jpg')
shot_url=f"{self.url}/shot.jpg"
response = requests.get(shot_url)
if response.status_code == 200:
# Ensure the scandir directory exists
Path(self.scandir).mkdir(parents=True, exist_ok=True)
image_data = response.content
# Get current date and time without timezone information
timestamp = datetime.now().strftime('%Y-%m-%d_%H%M%S')
timestamp = datetime.now().strftime("%Y-%m-%d_%H%M%S")
# Define the full path to save the image
image_file_name = f"webcam_{timestamp}.jpg"
image_file_path = Path(self.scandir) / image_file_name
# Write the image data to the file system
with open(image_file_path, 'wb') as image_file:
with open(image_file_path, "wb") as image_file:
image_file.write(image_data)
msg = f"Saved webcam image to {image_file_path}"
else:
msg = f"Failed to fetch the webcam image. Status code: {response.status_code}"
image_file_name = ""

ui.notify(msg)
if self.webserver.log_view:
self.webserver.log_view.push(msg)

self.notify(msg)
except Exception as ex:
self.webserver.handle_exception(ex)

return image_file_name

def setup_form(self):
"""
Setup the webcam form
"""
# Button to refresh or scan the video stream
self.scan_button = ui.button('Scan', on_click=self.run_scan)

self.scan_button = ui.button("Scan", on_click=self.run_scan)
self.barcode_button = ui.button("Barcode", on_click=self.scan_barcode)
# HTML container for the webcam video stream
self.webcam_input=ui.input(value=self.url)
self.image_link=ui.html().style(self.blue_link)
self.webcam_input = ui.input(value=self.url)
self.image_link = ui.html().style(self.blue_link)
self.barcode_results = ui.html("")
self.preview = ui.html()

async def scan_barcode(self):
"""
Scan for barcodes in the most recently saved webcam image.
"""
msg = "No image to scan for barcodes."
if self.image_path:
barcode_path = f"{self.scandir}/{self.image_path}"
barcode_list = Barcode.decode(barcode_path)
if barcode_list:
results = "\n".join(
[
f"Code: {barcode.code}, Type: {barcode.type}"
for barcode in barcode_list
]
)
msg = f"Barcodes found:\n{results}"
else:
msg = "No barcodes found."
self.notify(msg)
html_markup = f"<pre>{msg}</pre>"
self.barcode_results.content = html_markup

def update_preview(self, image_path: str = None):
"""
Update the preview with the current URL of the webcam.
Expand Down
40 changes: 40 additions & 0 deletions tests/test_amazon.py
@@ -0,0 +1,40 @@
"""
Created on 2023-11-16
@author: wf
"""
from ngwidgets.basetest import Basetest
from scan.amazon import Amazon, Product

class TestAmazon(Basetest):
"""
test the amazon search
"""

def test_amazon(self):
"""
Test Amazon lookup to ensure it returns correct product details.
This test checks if the Amazon product lookup returns the expected
product details for a given search key.
"""
if not self.inPublicCI():
# Test data setup
searches = {
"4020628887711": Product(title="The Beatles - A Hard Day's Night",
image_url="https://m.media-amazon.com/images/I/81m01dZR2UL._AC_UY218_.jpg",
price="4,99 €") # Note the space instead of '\xa0'
}
debug=self.debug
debug=True
# Testing each search key
for search_key, expected_product in searches.items():
products = Amazon.lookup_products(search_key)
self.assertTrue(products, f"No products found for search key: {search_key}")
product=products[0]
if debug:
print(product)
self.assertEqual(product.title, expected_product.title)
self.assertEqual(product.image_url, expected_product.image_url)
self.assertEqual(product.price, expected_product.price)

0 comments on commit 61a8843

Please sign in to comment.