# MVP 3 Parallel Computing

Laure Briol aided by Generative AI (ChatGPT)

This MVP is designed to generate a heatmap of the Twin Cities area, showing 'trendy' locations. Trendy locations tend to have more people visitng the stores, as well as more people reviewing stores and shops in the area. So, a place with more recent reviews will be more 'trendy'. The Google's Places API was used to find locations of coffee shops and gift shops within the Twin Cities area. From there a heatmap was created, based on the average time of the 5 most recent reviews. This program used Dask to split up the API calls across the Twin Cities area, so we could 'talk' with Google many times at once, in order to ensure the project map could be created within a timely manner.

In [None]:
#import necessary libraries for handling requests, data manipulation, mapping, and parallel processing
import requests
import pandas as pd
import folium
from datetime import datetime
import time
import numpy as np
from folium.plugins import HeatMap
import math
import json
#dask package for use in running multiple tasks at once
from dask import delayed, compute
from dask.distributed import Client, LocalCluster
from shapely.geometry import Polygon
#grid pattern creation
from shapely.ops import unary_union

In [1]:
#configuration and constants

#replace with your actual google api key
#note this was very expensive to run ($70 dollars for 3 Km) -- please dont run
#run if you create another Google Account and have free trial credits
API_KEY = '' 

#keywords to search for in the Google Places API
KEYWORDS = ['coffee shop', 'gift shop']

#in the final map, we show the top 3 locations instead of all 20 to make sure the map isn't too full
#within each cell of the final heatmap, only show up to 3 point locations instead of 20.
TOP_N = 3 

#bounding box coordinates for the Twin Cities area, defining the geographic area of interest
MIN_LAT, MAX_LAT = 44.75, 45.15
MIN_LNG, MAX_LNG = -93.65, -92.9

#number of parallel tasks to run, this is how many times at once we are calling Google's API to talk to
#we call first --> 8 grid cells to search at one time to find different places
#then we call second --> 8 places at one to find reviews of each place
NUM_WORKERS = 8


#functions section
#these are all the pieces of the code that we define now, then call later to run the program

#function to convert meters to degrees, based on latitude
#when creating the 'grid', we ask what size in meters because that is an easy number to interpret, but our bounding box is in lat/lng coordinates
#so, we convert the meters to lat/lng coordinates so we can properly create a grid inside the bounding box in the correct unit size
def meters_to_degrees(meters, latitude):
    #every degree of latitude is the same number of meters
    meters_per_degree_lat = 111_320
    #each degree of longitude is dependent on where on the earth we are. further north there is less distance per degree of longitude
    meters_per_degree_lng = 111_320 * math.cos(math.radians(latitude))
    #given our grid size (like 3km), this tells us how many degrees of latitude 3km is.
    delta_lat = meters / meters_per_degree_lat
    #given our grid size (like 3km), this tells us how many degrees of longitude 3km is.
    delta_lng = meters / meters_per_degree_lng
    #returns this data back so we can create a proper grid cell that appears nicely on the map
    return delta_lat, delta_lng

#this function is used to split our list of API calls (like all the locations we want to search or all the places we want to get reviews from)
#into n or 8 'chunks' that we want the computer to process
def split_list(lst, n):
    #calculate the size of each chunk by diving how mnay API calls we want to make by n
    chunk_size = len(lst) // n
    #calculate remainder
    remainder = len(lst) % n
    #create a list that has each chunk of data
    #Each chunk is a list of API calls. The dask program will go through all 8 lists at the same time to get the data we want
    chunks = []
    #start at the start of the list, this is to keep track of how many pieces of data are in the chunk.
    start = 0
    #we are looping to give each 'chunk' a bit of data to process
    for i in range(n):
        #this calculates how many pieces of data to give the chunk to process
        end = start + chunk_size + (1 if i < remainder else 0)
        #take our list of API calls, and assign the chunk of data that section of the list
        chunk = lst[start:end]
        #add this chunk into the list of chunks for storage
        chunks.append(chunk)
        #we will look at the next bit of the list to split up
        start = end
    #at the end, we return the list of chunks.
    return chunks

