# Fetch Data

In [1]:
import pandas as pd
from pymongo import MongoClient
import certifi

def fetchdata(collection_name:str) -> pd.DataFrame:
    try:
        client = MongoClient('mongodb+srv://evstation:evstation2024@atlascluster.hkkdu8u.mongodb.net/',  tlsCAFile = certifi.where())
        print("---Connection Successful---")
        database = client['CleanedEVstationData']
        collection = database[collection_name]
        data = collection.find()
        data = pd.DataFrame(list(data))
        data = data.replace('',None)
        data.drop(['_id'], axis=1, inplace=True)

        return data

    except:
        print("Connection Fail")
        
        return None

In [2]:
def convert_coordinate(coordinate):
    return tuple([float(i) for i in coordinate])

In [3]:
data = fetchdata('Allstation')
data['Coordinates'] = data['Coordinates'].apply(lambda x : x.replace("'",'').replace('(','').replace(')','').split(',')[1::-1])
data['Coordinates'] = data['Coordinates'].apply(convert_coordinate)
# exclude MG and etc
data = data[data['Provider'] != 'MG']
data = data[data['Provider'].apply(lambda x : 'etc' not in x.lower())]
data = data.reset_index(drop=True)
data.head()

---Connection Successful---


Unnamed: 0,Name,Description,styleUrl,NameEN,Power,ChargeCoreDescription,Address,Latitude,Longitude,Provider,...,Type2,Type2Count,CCS2,CCS2Count,CHAdeMO,CHAdeMOCount,Hours,Superchargers,SuperchargersCount,Dealership
0,อมารี สนามบินดอนเมือง กรุงเทพ,Name-EN: Amari Don Muang Airport Bangkok<br>Po...,#icon-1899-DB4436-labelson,Amari Don Muang Airport Bangkok,120 kW,"ChargeCore NKR: 2x CCS2, Type 2 (1000V 200A)",333 ถนน เชิดวุฒากาศ แขวง สีกัน เขตดอนเมือง กรุ...,13.920788,100.600708,PTT,...,True,1,True,2,False,,,False,,
1,โรบินสัน สไตล์ สมุทรปราการ,Name-EN: Robinson Lifestyle Samut Prakan<br>Po...,#icon-1899-DB4436-labelson,Robinson Lifestyle Samut Prakan,120 kW,"ChargeCore NKR: 2x CCS2, Type 2 (1000V 200A)",789 หมู่ 2 ถนนสุขุมวิท ตำบลท้ายบ้านใหม่ อำเภอเ...,13.58381,100.60954,PTT,...,True,1,True,2,False,,,False,,
2,ปตท หจก.อธิรุ่งโรจน์,Name-EN: PTT Athirungroaj<br>Power: 120 kW<br>...,#icon-1899-DB4436-labelson,PTT Athirungroaj,120 kW,ChargeCore NKR-ADC: 2x CCS2 (1000V 200A),343 ม.3 ต.แจระแม อ.เมืองอุบลราชธานี อุบลราชธาน...,15.27415,104.80487,PTT,...,False,0,True,2,False,,,False,,
3,ปตท บจ.3เอ็นพีพี,Name-EN: PTT 3 NPP<br>Power: 120 kW<br>Descrip...,#icon-1899-DB4436-labelson,PTT 3 NPP,120 kW,StarCharge Jupiter120: 2x CCS2 (1000V 200A),112/4 ถ.บรมราชชนนี แขวงตลิ่งชัน เขตตลิ่งชัน กร...,13.785001,100.458191,PTT,...,False,0,True,2,False,,,False,,
4,ปตท ตาก (ตากเคทีแอนด์ชันส์),Name-EN: PTT Tak (Tak KT & Song)<br>Power: 120...,#icon-1899-DB4436-labelson,PTT Tak (Tak KT & Song),120 kW,ChargeCore NKR-ADC: 2x CCS2 (1000V 200A),5 ถ.พหลโยธิน ต.เชียงเงิน อ.เมืองตาก ตาก 63000,16.861228,99.135005,PTT,...,False,0,True,2,False,,,False,,


# Plot Station

## Embeded iframe googlemap

In [4]:
from IPython.display import HTML

HTML("""
<iframe src="https://www.google.com/maps/d/u/0/embed?mid=1gpOy1eIchE0_DmaHs3FFhAR5skS3Nak&ehbc=2E312F&noprof=1" width="640" height="480"></iframe>
""")

## Google map (gmplot)

In [4]:
import os
from dotenv import load_dotenv
import webbrowser

# Load environment variables from .env file
load_dotenv()
api_key = os.getenv("API_KEY")

In [6]:
# import gmplot

# # create map object and center it on our event
# center = (13.736717, 100.523186)
# gmap = gmplot.GoogleMapPlotter(center[0], center[1], 6, apikey=api_key)

# attractions = zip(*data['Coordinates'].to_list())

# gmap.scatter(
#     *attractions,
#     color='red',
#     s=60,
#     ew=2,
# )

# gmap.draw('google_map.html')
# webbrowser.open('google_map.html')

## Folium

In [21]:
import folium
import branca
from typing import Any, Union

def icons() -> dict:
    return {'PTT':'icons\\ptt_icon-modified.png',
            'PEA':'icons\\pea_icon-modified.png',
            'EleX':'icons\\elex_icon-modified.png',
            'Altervim':'icons\\altervim_icon-modified.png',
            'EA':'icons\\ea_icon-modified.png',}

