# Imports

In [30]:
import firebase_admin
from firebase_admin import firestore
from firebase_admin import credentials

import logging
import traceback
from pprint import pformat
from typing import Dict, Optional, Union, List, Tuple

from geopy.distance import geodesic

import numpy as np
from collections import Counter
from sklearn.cluster import DBSCAN

## Parameters

In [2]:
creds_path: str = "./keys/serviceAccountKey.json"
max_distance_km: float = 5.0

## Setup

In [3]:
logging.basicConfig(
    level=logging.INFO, format="%(levelname)s: %(asctime)s - %(message)s"
)

logger = logging.getLogger(__name__)

## Authentication

In [4]:
try:
    cred = credentials.Certificate(creds_path)
    logger.info("Firebase centantial read successfully")
    firebase_admin.initialize_app(cred)
    logger.info("Firebase initialized successfully")
    db = firestore.client()
    logger.info("Firestore client created")

except Exception as e:
    traceback_str = traceback.format_exc()
    logger.error("An error occurred: %s", str(e))
    logger.debug(f"Traceback: {traceback_str}")

INFO: 2024-01-20 21:18:26,800 - Firebase centantial read successfully


INFO: 2024-01-20 21:18:26,802 - Firebase initialized successfully
INFO: 2024-01-20 21:18:26,803 - Firestore client created


## Custom Error

In [5]:
class MarkerNotFoundError(Exception):
    pass

## Utils

In [6]:
def getAllMarkers(firebase_clent):
    markers = []

    try:
        markers_get = firebase_clent.collection("Markers").get()
        logger.info("Successfully retrived all Markers")

    except Exception as e:
        traceback_str = traceback.format_exc()
        logger.error("An error occurred: %s", str(e))
        logger.debug(f"Traceback: {traceback_str}")

    for m in markers_get: # type: ignore
        marker = {
            "marker-id": str(m.id),
            "marker_cord": (m.to_dict()["lat"], m.to_dict()["long"]),
        }
        markers.append(marker)

    logger.info("Successfully got all Markers into python dict")
    return markers

In [7]:
def find_markers_within_distance(
    markers: list, target_coord: tuple, max_distance_km: float = 5.0
):
    nearby_markers = []

    try:
        for marker in markers:
            marker_coord = marker["marker_cord"]
            distance = geodesic(target_coord, marker_coord).kilometers

            if distance <= max_distance_km:
                nearby_markers.append(marker)
        logger.debug(
            "Successfully looped over all markers and selected markers within distance"
        )
    except Exception as e:
        traceback_str = traceback.format_exc()
        logger.error("An error occurred: %s", str(e))
        logger.debug(f"Traceback: {traceback_str}")

    logger.info(f"Successfully got nearby markers: {len(nearby_markers)}")
    return nearby_markers

In [8]:
def cluster_markers_fn(
    markers, 
    epsilon=1.0, 
    min_samples=2
    ):
    """
    Cluster markers by coordinates using DBSCAN.

    Parameters:
    - markers: List of dictionaries, each containing 'marker-id' and 'marker_cord'.
    - epsilon: Maximum distance between two samples for one to be considered in the neighborhood of the other.
    - min_samples: The number of samples in a neighborhood for a point to be considered a core point.

    Returns:
    - A modified list of markers with an additional 'cluster_label' field.
    """
    # Extract coordinates from markers
    coordinates = np.array([marker["marker_cord"] for marker in markers])

    # Perform DBSCAN clustering
    dbscan = DBSCAN(eps=epsilon, min_samples=min_samples)
    labels = dbscan.fit_predict(coordinates)

    # Add cluster labels to the markers
    for i, marker in enumerate(markers):
        marker["cluster_label"] = labels[i]

    return markers

In [9]:
def get_cluster_label(
    marker_data, 
    target_marker_id
    ):
    for marker in marker_data:
        if marker['marker-id'] == target_marker_id:
            return marker['cluster_label']
    raise MarkerNotFoundError(f"Marker ID {target_marker_id} not found in the list.")

In [10]:
def upadteClusterIDfirestore(firebaseClient, clusters: list[dict]):
    marker_ids = [marker["marker-id"] for marker in clusters]
    try:
        for key in marker_ids:
            ref = firebaseClient.collection("Markers").document(key)
            ref.update(
                {
                    "cluster": int(get_cluster_label(clusters, key)),
                }
            )
        logger.info("Added updated cluster ids")
    except Exception as e:
        traceback_str = traceback.format_exc()
        logger.error("An error occurred: %s", str(e))
        logger.debug(f"Traceback: {traceback_str}")

In [None]:
def find_nearest_marker(markers, target_lat, target_lon, max_distance_km=5.0):
    target_location = (target_lat, target_lon)
    nearest_marker = None
    min_distance = float('inf')

    for marker in markers:
        marker_location = marker['marker_cord']
        distance = geodesic(target_location, marker_location).kilometers

        if distance < min_distance and distance <= max_distance_km:
            min_distance = distance
            nearest_marker = marker

    return nearest_marker

## Program