#this funciton is designed to call Google's API and search for all the 'places' that are inside our grid cell size
def get_places(keyword, location, radius, api_key):
    #this is the google API website URL that we want to talk to, gives us all the 'places' within a nearby area of our search
    url = 'https://maps.googleapis.com/maps/api/place/nearbysearch/json'
    #this is the parameters telling the Google API what we are searching for (keyword), where we want to search (location), how far to search (radius), and API key for authentication
    params = {
        'keyword': keyword,
        'location': location,
        'radius': radius,
        'key': api_key
    }
    #use the requests package to 'call' the Google API and get data
    response = requests.get(url, params=params)
    #convert the data that google gives us into JSON format for easier processing
    res_json = response.json()
    #within the data from google, we specifically want the results. They give us other not useful metadata but we only care about the places that it gives us
    results = res_json.get('results', [])
    #return the list of places to the code below
    return results

#this funciton is designed to call Google's API and search for the reviews of each 'place' that we found from the get_places funciton above
def get_place_details(place_id, api_key):
    #this is the google API website URL that we want to talk to, gives us details of a given 'place'
    url = 'https://maps.googleapis.com/maps/api/place/details/json'
    #this is the parameters telling the Google API what we place are searching for (place_id), what we are interested in learning about that place (fields), we want the reviews sorted giving the newest reviews, and API key for authentication
    params = {
        'place_id': place_id,
        'fields': 'name,geometry,review,formatted_address,website',
        'reviews_sort': 'newest',
        'key': api_key
    }
    #use the requests package to 'call' the Google API and get data
    response = requests.get(url, params=params)
    #return the details about the place, we want specifically the results of the API call
    return response.json().get('result', {})

#this is the function that performs all the repetitive API calls to google to search each individual grid cell.
#it takes in a 'chunk' of API calls we want to make, and then goes through that list one at a time calling the Google API for each grid cell location
def process_chunk(chunk, api_key):
    #create a list to hold all the individual places that we find
    places_list = []
    #create a loop that looks at each individual cell location and then searches based on the 2 keywords and grid cell size
    for keyword, location, radius in chunk:
        #at each grid cell location, call the Google API to search for all the business locations inside that cell
        places = get_places(keyword, location, radius, api_key)
        #once we get a list of all the locations, we add in extra info that this place was a 'coffee shop' or 'gift shop' based on the search we did
        for place in places:
            place['keyword'] = keyword
        #we add onto the list holding all places the new places we found from the Google API call
        places_list.extend(places)
        #we slow down the program, pausing for 0.1 second so that Google doesn't get mad for calling too many times at once.
        time.sleep(0.1)
    #once we finish searching all the grid cells in this 'chunk' we return all the places that it was able to find. 
    return places_list

#now that we have a list of locations, this is calling Google's API again to get the reviews at each location
#this takes in a 'chunk' list of business locations, and it then gets all the reviews for that location
def process_chunk_response(place_chunk, api_key):
    #initialize a list to hold detailed data about each place
    data = []
    #loop over each place in the chunk to retrieve detailed information
    for place in place_chunk:
        #get the place id, a unique identifier for each place
        place_id = place['place_id']
        #retrieve place details using the place id and api key
        details = get_place_details(place_id, api_key)
        #get the list of reviews from the place details
        reviews = details.get('reviews', [])
        #check if there are any reviews available for the place
        if reviews:
            #get up to the first 5 reviews, which are already sorted by newest
            recent_reviews = reviews[:5]
            #calculate the average timestamp of the recent reviews to determine trendiness
            avg_time = sum([review['time'] for review in recent_reviews]) / len(recent_reviews)
            #convert the average timestamp to a datetime object for readability
            avg_datetime = datetime.fromtimestamp(avg_time)
            #append the collected data for this place to the data list
            data.append({
                'name': details.get('name', ''),
                'lat': details['geometry']['location']['lat'],
                'lng': details['geometry']['location']['lng'],
                'avg_review_time': avg_datetime,
                'address': details.get('formatted_address', ''),
                'website': details.get('website', ''),
                'keyword': place.get('keyword', '')
            })
        #delay to respect api rate limits, ensuring compliance with api usage policies
        time.sleep(0.1)
    #return the list of detailed data collected from this chunk
    return data