def popup_html_marker(data:pd.DataFrame, index:int) -> str:
    return f"""
                    <h3>{data['Name'].iloc[index]}</h3>
                    <p>Provider : {data['Provider'].iloc[index]}</p>
                    <p>Power : {data['Power'].iloc[index]}</p>
                    <p>Type1 : {data['Type1'].iloc[index]}</p>
                    <p>Type1Count : {data['Type1Count'].iloc[index]}</p>
                    <p>Type2 : {data['Type2'].iloc[index]}</p>
                    <p>Type2Count : {data['Type2Count'].iloc[index]}</p>
                    <p>CCS2 : {data['CCS2'].iloc[index]}</p>
                    <p>CCS2Count : {data['CCS2Count'].iloc[index]}</p>
                    <p>CHAdeMO : {data['CHAdeMO'].iloc[index]}</p>
                    <p>CHAdeMOCount : {data['CHAdeMOCount'].iloc[index]}</p>
                    <p>Superchargers : {data['Superchargers'].iloc[index]}</p>
                    <p>SuperchargersCount : {data['SuperchargersCount'].iloc[index]}</p>
                """
def popup_html_polyline(data:pd.DataFrame, index:int) -> str:
    return f"""
                <p>Distance : {data['Distance'].iloc[index]}</p>
                <p>Duration : {data['Duration'].iloc[index]}</p>
            """
def popup_iframe(html: str, iframe_width: int=400, iframe_height: int=200, 
                 iframe_ratio: str='60%', iframe_figsize:Any=None, popup_max_width: Union[int,str]='100%', 
                 popup_show: bool=False, popup_sticky: bool=False, popup_lazy: bool=False) -> folium.Popup:
    iframe = branca.element.IFrame(html=html, width=iframe_width, height=iframe_height, ratio=iframe_ratio, figsize=iframe_figsize)
    popup = folium.Popup(iframe, max_width=popup_max_width, show=popup_show, sticky=popup_sticky, lazy=popup_lazy)
    return popup

In [8]:
import folium
from folium.plugins import Fullscreen, Geocoder, LocateControl, MiniMap, MousePosition
import webbrowser
import branca

# create map of all EV Charger Stations in Thailand
center = (13.736717, 100.523186)

icon_url = icons()
            
fmap1 = folium.Map(location=center, tiles=None, zoom_start=13)

# add Openstreetmap layer
folium.TileLayer('openstreetmap', name='Provider').add_to(fmap1)

for provider in data['Provider'].unique():
    feature_group = folium.FeatureGroup(name=provider).add_to(fmap1)
    temp = data[data['Provider'] == provider].reset_index(drop=True)

    for i in range(len(temp)):
        icon = folium.CustomIcon(
                                    icon_url[provider],
                                    icon_size=(38, 50),
                                    icon_anchor=(3, 50),
                                )
        
        html = popup_html_marker(temp, i)
    
        iframe = branca.element.IFrame(html=html, width=400, height=200)
        popup = folium.Popup(iframe, max_width=500)
        
        folium_marker = folium.Marker(location=temp['Coordinates'].iloc[i], tooltip=temp['Provider'].iloc[i], icon=icon, popup=popup)
        folium_marker.add_to(feature_group)

fmap1.add_child(Geocoder())
fmap1.add_child(folium.LayerControl(position='topright', collapsed=False, autoZIndex=True))
fmap1.add_child(Fullscreen())
fmap1.add_child(MiniMap(position='bottomleft'))
fmap1.add_child(MousePosition())

# save and open map
fmap1.save('root_map\\folium_map.html')
webbrowser.open('root_map\\folium_map.html')

True

In [9]:
import folium
from folium.plugins import GroupedLayerControl, Fullscreen, Geocoder, LocateControl, MiniMap, MousePosition
import webbrowser
import branca

icon_url = icons()

# create charge type feature groups
type_list = ['Type1', 'Type2', 'CCS2', 'CHAdeMO', 'Superchargers']
typefeature_group = {}
for feature in type_list:
    typefeature_group[str(feature)] = folium.FeatureGroup(name=feature, show=False)

 # create map of all EV Charger Stations in Thailand
center = (13.736717, 100.523186)
fmap2 = folium.Map(location=center, tiles=None, zoom_start=13)
# add Openstreetmap layer
folium.TileLayer('openstreetmap', name='Charge Type').add_to(fmap2)

for provider in data['Provider'].unique():
    temp = data[data['Provider'] == provider].reset_index(drop=True)
    chrgetype_temp = temp[['Type1', 'Type2', 'CCS2', 'CHAdeMO', 'Superchargers']].copy()
    types_list = []

    for index, row in chrgetype_temp.iterrows():
        types_found = [col for col in chrgetype_temp.columns if row[col]]
        types_list.append(', '.join(types_found))

    temp['Types'] = types_list
    type_explode = temp.assign(Types=temp['Types'].str.split(',')).explode('Types').reset_index(drop=True)
    
    for i in range(len(type_explode)):
        icon = folium.CustomIcon(
                            icon_url[provider],
                            icon_size=(38, 50),
                            icon_anchor=(3, 50),
                        )
        
        html = popup_html_marker(type_explode, i)
        iframe = branca.element.IFrame(html=html, width=400, height=200)
        popup = folium.Popup(iframe, max_width=500)

        for chargetype in type_list:
            if chargetype in type_explode['Types'].iloc[i]:
                folium_marker = folium.Marker(location=type_explode['Coordinates'].iloc[i], tooltip=type_explode['Provider'].iloc[i],  
                                              icon=icon, popup=popup)
                folium_marker.add_to(typefeature_group[chargetype])

