### Untappd Scraper

Due to the unique requirements of scraping untappd, selenium (headless or otherwise) is our best choice.  

    1) Login required
    2) Must select 'Show More' to see more than a handful of both search results and reviews
    3) Odd design that is surprisingly difficult to use requests with
    

In [13]:
import os
import glob
import json
import time
import pickle

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys

### Load config info

Username, Password, and selenium driver path

In [14]:
config_path = 'untappd.cfg'

with open(config_path) as rdr:
    config = json.load(rdr)

### Set Variables

In [15]:
LOGIN_URL = 'https://untappd.com/login'
CHUNK_SIZE = 25 ## URLs to scraper per session.  

search_pkl = 'ipa_urls.pkl' ## Only perform search once.  It is picked otherwise.  If this file exists, search will not be performed

## Log in

    1) Create Browser Object
    2) Find login elements
    3) Fill them out, submit

In [16]:
browser = webdriver.Chrome(config['driver_path'])
browser.get(LOGIN_URL)

username = browser.find_element_by_id("username")
password = browser.find_element_by_id("password")

username.send_keys(config['username'])
password.send_keys(config['password'])

browser.find_element_by_xpath("//input[@type='submit']").click()


### Note: 

Sometimes along the bottom, a prompt to download the app appears.  It's hard to identify and click for some reason with selenium.  Closing it once per session will keep it closed.  Go ahead and look for that now, before continuing.  If it's there, just click 'x'



### Identify IPA URLS

If the pickle file exists, load it.  Otherwise, perform search. 

In [17]:
def get_beers_from_search(search_term, browser):
    ## Create search URL and go
    browser.get('https://untappd.com/search?q={}'.format(search_term.strip().replace(' ', '+')))
    
    ## Click the show more button
    for i in range(25):
        try:
            browser.find_element_by_xpath("//*[contains(text(), 'Show More')]").click()
            time.sleep(2)
            browser.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        except:
            print('Error clicking "Show More" on iteration', i)
            time.sleep(5)
    
    ## Find beer links on page
    results = browser.find_elements_by_css_selector('.beer-item')
    
    urls = []
    for result in results:
        for url in result.find_elements_by_tag_name('a'):
            if url.get_attribute('href').startswith(r'https://untappd.com/beer'):
                urls.append(url.get_attribute('href'))

    print(len(urls), 'beers found for search', search_term)
    
    return urls

def write_pkl(filename, data):
    with open(filename, 'wb') as f:
        pickle.dump(data, f)
        
def read_pkl(filename):
    with open(filename, 'rb') as f:
        return pickle.load(f)

In [18]:
if not os.path.isfile(search_pkl): 
    print('Pickle file not found.  Performing search.')
    
    beer_urls = set()

    search_terms = ['ipa', 'dipa', 'double ipa', 'hazy ipa']

    for search_term in search_terms:
        urls = set(get_beers_from_search(search_term, browser))
        beer_urls = beer_urls.union(urls)

        print('total url count:', len(beer_urls))
    
    ## Write out our pickle file
    write_pkl('ipa_urls.pkl', beer_urls)
    
else:
    ## Read pickled data
    print("Reading pickled URLS")
    beer_urls = read_pkl('ipa_urls.pkl')

Reading pickled URLS


### Create Function for identifying/naming files

In [19]:
def create_naming_funct(file_format):
    """file_format is intended to come in as folder/filename_{}.extension, where the {} will be replaced by a number (0000, 00001, etc)"""
    file_format = file_format
    
    def identify_checkpoint():
        nonlocal file_format
        existing_files = glob.glob(file_format.format('*'))
        return len(existing_files), file_format.format(str(len(existing_files)).zfill(5)) 
    
    return identify_checkpoint

In [20]:
beer_checkpointer = create_naming_funct('data/beer_info_{}.json')
review_checkpointer = create_naming_funct('data/reviews_{}.json')
user_checkpointer = create_naming_funct('data/users_{}.json')

url_checkpointer = create_naming_funct('checkpoints/run_checkpoint_{}.pkl')

### Once we have a set of URLS to iterate over, we can begin scraping reviews.  

We'll have to load each page, push the show more button a bunch, and scrape the reviews. 

In [21]:
def get_beer_info(browser, url):
    classnames = 'name,brewery,style,abv,ibu,rating,raters,date'.split(',')
    
    browser.get(url)
    
    beer_id = url.split('/')[-1]

    ## Populate the beer info
    beer_info = {}
    beer_info['id'] = beer_id
    
    try:
        element = browser.find_element_by_class_name('beer-descrption-read-more')
        if not element:
            print('didnt find element.  pausing')
            time.sleep(10)
    except:
        print('Exception caught. pausing')
        time.sleep(10)

    ## If there is no "Show More" button, catch that error
    try:
        browser.find_element_by_class_name('beer-descrption-read-more').find_element_by_link_text('Show More').click()
        time.sleep(0.5)
    except:
        pass
        
    beer_info['description'] = browser.find_element_by_class_name('beer-descrption-read-less').text[:-10]

    for classname in classnames:
        beer_info[classname] =  browser.find_element_by_class_name(classname).text
        if classname == 'name':
            beer_info[classname] = browser.find_element_by_class_name(classname).find_element_by_tag_name('h1').text

    
    return beer_info