#this function is used to create the final heatmap of the data based on the average time of the 5 most recent reviews
#the more recent the reviews, the 'hotter' a place is on the map.
def prepare_heatmap_data(data):
    #get the current time as a timestamp to calculate time differences
    current_time = time.time()
    #initialize a list to hold heatmap data points
    heat_data = []
    #loop over each item in the data to calculate heatmap intensities
    for item in data:
        #calculate the time difference between now and the average review time
        time_diff = current_time - item['avg_review_time'].timestamp()
        #check if the time difference is positive to ensure valid calculations
        if time_diff > 0:
            #avoid division by zero by checking the time difference
            intensity = 1 / time_diff if time_diff != 0 else 0
            #append the heatmap data point with latitude, longitude, and calculated intensity
            heat_data.append([item['lat'], item['lng'], intensity])
    #return the prepared heatmap data for visualization
    return heat_data

#this function is desinged to create the final map that is saved to 'trendiness_heatmap.html'
#it combines the information that we collected (place points), details about each point (click on a point it gives details), the grid shape overlay, and the heatmap
#this is a long function, but it creates a folium map featuring all those items listed. Most of the code is defining the colors, shapes, legend features for the folium map
def create_map(heat_data, data, grid_polygons, filename='trendiness_heatmap.html'):
    #center the map around the twin cities by calculating the midpoint of the bounding box
    center_lat = (MIN_LAT + MAX_LAT) / 2
    center_lng = (MIN_LNG + MAX_LNG) / 2
    #create a folium map object centered at the calculated midpoint with an initial zoom level
    m = folium.Map(location=[center_lat, center_lng], zoom_start=11)
    #add heatmap layer to the map using the prepared heatmap data and define visual properties
    HeatMap(heat_data, radius=15, name='Heatmap').add_to(m)
    
    #create a feature group for the grid, allowing grid cells to be managed as a single layer
    grid_group = folium.FeatureGroup(name='Grid', show=True)
    #add grid cells to the map by iterating over each polygon defining a grid cell
    for polygon in grid_polygons:
        #convert polygon coordinates to the format required by folium (latitude, longitude)
        coords = [(lat, lng) for lng, lat in polygon.exterior.coords]
        #create a polygon on the map with specified visual properties and add it to the grid group
        folium.Polygon(
            locations=coords,
            color='grey',
            weight=1,
            fill=False,
            opacity=0.9
        ).add_to(grid_group)
    #add the grid group to the map, making the grid visible as a separate layer
    m.add_child(grid_group)
    
    #create a feature group for each keyword to manage markers by category
    keyword_groups = {}
    #define a list of colors to differentiate keywords visually on the map
    colors = ['red', 'blue', 'green', 'purple', 'orange', 'darkred', 'lightred', 'beige', 'darkblue', 'darkgreen']
    
    #assign a unique color to each keyword for visual distinction in the markers
    keyword_colors = {keyword: colors[i % len(colors)] for i, keyword in enumerate(KEYWORDS)}
    
    #allow toggling of the marker layers for each keyword, enhancing interactivity
    for keyword in KEYWORDS:
        #create a feature group for the current keyword
        fg = folium.FeatureGroup(name=keyword, show=True)
        #store the feature group in the keyword_groups dictionary for easy access
        keyword_groups[keyword] = fg
        #add the feature group to the map, enabling it to be displayed as a separate layer
        m.add_child(fg)
    
    #group data by grid cells and keywords to organize markers efficiently
    from collections import defaultdict
    #initialize a nested defaultdict to categorize places by grid cell and keyword
    grid_data = defaultdict(lambda: defaultdict(list))
    #define grid size based on previously calculated degree deltas
    grid_size = (delta_lat, delta_lng)
    
    #loop over each data item to assign it to the appropriate grid cell and keyword category
    for item in data:
        #calculate the latitude index of the grid cell for the current item
        lat_idx = int((item['lat'] - MIN_LAT) / delta_lat)
        #calculate the longitude index of the grid cell for the current item
        lng_idx = int((item['lng'] - MIN_LNG) / delta_lng)
        #define the grid cell as a tuple of latitude and longitude indices
        grid_cell = (lat_idx, lng_idx)
        #retrieve the keyword associated with the current item
        keyword = item['keyword']
        #append the current item to the appropriate grid cell and keyword category
        grid_data[grid_cell][keyword].append(item)
    
    #for each grid cell and keyword, select top N places based on recency to display as markers
    for grid_cell in grid_data:
        for keyword in grid_data[grid_cell]:
            #sort places by avg_review_time in descending order to prioritize recent reviews
            places = sorted(grid_data[grid_cell][keyword], key=lambda x: x['avg_review_time'], reverse=True)
            #select the top N places from the sorted list
            top_places = places[:TOP_N]
            #add markers for each of the top places to the corresponding keyword feature group
            for place in top_places:
                #create a popup with additional details about the place for user interaction
                popup_html = f"<b>{place['name']}</b><br>"
                popup_html += f"Keyword: {place['keyword']}<br>"
                popup_html += f"Address: {place['address']}<br>"
                if place['website']:
                    popup_html += f"<a href='{place['website']}' target='_blank'>Website</a>"
                #create a folium Popup object with the constructed HTML content
                popup = folium.Popup(popup_html, max_width=300)
                #add a marker to the map at the place's location with the popup and a colored icon
                folium.Marker(
                    location=[place['lat'], place['lng']],
                    popup=popup,
                    icon=folium.Icon(color=keyword_colors[keyword], icon='info-sign'),
                ).add_to(keyword_groups[keyword])
    
    #add layer control to toggle marker and grid layers, enhancing map interactivity
    folium.LayerControl().add_to(m)
    #save the constructed map to an HTML file for viewing in a web browser
    m.save(filename)
    #print a confirmation message indicating where the map has been saved
    print(f"Map has been saved to '{filename}'")