for f in typefeature_group:
    fmap2.add_child(typefeature_group[f])

layer_control_groups = { '-----ChargeType-----': list(typefeature_group.values())}

fmap2.add_child(Geocoder())
fmap2.add_child(folium.LayerControl(position='topright', collapsed=False, autoZIndex=True))
fmap2.add_child(Fullscreen())
fmap2.add_child(MiniMap(position='bottomleft'))
fmap2.add_child(MousePosition())

# save and open map
fmap2.save('root_map\\folium_map.html')
webbrowser.open('root_map\\folium_map.html')

True

In [10]:
fig = branca.element.Figure()

subplot1 = fig.add_subplot(1, 2, 1)
subplot2 = fig.add_subplot(1, 2, 2)

subplot1.add_child(fmap1)
subplot2.add_child(fmap2)

# save and open map
fig.save('root_map\\folium_map.html')
webbrowser.open('root_map\\folium_map.html')

True

# Algorithm

## API load

In [4]:
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()
api_key = os.getenv("API_KEY")

## Plot route path

In [12]:
import folium
import webbrowser
import branca

################### create map object ############################
# create map object and center it on our event
center = (13.736717, 100.523186)
origin_address = (19.229801521495578, 99.53510967653419)
destination_address = (18.803585753872834, 98.9536663008547)

icon_url = icons()
            
fmap = folium.Map(location=origin_address, tiles=None, zoom_start=13)

# add Openstreetmap layer
folium.TileLayer('openstreetmap', name='OpenStreet Map').add_to(fmap)

for provider in data['Provider'].unique():
    feature_group = folium.FeatureGroup(name=provider).add_to(fmap)

    for i in range(len(data)):
        if data['Provider'].iloc[i] == provider:

            icon = folium.CustomIcon(
                                        icon_url[provider],
                                        icon_size=(38, 50),
                                        icon_anchor=(3, 50),
                                    )
            
            html = popup_html_marker(data, i)
           
            iframe = branca.element.IFrame(html=html, width=400, height=200)
            popup = folium.Popup(iframe, max_width=500)
            
            folium_marker = folium.Marker(location=data['Coordinates'].iloc[i], tooltip=data['Provider'].iloc[i], icon=icon, popup=popup)
            folium_marker.add_to(feature_group)

# add legend in top right corner
fmap.add_child(folium.LayerControl(position='topright', collapsed=False, autoZIndex=True))


############################ plot route path ################################
import googlemaps
import polyline

gmaps = googlemaps.Client(key=api_key)

directions_result = gmaps.directions(origin_address, destination_address, mode='driving')
route_polyline = directions_result[0]['legs'][0]['steps']

for step in route_polyline:
    route_coordinates = polyline.decode(step['polyline']['points'])
    folium.PolyLine(locations=route_coordinates, color='blue').add_to(fmap)

# # Save the map to an HTML file
# fmap.save('route_map.html')
# webbrowser.open('route_map.html')

## Find all station near route

### utils

In [7]:
import math

def haversine(lat1:float, lon1:float, lat2:float, lon2:float) -> float:
    R = 6371  # Earth radius in kilometers

    # Convert latitude and longitude from degrees to radians
    lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])

    # Haversine formula
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
    distance = R * c

    return distance

def calculate_circle_points(center_lat:float, center_lon:float, radius:float) -> list[tuple[float,float]]:
    num_points = 360
    circle_points = []

    for i in range(num_points):
        angle = math.radians(i)
        dx = radius * math.cos(angle)
        dy = radius * math.sin(angle)

        # Calculate new latitude and longitude
        new_lat = center_lat + (dy / 6371) * (180 / math.pi)
        new_lon = center_lon + (dx / (6371 * math.cos(math.radians(center_lat)))) * (180 / math.pi)

        circle_points.append((new_lat, new_lon))

    return circle_points

def points_in_circle(center_lat:float, center_lon:float, radius:float, points:list[tuple[float,float]]) -> tuple[list[int],list[tuple[float,float]]]:
    points_in_circle = []
    index_in_circle = []

    for index,point in enumerate(points):
        point_lat, point_lon = point
        distance = haversine(center_lat, center_lon, point_lat, point_lon)

        if distance <= radius:
            points_in_circle.append(point)
            index_in_circle.append(index)

    return index_in_circle, points_in_circle

### algo

In [14]:
import folium
import webbrowser
import branca
import googlemaps
import polyline
from collections import OrderedDict

################### create map object ############################
# create map object and center it on our event
center = (13.736717, 100.523186)
origin_address = (18.803589182080007, 98.95363466653255)
destination_address = (13.746025003904936, 100.53444461061416)
radius_km = 1

icon_url = icons()
            
fmap = folium.Map(location=origin_address, tiles=None, zoom_start=13)
folium.Marker(location=origin_address, icon=folium.Icon("red"), tooltip="origin_address").add_to(fmap)
folium.Marker(location=destination_address, icon=folium.Icon("red"),tooltip="destination_address").add_to(fmap)

# add Openstreetmap layer
folium.TileLayer('openstreetmap', name='OpenStreet Map').add_to(fmap)

gmaps = googlemaps.Client(key=api_key)

