In [None]:
import os
import django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'api.settings')
os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true"

os.environ["DATABASE_HOST"] = ""
os.environ["DATABASE_PORT"] = ""
os.environ["DATABASE_PASSWORD"] = ""

os.environ["GOOGLE_API_KEY"] = ""
os.environ["GOOGLE_CUSTOM_SEACH_ID"] = ""

django.setup()

## Release Info

In [None]:
import datetime

release_beers = []
release_name = "Mai 2025"
release_date = datetime.datetime(year=2025, month=5,day=7, hour=6, minute=0)
badge_text = ""
badge_type = ""
badge_days = 35

## Add new products

In [None]:
import cloudscraper
import xmltodict
from beers.models import Beer, ExternalAPI
from django.utils import timezone

def call_api(url):
    scraper = cloudscraper.create_scraper(interpreter="nodejs")
    request = scraper.get(url).text
    response = xmltodict.parse(request)

    return response

baseurl = ExternalAPI.objects.get(name="vinmonopolet_v3").baseurl

for b in release_beers:
    url = baseurl + "products/" + str(b)
    try:
        response = call_api(url)["product"]

        try:
            beer = Beer.objects.get(vmp_id=int(response["code"]))
            beer.vmp_name = response["name"]
            beer.main_category = response["main_category"]["name"]
            if "main_sub_category" in response:
                beer.sub_category = response["main_sub_category"]["name"]
            beer.country = response["main_country"]["name"]
            beer.volume = float(response["volume"]["value"]) / 100.0
            if "price" in response:
                beer.price = response["price"]["value"]
                beer.price_per_volume = float(response["price"]["value"]) / (
                    float(response["volume"]["value"]) / 100.0
                )
            beer.product_selection = response["product_selection"]
            beer.vmp_url = "https://www.vinmonopolet.no" + response["url"]
            beer.vmp_updated = timezone.now()
            if not beer.active:
                beer.active = True
            beer.save()

        except Beer.DoesNotExist:
            beer = Beer.objects.create(
                vmp_id=int(response["code"]),
                vmp_name=response["name"],
                main_category=response["main_category"]["name"],
                country=response["main_country"]["name"],
                volume=float(response["volume"]["value"]) / 100.0,
                product_selection=response["product_selection"],
                vmp_url="https://www.vinmonopolet.no" + response["url"],
                vmp_updated=timezone.now(),
            )
            if "main_sub_category" in response:
                beer.sub_category = response["main_sub_category"]["name"]
            if "price" in response:
                beer.price = response["price"]["value"]
                beer.price_per_volume = float(response["price"]["value"]) / (
                    float(response["volume"]["value"]) / 100.0
                )
            beer.save()
    except:
        continue

## Match Beers

In [None]:
from bs4 import BeautifulSoup
from fuzzywuzzy import process, fuzz

def query_untappd(query):
    scraper = cloudscraper.create_scraper()
    url = f"https://untappd.com/search?q={query}"
    
    response = scraper.get(url)
    
    if response.status_code == 200:
        return response.text
    else:
        print(f"Failed to fetch results for query: {query}")
        return None
    
def parse_search_results(html):
    soup = BeautifulSoup(html, 'html.parser')
    results = []
    
    for beer_item in soup.select('.beer-item'):
        beer_name = beer_item.select_one('.name').text.strip()
        beer_link = beer_item.select_one('a')['href'] 
        untappd_id = beer_link.split('/')[-1]
        full_url = f"https://untappd.com{beer_link}"

        results.append((beer_name, untappd_id, full_url))
    
    return results

def generate_query_variations(beer_name):
    variations = [beer_name]

    if ' x ' in beer_name:
        main_brewery, rest = beer_name.split(' x ', 1)
        collab_removed = main_brewery.strip() + ' ' + rest.strip()
        variations.append(collab_removed)
    else:
        collab_removed = beer_name 

    words = collab_removed.split()
    for i in range(len(words) - 1, 2, -1):  
        variations.append(' '.join(words[:i]))

    return variations

def find_beer_match(beer_name):
    queries = generate_query_variations(beer_name)
    
    for query in queries:
        print(f"Trying query: {query}")
        
        html = query_untappd(query)
        if html:
            results = parse_search_results(html)

            beer_names = [result[0] for result in results]
            
            best_match = process.extractOne(beer_name, beer_names, scorer=fuzz.ratio)
            
            if best_match:
                matched_name = best_match[0]
                similarity_score = best_match[1]
                
                for result in results:
                    if result[0] == matched_name:
                        return result, similarity_score
    
    print("No match found.")
    return None, None