#main file running
#this section is where we call all the functions that were writeen above and perform the process of actually creating the map

#start by getting the size of our grid cells by the user
grid_cell_size = float(input("Enter grid cell size in meters (e.g., 1000): "))

#convert grid cell size from meters to degrees using the helper function, this is used so we can calculate the number of grid cells on the map
avg_latitude = (MIN_LAT + MAX_LAT) / 2
delta_lat, delta_lng = meters_to_degrees(grid_cell_size, avg_latitude)

#now, we are calulating the number of grid cells, so we know how many API calls we may call
#calculate the number of steps for latitude based on the bounding box and grid size
lat_steps = int(math.ceil((MAX_LAT - MIN_LAT) / delta_lat))
#calculate the number of steps for longitude based on the bounding box and grid size
lng_steps = int(math.ceil((MAX_LNG - MIN_LNG) / delta_lng))

#initialize lists to hold latitude and longitude points, which define the grid cell locations
lat_points = []
lng_points = []

#create a list of latitude points
for i in range(lat_steps + 1):
    #calculate the latitude point by adding the delta to the minimum latitude
    lat_point = MIN_LAT + i * delta_lat
    #append the calculated latitude point to the list
    lat_points.append(lat_point)

#create a list of longitude points
for i in range(lng_steps + 1):
    #calculate the longitude point by adding the delta to the minimum longitude
    lng_point = MIN_LNG + i * delta_lng
    #append the calculated longitude point to the list
    lng_points.append(lng_point)

#generate grid coordinates by pairing each latitude point with each longitude point
#this is giving the point at the center of each grid cell so we can call in the google API later
grid_points = []
#loop over each latitude point
for lat in lat_points:
    #loop over each longitude point for the current latitude
    for lng in lng_points:
        #append the coordinate pair as a tuple to the grid points list
        grid_points.append((lat, lng))

#this creates a grid-shaped polygon, this is used for creating the grid overlay on the map later
grid_polygons = []
#loop over the number of latitude steps to create horizontal grid lines
for i in range(lat_steps):
    #loop over the number of longitude steps to create vertical grid lines
    for j in range(lng_steps):
        #get the four corners of the current grid cell based on latitude and longitude indices
        lat1 = lat_points[i]
        lat2 = lat_points[i + 1]
        lng1 = lng_points[j]
        lng2 = lng_points[j + 1]
        #create a polygon for the current grid cell using the four corner coordinates
        polygon = Polygon([
            (lng1, lat1),
            (lng2, lat1),
            (lng2, lat2),
            (lng1, lat2),
            (lng1, lat1)
        ])
        #append the created polygon to the grid_polygons list
        grid_polygons.append(polygon)


#now, we are preparing to make API calls using the google API, but first we want to ensure it won't cost too much money
#set the search radius to half the grid cell size
radius = grid_cell_size / 2

#this is the task inputs (aka what keyword we want to search for, and the locations of all the grid cells)
task_inputs = []
#loop over each keyword to create search tasks for each category
for keyword in KEYWORDS:
    #loop over each grid point to define the search location
    for lat, lng in grid_points:
        #the Google API want the locations in latitude, then longitude format
        location = f"{lat},{lng}"
        #this adds onto the list
        task_inputs.append((keyword, location, radius))