directions_result = gmaps.directions(origin_address, destination_address, mode='driving')
route_polyline = directions_result[0]['legs'][0]['steps']
station_index = OrderedDict()

######################## plot route path ###########################
for step in route_polyline:
    route_coordinates = polyline.decode(step['polyline']['points'])

    for center_corordinate in route_coordinates:
        #folium.Circle(location=center_corordinate, color='red', weight=2.5, opacity=1, radius=1000).add_to(fmap)
        points_within_circle = points_in_circle(*center_corordinate, radius_km, data['Coordinates'].to_list())

        for index, point in zip(*points_within_circle):
            station_index[index] = point

    folium.PolyLine(locations=route_coordinates, color='blue').add_to(fmap)

######################## plot marker ###########################
for provider in data['Provider'].unique():
    feature_group = folium.FeatureGroup(name=provider).add_to(fmap)

    for i in station_index.keys():
        if data['Provider'].iloc[i] == provider:

            icon = folium.CustomIcon(
                                        icon_url[provider],
                                        icon_size=(38, 50),
                                        icon_anchor=(3, 50),
                                    )
            
            html = popup_html_marker(data, i)
           
            iframe = branca.element.IFrame(html=html, width=400, height=200)
            popup = folium.Popup(iframe, max_width=500)
            
            folium_marker = folium.Marker(location=data['Coordinates'].iloc[i], tooltip=data['Provider'].iloc[i], icon=icon, popup=popup)
            folium_marker.add_to(feature_group)
# add legend in top right corner
fmap.add_child(folium.LayerControl(position='topright', collapsed=False, autoZIndex=True))

# fmap.save('เชียงใหม่-กรุงเทพ.html')

In [15]:
#export dataframe
data.iloc[list(station_index.keys())].reset_index(drop=True)#.to_excel('เชียงใหม่-กรุงเทพ.xlsx',index=False)