In [None]:
from beers.models import Beer
beers = Beer.objects.filter(
    untpd_id__isnull=True, match_manually=False, active=True
)
for beer in beers:
    result, score = find_beer_match(beer.vmp_name)

    try:
        if (
            score > 40
        ):
            beer.untpd_id = int(result[1])
            beer.untpd_url = result[2]
            beer.save()
            print(f"Matched {beer} as {result[0]}")

            continue

        else:
            beer.description = "Missing on Untappd."
            beer.match_manually = True
            beer.save()
            print(f"Failed to match {beer}... Possible option: {result[0]}")
            continue

    except:
        beer.description = "Missing on Untappd."
        beer.match_manually = True
        beer.save()
        print(f"Failed to match {beer}...")
        continue

In [None]:
from django.utils import timezone
from datetime import timedelta

thirty_days_ago = timezone.now() - timedelta(days=30)

recent_beers = Beer.objects.filter(created_at__gte=thirty_days_ago)

for beer in recent_beers:
    beer.active = True
    beer.match_manually = False
    beer.save()

## Update Beers

In [None]:
import re
import json
from urllib.request import Request, urlopen
from itertools import chain
from beers.models import Beer
from django.utils import timezone
from datetime import timedelta

beers1 = Beer.objects.filter(
    untpd_id__isnull=False, prioritize_recheck=True, active=True
)
beers2 = Beer.objects.filter(
    untpd_id__isnull=False, rating__isnull=True, active=True
)
time_threshold = timezone.now() - timedelta(days=7)
beers3 = Beer.objects.filter(
    untpd_updated__lte=time_threshold,
    untpd_id__isnull=False,
    active=True,
    checkins__lte=500,
)
beers4 = Beer.objects.filter(untpd_id__isnull=False, active=True).order_by(
    "untpd_updated"
)

beers = []
for x in list(chain(beers1, beers2, beers3, beers4)):
    if x not in beers:
        beers.append(x)

for beer in beers:
    url = beer.untpd_url
    print(beer.vmp_name + " " + url)

    try:
        req = Request(url, headers={"User-Agent": "Mozilla/5.0"})
        html_page = urlopen(req).read()
        soup = BeautifulSoup(html_page, "lxml")
        data = [
            json.loads(x.string)
            for x in soup.find_all("script", type="application/ld+json")
        ]

    except Exception as e:
        print(e)
        break

    beer.untpd_id = (
        data[0]["sku"]
        if data
        else int(
            soup.find("meta", {"property": "og:url"})["content"].split("/")[-1]
        )
    )
    beer.untpd_name = (
        data[0]["name"]
        if data
        else soup.find("p", {"class": "brewery"}).find("a").text
        + " "
        + soup.find("div", {"class": "name"}).find("h1").text
    )
    beer.brewery = (
        data[0]["brand"]["name"]
        if data
        else soup.find("p", {"class": "brewery"}).find("a").text
    )
    beer.rating = (
        data[0]["aggregateRating"]["ratingValue"]
        if data
        else float(soup.find("div", {"class": "caps"})["data-rating"])
    )
    beer.checkins = (
        data[0]["aggregateRating"]["reviewCount"]
        if data
        else [
            int(x)
            for x in re.findall(
                r"\b\d+\b", soup.find("p", {"class": "raters"}).text
            )
        ][0]
    )
    beer.style = soup.find("p", {"class": "style"}).text
    beer.description = (
        data[0]["description"]
        if data
        else soup.find(
            "div", {"class": "beer-descrption-read-less"}
        ).text.strip()
    )
    try:
        beer.abv = [
            float(x)
            for x in re.findall(
                r"\b\d+\b", soup.find("p", {"class": "abv"}).text
            )
        ][0]
    except:
        beer.abv = 0
    try:
        beer.ibu = [
            int(x)
            for x in re.findall(
                r"\b\d+\b", soup.find("p", {"class": "ibu"}).text
            )
        ][0]
    except:
        beer.ibu = None
    beer.label_hd_url = soup.find("a", {"class": "label image-big"})[
        "data-image"
    ]
    beer.label_sm_url = soup.find("a", {"class": "label image-big"}).find(
        "img"
    )["src"]
    beer.untpd_url = soup.find("meta", {"property": "og:url"})["content"]
    beer.untpd_updated = timezone.now()
    beer.prioritize_recheck = False
    beer.alcohol_units = (beer.volume * 1000 * beer.abv / 100 * 0.8) / 12
    beer.save()

