# Google Places Data Setup Process

## Naming Conventions in BigQuery

All Google amenities data used in this framework are stored in the homevest-data.nbhd_similarity dataset with the following naming convention. Unlike yelp places data, the yelp API pulls by CBSA code/at the CBSA scope, and thus is named as such. 

* “gmaps_{KEYWORD}_{STATE FIPS}{COUNTYFIPS}”
* ex:
    * “gmaps_arcade_13063”
    * “gmaps_bail_bonds_13063”

## Use the following code to pull new data from Google Places into BigQuery. 

Make sure to follow through and store the pulled data to bigquery so we don't have to pull the data & be charged for it again. Data is first pulled from the api insto GCS, and other functions are used to then stored that GCS data into BigQuery.

*The final functions section has the functions you can run to do all the work of searching, calling, and storing for you, but you will need to run the lines above it for set up!.*

**Be aware of Google Places API costs!**

### Set Up

In [2]:
# import packages
import json
import requests
import pandas as pd
import geopandas as gpd
import matplotlib.pyplot as plt
import plotly.express as px
import statsmodels.api as sm
import censusdata
from us import states
from google.cloud import bigquery, storage
import time
import logging
from google.cloud.exceptions import NotFound
from shapely.geometry import Point # Shapely for converting latitude/longtitude to geometry
import geopandas as gpd # To create GeodataFrame

#### Geodata pulling functions

In [3]:
# pull bigquery query as dataframe function
def bq_to_df(query):
    query_job = client.query(
        query,
        location="US")
    df = query_job.to_dataframe()
    return df

def state_fips(state): #pulls state fips
    return states.lookup(state).fips

def pull_county_zip_codes(state, county): #pulls all zip codes in county
    zipcode_points_query = f"""
    WITH county AS (SELECT * 
    FROM `bigquery-public-data.geo_us_boundaries.counties` cbsa
    WHERE geo_id = '{str(state_fips(state)) + str(county)}')

    SELECT zc.zip_code,
    zc.internal_point_lat as latitude,
    zc.internal_point_lon as longitude,
    FROM `bigquery-public-data.geo_us_boundaries.zip_codes` zc
    CROSS JOIN county c 
    WHERE zc.state_fips_code = '{state_fips(state)}' AND
    ST_INTERSECTS(zc.zip_code_geom, c.county_geom) AND 
    NOT ST_TOUCHES(zc.zip_code_geom, c.county_geom)
    """
    client = bigquery.Client(location="US")
    query_job = client.query(
        zipcode_points_query,
        location="US")

    df = query_job.to_dataframe()
    
    return df

def pull_census_tract_geodata(state,county=False): #pulls all census tracts in county or entire state
    q = f"""SELECT * 
      EXCEPT (internal_point_geo, tract_geom)
    FROM `bigquery-public-data.geo_census_tracts.us_census_tracts_national` 
    WHERE county_fips_code = '{county}' AND
    state_fips_code = '{state_fips(state)}'
    """
    df = bq_to_df(q).rename(columns={'tract_ce':'tract','county_fips':'county','internal_point_lat':'latitude','internal_point_lon':'longitude'})
    df.latitude = df.latitude.str.replace('+','')
    df.longitude = df.longitude.str.replace('+','')
    return df

#### Setting Global Variables

In [1]:
API_KEY = {ENTER_HERE}
BASE_URL = "https://maps.googleapis.com/maps/api/place/nearbysearch/json"
GCS_BUCKET = 'homevest-data'
GCS_DATA_DIR = 'nbhd_similarity/45045/' #change county here everytime

storage_client = storage.Client()
bucket = storage_client.get_bucket(GCS_BUCKET)
client = bigquery.Client(location="US")

NameError: name 'ENTER_HERE' is not defined

#### Pulling and Storing Results in GCS Functions

In [None]:
# check to see if gc file exits already 
def is_gcs_file_existing(file_name, directory, bucket):
    blob = bucket.blob(f'{directory}{file_name}')
    return blob.exists()
    
# upload file to gcs
def upload_dict_to_gcs(dict_to_upload, file_name, directory, bucket):    
    json_string = json.dumps(dict_to_upload)
    blob = bucket.blob(f'{directory}{file_name}')
    blob.upload_from_string(json_string)

# gets next page of results
def get_next_page(json_file):
    token = json_file['next_page_token']
    r_next_page = requests.get(BASE_URL, params = {'pagetoken': token, 'key' : API_KEY})
    r_next_page.raise_for_status()
    return r_next_page.json()

