In [1]:
import requests
import random
import time
from tqdm import tqdm
import logging
from sentence_transformers import SentenceTransformer, util
import re

# Load the pre-trained model
model = SentenceTransformer('paraphrase-MiniLM-L6-v2')

  from tqdm.autonotebook import tqdm, trange


In [2]:
# Define delay range for human-like behavior
min_delay = 0.5
max_delay = 1.5

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.getLogger('sentence_transformers').setLevel(logging.ERROR) # Turn off most logging 

# Define user agents and accept languages
user_agents = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Safari/605.1.15",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36",
    "Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1",
    "Mozilla/5.0 (iPad; CPU OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) CriOS/91.0.4472.80 Mobile/15E148 Safari/604.1",
    "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0"
]

accept_languages = [
    "en-US,en;q=0.9",
    "en-GB,en;q=0.9",
    "en-CA,en;q=0.9",
    "en-AU,en;q=0.9",
    "en-NZ,en;q=0.9",
    "en-ZA,en;q=0.9",
    "en-IE,en;q=0.9"
]

In [3]:
# Simulate a human-like delay between requests
def human_like_delay():
    time.sleep(random.uniform(min_delay, max_delay))

# Use origin or previous item as referrer
def generate_referrer(previous_item=None):
    if previous_item is None:
        return "https://www.tesco.com"
    else:
        return f"https://www.tesco.com/groceries/en-GB/search?query={previous_item}"

# Use origin or previous item as referrer
def query_tesco_api(search_item, referrer, count, session=None):
    url = "https://api.tesco.com/shoppingexperience"
    
    headers = {
        "Referer": referrer,
        "Origin": "https://www.tesco.com",
        "User-Agent": random.choice(user_agents),
        "Accept": "application/json, text/plain, */*",
        "Accept-Language": random.choice(accept_languages),
        "x-apikey": "TvOSZJHlEk0pjniDGQFAc9Q59WGAR4dA",
        "Content-Type": "application/json"
    }
    
    query = """
    query Search($query: String!, $page: Int = 1, $count: Int = 2, $sortBy: String) {
      search(query: $query, page: $page, count: $count, sortBy: $sortBy) {
        pageInformation: info {
          totalCount: total
          pageNo: page
          count
          __typename
        }
        results {
          node {
            ... on ProductInterface {
              gtin
              title
              brandName
              superDepartmentName
              departmentName
              aisleName
              shelfName
              reviews {
                stats {
                  overallRating
                }
              }
            }
          }
        }
      }
    }
    """
    
    body = [{
        "operationName": "Search",
        "variables": {
            "query": search_item,
            "page": 1,
            "count": count,
            "sortBy": "relevance"
        },
        "extensions": {"mfeName": "unknown"},
        "query": query
    }]
    
    if session is None:
        session = requests.Session()
    
    max_retries = 3
    for attempt in range(max_retries):
        try:
            # Collect the relevant section of the response
            response = session.post(url, headers=headers, json=body)
            json_response = response.json()
            data = json_response[0].get('data', {})
            search_results = data.get('search', {})
            return {
                "status": "Data Found",
                "page_information": search_results.get('pageInformation'),
                "results": search_results.get('results', [])
            }
        #If this didn't work, response isn't what we've been hoping for, so try again 
        except requests.exceptions.RequestException as e:
            logging.error(f"Error querying item {search_item}: {str(e)}")
            if attempt == max_retries - 1:
                return {"status": f"API Call Unsuccessful: {str(e)}"}
            #Exponential sleep if it fails
            time.sleep((2 ** attempt) + random.random())

In [4]:
def extract_matching_dict(results, target):
    for item in results['results']:
        if item['node']['title'].lower() == target.lower():
            return item['node']
    return None

In [25]:
# Function to rename the keys from the tesco api
def swap_dict_keys(input_dict):
    key_mappings = [
        ("title", "matched_name"),
        ("gtin", "barcode"),
        ("brandName", "brand"),
        ("superDepartmentName", "category_1"),
        ("departmentName", "category_2"),
        ("aisleName", "category_3"),
        ("shelfName", "category_4"),
    ]
    
    result = {}
    
    # Handle the rating separately
    if 'reviews' in input_dict and 'stats' in input_dict['reviews'] and 'overallRating' in input_dict['reviews']['stats']:
        result['rating'] = input_dict['reviews']['stats']['overallRating']
    
    # Process other mappings
    for new_key, old_key in key_mappings:
        if new_key in input_dict:
            result[old_key] = input_dict[new_key]
    
    return result

In [11]:
# Function to find the best 
def extract_best_match(target, candidates_dicts):

    result_dict = {}

    # Extract titles into a list
    titles = [item['node']['title'] for item in candidates_dicts['results']]

    if not candidates_dicts:
        result_dict["item_data"] = None
        result_dict["match_score"] = 0.0
        return result_dict

    # clean all wording
    cleaned_target = re.sub(r'\(.*?\)', '', target).strip().lower()
    cleaned_titles = [re.sub(r'\(.*?\)', '', c).strip().lower() for c in titles]

    # If there is a perfect name match, extract this one
    # Swap the keys for the names we want
    if cleaned_target in cleaned_titles:
        result_dict["item_data"] = swap_dict_keys(extract_matching_dict(candidates_dicts, target))
        result_dict["match_score"] = 100.0
        return result_dict

    # Encode items and target then calculate similarities
    target_embedding = model.encode(cleaned_target, convert_to_tensor=True)
    candidate_embeddings = model.encode(cleaned_titles, convert_to_tensor=True)
    cosine_scores = util.pytorch_cos_sim(target_embedding, candidate_embeddings)[0]

    # Collect the best match then find the product name
    best_match_index = cosine_scores.argmax().item()
    best_match_name = titles[best_match_index]

    # Extract the dict with the best matching name
    result_dict["item_data"] = swap_dict_keys(extract_matching_dict(candidates_dicts, best_match_name))
    result_dict["match_score"] = round(cosine_scores[best_match_index].item() * 100, 1)

    return result_dict

In [14]:
# Main function to run the Tesco API processing
def run_tesco_scraper(search_items):
    results = []
    session = requests.Session()
    previous_item = None
    
    for item in tqdm(search_items, desc="Processing search items"):
        print(item)
        human_like_delay()
        referrer = generate_referrer(previous_item)
        all_results = query_tesco_api(item, referrer, 100, session)

        #Find closest match from all items we have found & return its dict
        result = extract_best_match(item, all_results)
        
        results.append({
            'name': item,
            'match_score': result["match_score"],
            **{f'{k}': v for k, v in result["item_data"].items() if k != 'status'}
        })
        
        if result['item_data']:
            previous_item = item
            
    return results

In [61]:
item = ""
referrer = generate_referrer(None)
candidates_dicts = query_tesco_api(item, referrer, 100)

In [51]:
swap_dict_keys(extract_matching_dict(candidates_dicts, item))

TypeError: argument of type 'NoneType' is not iterable

In [62]:
candidates_dicts

{'status': 'Data Found', 'page_information': None, 'results': None}

In [33]:
titles = [item['node']['title'] for item in candidates_dicts['results']]

In [36]:
for item in titles:
    print(item)

Tesco Mini Gems Sweets 200G


In [37]:
p = [1, 2, 3]

p.index(2)

1

In [38]:
p[1]

2