## Add release model

In [None]:
from beers.models import Release
try:
    release = Release.objects.get(name=release_name)
except Release.DoesNotExist:
    release = Release.objects.create(name=release_name, release_date=release_date)

for b in release_beers:
    try:
        beer = Beer.objects.get(vmp_id=b)
    except Beer.DoesNotExist:
        continue

    release.beer.add(beer.vmp_id)
    release.save()

print("Release added")

## Check Beers

In [None]:
from beers.models import Release
from django.utils import timezone
from datetime import timedelta

release = Release.objects.get(name=release_name)
thirty_days_ago = timezone.now() - timedelta(days=30)

beers = list(release.beer.filter(verified_match=False, untpd_id__isnull=False,created_at__gte=thirty_days_ago).exclude(untpd_name=""))

for beer in beers:
    print(f"{beer.vmp_name}")
    
    print(f"{beer.untpd_name}")
    
    user_input = input("Is this a verified match? (y/n): ").strip().lower()
    
    if user_input == 'y':
        beer.verified_match = True
        beer.save()
        print("Verified match set to True.\n")
    elif user_input == 'n':
        beer.untpd_id = None
        beer.untpd_name = None
        beer.untpd_url = None
        beer.verified_match = False
        beer.prioritize_recheck = False
        beer.brewery = None
        beer.rating = None
        beer.checkins = None
        beer.style = None
        beer.description = None
        beer.abv = None
        beer.ibu = None
        beer.label_hd_url = None
        beer.label_sm_url = None
        beer.alcohol_units = None
        beer.untpd_updated = None
        beer.save()
        print("Fields reset and saved.\n")
    else:
        print("Invalid input. Skipping this beer.\n")


## Match rest from Google

In [None]:
import requests
import re
from beers.models import Beer

API_KEY = os.getenv("GOOGLE_API_KEY")
CSE_ID = os.getenv("GOOGLE_CUSTOM_SEACH_ID")


def query_google(beer_name):
    """
    Query Google Custom Search API for the beer name on site untappd.com and return the top 5 results.
    """
    url = "https://www.googleapis.com/customsearch/v1"
    params = {
        "key": API_KEY,
        "cx": CSE_ID,
        "q": beer_name,
    }

    response = requests.get(url, params=params)
    if response.status_code != 200:
        print(f"Failed to fetch results for {beer_name}. Status code: {response.status_code}")
        return []

    data = response.json()
    results = []

    valid_url_pattern = re.compile(r"^https://untappd\.com/b/[^/]+/\d+$")

    for item in data.get("items", []):
        title = item.get("title")
        link = item.get("link")

        if valid_url_pattern.match(link):
            results.append((title, link))

        if len(results) == 5:
            break

    return results


beers = Beer.objects.filter(match_manually=True, active=True)

for beer in beers:
    print(f"\nProcessing beer: {beer.vmp_name}")

    results = query_google(beer.vmp_name)

    if not results:
        print("No results found. Skipping this beer.")
        continue

    print("\nTop 5 results:")
    for i, (title, link) in enumerate(results, start=1):
        print(f"{i}. {title} - {link}")

    choice = input("\nEnter the number of the correct match (or press Enter to skip): ").strip()

    if choice.isdigit() and 1 <= int(choice) <= len(results):
        selected_index = int(choice) - 1
        selected_title, selected_link = results[selected_index]

        beer.untpd_url = selected_link
        beer.untpd_id = selected_link.split("/")[-1]
        beer.verified_match = True
        beer.match_manually = False
        beer.save()

        print(f"Matched '{beer.vmp_name}' to '{selected_title}' ({selected_link}).")
    else:
        print("No match selected. Skipping this beer.")


## Schedule badges

In [None]:
from datetime import timedelta
from django.utils import timezone
from django_q.models import Schedule

Schedule.objects.create(
    name="Release: " + badge_text + " - Add badges",
    func="beers.tasks.create_badges_custom",
    kwargs="products='"
    + ",".join(str(beer) for beer in release_beers)
    + "', badge_text='"
    + badge_text
    + "', badge_type='"
    + badge_type
    + "'",
    schedule_type=Schedule.ONCE,
    next_run=timezone.now() + timedelta(minutes=10),
)

# Schedule removing badges
Schedule.objects.create(
    name="Release: " + badge_text + " - Remove badges",
    func="beers.tasks.remove_badges",
    kwargs="badge_type='" + badge_type + "'",
    schedule_type=Schedule.ONCE,
    next_run=timezone.now() + timedelta(days=badge_days),
)