# pulls results given a df, a keyword, and a scope
def pull_keyword_results(input_df, keyword, scope='zip_code', replace=False, verbose=True, max_locs=1500,):
    # initialization
    #set parameters
    params = {
        'location' : None,
        'keyword' : keyword,
        'key' : API_KEY,
        'rankby' : 'distance'
    }
    errors = []
    total_locs = len(input_df) #find total number of locations (either zip codes or census tracts)
    
    if total_locs > max_locs: #check to see if total number of provided tracts or zips is over max 
        raise ValueError(f'# of zip codes in MSA({total_locs}) exceeds maximum of {max_locs}!')
    
    for idx, row in input_df.iterrows():
        loc = row[scope] #pulls either the tract name or the zipcode depending on specified scope
        
        file_path = f'{keyword.lower().replace(" ", "_")}/{loc}.json' #file named based on loc
        
        if not replace and is_gcs_file_existing(file_path, GCS_DATA_DIR, bucket): #so long as not replacing and file not already exist, build the tables
            continue
            
        params['location'] =  f"{row['latitude']}, {row['longitude']}" #pull exact geocode for loc
        
        try:
            r = requests.get(BASE_URL, params=params)
            r.raise_for_status()

            upload_dict_to_gcs(r.json(), file_path, GCS_DATA_DIR, bucket) #upload data to gcs
            
            # Thu include next page token
            i = 0
            api_result = []
            api_result.append(r.json())
            next_page = 'next_page_token' in api_result[0] # If there's a next page token, then this will be true and the while loop will run
            while next_page: # The below runs if true and doesn't run if false
                time.sleep(2.4)
                i = i + 1
                file_path = f'{keyword.lower().replace(" ", "_")}/{loc}_{i}.json' # Append number to indicate file order
                # print(file_path)
                x = get_next_page(api_result[0])
                # print (x)
                upload_dict_to_gcs(x, file_path, GCS_DATA_DIR, bucket)
                api_result.clear()
                api_result.append(x)
                next_page = 'next_page_token' in api_result[0] # Update next page status, if it false then while loop stops running
        
        except BaseException:
            logging.exception("An exception was thrown!")
            print(f'Error encountered for zip code {loc}')
            errors.append(loc)
        
        if verbose and idx % 30 == 0:
            print(f'{idx} of {total_locs} {scope} processed')
    
    return errors

#### Conjoin GCS Files and Upload Final File to BigQuery

In [6]:
# columns we want to pull
COLUMNS=[
   'place_id',
    'name',
    'rating',
    'user_ratings_total',
    'lat',
    'lng',
    'icon',
    'types',
    'business_status'
]

# pulls file paths from gcs directory 
def get_file_paths_from_gcs_directory(keyword, bucket=bucket): 
    blobs = bucket.list_blobs(prefix = f'{GCS_DATA_DIR}{keyword.lower().replace(" ", "_")}/')
    file_path_list = []
    for blob in blobs:
        file_path_list.append(blob.name)
        df = pd.DataFrame({'file_path' : file_path_list})
    return df

#pulls jsons of given file path from gcs
def get_json_per_file_path(file_path, keyword, bucket=bucket):
    blob = bucket.blob(f'{file_path}')
    json_string = blob.download_as_string()

    return json.loads(json_string)

# Generator function that combines jsons from folder and combines them..
def generate_places(keyword):
 
    file_path_df = get_file_paths_from_gcs_directory(keyword, bucket)
    total_json_tables = len(file_path_df)
    places_loaded = set()
    
    for idx, row in file_path_df.iterrows():
        if idx % 50 == 0: # print progress after every 50 processed..
            print(f'{idx} of {total_json_tables} json files processed')
        tract_dict = get_json_per_file_path(row['file_path'], keyword, bucket)
        places_list = tract_dict.get('results', [])
        for place_dict in places_list:
            if place_dict['place_id'] in places_loaded:
                continue
            places_loaded.add(place_dict['place_id'])
            
            cleaned_dict = { col : place_dict.get(col) for col in COLUMNS}
            cleaned_dict.update({ col : place_dict.get('geometry', {}).get('location', {}).get(col) for col in ['lat', 'lng']})

            yield cleaned_dict

            
