In [166]:
import mercantile
import requests
import re
from PIL import Image
import io
from io import BytesIO
import numpy as np
import pandas as pd
import xarray as xr
import rasterio
import easyocr
from shapely import Polygon, Point, box
from rasterio.transform import from_bounds
import dask

import locale
import time
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
import hvplot.xarray

In [266]:
class GoogleMiner():
    def __init__(self):
        # Set up headless Chrome options
        chrome_options = Options()
        chrome_options.add_argument("--headless")
        chrome_options.add_argument("--disable-gpu")
        #chrome_options.add_argument("--window-size=1520x3080")
        chrome_options.add_argument("--no-sandbox")
        self.driver = webdriver.Chrome(options=chrome_options)
        self.reader = easyocr.Reader(['en'])
    
    def fetch(self,lat=None,lon=None,radius=None,bbox=None,resolution=1):
        if bbox is None : 
            bbox = Point(lon,lat).buffer(radius/111/1000).bounds
            
        ds,metadata = dask.compute(self.fetch_imagery(bbox,resolution),self.fetch_metadata(bbox,resolution))
        ds.attrs['metadata'] = metadata
        return ds
    
    @dask.delayed()
    def fetch_imagery(self,bbox,resolution):
        ds = download_google_basemap(bbox, resolution)
        return ds
    
    @dask.delayed()
    def fetch_metadata(self,bbox,resolution):
        lon,lat = list(box(*bbox).centroid.coords)[0]
        
        self.driver.get(self.generate_google_earth_url(lat,lon,15))
        time.sleep(1)
        body = self.driver.find_element(By.TAG_NAME, "body")
        body.send_keys(Keys.ESCAPE)
        max_tries=10
        while max_tries>0:
            max_tries-=1
            time.sleep(1)
            png = self.driver.get_screenshot_as_png()
            image = Image.open(io.BytesIO(png))
            self.image = image
            data = np.array(image)
            ds = xr.DataArray(data=data,dims=['y','x','band'],coords={'band':[0,1,2],'y':range(data.shape[0]),'x':range(data.shape[1])})
            data = data[int(data.shape[0]*0.940):,int(data.shape[1]*0.16):int(data.shape[1]*0.48)]
            text = self.reader.readtext(image=data)
            date = None
            confidence = None
            for _ in text:
                try : 
                    if len(re.sub(r'[^0-9/]', '',_[1]))>5:
                        date = str(pd.to_datetime(re.sub(r'[^0-9/]', '',_[1]), format='%m/%d/%Y' if locale.getlocale()[0] == 'en_US' else '%d/%m/%Y').date())
                        confidence = _[2]
                        if len(date)>5:
                            break
                        else : 
                            raise
                except : 
                    date = None
                    confidence = None
                    continue
            if date is not None:
                break
        metadata = {
            'date':{
                'value':date,
                'confidence':confidence
            }
        }
        return metadata
    
    @staticmethod
    def generate_google_earth_url(latitude, longitude, zoom_level):
        """
        Generates a Google Earth URL with the given latitude, longitude, and zoom level.

        Parameters:
        - latitude (float): Latitude of the location.
        - longitude (float): Longitude of the location.
        - zoom_level (float): Zoom level (used to approximate altitude).

        Returns:
        - str: The formatted Google Earth URL.
        """
        # Approximate the altitude based on zoom level (this is a simplified approximation)
        # Note: The exact relationship between zoom level and altitude in Google Earth is complex.
        # Here, altitude is just a rough estimate.
        altitude = 40000000 / (2 ** zoom_level)
        distance = altitude * 0.3  # Adjust distance based on altitude (simplified assumption)

        # Fixed values for tilt, heading, pitch, and roll for simplicity
        tilt = 0
        heading = 0
        pitch = 0
        roll = 0

        url = f"https://earth.google.com/web/@{latitude},{longitude},{altitude:.2f}a,{distance:.2f}d,{tilt}y,{heading}h,{pitch}t,{roll}r"
        return url
    
    @staticmethod
    def download_google_basemap(bbox, resolution):
        """
        Downloads and stitches Google basemap tiles into an xarray.DataArray.

        Parameters:
        bbox (tuple): (west, south, east, north) bounding box in WGS 84 coordinates.
        resolution (float): Desired resolution in meters per pixel.

        Returns:
        xarray.DataArray: Stitched basemap as an xarray.DataArray with georeferencing.
        """

        def resolution_to_zoom(resolution):
            zoom = np.log2(156543.03 / resolution)
            return int(np.ceil(zoom))

        # Determine the zoom level
        zoom = resolution_to_zoom(resolution)

        # Calculate tile bounds using mercantile
        tiles = list(mercantile.tiles(bbox[0], bbox[1], bbox[2], bbox[3], zoom))

        # Download the tiles
        tile_images = []
        for tile in tiles:
            url = f"https://mt1.google.com/vt/lyrs=s&x={tile.x}&y={tile.y}&z={zoom}"
            response = requests.get(url)
            img = Image.open(BytesIO(response.content))
            tile_images.append((img, tile))

        # Determine the size of the output image
        tile_width, tile_height = tile_images[0][0].size
        total_width = tile_width * len(set([tile.x for _, tile in tile_images]))
        total_height = tile_height * len(set([tile.y for _, tile in tile_images]))

        # Create an empty image to paste the tiles into
        mosaic = Image.new('RGB', (total_width, total_height))

        # Determine the overall bounding box
        west, south, east, north = mercantile.xy_bounds(tiles[0])
        for _, tile in tile_images[1:]:
            tile_west, tile_south, tile_east, tile_north = mercantile.xy_bounds(tile)
            west = min(west, tile_west)
            south = min(south, tile_south)
            east = max(east, tile_east)
            north = max(north, tile_north)

        # Paste tiles into the mosaic
        for img, tile in tile_images:
            x_offset = (tile.x - tiles[0].x) * tile_width
            y_offset = (tile.y - tiles[0].y) * tile_height
            mosaic.paste(img, (x_offset, y_offset))

        # Calculate the geotransform
        transform = from_bounds(west, south, east, north, total_width, total_height)

        # Convert the image to a NumPy array
        data = np.array(mosaic)

        # Create an xarray.DataArray
        da = xr.DataArray(
            data.transpose(2, 0, 1),  # Transpose to (bands, y, x) format
            dims=["band", "y", "x"],
            coords={
                "band": [1, 2, 3],
                "y": np.linspace(north, south, total_height),
                "x": np.linspace(west, east, total_width)
            },
            attrs={
                "transform": transform,
                "crs": "EPSG:3857"  # Corrected to Web Mercator CRS
            }
        )
        return da