Unnamed: 0,Name,Description,styleUrl,NameEN,Power,ChargeCoreDescription,Address,Latitude,Longitude,Provider,...,Type2,Type2Count,CCS2,CCS2Count,CHAdeMO,CHAdeMOCount,Hours,Superchargers,SuperchargersCount,Dealership
0,One Nimman,Power: 125 kW<br>Description: 2x CCS2 125 kW (...,#icon-1899-DB4436-labelson,,125 kW,"2x CCS2 125 kW (300A), Type2 22 kW","Suthep, Mueang Chiang Mai District, Chiang Mai...",18.800440,98.968963,EleX,...,True,2,True,2,False,0.0,,False,,
1,โลตัส เชียงใหม่คำเที่ยง,Name-EN: Lotus’s Chiang Mai Kham Thiang<br>Pow...,#icon-1899-DB4436-labelson,Lotus’s Chiang Mai Kham Thiang,150 kW,"2x CCS2: 150 kW (1000V 200A), Type2: 11 kW",19 ถ.ตลาดคำเที่ยง ต.ป่าตัน อ.เมือง จ.เชียงใหม่...,18.808397,98.996940,Altervim,...,True,1,True,2,False,,,False,,
2,กรีนปาร์ค คอมมูนิตี้ มอลล์,Name-EN: Green Park Community Mall<br>Power: 1...,#icon-1899-DB4436-labelson,Green Park Community Mall,120 kW,"ChargeCore NKR: 2x CCS2, Type 2 (1000V 200A)","94, ตำบล ฟ้าฮ่าม อำเภอเมืองเชียงใหม่ เชียงใหม่...",18.806840,99.021735,PTT,...,True,1,True,2,False,,,False,,
3,มายฮิป คอนโด 2 (เชียงใหม่),Hours: Peak<br>Power: 100 kW<br>Description: C...,#icon-1899-DB4436-labelson,,100 kW,"CCS2: 2x 100 kW, 200A\nType2: CLOSED\nParking","หนองป่าครั่ง, เมืองเชียงใหม่, เชียงใหม่",18.800042,99.024041,EA,...,True,1,True,2,False,,Peak,False,,
4,บางจาก ซุปเปอร์ไฮเวย์เชียงใหม่,Power: 50 kW<br>Description: 99 ม.1 ถ.ซุปเปอร์...,#icon-1899-DB4436-labelson,,50 kW,,99 ม.1 ถ.ซุปเปอร์ไฮเวย์ ต.หนองป่าครั่ง อ.เมือง...,18.791737,99.024200,PEA,...,False,0,False,0,False,,,False,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,บี ออโต้เฮาส์ (ถ.วิภาวดีรังสิต),Hours: Closed<br>Power: 90 kW<br>Description: ...,#icon-1899-DB4436-labelson,,90 kW,"Type2: 22 kW, CLOSED\nCCS2: 90 kW, 200A, CLOSE...","จอมพล, จตุจักร, กรุงเทพมหานคร",13.807589,100.560448,EA,...,True,1,True,1,False,,Closed,False,,
96,ซัสโก้ สาขา วิภาวดี 1,Hours: 24 Hr<br>Power: 110 kW<br>Description: ...,#icon-1899-DB4436-labelson,,110 kW,"Type2: 5 kW\nCCS2: 2x 110 kW, 200A\nParking","จอมพล, จตุจักร, กรุงเทพมหานคร",13.802918,100.561303,EA,...,True,1,True,2,False,,24 Hr,False,,
97,E-BUS NEX Express Voice TV (สุทธิสาร),Hours: 24 Hr<br>Power: 152 kW<br>Description: ...,#icon-1899-DB4436-labelson,,152 kW,"CCS2: 2x 152 kW, 200A\nParking","สามเสนใน, พญาไท, กรุงเทพมหานคร",13.785779,100.559438,EA,...,False,0,True,2,False,,24 Hr,False,,
98,ปตท วิภาวดี (เดิม-สวัสดิการ ร.1 รอ.),Name-EN: PTT Vibhavadi<br>Power: 120 kW<br>Des...,#icon-1899-DB4436-labelson,PTT Vibhavadi,120 kW,"ChargeCore NKR: 2x CCS2, Type 2 (1000V 200A)",181 ถ.วิภาวดีรังสิต แขวงสามเสนใน เขตพญาไท กรุง...,13.779390,100.556130,PTT,...,True,1,True,2,False,,,False,,


## Optimization

In [20]:
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()
api_key = os.getenv("API_KEY")

In [21]:
import numpy as np
optim_df = data.copy()
optim_df['Power'] = optim_df['Power'].apply(lambda x : float(x.replace(' kW', '')) if x is not None else 60)
optim_df = optim_df.fillna(0)
# optim_df = optim_df.applymap(lambda x: np.where(isinstance(x, bool), 1 if x else 0, x))
optim_df.head()

Unnamed: 0,Name,Description,styleUrl,NameEN,Power,ChargeCoreDescription,Address,Latitude,Longitude,Provider,...,Type2,Type2Count,CCS2,CCS2Count,CHAdeMO,CHAdeMOCount,Hours,Superchargers,SuperchargersCount,Dealership
0,อมารี สนามบินดอนเมือง กรุงเทพ,Name-EN: Amari Don Muang Airport Bangkok<br>Po...,#icon-1899-DB4436-labelson,Amari Don Muang Airport Bangkok,120.0,"ChargeCore NKR: 2x CCS2, Type 2 (1000V 200A)",333 ถนน เชิดวุฒากาศ แขวง สีกัน เขตดอนเมือง กรุ...,13.920788,100.600708,PTT,...,True,1,True,2,False,0.0,0,False,0.0,0
1,โรบินสัน สไตล์ สมุทรปราการ,Name-EN: Robinson Lifestyle Samut Prakan<br>Po...,#icon-1899-DB4436-labelson,Robinson Lifestyle Samut Prakan,120.0,"ChargeCore NKR: 2x CCS2, Type 2 (1000V 200A)",789 หมู่ 2 ถนนสุขุมวิท ตำบลท้ายบ้านใหม่ อำเภอเ...,13.58381,100.60954,PTT,...,True,1,True,2,False,0.0,0,False,0.0,0
2,ปตท หจก.อธิรุ่งโรจน์,Name-EN: PTT Athirungroaj<br>Power: 120 kW<br>...,#icon-1899-DB4436-labelson,PTT Athirungroaj,120.0,ChargeCore NKR-ADC: 2x CCS2 (1000V 200A),343 ม.3 ต.แจระแม อ.เมืองอุบลราชธานี อุบลราชธาน...,15.27415,104.80487,PTT,...,False,0,True,2,False,0.0,0,False,0.0,0
3,ปตท บจ.3เอ็นพีพี,Name-EN: PTT 3 NPP<br>Power: 120 kW<br>Descrip...,#icon-1899-DB4436-labelson,PTT 3 NPP,120.0,StarCharge Jupiter120: 2x CCS2 (1000V 200A),112/4 ถ.บรมราชชนนี แขวงตลิ่งชัน เขตตลิ่งชัน กร...,13.785001,100.458191,PTT,...,False,0,True,2,False,0.0,0,False,0.0,0
4,ปตท ตาก (ตากเคทีแอนด์ชันส์),Name-EN: PTT Tak (Tak KT & Song)<br>Power: 120...,#icon-1899-DB4436-labelson,PTT Tak (Tak KT & Song),120.0,ChargeCore NKR-ADC: 2x CCS2 (1000V 200A),5 ถ.พหลโยธิน ต.เชียงเงิน อ.เมืองตาก ตาก 63000,16.861228,99.135005,PTT,...,False,0,True,2,False,0.0,0,False,0.0,0


In [22]:
import googlemaps
import polyline
from collections import OrderedDict
import math
import networkx as nx
import itertools
from geopy import distance
import matplotlib.pyplot as plt

######################### Util function ########################
def haversine(lat1:float, lon1:float, lat2:float, lon2:float) -> float:
    R = 6371  # Earth radius in kilometers

    # Convert latitude and longitude from degrees to radians
    lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])

    # Haversine formula
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
    distance = R * c

    return distance

def points_in_circle(center_lat:float, center_lon:float, radius:float, points:list[tuple[float,float]]) -> tuple[list[int],list[tuple[float,float]]]:
    points_in_circle = []
    index_in_circle = []

    for index,point in enumerate(points):
        point_lat, point_lon = point
        distance = haversine(center_lat, center_lon, point_lat, point_lon)

        if distance <= radius:
            points_in_circle.append(point)
            index_in_circle.append(index)

    return index_in_circle, points_in_circle

def geocode_place(place_name:str, gmaps = googlemaps.Client(key=api_key)) -> tuple[float,float]:
    try:
        # Geocode the place name to get its latitude and longitude
        geocode_result = gmaps.geocode(place_name)

        if geocode_result:
            location = geocode_result[0]['geometry']['location']
            lat, lng = location['lat'], location['lng']
            return tuple([lat, lng])
        else:
            raise ValueError()
        
    except:
        raise ValueError(f"'{place_name}' Location Not Found. Please check your place name or try to use Coordinate option instead.")