def load_to_BQ(df, client, dataset, table, schema, cluster_fields=None):

    table_id = f'{dataset}.{table}'
    job_config = bigquery.LoadJobConfig(schema=schema)
    job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
    job_config.autodetect = True

    if cluster_fields is not None:
        job_config.clustering_fields = cluster_fields

    job = client.load_table_from_dataframe(df, table_id, job_config=job_config)
    job.result()
    out_table = client.get_table(table_id)
    print("Loaded {} rows into {}.".format(out_table.num_rows, table_id))

def get_schema(df):
    DTYPE_MAP = {'object':'STRING', 'float64': 'FLOAT64', 'int64':'INT64', 'bool':'BOOL', 'datetime64[ns]': 'TIMESTAMP'}
    return [bigquery.SchemaField(c, DTYPE_MAP[str(d)]) for c,d in zip(df.columns, df.dtypes)]

def correct_df_dtypes(df, inplace=False):
    _df = df.copy(deep=(not inplace))
    DTYPE_MAP = {'object':str, 'float64': float, 'int64':int, 'bool':bool}
    for col, dtype in zip(df.columns, df.dtypes):
        _df[col] = df[col].astype(DTYPE_MAP[str(dtype)])
    return _df

### Final Functions

You can use the below functions to streamline everything to pull and store the data necessary. All you need to provide is the state, a county fips code (these functions conduct the search & pull per county), some keywords to search, and a scope. 

The scope might depend on what you are looking for. Generally, running scope as 'tract' will be more expensive than running as 'zip_code'. The Google Places API has a results limit of 60 places. If you believe there might be more than 60 of a certain amenity (your keyword) in a zipcode, you might consider running with scope as 'tract'. 

#### Pulling multiple keywords at once

**Steps:**

1. Create a list of amenities you would like to pull for an area
* ex. `google_amenities=['library','park','lake','art gallery','retail store','police station','parking']`

2. Set GCS_DATA_DIR following below convention:
* `GCS_DATA_DIR = 'nbhd_similarity/{STATE FIPS}{COUNTY FIPS}/'`

3. Set state and county variables.
* ex: `state = 'Georgia'
        county = '063'`
        
4. Run gp_api_all_in_list() function with assigned parameter variables. 
* ex:`gp_api_all_in_list(google_amenities,state,county,scope='zip_code')`

In [11]:
#pulls all search results for given county for a certain keyword and stores the data in BQ
def pull_store_data(state,county,keyword = 'restaurant',replace=False, scope='tract'):
    
    #check to see if data already exists in bigquery
    if not replace and not is_bq_file_existing(state,county,keyword):
        print('Building Table..')
    #If does not already exist, build the table using previous functions
        if scope =='zip_code':
            locs = pull_county_zip_codes(state,county)
        else:
            locs = pull_census_tract_geodata(state,county)
            
        pull_keyword_results(locs, keyword=keyword, replace=replace,scope=scope)
        results = pd.DataFrame(generate_places(keyword=keyword),columns = COLUMNS)
        # results.head()
        load_to_BQ(correct_df_dtypes(results),
                   client,
                   'nbhd_similarity', 
                   f'gmaps_{keyword.lower().replace(" ", "_")}_{str(states.lookup(state).fips) + str(county)}', 
                   get_schema(correct_df_dtypes(results))
                  )
        return (f'data stored in nbhd_similarity.gmaps_{keyword.lower().replace(" ", "_")}_{str(states.lookup(state).fips) + str(county)}')

#checks if file already exists in BigQuery    
def is_bq_file_existing(state,county,keyword = 'restaurant'):
    table_id = f'homevest-data.nbhd_similarity.gmaps_{keyword.lower().replace(" ", "_")}_{str(states.lookup(state).fips) + str(county)}'
    
    try:
        client.get_table(table_id)  # Make an API request.
        print(f'Table already built as {table_id}')
        return True
    except NotFound:
        print('Table needs to be built..')
        return False

def gp_api_all_in_list(amenities,state,county,scope='tract'):
    for a in amenities:
        pull_store_data(state,county,keyword = a, scope=scope)
    print('All Tables Built')

In [10]:
# FULL EXAMPLE OF ABOVE. DO NOT RUN THIS BECAUSE WE ALREADY HAVE THIS DATA.
# marion county: 213 places 
# GCS_DATA_DIR = 'nbhd_similarity/13097/' #change county here everytime

# state = 'Indiana'
# county = '097'
# gp_api_all_in_list(google_amenities,state,county,scope='zip_code')