In [273]:
Point(1,2).buffer(10).bounds

(-9.0, -8.0, 11.0, 12.0)

In [267]:
%%time
miner = GoogleMiner()

Neither CUDA nor MPS are available - defaulting to CPU. Note: This module is much faster with a GPU.


CPU times: user 3.52 s, sys: 1.45 s, total: 4.97 s
Wall time: 10.2 s


In [268]:
%%time
ds = miner.fetch(-13.25049643,35.24904667,radius=500,resolution=0.5)

CPU times: user 14.7 s, sys: 6 s, total: 20.7 s
Wall time: 27.3 s


In [269]:
ds

In [271]:
ds.hvplot(bands='band',x='x',y='y',height=900,width=900,rasterize=True)

In [237]:
ds

In [149]:
def download_google_basemap(bbox, resolution):
    """
    Downloads and stitches Google basemap tiles into an xarray.DataArray.

    Parameters:
    bbox (tuple): (west, south, east, north) bounding box in WGS 84 coordinates.
    resolution (float): Desired resolution in meters per pixel.

    Returns:
    xarray.DataArray: Stitched basemap as an xarray.DataArray with georeferencing.
    """

    def resolution_to_zoom(resolution):
        zoom = np.log2(156543.03 / resolution)
        return int(np.ceil(zoom))

    # Determine the zoom level
    zoom = resolution_to_zoom(resolution)

    # Calculate tile bounds using mercantile
    tiles = list(mercantile.tiles(bbox[0], bbox[1], bbox[2], bbox[3], zoom))

    # Download the tiles
    tile_images = []
    for tile in tiles:
        url = f"https://mt1.google.com/vt/lyrs=s&x={tile.x}&y={tile.y}&z={zoom}"
        response = requests.get(url)
        img = Image.open(BytesIO(response.content))
        tile_images.append((img, tile))

    # Determine the size of the output image
    tile_width, tile_height = tile_images[0][0].size
    total_width = tile_width * len(set([tile.x for _, tile in tile_images]))
    total_height = tile_height * len(set([tile.y for _, tile in tile_images]))

    # Create an empty image to paste the tiles into
    mosaic = Image.new('RGB', (total_width, total_height))

    # Determine the overall bounding box
    west, south, east, north = mercantile.xy_bounds(tiles[0])
    for _, tile in tile_images[1:]:
        tile_west, tile_south, tile_east, tile_north = mercantile.xy_bounds(tile)
        west = min(west, tile_west)
        south = min(south, tile_south)
        east = max(east, tile_east)
        north = max(north, tile_north)

    # Paste tiles into the mosaic
    for img, tile in tile_images:
        x_offset = (tile.x - tiles[0].x) * tile_width
        y_offset = (tile.y - tiles[0].y) * tile_height
        mosaic.paste(img, (x_offset, y_offset))

    # Calculate the geotransform
    transform = from_bounds(west, south, east, north, total_width, total_height)

    # Convert the image to a NumPy array
    data = np.array(mosaic)

    # Create an xarray.DataArray
    da = xr.DataArray(
        data.transpose(2, 0, 1),  # Transpose to (bands, y, x) format
        dims=["band", "y", "x"],
        coords={
            "band": [1, 2, 3],
            "y": np.linspace(north, south, total_height),
            "x": np.linspace(west, east, total_width)
        },
        attrs={
            "transform": transform,
            "crs": "EPSG:3857"  # Corrected to Web Mercator CRS
        }
    )

    return da

