# Part 2 ... searching gas stations and fetching data 

about them and about the peak hours.


## Approach

As a first step, I use publicly available data about the capacity use of the gas stations on Google maps (peak times). To do this, I fetching all gas station place IDs (and further values) of all districts using the google place API. With this IDs, I visit the Google Maps website and read out the data for the peak hours for each gas station. I fill a Postgresql database with all the data obtained. The evaluations can then be done later via the database.

![FlowChart](https://github.com/cbrennig/peak-hours-and-high-power-charging/blob/main/jupyter_notebooks/images/flowchart.png)


In [1]:
import re
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from bs4 import BeautifulSoup
import requests
import time
import psycopg2
from credentials import Credentials

## Database class

As in part one, the connection to the database is established when a database instance is called. The necessary credentials are in the file ``credentials.py``. The database class is there to 

1. return district names (which have not yet been entered)

2. record data from the Place API call (place id, address, name, type, geolocation data etc)

3. map address data and types to the corresponding n:m tables

4. record previously unrecorded types into the appropriate table and process their IDs for subsequent records

5. record peak times into the database.


I will not use docstrings here, as the tasks of the individual functions are easy to see.

In [2]:
class GasStationDB(Credentials):
    
    def __init__(self):
        super().__init__()
        self.conn = psycopg2.connect(dbname=self.DBNAME, user=self.DBUSER, password=self.DBPASS)
        self.cur = self.conn.cursor() 
        
    def _get_station_id(self, place):
        query = "SELECT * FROM portfolio.stations WHERE place_id = '{}';".format(place['place_id'])
        self.cur.execute(query)
        result = self.cur.fetchone()
        if result:
            return result[0]
        return None
    
    def _get_zip_id(self, zip_code):
        self.cur.execute("SELECT * FROM portfolio.zip_codes WHERE zip_code = '{}';".format(zip_code))
        try:
            return self.cur.fetchone()[0]
        except Exception as e:
            return False
        
    def _get_types_id(self, name):
        self.cur.execute("SELECT * FROM portfolio.types WHERE name = '{}';".format(name))
        result = self.cur.fetchone()
        try:
            return result[0]
        except Exception as e:
            return False
    
    def _populate_station_data(self, place):
        cmd = """INSERT INTO portfolio.stations
                        (name, address, place_id, global_plus_code, location_lat, location_lng,
                         location_viewport_northeast_lat, location_viewport_northeast_lng,
                         location_viewport_southwest_lat, location_viewport_southwest_lng,
                         business_status) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s );                              
              """
        attr_list = ['name', 'address', 'place_id', 'global_plus_code', 'location_lat', 'location_lng',
                     'location_viewport_northeast_lat', 'location_viewport_northeast_lng', 
                     'location_viewport_southwest_lat', 'location_viewport_southwest_lng',
                     'business_status']
        values_list = []
        for attr in attr_list:
            values_list.append(place[attr]) 

        self.cur.executemany(cmd, [values_list])
        self.conn.commit()
    
    def _populate_guest_quantity_data(self, data):             
        cmd = """INSERT INTO portfolio.guest_quantities
                        (station_id, weekday, hr00, hr01, hr02, hr03, hr04, hr05, hr06, hr07, hr08, hr09, 
                        hr10, hr11, hr12, hr13, hr14, hr15, hr16, hr17, hr18, hr19, hr20, hr21, hr22, hr23
                        ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
                        %s, %s, %s, %s, %s, %s, %s, %s, %s, %s );              
              """
        self.cur.executemany(cmd, data)
        self.conn.commit()

    def _populate_station_zip_table(self, data):
        cmd = """INSERT INTO portfolio.station_zip
                        (zds_station_id, zds_zip_id)
                        VALUES ( %s, %s );
              """
        self.cur.execute(cmd, data)
        self.conn.commit()
        
    def _populate_types_table(self, data):
        cmd = """INSERT INTO portfolio.types
                        (name) VALUES ( %s );
              """
        self.cur.execute(cmd, [data])
        self.conn.commit()
        
    def _populate_station_type_table(self, data):
        cmd = """INSERT INTO portfolio.type_place
                        (ts_types_id, ts_stations_id) VALUES ( %s, %s );
              """
        self.cur.executemany(cmd, data)
        self.conn.commit()

## Fetching Maps class

This class contains the functions to grab the maps web pages using Selenium webdriver.

In [3]:
class FetchGMaps:
    
    def __init__(self):
        """
        initialise the webdriver
        """
        self.html_string = ''
        self.base_url = 'https://www.google.com/maps/place/?q=place_id:'
        chrome_options = Options()
        chrome_options.add_argument('--headless')
        chrome_options.add_argument('--no-sandbox')
        chrome_options.add_argument('--disable-dev-shm-usage')
        self.driver = webdriver.Chrome(options=chrome_options)

    def accessing_google(self):
        """
        establishing a connection to google maps
        accept cookies if not already done
        :return: str message
        """
        any_place_id = 'ChIJvTkT0OWApEcRHkO3-fc99Ks'
        url = self.base_url + any_place_id
        self.driver.get(url)
        try:
            cookies_accept_button = '/html/body/c-wiz/div/div/div/div[2]/div[1]/div[4]/form/div/div/button'
            self.driver.find_element_by_xpath(cookies_accept_button).click()
            time.sleep(1)
            return 'connected'
        except Exception as e:
            time.sleep(1)
            return 'already connected '+self.driver.title
        
    def browse_site_by_place_id(self, place_id):
        """
        getting the full website content
        :param place_id: previously obtained id for the location to be called up
        :return: True or False
        """
        url = self.base_url + place_id
        try:
            self.driver.get(url)
            self.content = self.driver.page_source
            return True
        except:
            return False
        
    def get_html_string(self):
        """
        making a single string of the website content in order to simplify
        matching the data string inside the js code
        :return: True or False
        """
        try:
            soup = BeautifulSoup(self.content, 'html.parser')
            html_content_list = str(soup).split('\n')
            self.html_string = ' '.join(html_content_list)
            return True
        except:
            return False

## Place class

This class contains the functions to fetch the place details. For this purpose, the google place api is used. A brief description of each function can be found in the individual docstring.

In [4]:
class PlacesAPI:
    
    def __init__(self, search_term):
        """
        load the API key and create the request string for one session 
        with up to two next page loads
        :param search_term: str
        """
        with open('/home/offboss/credentials/places_api_key.txt', 'r') as f:
            API_KEY = f.read()
        self.API_KEY = '&key=' + API_KEY
        
        url = 'https://maps.googleapis.com/maps/api/place/'
        find_place_request = 'textsearch/'
        output_type = 'json?query='
        search_term = search_term.replace(' ', '%20')
        place_type = 'gas_station'
        self.location_request = url  + find_place_request + output_type + search_term + self.API_KEY
        self.next_page_token = ''
        
    def query_results(self):
        """
        request the search results via place api
        and load the next page token if one is delivered
        :return: result part of the api response
        """
        result = requests.get(self.location_request + self.next_page_token)
        if result.status_code != 200:
            return False
        try:
            self.next_page_token = "&pagetoken=" + result.json()['next_page_token']
        except Exception as e:
            self.next_page_token = False
            
        return result.json()['results']

    def get_places_data(self, result):
        """
        read all relevant data from the json response
        all error exceptions where put into the code while testing
        :param result: api response result (json)
        :return: places - list of dictionaries to populate the stations table
        """
        places = [] 
        for place in result:
            p = {'name': place['name']} 
            p['address'] = place['formatted_address'] 
            p['place_id'] = place['place_id'] 
            p['reference'] = place['reference'] 
            try:
                p['global_plus_code'] = place['plus_code']['global_code'] 
            except:
                p['global_plus_code'] = None
            p['location_lat'] = place['geometry']['location']['lat'] 
            p['location_lng'] = place['geometry']['location']['lng']
            p['location_viewport_northeast_lat'] = place['geometry']['viewport']['northeast']['lat']
            p['location_viewport_northeast_lng'] = place['geometry']['viewport']['northeast']['lng']
            p['location_viewport_southwest_lat'] = place['geometry']['viewport']['southwest']['lat']
            p['location_viewport_southwest_lng'] = place['geometry']['viewport']['southwest']['lng']
            p['types'] = place['types']
            try:
                p['business_status'] = place['business_status']
            except:
                p['business_status'] = None
            places.append(p)
        return places       

## Data processing class

The functions of this class are there to extract the data from Maps web pages. On the one hand, the web page source code is parsed in such a way that the decisive places with the data are exposed. Afterwards, the data is prepared in such a way that it can be loaded into the corresponding table. In addition to this primary job, other smaller data cleaning tasks are also executed.

In [5]:
class CustomiseData:
    
    def __init__(self):
        self.data_string = ''
        self.data = []
    
    def _no_data(self):
        """
        creates a dataset with None values for the websites 
        from which no peak hours were returned
        :return: no_data - list of 24times [hour, None]
        """
        no_data = [[None for i in range(24)] for x in range(7)]
        i=1
        for item in no_data:
            item.insert(0, i)
            i+=1
        return no_data 
    
    def _add_station_id(self, station_id, data):
        """
        add the stations id to the peak hours dataset
        :param station_id: int
        :param data: peak hours dataset
        :return: data
        """
        for item in data:
            item.insert(0, station_id)
        return data
    
    def _get_zip_code(self, place):
        """
        get the zip code out of the address from the place api response
        :param place: dict dataset of a station
        :return: str of 5 digits
        """
        return re.findall(r'[0-9]{5}', place['address'])[0]
    
    def get_data_string(self, html_string):
        """
        find data string 
        it is within the script part of the website
        :param html_string: single line str
        :return: True or False
        """
        self.data_string = ''
        start_pattern = r'\[+[0-9],\[+[0-9]'
        mid_pattern = r'(.*?)'
        end_pattern = r'\]+,[0-9]\]+,[0-9]'
        mask = start_pattern + mid_pattern + end_pattern
        try:
            self.data_string = '7,[[4' + re.search(mask, html_string).group(1) + ']],0]],6'
            return True
        except:
            return False
        
    def get_data(self):
        """
        extracting the data from the data string
        and prepare a dataset for populating the 
        guest_quantities table
        """
        self.data = []
        
        def _cleanup(values):
            value_set = []
            for value in values:
                val = re.sub(r'\[+', '', value).split(',')
                if len(val)== 3:
                    del val[2]
                value_set.append(val)
            return value_set
        
        def _change_dtype(dlist):
            return [[int(item[0]), int(item[1])] for item in dlist]
        
        def _append_missing_hours_data():
            hours = {0:0, 1:0, 2:0, 3:0, 4:0, 5:0, 6:0, 7:0, 8:0, 9:0, 10:0, 11:0, 
                     12:0, 13:0, 14:0, 15:0, 16:0, 17:0, 18:0, 19:0, 20:0, 21:0, 22:0, 23:0}
            new_list = []
            for day in self.data:
                for pair in day[1]:
                    hours[pair[0]] = pair[1]
                sub_list = [[k, v] for k, v in hours.items()]
                new_day = [day[0]]
                new_day.append(sub_list)
                new_list.append(new_day)
            self.data = new_list
        
        def _sorted_data():
            sorted_data = []
            for item in self.data:
                item[1].sort()
                dataset = [x[1] for x in item[1]]
                dataset.insert(0, item[0])
                sorted_data.append(dataset)
            return sorted_data
                
        split_pattern = r']+,[0-9]],\['
        datasets = re.split(split_pattern, self.data_string)
        
        match_pattern = r'\[+[0-9]+,[0-9]+,'
        for dataset in datasets:
            values = _cleanup(re.findall(match_pattern, str(dataset[1:])))
            self.data.append([int(dataset[0]), _change_dtype(values)])
        _append_missing_hours_data()
        self.data = _sorted_data()
    

## Main class

The main class is responsible for maintaining the order of the individual tasks. As shown in the graphic above, a search query is first executed with the given search term. The answers returned by the API (search results) are successively transferred to the database and the respective peak time data is obtained. For each individual location, the Maps page is accessed and a check is made to see whether the peak time data is available or not. If they are available, they are processed accordingly. If not, a no-data record is created for this location. As soon as all search results have been processed, the next page of the API is called up or a new search query is started. Google place API delivers a maximum of 20 search results per call. These can be extended by up to two additional "pages". For this purpose, a so-called next-page-token is supplied. However, each page cost a request credit. 
All places that have already been included in the database are not recorded any further. I have tried to avoid as many errors as possible; at the same time I have tried to keep the number of exceptions low in order not to store too many no-data responses.

In [6]:
class MainPile:
    def __init__(self):
        """
        initialise further classes
        """
        self.FGM = FetchGMaps()
        print(self.FGM.accessing_google())
        self.GDB = GasStationDB()
        self.CD = CustomiseData()
        self.counter = 0 

    def fetch_places(self, search_term='Tankstelle in Berlin'):
        """
        main function to fetch places via the place API
        :param search_term: str
        """
        PA = PlacesAPI(search_term)
        while True:
            if PA.next_page_token is not False:
                results = PA.query_results()
                if results is not False:
                    places = PA.get_places_data(results)
                    
                    for place in places:
                        self.counter += 1
                        
                        # check whether the station is recorded or not in table 'stations'
                        if not self.GDB._get_station_id(place):
                        
                            # populate station data
                            self.GDB._populate_station_data(place)
                            
                            # populate n:m table with zip_station data
                            zip_code = self.CD._get_zip_code(place)
                            station_id = self.GDB._get_station_id(place)
                            zip_id = self.GDB._get_zip_id(zip_code)
                            self.GDB._populate_station_zip_table([station_id, zip_id])
                            
                            # populate types data
                            type_ids = []
                            for type_name in place['types']:
                                # check if the type name already exists
                                if not self.GDB._get_types_id(type_name):
                                    # populate type_name
                                    self.GDB._populate_types_table(type_name) # <======= ??? 
                                else:
                                    #print('already exists in db')
                                    pass
                                #getting type_name id and collect in list
                                type_ids.append(self.GDB._get_types_id(type_name))
                            #print('type ids: ', type_ids)
                                
                            # populate station_type table with
                            # station_id and list of type ids
                            station_types_data = [[type_id, station_id] for type_id in type_ids]
                            self.GDB._populate_station_type_table(station_types_data)
                                
                            # start processing guest quantity data
                            self.fetch_quantities(place)
                        else:
                            print('station {} {} already populated'.format(place['name'], place['address']))
                else:
                    print('no more places found within this search')
            else:
                break
            
    def fetch_quantities(self, place):
        """
        main function to fetch peak hours of a place/station
        :param place: dict dataset of a station
        """
        print('')
        print('=========================== {} ============================'.format(self.counter))
        print(place['name'], place['address'])
        print('-----------------------------------------------------------')

        if self.FGM.browse_site_by_place_id(place['place_id']):
            if self.FGM.get_html_string():
                if self.CD.get_data_string(self.FGM.html_string): 
                    self.CD.get_data()
                else:
                    print('get_data_string failed')
                    self.CD.data = self.CD._no_data() 
                                        
                # add new station_id to data
                self.CD.data = self.CD._add_station_id(self.GDB._get_station_id(place), self.CD.data)
                
                # populate DB
                self.GDB._populate_guest_quantity_data(self.CD.data) 
            
            else:
                print('get_html_string failed')
        else:
            print('browse_site_by_place_id failed') 
        print()
        time.sleep(1)

#### connect database

In [7]:
GDB = GasStationDB()

## Feed the search

### Identify already recorded districts

In [8]:
query = """SELECT dist_name FROM portfolio.districts 
                 WHERE district_id IN (
                      SELECT district_id FROM portfolio.zip_codes 
                            WHERE zip_id IN (
                                 SELECT zds_zip_id FROM portfolio.station_zip
                                  ));
        """
GDB.cur.execute(query)
results = GDB.cur.fetchall()
district_names = [district_name[0] for district_name in results]

### Get not recorded district names

In [9]:
GDB = GasStationDB()
if len(district_names) > 0:
    query = """SELECT dist_name FROM portfolio.districts WHERE dist_name NOT IN (
               SELECT dist_name FROM portfolio.districts 
                 WHERE district_id IN (
                      SELECT district_id FROM portfolio.zip_codes 
                            WHERE zip_id IN (
                                 SELECT zds_zip_id FROM portfolio.station_zip
                                  )));
        """
else:
    query = "SELECT dist_name FROM portfolio.districts;"
GDB.cur.execute(query)
table = GDB.cur.fetchall()

## Start fetching data and populating DB
### Creating search term and start searching by district

In [10]:
for district in table[:50]:
    search_term = "Tankstelle in {}".format(district[0])
    print('searching: {}'.format(search_term))
    MP = MainPile()
    MP.fetch_places(search_term)
    MP.FGM.driver.close()

searching: Tankstelle in Adlershof
connected

HEM Tankstelle Adlergestell 305, 12489 Berlin, Germany
-----------------------------------------------------------


JET Tankstelle Glienicker Weg 105, 12489 Berlin, Germany
-----------------------------------------------------------


Elan-Tankstelle Adlergestell 179, 12489 Berlin, Germany
-----------------------------------------------------------


AGIP Adlergestell Adlergestell 289, 12489 Berlin, Germany
-----------------------------------------------------------


Autogas Adlershof Glienicker Weg 105-107, 12489 Berlin, Germany
-----------------------------------------------------------
get_data_string failed


Uwe Werkstatt Rudower Ch 44, 12489 Berlin, Germany
-----------------------------------------------------------
get_data_string failed


Ladesäule be-emobil 12489 Berlin, Germany
-----------------------------------------------------------
get_data_string failed


MietFirma - MietHänger | MietStation AGIP-Tankstelle Adlershof Adler

KeyboardInterrupt: 

#### close db connection

In [None]:
GDB.conn.close()

## Conclusion

Most of the data were automatically collected by the predefined algorithm, i.e. on the basis of the district names that had already been stored in the database. In addition to the search queries by district name, manually created lists (motorway numbers and cities with more than 90,000 inhabitants) were added to the search query and processed. I did this to ensure that the desired locations on motorways and near metropolitan areas were included in every case.

Besides some error exceptions due to missing or incorrect values in the place api and some non-fetchable maps pages, the data collection went pretty well. Unfortunately, the quota for the API calls was not large enough to achieve even tighter coverage. This is also the reason for the additional search lists mentioned earlier. 


### Some personal reflexions
- ever stop the chromedriver, otherwise it leads to a buffer overflow
- next time use a pipeline model