#so now, we have a list of 'tasks' aka all the locations we need to search and each search term (coffee shops & gift shops)

#estimate total api requests to warn the user about how many total API calls it may take
#it will for sure take 1 API call per grid cell * the number of keywords (2) * worst case scenario 20 API calls per grid cell location
#the worst case is that there are 20 businesses within the grid cell, meaning we need to get reviews for all 20 locations.
total_requests = len(task_inputs) * 20
#print the estimated total number of api requests to be made
print(f"Approximate total API requests to be made: {total_requests}")

#prompt the user for confirmation before proceeding, ensuring they are aware of the potential number of requests
confirm = input("Do you want to proceed? (yes/no): ").strip().lower()
#check if the user confirmed to continue
if confirm != 'yes':
    #if the user did not confirm, print a cancellation message and exit the script
    print("Operation cancelled by the user.")
    exit()

#split tasks into chunks to be able to use dask to run multiple things at once

#split the task inputs list into smaller chunks based on the number of workers (8)
chunks = split_list(task_inputs, NUM_WORKERS)


#start dask to manage the parallel computer workload using multiple workers

#create a local dask cluster with the specified number of workers and single thread per worker
cluster = LocalCluster(n_workers=NUM_WORKERS, threads_per_worker=1)
#create a dask client connected to the local cluster, enabling task distribution and management
client = Client(cluster)

#initialize a list to hold chunk tasks, which are units of work to be run at the same time
chunk_tasks = []
#loop over each chunk of tasks
for chunk in chunks:
    #create a delayed task for processing the current chunk using the process_chunk function
    task = delayed(process_chunk)(chunk, API_KEY)
    #append the created delayed task to the chunk_tasks list
    chunk_tasks.append(task)

#tell dask to start running all the API calls to search for place locations and save the list of places into a list of lists
all_places_lists = compute(*chunk_tasks)

#now we combine all the results from the multiple API calls into one big list
all_places = []
#loop over the results from each chunk
for places in all_places_lists:
    #extend the all_places list with the places retrieved from the current chunk
    all_places.extend(places)

#save all retrieved places to a JSON file for record-keeping and potential future use
with open('all_places.json', 'w') as f:
    json.dump(all_places, f)


#remove any duplicate places incase the Google API gave us duplicate locations
unique_places = {}
#loop over each place in the all_places list
for place in all_places:
    #get the place id, which uniquely identifies each place
    place_id = place['place_id']
    #add the place to the unique_places dictionary using place_id as the key
    unique_places[place_id] = place

#this removes all the duplicate places
all_places = list(unique_places.values())
#print the total number of unique places found
print(f"Total unique places found: {len(all_places)}")


#now, we run dask again to get reviews of all the individual businesses
place_chunks = split_list(all_places, NUM_WORKERS)

#initialize a list to hold place review chunks
detail_chunk_tasks = []
#loop over each place chunk
for chunk in place_chunks:
    #create a delayed task for processing the current place chunk using the process_chunk_response function
    task = delayed(process_chunk_response)(chunk, API_KEY)
    #append the created delayed task to the detail_chunk_tasks list
    detail_chunk_tasks.append(task)

#tell dask to start making all the API calls to get review information
results = compute(*detail_chunk_tasks)

#now we combine data from all chunks into a single list containing detailed information about each place
data = []
#loop over the results from each detail chunk
for res in results:
    #extend the data list with the detailed information retrieved from the current chunk
    data.extend(res)

#prepare heatmap data by calculating intensities based on the recency of reviews
heat_data = prepare_heatmap_data(data)

#create and save the map with heatmap, markers, and grid to visualize the collected and processed data
create_map(heat_data, data, grid_polygons)

#convert 'avg_review_time' to string format because that lets us save the data easier
for item in data:
    item['avg_review_time'] = item['avg_review_time'].isoformat()

#save detailed data to a JSON file for future reference
with open('detailed_places_data.json', 'w') as f:
    json.dump(data, f)

#close the dask client so it doesn't run forever
client.close()
#close the dask cluster so it doesn't take up computer power forever
cluster.close()

Enter grid cell size in meters (e.g., 1000):  3000


Approximate total API requests to be made: 13440


Do you want to proceed? (yes/no):  yes


Total unique places found: 1787
Map has been saved to 'trendiness_heatmap.html'