In [11]:
markers_data = getAllMarkers(db)
logger.info(f"Total markers retrived {len(markers_data)}")
logger.debug(pformat(markers_data))

INFO: 2024-01-20 21:18:36,807 - Successfully retrived all Markers
INFO: 2024-01-20 21:18:36,810 - Successfully got all Markers into python dict
INFO: 2024-01-20 21:18:36,811 - Total markers retrived 33


In [12]:
clustered_markers = cluster_markers_fn(markers_data, epsilon=0.1, min_samples=1)

In [13]:
clustered_markers

[{'marker-id': '0YNsm8Rsgrh0c4tptwjX',
  'marker_cord': (26.1422046, 91.6618141),
  'cluster_label': 0},
 {'marker-id': '1a8kOzDShApAvsyuMa7O',
  'marker_cord': (26.523327432461524, 93.88401668518782),
  'cluster_label': 1},
 {'marker-id': '1cUwJ8PU8iWF0MHxMz1x',
  'marker_cord': (26.1362523933542, 91.72798164188862),
  'cluster_label': 0},
 {'marker-id': '4L3HDVw5oOtNDXYa0fwy',
  'marker_cord': (26.1060139519425, 91.59353524446487),
  'cluster_label': 0},
 {'marker-id': '4ZKfLBFpkEkaTA177C0p',
  'marker_cord': (19.111409, 72.8714556),
  'cluster_label': 2},
 {'marker-id': '4xt5JP6o13EVSNOSJOhM',
  'marker_cord': (19.1300361, 72.9150783),
  'cluster_label': 2},
 {'marker-id': '5EHzNh3wxnSrG7FK5AMR',
  'marker_cord': (19.0920725, 72.8566487),
  'cluster_label': 2},
 {'marker-id': '7ywZrL4ZYTwX6neMh4b9',
  'marker_cord': (26.140288349728852, 91.76433496177197),
  'cluster_label': 0},
 {'marker-id': 'ESWCt8LykurahdgDz7Nh',
  'marker_cord': (24.83848107736667, 92.83123590052128),
  'cluste

In [14]:
upadteClusterIDfirestore(firebaseClient=db, clusters=clustered_markers)

INFO: 2024-01-20 21:19:26,576 - Added updated cluster ids


## testing

In [29]:
import folium
from folium.plugins import MarkerCluster

# Create a folium map centered around India
india_map = folium.Map(location=[20.5937, 78.9629], zoom_start=5)

# Create a MarkerCluster layer
marker_cluster = MarkerCluster().add_to(india_map)

# Add markers to the map
for marker in clusters:
    folium.Marker(
        location=marker["marker_cord"],
        popup=f"Marker ID: {marker['marker-id']}, Cluster: {marker['cluster_label']}",
    ).add_to(marker_cluster)

# Save the map to an HTML file or display it
india_map

## class

In [140]:
dbscan = DBSCAN(eps=1.0, min_samples=2)

class statsNearYou:
    def __init__(
            self, 
            firebase_clent, 
            max_distance_km: float,
        ) -> None:
        """
        Initialize the StatsNearYou instance.

        Parameters:
        - firebase_client : Firebase client instance.
        - max_distance_km (float): Maximum distance in kilometers to consider for finding the nearest marker.
        """
        self.firebaseClient = firebase_clent
        self.getAllMarkers()
        self.max_distance_km = max_distance_km
        self.cluster_markers_fn()
        logger.info("Creadted statsNearYou object.")
        
    def get_cluster_label(
        self,
        target_marker_id: str
    ) -> int:
        """
        Get the cluster label for a given marker ID.

        Parameters:
        - target_marker_id (str): Marker ID.

        Returns:
        - int: Cluster label.
        """
        for marker in self.markers:
            if marker['marker-id'] == target_marker_id:
                logger.info("Successful got cluster label for given marker.")
                return marker['cluster_label']
        raise MarkerNotFoundError(f"Marker ID {target_marker_id} not found in the list.")

    def getAllMarkers(
            self, 
        ) -> None:
        """
        Retrieve all markers from Firebase and store them in the instance.
        """
        markers = []
        try:
            markers_get = self.firebaseClient.collection("Markers").get()
            logger.info("Successfully retrived all Markers")

        except Exception as e:
            traceback_str = traceback.format_exc()
            logger.error("An error occurred: %s", str(e))
            logger.debug(f"Traceback: {traceback_str}")

        for m in markers_get: # type: ignore
            marker = {
                "marker-id": m.to_dict()["id"],
                "marker_cord": (m.to_dict()["lat"], m.to_dict()["long"]),
            }
            markers.append(marker)
        logger.info("Successfully got all Markers into python dict")
        self.markers = markers

    def cluster_markers_fn(
            self
        )-> None:
        """
        Cluster markers by coordinates using DBSCAN.
        """
        coordinates = np.array([marker["marker_cord"] for marker in self.markers])
        # Perform DBSCAN clustering
        labels = dbscan.fit_predict(coordinates)
        logger.info("Clustered makeres successfully.")
        
        # Add cluster labels to the markers
        for i, marker in enumerate(self.markers):
            marker["cluster_label"] = labels[i]
        logger.info("Updated Clusters with cluster ids.")
          
    def getClusteredMarkers(
        self
    )-> list[dict]:
        return self.markers

    def upadteClusterIDfirestore(
        self
        ) -> None:
        """
        Update cluster IDs in Firestore for all markers.
        """
        marker_ids = [marker["marker-id"] for marker in self.markers]
        try:
            for key in marker_ids:
                ref = self.firebaseClient.collection("Markers").document(key)
                ref.update(
                    {
                        "cluster": int(get_cluster_label(self.markers, key)),
                    }
                )
            logger.info("Added updated cluster ids")
        except Exception as e:
            traceback_str = traceback.format_exc()
            logger.error("An error occurred: %s", str(e))
            logger.debug(f"Traceback: {traceback_str}")

    def find_nearest_marker(
        self, 
        target_lat: float,
        target_lon: float
        ) -> Optional[Dict[str, Union[str, Tuple[float, float], int]]]:
        """
        Find the nearest marker within the specified maximum distance.

        Parameters:
        - target_lat (float): Target latitude.
        - target_lon (float): Target longitude.

        Returns:
        - Optional[Dict[str, Union[str, Tuple[float, float], int]]]: Nearest marker information or None if not found.
        """
        target_location = (target_lat, target_lon)
        nearest_marker = None
        min_distance = float('inf')

        for marker in self.markers:
            marker_location = marker['marker_cord']
            distance = geodesic(target_location, marker_location).kilometers

            if distance < min_distance and distance <= self.max_distance_km :
                min_distance = distance
                nearest_marker = marker
        logger.info("Returned nearest markers successfully.")
        return nearest_marker
    
    def get_total_markers_in_each_cluster(self):
        cluster_counts = Counter(marker['cluster_label'] for marker in self.markers)
        return dict(cluster_counts)
    
    def calculate_percentile(self, dictionary, key):
        values = list(dictionary.values())
        values_below = sum(value < dictionary[key] for value in values)
        total_values = len(values)
        
        percentile = (values_below / total_values) * 100
        
        return percentile
    
    def rate_clusters(self):
        cluster_counts = self.get_total_markers_in_each_cluster()
        total_markers = len(self.markers)
        # Calculate percentile for each cluster
        cluster_percentiles = {key: self.calculate_percentile(cluster_counts, key) for key in cluster_counts}
        logger.info("Successfully calullated percentiles")
        # Assign star ratings based on percentiles
        cluster_ratings = {}
        for cluster_label, percentile in cluster_percentiles.items():
            if percentile >= 80:
                rating = 1
            elif 60 <= percentile < 80:
                rating = 2
            elif 40 <= percentile < 60:
                rating = 3
            elif 20 <= percentile < 40:
                rating = 4
            elif 0 <= percentile < 20:
                rating = 4
            cluster_ratings[cluster_label] = rating # type:ignore

        self.cluster_ratings = cluster_ratings
        logger.info("Successfully Assigned ratings")
        # print(type(cluster_ratings))

        try:
            doc_ref = self.firebaseClient.collection("Stats")
            for key, value in cluster_ratings.items():
                doc_ref.document(f'{key}').set({"marker_star": value})
            logger.info("Successfully Updated to FireStoere")
        except Exception as e:
            traceback_str = traceback.format_exc()
            logger.error("An error occurred: %s", str(e))
            logger.debug(f"Traceback: {traceback_str}")
            
    def statsByCoord(
        self,
        lat: float,
        lon: float
    ) -> Optional[Dict[str, Union[str, Tuple[float, float], int]]]:
        """
        Retrieve statistics for a given set of coordinates.

        Parameters:
        - lat (float): Latitude.
        - lon (float): Longitude.

        Returns:
        - Optional[Dict[str, Union[str, Tuple[float, float], int]]]: Statistics for the given coordinates or None if not found.
        """
        marker = self.find_nearest_marker(target_lat=lat,target_lon=lon)
        try:
            data = self.firebaseClient.collection('Stats').document(f"{marker['cluster_label']}").get() # type:ignore
            logger.info("successfully got Statf for given coord")
            return data.to_dict()
        except Exception as e:
            traceback_str = traceback.format_exc()
            logger.error("An error occurred: %s", str(e))
            logger.debug(f"Traceback: {traceback_str}")

In [141]:
stats = statsNearYou(
    firebase_clent=db,
    max_distance_km=5.0
    )

INFO: 2024-01-20 23:23:55,997 - Successfully retrived all Markers
INFO: 2024-01-20 23:23:56,004 - Successfully got all Markers into python dict
INFO: 2024-01-20 23:23:56,009 - Clustered makeres successfully.
INFO: 2024-01-20 23:23:56,010 - Updated Clusters with cluster ids.
INFO: 2024-01-20 23:23:56,010 - Creadted statsNearYou object.


In [142]:
stats.statsByCoord(lat=26.120600, lon=91.652300)

INFO: 2024-01-20 23:23:56,237 - Returned nearest markers successfully.
INFO: 2024-01-20 23:23:56,378 - successfully got Statf for given coord


{'marker_star': 2}