In [150]:
def generate_google_earth_url(latitude, longitude, zoom_level):
    """
    Generates a Google Earth URL with the given latitude, longitude, and zoom level.
    
    Parameters:
    - latitude (float): Latitude of the location.
    - longitude (float): Longitude of the location.
    - zoom_level (float): Zoom level (used to approximate altitude).

    Returns:
    - str: The formatted Google Earth URL.
    """
    # Approximate the altitude based on zoom level (this is a simplified approximation)
    # Note: The exact relationship between zoom level and altitude in Google Earth is complex.
    # Here, altitude is just a rough estimate.
    altitude = 40000000 / (2 ** zoom_level)
    distance = altitude * 0.3  # Adjust distance based on altitude (simplified assumption)
    
    # Fixed values for tilt, heading, pitch, and roll for simplicity
    tilt = 0
    heading = 0
    pitch = 0
    roll = 0
    
    url = f"https://earth.google.com/web/@{latitude},{longitude},{altitude:.2f}a,{distance:.2f}d,{tilt}y,{heading}h,{pitch}t,{roll}r"
    return url

In [151]:
lat,lon = 27.58036507,75.80475609
bbox = Point(lon,lat).buffer(500/111/1000).bounds
resolution = 1 # resolution in meters
ds = download_google_basemap(bbox, resolution)

In [211]:
# Set up headless Chrome options
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
#chrome_options.add_argument("--window-size=1520x3080")
chrome_options.add_argument("--no-sandbox")
driver = webdriver.Chrome(options=chrome_options)
reader = easyocr.Reader(['en'])

Neither CUDA nor MPS are available - defaulting to CPU. Note: This module is much faster with a GPU.


In [172]:
list(box(*bbox).centroid.coords)[0]

(75.80475609, 27.58036507)

In [164]:

driver.get(generate_google_earth_url(lat,lon,14))
time.sleep(1)
body = driver.find_element(By.TAG_NAME, "body")
body.send_keys(Keys.ESCAPE)
max_tries=5
while max_tries>0:
    max_tries-=1
    time.sleep(1)
    png = driver.get_screenshot_as_png()
    image = Image.open(io.BytesIO(png))
    data = np.array(image)
    ds = xr.DataArray(data=data,dims=['y','x','band'],coords={'band':[0,1,2],'y':range(data.shape[0]),'x':range(data.shape[1])})

    data = data[int(data.shape[0]*0.940):,int(data.shape[1]*0.16):int(data.shape[1]*0.48)]
    Image.fromarray(data)
    
    text = reader.readtext(image=data)
    date = None
    confidence = None
    for _ in text:
        try : 
            date = re.sub(r'[^0-9/]', '',_[1])
            if len(date)>5:
                date = str(pd.to_datetime(re.sub(r'[^0-9/]', '',text[1][-2]), format='%m/%d/%Y' if locale.getlocale()[0] == 'en_US' else '%d/%m/%Y').date())
                confidence = _[2]
                break
        except : 
            date = None
            confidence = None
            continue
    if date is not None:
        break

In [38]:
ds.hvplot(bands='band',x='x',y='y',height=900,width=900,rasterize=True)