## Reviews Scraper | Worker 4

**Step 1:** Set variables for offset, batch size, and worker file directories

In [1]:
################################### UPDATE ###################################
OFFSET = 765*3
BATCH = 765
worker_reviews_csv = 'reviews_worker_files/worker4/reviews_w4.csv'
worker_reviews_error_csv = 'reviews_worker_files/worker4/reviews_error_w4.csv'
##############################################################################

**Step 2:** Worker functions. Modified version of `reviews_etl.py`. 

<font color='red'>**### ATTENTION: Only run the below cell ONCE! ###**</font>

Running multiple times will affect the `GLOBAL_COUNTER` variable and lead to inaccurate "Remaining businesses to scrape" metric

In [2]:
import requests
import json
import pandas as pd
import time
import beepy
# import pyspark
# from pyspark.sql.functions import col, isnull
# from pyspark.sql.types import StructType, StructField, StringType, IntegerType

############ INSERT API KEY HERE ############
API_KEY = ''
#############################################
GLOBAL_COUNTER = 0


def apiURL(yelp_url):
    global API_KEY
    return f"https://api.scrapingdog.com/scrape?api_key={API_KEY}&url={yelp_url}&dynamic=false"

def apiCall(yelp_url):
    try:
        return requests.get(apiURL(yelp_url), timeout=120).text
    except Exception as e:
        print(f'API connection error detected: {e}')
        return None


def convertToJson(response):
    review_list = json.loads(
        response[response.find('\"review\":[')+9:response.find('],\"aggregateRating\"')+1]
    )
    return review_list

def buildDf(yelp_id, review_list):
    author_list = [r['author'] for r in review_list] 
    datePublished_list = [r['datePublished'] for r in review_list] 
    reviewRating_list = [r['reviewRating']['ratingValue'] for r in review_list] 
    reviewText_list = [r['description'] for r in review_list]
    
    return pd.DataFrame(
        {
            'id': [yelp_id for i in range(len(review_list))],
            'author': author_list,
            'rating': reviewRating_list,
            'text': reviewText_list
        }
    )

def getIdsAndUrls():
    global OFFSET, BATCH, GLOBAL_COUNTER, worker_reviews_csv, worker_reviews_error_csv
    
    business_10plus_df = pd.read_csv('csv_files/business_10plus.csv', usecols = ['id','url'])

    # Truncate URL and remove any businesses where id = #NAME?
    business_10plus_df['url'] = business_10plus_df['url'].apply(lambda u: u[0:u.find('?')])
    business_10plus_df = business_10plus_df[business_10plus_df['id'] != '#NAME?']

    # Read in distinct ids in review.csv
    reviews_df = pd.read_csv(worker_reviews_csv, usecols = ['id', 'rating']).drop_duplicates(['id'])

    # Read in ids in review_error.csv
    reviews_error_df = pd.read_csv(worker_reviews_error_csv)
    reviews_error_df['error_flag'] = 1

    # Only select ids and urls that don't exist in reviews.csv or review_error.csv
    joined_df = business_10plus_df \
                    .merge(reviews_df, left_on='id', right_on='id', how='left') \
                    .merge(reviews_error_df, left_on='id', right_on='id', how='left')


    joined_df = joined_df[joined_df['rating'].isna() & joined_df['error_flag'].isna()] \
                    .drop(columns=['rating', 'error_flag']) \
                    .sort_values('id') \
                    .reset_index(drop=True)

    # Return list of id-url pairs
    return [joined_df.loc[i,:].values.flatten().tolist() for i in range(OFFSET, OFFSET+BATCH-GLOBAL_COUNTER)]


def main(business_count=10):
    global GLOBAL_COUNTER
    
    # Counters for final report out
    success_counter = 0
    error_counter = 0
    
    # Get list of ids and urls for API calls
    id_url_list = getIdsAndUrls()[0:BATCH-GLOBAL_COUNTER]
    
    # Read reviews.csv
    reviews_df = pd.read_csv(worker_reviews_csv)
    
    # Read reviews_error.csv
    reviews_error_df = pd.read_csv(worker_reviews_error_csv)

    for b in id_url_list[0: min(business_count, len(id_url_list))]:
        print(f'START: Collect review data for id {b[0]} ({success_counter+error_counter+1}/{min(business_count, len(id_url_list))})')
        try:    
            response = apiCall(yelp_url=b[1])
            review_list = convertToJson(response=response)
            response_df = buildDf(yelp_id=b[0], review_list=review_list)

            reviews_df = pd.concat([reviews_df, response_df])
            reviews_df.to_csv(worker_reviews_csv, index=False)
            print(f'SUCCESS: Review data for id {b[0]} successfully written to CSV')

            success_counter+=1
        except:
            print(f'*** ERROR: Review data not collected for id {b[0]} ***')
            reviews_error_df = pd.concat([reviews_error_df, pd.DataFrame([b[0]], columns=['id'])])
            reviews_error_df.to_csv(worker_reviews_error_csv, index=False)

            error_counter+=1
    
    
    GLOBAL_COUNTER = GLOBAL_COUNTER + success_counter + error_counter
    print('################# COMPLETE #################') 
    print(f'API call for {min(business_count, len(id_url_list))} businesses complete')
    print(f'- Successes: {success_counter}  - Errors: {error_counter}')
    print(f'- Remaining businesses to scrape: {len(id_url_list) - min(business_count, len(id_url_list))}')
    print('############################################') 
    beepy.beep(4)

**Step 3:** Run `main()` to begin scraping. `business_count` variable specifies how many pages to scrape before terminating the `main()` (Default value is 10). `main()` can be run multiple times.

In [10]:
print('start:',time.ctime(time.time()))
main(250)
print('end:',time.ctime(time.time()))

start: Fri Oct 27 20:30:05 2023
################# COMPLETE #################
API call for 0 businesses complete
- Successes: 0  - Errors: 0
- Remaining businesses to scrape: 0
############################################
end: Fri Oct 27 20:30:10 2023