In [23]:
######################### Variables ########################
#user input
origin_address = geocode_place('มหาวิทยาลัยเชียงใหม่')
destination_address = geocode_place('วัดร่องขุ่น')
usable_battype  = ['CCS2','Type2']
battery_capacity = 80 #(kWh)
battery_initial = 10 #(% of battery_capacity)

#predefine
epcr = 0.2 #Electric power consumption rate (kWh/km)
eer = 1/epcr #Electric efficiency rate (km/kWh)
avg_speed = 80 #(km/h)
radius_km = 3 #(km)
reserve_batterry = 10 #% of battery_capacity
cluster_threshold = 20 #cluster threshold
rest_time = 1/3 #rest time
def calculate_charging_time(battery_added:float, station_power:float) -> float:
    return battery_added/(station_power+1) #avoid divide by 0


In [24]:
import folium
######################### Find stations ########################
#find stations within radius
gmaps = googlemaps.Client(key=api_key)
directions_result = gmaps.directions(origin_address, destination_address, mode='driving')
route_polyline = directions_result[0]['legs'][0]['steps']
all_path_point = []
station_index = OrderedDict()

fmap = folium.Map(location=origin_address, tiles=None, zoom_start=13)
# add Openstreetmap layer
folium.TileLayer('openstreetmap', name='OpenStreet Map').add_to(fmap)
folium.Marker(location=origin_address, icon=folium.Icon("red"), tooltip="origin_address").add_to(fmap)
folium.Marker(location=destination_address, icon=folium.Icon("red"),tooltip="destination_address").add_to(fmap)

for step in route_polyline:
    route_coordinates = polyline.decode(step['polyline']['points'])
    all_path_point += route_coordinates

    for center_corordinate in route_coordinates:
        points_within_circle = points_in_circle(*center_corordinate, radius_km, data['Coordinates'].to_list())

        for index, point in zip(*points_within_circle):
            station_index[index] = point

    #plot polyline
    folium.PolyLine(locations=route_coordinates, color='blue').add_to(fmap)
    
#statioun found and filter by usable_battype
station_found = optim_df.iloc[list(station_index.keys())].copy()
#filter by usable battery type
station_found =  station_found[station_found[usable_battype].any(axis=1)]

In [25]:
import numpy as np
from sklearn.cluster import KMeans

if len(station_found) >= cluster_threshold:
    X = np.array(station_found['Coordinates'].to_list())

    inertias = []
    k_options = range(1, cluster_threshold+1)
    for k in k_options:
        # Set n_init explicitly to 10 to suppress the FutureWarning
        kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
        kmeans.fit(X)
        inertias.append(kmeans.inertia_)

    # Calculate the gradient (difference in inertia) to find the elbow
    differences = np.diff(inertias)  # First derivative (difference)
    second_differences = np.diff(differences)  # Second derivative

    # The "elbow" is where the second derivative is maximum (in absolute value)
    elbow_k = np.argmin(second_differences) + 2  # +2 due to index shift from diff()

    print(f'Optimal number of clusters based on the Elbow Method: {elbow_k}')

    # Perform K-means clustering with the optimal number of clusters
    kmeans_final = KMeans(n_clusters=elbow_k, random_state=42, n_init=10)
    kmeans_final.fit(X)

    # Predict the clusters
    clusters = kmeans_final.predict(X)

    # Add the cluster labels to your original dataframe
    station_found['Cluster'] = clusters

    # Now you can retrieve the indices of the dataframe for each cluster
    cluster_indices = {i: station_found[station_found['Cluster'] == i].index.tolist() for i in range(elbow_k)}

    # Assign weights based on your priorities
    weights = {
        'Power': 0.5,  
        'Type1Count': 0.1,
        'Type2Count': 0.1,
        'CCS2Count': 0.1,
        'CHAdeMOCount': 0.1,
        'SuperchargersCount': 0.1,
    }

    # Compute a score for each station
    station_scores = []
    for _, row in station_found.iterrows():
        score = 0
        for key, weight in weights.items():
            # Normalize or standardize the data if necessary
            score += weight * row[key]
        station_scores.append(score)

    # Add scores to your dataframe
    station_found['Score'] = station_scores

    # Now, you can select the station with the highest score within each cluster
    best_stations = station_found.loc[station_found.groupby('Cluster')['Score'].idxmax()]

else:
    best_stations = station_found

In [26]:
######################### Graph construct ########################
#implement by DiGraph
G = nx.DiGraph()

# add origin_address and destination_address node
G.add_node('START', index='START', coordinate=origin_address)
G.add_node('STOP', index='STOP', coordinate=destination_address)

#add  sstation nodes
for index in best_stations.index:
    G.add_node(index, index=index, coordinate=best_stations.loc[index]['Coordinates'], power=best_stations.loc[index]['Power'])

#add edges
allnodes = ['START'] + list(best_stations.index) + ['STOP']
for i in range(len(allnodes)):
    for j in range(i + 1, len(allnodes)):
        G.add_edge(allnodes[i], allnodes[j])

# plt.figure(figsize=(6, 4))
# nx.draw_circular(G, with_labels=True, node_size=2000, node_color="lightblue", font_size=10, arrows=True)
# plt.title("Directed Graph in Circular Layout")
# plt.show()