In [22]:
def scrape_reviews(browser, beer_id):
    ## Show more reviews!
    ## Click the show more button
    fail_count = 0
    
    for i in range(50):
        try:
            browser.find_elements_by_xpath("//*[contains(text(), 'Show More')]")[1].click()
            time.sleep(2)
            
        except:
            fail_count += 1
            if fail_count > 2:
                break
            time.sleep(4)
            
    
    ## Get reviews
    user_reviews = []
    
    user_reviews_elems = browser.find_element_by_id('main-stream').find_elements_by_class_name('checkin')
    for user_review_elem in user_reviews_elems:
        rating_dict = {}
        rating_dict['beer_id'] = beer_id
        rating_dict['user_id'] = user_review_elem.find_element_by_class_name('user').get_attribute('href')

        try:
            rating = None
            rating_spans = user_review_elem.find_element_by_class_name('rating-serving').find_elements_by_tag_name('span')
            for span in rating_spans:
                if span.get_attribute('class').startswith('rating small'):
                    rating = span.get_attribute('class').split(' ')[-1][1:]
        except:
            continue

        rating_dict['comment'] = None
        try:
            rating_dict['comment'] = user_review_elem.find_element_by_class_name('comment-text').text
        except:
            pass

        if rating:
            if len(rating) > 1:
                rating = rating[0] + '.' + rating[1:]
            rating = float(rating)

        rating_dict['rating'] = rating

        user_reviews.append(rating_dict)
        
    return user_reviews

In [23]:
## Identify checkpoints
if url_checkpointer()[0] == 0:
    print("Creating first checkpoint")
    
    url_list = list(beer_urls)
    write_pkl(url_checkpointer()[1], url_list)
else:
    
    next_checkpoint = url_checkpointer()
    
    previous_checkpoint = next_checkpoint[1].replace(str(next_checkpoint[0]).zfill(5), str(next_checkpoint[0]-1).zfill(5))
    print('Reading checkpoint file', previous_checkpoint)
    url_list = read_pkl(previous_checkpoint)
    

Reading checkpoint file checkpoints/run_checkpoint_00026.pkl


In [24]:

for x in range(1):
    
    reviews = []
    beers = []
    
    start = time.time()
    for i in range(CHUNK_SIZE):
        url = url_list.pop()
        print(str(i) + ':', url, end=' ')

        beer_info = get_beer_info(browser, url)
        beer_reviews = scrape_reviews(browser, beer_info['id'])

        reviews.extend(beer_reviews)
        beers.append(beer_info)

        time.sleep(20)
        print('Reviews found:', len(beer_reviews), len(json.dumps(beer_reviews)), 'Total Review Count:', len(reviews))

    with open(beer_checkpointer()[1], 'w') as wtr:
        json.dump(beers, wtr)

    with open(review_checkpointer()[1], 'w') as wtr:
        json.dump(reviews, wtr)

    write_pkl(url_checkpointer()[1], url_list)
    
    print('Run', x, 'took', time.time()-start, 'seconds. pausing.')
    time.sleep(240) # 1 minute pause

0: https://untappd.com/beer/2943312 Reviews found: 15 1889 Total Review Count: 15
1: https://untappd.com/beer/2783468 Reviews found: 15 1704 Total Review Count: 30
2: https://untappd.com/beer/2710568 Reviews found: 15 1965 Total Review Count: 45
3: https://untappd.com/beer/1016833 Reviews found: 277 31364 Total Review Count: 322
4: https://untappd.com/beer/2461404 Reviews found: 15 1780 Total Review Count: 337
5: https://untappd.com/beer/2476735 Reviews found: 256 31228 Total Review Count: 593
6: https://untappd.com/beer/2146934 Reviews found: 279 31894 Total Review Count: 872
7: https://untappd.com/beer/2075737 Reviews found: 14 1482 Total Review Count: 886
8: https://untappd.com/beer/43833 Reviews found: 281 30579 Total Review Count: 1167
9: https://untappd.com/beer/215645 Reviews found: 237 27897 Total Review Count: 1404
10: https://untappd.com/beer/2089569 Reviews found: 15 1825 Total Review Count: 1419
11: https://untappd.com/beer/2526177 Reviews found: 274 30939 Total Review Coun

### Get user information!

Identify all user urls, and figure out what information we can scrape

In [None]:
user_urls = set([review['user_id'] for review in reviews])
len(user_urls)

In [None]:
user_data = []
for user_url in list(user_urls)[:100]:
    
    
    browser.get(user_url)
    
    user_info = browser.find_element_by_class_name('user-info')

    user_dict = {}
    user_dict['name'] = user_info.find_element_by_class_name('info').find_element_by_tag_name('h1').text

    user_dict['username'] = user_info.find_element_by_class_name('username').text
    user_dict['location'] = user_info.find_element_by_class_name('username').text
    user_dict['location'] = None if len(user_dict['location']) == 0 else user_dict['location']
    user_dict['social'] = {}
    social_list = user_info.find_element_by_class_name('social').find_elements_by_tag_name('a')
    for social in social_list:
        user_dict['social'][social.text] = social.get_attribute('href')


    user_dict['stats'] = {}
    stats_list = user_info.find_element_by_class_name('stats').find_elements_by_tag_name('a')
    for stat in stats_list:
        user_dict['stats'][stat.find_element_by_class_name('title').text] = int(stat.find_element_by_class_name('stat').text.replace(',', ''))
        
    user_data.append(user_dict)

In [None]:
with open(user_checkpointer()[1], 'w') as wtr:
    json.dump(user_data, wtr)

In [None]:
len(user_data)

In [None]:
user_review_elem.text