In [27]:
# def calculate_edge_weight(from_node, to_node, G):
#     global avg_speed, epcr

#     from_node = G.nodes[from_node]
#     to_node = G.nodes[to_node]

#     distance_btw2nodes = distance.distance(from_node['coordinate'], to_node['coordinate']).km

#     # calculate driving time
#     driving_time = distance_btw2nodes/avg_speed

#     # avoid start location where is not station. 
#     if from_node != G.nodes['START']:
#         # calculate charging time
#         required_battery = distance_btw2nodes*epcr
#         required_battery = required_battery + 0.05*required_battery # +5% of error
#         charging_time = required_battery/(from_node['power'] + 1)

#         # total time
#         total_time = driving_time + charging_time
#         return total_time
    
#     else:
#         return driving_time

# # Update weights for all edges
# for from_node, to_node in itertools.permutations(G.nodes, 2):
#     # Check if the edge exists, though in your case, you've added all permutations already
#     if G.has_edge(from_node, to_node):
#         weight = calculate_edge_weight(from_node, to_node, G)
#         G[from_node][to_node]['weight'] = weight

In [28]:
# import random

# def reservoir_sampling(iterator, k):
#     """
#     Perform reservoir sampling from an iterator and store results in a set.
    
#     Parameters:
#     - iterator: an iterable object (like a generator).
#     - k: number of items to sample.
    
#     Returns:
#     - A set of k sampled items, with each item being a tuple instead of a list.
#     """
#     reservoir = set()
#     for i, item in enumerate(iterator):
#         if len(reservoir) != k:
#             item_tuple = tuple(item)  # Convert list to tuple
#             if len(reservoir) < k:
#                 reservoir.add(item_tuple)
#             else:
#                 j = random.randint(0, i)
#                 if j < k:
#                     # Since we're using tuples, we cannot just "replace" an item.
#                     # Instead, we ensure the set has exactly k items by adding new items as needed.
#                     reservoir_list = list(reservoir)
#                     removed_item = reservoir_list[j % len(reservoir)]
#                     reservoir.remove(removed_item)
#                     reservoir.add(item_tuple)
#         else:
#             break
#     return reservoir

In [29]:
def get_consecutive_groups(lst, n):
    for i in range(len(lst) - n + 1):
        yield list(itertools.islice(lst, i, i + n))

history = {'path' : [], 'total time history' : [], 'arrival battery' : [], 'target battery' : [], 'driving time' : [], 'charging time' : []}
min_total_time = float('inf')
solution_path = None
solution_data = None

for path in nx.all_simple_paths(G, source='START', target='STOP'):
    total_time = 0
    arrival_battery_list = []
    target_battery_list = []
    driving_time_list = []
    charging_time_list = []

    if len(path) != 2:
        for start_node, current_node, stop_node in get_consecutive_groups(path, 3):
            # find distance between start_node and stop_node
            start_node = G.nodes[start_node]
            current_node = G.nodes[current_node]
            stop_node = G.nodes[stop_node]
            distance_start_current = distance.distance(start_node['coordinate'], current_node['coordinate']).km
            distance_current_stop = distance.distance(current_node['coordinate'], stop_node['coordinate']).km

            # calculate driving time
            driving_time = distance_start_current/avg_speed

            # avoid start location where is not station.
            if start_node == G.nodes['START']:
                arrival_battery = battery_initial - distance_start_current*epcr
            else:
                arrival_battery = target_battery_list[-1] - distance_start_current*epcr
            
            # avoid start node can't reach next node
            if round(arrival_battery) < round(((reserve_batterry/100)*battery_capacity)):
                total_time = float('inf')
                break

            # if arrival battery current node can't reach the next destination(stop node) by considering distance from current to next node, then charge. otherwise no charge.
            if (arrival_battery-((reserve_batterry/100)*battery_capacity))*eer < distance_current_stop: #charge
                battery_added = (distance_current_stop-((arrival_battery-((reserve_batterry/100)*battery_capacity))*eer))*epcr
                target_battery = arrival_battery + battery_added

                #avoid current node can't reach next 
                if target_battery >= battery_capacity:
                    total_time = float('inf')
                    break
                
                #calculate charging time
                charging_time = calculate_charging_time(battery_added, current_node['power'])
            else: #not charge
                target_battery = arrival_battery

                #calculate charging time = time to rest if visit the station
                charging_time = rest_time #rest time

            driving_time_list.append(driving_time)
            arrival_battery_list.append(arrival_battery)
            target_battery_list.append(target_battery)
            charging_time_list.append(charging_time)

            #accumulating time
            total_time = total_time + driving_time + charging_time
        
        # driving time for last station to destination
        start_node = G.nodes[path[-2]]
        stop_node = G.nodes[path[-1]]
        last_distance = distance.distance(start_node['coordinate'], stop_node['coordinate']).km
        driving_time = last_distance/avg_speed
        total_time += driving_time

    else: # for only start to stop
        start_node = G.nodes[path[0]]
        stop_node = G.nodes[path[-1]]
        distance_start_stop = distance.distance(start_node['coordinate'], stop_node['coordinate']).km

        driving_time = distance_start_current/avg_speed

        arrival_battery = battery_initial - distance_start_stop*epcr

        # avoid start node can't reach next node
        if arrival_battery <= ((reserve_batterry/100)*battery_capacity):
            total_time = float('inf')
            break
        else:
            total_time = driving_time
        
        arrival_battery_list = []
        target_battery_list = []
        driving_time_list = []
        charging_time_list = []

    if total_time < min_total_time:
        min_total_time = total_time
        solution_path = path
        solution_data = {'arrival battery':arrival_battery_list,
                        'target battery':target_battery_list,
                        'driving time':driving_time_list,
                        'charging time':charging_time_list}

    history['path'].append(path)
    history['total time history'].append(total_time)
    history['driving time'].append(driving_time_list)
    history['charging time'].append(charging_time_list)
    history['arrival battery'].append(arrival_battery_list)
    history['target battery'].append(target_battery_list)

In [30]:
min_total_time

1.9587440629777952

In [31]:
solution_path

['START', 1059, 'STOP']

In [32]:
solution_data

{'arrival battery': [9.109364897736178],
 'target battery': [35.638261042373024],
 'driving time': [0.05566469389148891],
 'charging time': [0.17568805393799236]}

In [33]:
import folium
import branca
from typing import Any, Union

def icons() -> dict:
    return {'PTT':'icons\\ptt_icon-modified.png',
            'PEA':'icons\\pea_icon-modified.png',
            'EleX':'icons\\elex_icon-modified.png',
            'Altervim':'icons\\altervim_icon-modified.png',
            'EA':'icons\\ea_icon-modified.png',}

def popup_html_marker(data:pd.DataFrame, index:int) -> str:
    return f"""
                    <h3>{data['Name'].iloc[index]}</h3>
                    <p>Provider : {data['Provider'].iloc[index]}</p>
                    <p>Power : {data['Power'].iloc[index]}</p>
                    <p>Type1 : {data['Type1'].iloc[index]}</p>
                    <p>Type1Count : {data['Type1Count'].iloc[index]}</p>
                    <p>Type2 : {data['Type2'].iloc[index]}</p>
                    <p>Type2Count : {data['Type2Count'].iloc[index]}</p>
                    <p>CCS2 : {data['CCS2'].iloc[index]}</p>
                    <p>CCS2Count : {data['CCS2Count'].iloc[index]}</p>
                    <p>CHAdeMO : {data['CHAdeMO'].iloc[index]}</p>
                    <p>CHAdeMOCount : {data['CHAdeMOCount'].iloc[index]}</p>
                    <p>Superchargers : {data['Superchargers'].iloc[index]}</p>
                    <p>SuperchargersCount : {data['SuperchargersCount'].iloc[index]}</p>
                """
def popup_html_polyline(data:pd.DataFrame, index:int) -> str:
    return f"""
                <p>Distance : {data['Distance'].iloc[index]}</p>
                <p>Duration : {data['Duration'].iloc[index]}</p>
            """
def popup_iframe(html: str, iframe_width: int=400, iframe_height: int=200, 
                 iframe_ratio: str='60%', iframe_figsize:Any=None, popup_max_width: Union[int,str]='100%', 
                 popup_show: bool=False, popup_sticky: bool=False, popup_lazy: bool=False) -> folium.Popup:
    iframe = branca.element.IFrame(html=html, width=iframe_width, height=iframe_height, ratio=iframe_ratio, figsize=iframe_figsize)
    popup = folium.Popup(iframe, max_width=popup_max_width, show=popup_show, sticky=popup_sticky, lazy=popup_lazy)
    return popup

In [34]:
from folium.plugins import Fullscreen, Geocoder, LocateControl, MiniMap, MousePosition
icon_url = icons()
######################## plot marker of stations ###########################
if solution_path:
    for provider in data['Provider'].unique():
        feature_group = folium.FeatureGroup(name=provider).add_to(fmap)

        for i in solution_path[1:-1]:
            if data['Provider'].iloc[i] == provider:

                icon = folium.CustomIcon(
                                            icon_url[provider],
                                            icon_size=(38, 50),
                                            icon_anchor=(3, 50),
                                        )
                html = popup_html_marker(data, i)
                popup = popup_iframe(html)
                    
                folium_marker = folium.Marker(location=data['Coordinates'].iloc[i], tooltip=data['Provider'].iloc[i], icon=icon, popup=popup)
                folium_marker.add_to(feature_group)
# add legend in top right corner
fmap.add_child(Geocoder())
fmap.add_child(folium.LayerControl(position='topright', collapsed=False, autoZIndex=True))
fmap.add_child(Fullscreen())
fmap.add_child(MiniMap(position='bottomleft'))
fmap.add_child(MousePosition())

#convert to branca.element.Figure()  
fig = branca.element.Figure()    
subplot1 = fig.add_subplot(1, 1, 1)
subplot1.add_child(fmap)

In [35]:
# #https://developers.google.com/maps/documentation/places/web-service/text-search#search-for-ev-chargers-with-a-minimum-charging-rate

# import requests

# url = 'https://places.googleapis.com/v1/places:searchText'

# headers = {
#     'Content-Type': 'application/json',
#     'X-Goog-Api-Key': api_key,
#     'X-Goog-FieldMask': '*',
# }

# data = {
#     "textQuery": "ev charging station ไกล้ 17.248190864597724, 99.15470381517895",
#     "maxResultCount": 4,
#     "evOptions": {
#       "minimumChargingRateKw": 10,
#       "connectorTypes": []
#     }
# }

# response = requests.post(url, json=data, headers=headers)
# response.json()
