# ***Map Foundation***

***Install Package***

In [None]:
!pip install leafmap

In [None]:
!pip install segment-geospatial groundingdino-py leafmap localtileserver
!pip install contextily

***Create an Interactive Map***

In [None]:
import leafmap
from samgeo import tms_to_geotiff
from samgeo.text_sam import LangSAM
import leafmap.leafmap as leafmap

# create map
m = leafmap.Map(center=[ 42.374443, -71.116943], zoom=18, height = "800px")
m.add_basemap("SATELLITE")
m

***Get bounding coords***

In [None]:

#Zoom and move the map to select the area of interest. Use the draw tools to draw a polygon or rectangle on the map

bbox = m.user_roi_bounds()
if bbox is None:
    bbox = [-122.4611, 37.7636, -122.4488, 37.7713]

bbbox = bbox
print(bbox)



***Download and show imagery***

In [None]:
# Download satellite image
image = "Image.tif"
tms_to_geotiff(output=image, bbox=bbox, zoom=19, source="Satellite", overwrite=True)

# Display the downloaded satellite image on the map
m.layers[-1].visible = False
m.add_raster(image, layer_name="Image")
m

# ***StreetView Collection***

***Download Street Views***

In [None]:
import requests
import numpy as np
import os
from io import BytesIO
from PIL import Image
import matplotlib.pyplot as plt

In [None]:
def generate_random_points(lat_min, lat_max, lon_min, lon_max, num_points):
    """Generate random latitude and longitude points within a bounding box."""
    latitudes = np.random.uniform(low=lat_min, high=lat_max, size=num_points)
    longitudes = np.random.uniform(low=lon_min, high=lon_max, size=num_points)
    return latitudes, longitudes

def is_street_view_available(lat, lon, api_key):
    """Check if Street View imagery is available at a given location using the Metadata API."""
    url = "https://maps.googleapis.com/maps/api/streetview/metadata"
    params = {'location': f'{lat},{lon}', 'key': api_key}
    response = requests.get(url, params=params)
    return response.json().get('status') == 'OK'

def download_street_view_images(latitudes, longitudes, api_key, num_images, save_path='images'):
    """Download Google Street View images for given coordinates up to a specified number."""
    base_url = "https://maps.googleapis.com/maps/api/streetview"

     # Clear out the existing images in the save path at the start of each run
    if os.path.exists(save_path):
        for file in os.listdir(save_path):
            os.remove(os.path.join(save_path, file))
    else:
        os.makedirs(save_path)

    downloaded_count = 0
    for lat, lon in zip(latitudes, longitudes):
        if downloaded_count >= num_images:
            break  # Ensure no more downloads once the target is reached
        if is_street_view_available(lat, lon, api_key):
            params = {
                'size': '640x640',
                'location': f'{lat},{lon}',
                'key': api_key
            }
            response = requests.get(base_url, params=params)
            if response.status_code == 200:
                file_path = os.path.join(save_path, f"{lat}_{lon}.jpg")
                with open(file_path, 'wb') as f:
                    f.write(response.content)
                downloaded_count += 1  # Increment only after a successful download
                print(f"Downloaded {downloaded_count} of {num_images}: {file_path}")
            else:
                print(f"Failed to download image at {lat}, {lon}, Status Code: {response.status_code}")
        else:
            print(f"No Street View available at {lat}, {lon}")



def display_images(folder_path='images'):
    """Display images from a specified directory."""
    image_files = [os.path.join(folder_path, file) for file in os.listdir(folder_path) if file.endswith('.jpg')]
    cols = 5
    rows = len(image_files) // cols + (len(image_files) % cols > 0)
    plt.figure(figsize=(15, 3 * rows))
    for i, image_file in enumerate(image_files):
        img = Image.open(image_file)
        plt.subplot(rows, cols, i + 1)
        plt.imshow(img)
        plt.axis('off')
    plt.tight_layout()
    plt.show()

def main(api_key, lat_min, lat_max, lon_min, lon_max, num_images):
    latitudes, longitudes = generate_random_points(lat_min, lat_max, lon_min, lon_max, num_images * 5)  # Increase number to ensure sufficient availability
    download_street_view_images(latitudes, longitudes, api_key, num_images=num_images)
    display_images()

# User input for the number of images
num_images = int(input("type in how many images you need"))

# Define bounding box [lat_min, lat_max, lon_min, lon_max]
bbox = [bbbox[1],bbbox[3],bbbox[0],bbbox[2]]

# API Key - replace with your actual API key
api_key = 'AIzaSyBfcDwI-86zy3jZ00uwnsuxAwUjD8J0kCw'

# Execute the main function with the bounding box and number of images
main(api_key, *bbox, num_images)


***Download image folder***

In [None]:
!zip -r output.zip images/

In [None]:
from google.colab import files
files.download('output.zip')

# ***SNS Collection***

***Download SNS data by location***

***Get search area from bbox***

In [None]:
def calculate_center_and_radius(north, south, east, west):
    # Calculate the center latitude and longitude
    center_lat = (north + south) / 2
    center_lon = (east + west) / 2

    # Approximate the radius (assuming Earth's curvature is negligible in small areas)
    initial_radius = max(abs(north - south), abs(east - west)) / 2 * 111000  # converting degrees to meters approximately

    return f"{center_lat},{center_lon}", str(int(initial_radius))
    print(f"{center_lat},{center_lon}, {initial_radius}")

***Check if comments are valid***

In [None]:
import requests

def get_place_details(api_key, place_id, max_comments):
    url = "https://maps.googleapis.com/maps/api/place/details/json"
    params = {
        "key": api_key,
        "place_id": place_id,
        "fields": "reviews"  # Specify only to fetch reviews
    }
    response = requests.get(url, params=params)
    details = response.json().get('result', {})

    reviews = details.get('reviews', [])
    if reviews:
        # Limit the number of reviews and format the output
        limited_reviews = reviews[:max_comments]
        return [review['text'] for review in limited_reviews]
    else:
        return "No comments found"

# Adjust the main function or the loop where you call get_place_details

***Collect comments and adjust the size of commentset***

In [None]:
import requests

def search_places(api_key, location, initial_radius, max_results, max_radius=10000):
    url = "https://maps.googleapis.com/maps/api/place/nearbysearch/json"
    radius = initial_radius
    places = []
    unique_place_ids = set()

    while len(places) < max_results and radius <= max_radius:
        params = {
            "key": api_key,
            "location": location,
            "radius": radius
        }

        response = requests.get(url, params=params)
        if response.status_code != 200:
            print(f"Failed to fetch places: {response.status_code} {response.text}")
            break

        results = response.json()
        new_places = results.get('results', [])
        print(f"Search radius: {radius} meters, found {len(new_places)} new places.")

        # Add unique places
        for place in new_places:
            if place['id'] not in unique_place_ids:
                unique_place_ids.add(place['id'])
                places.append(place)

        # Increase radius for next iteration
        radius += 1000

    print(f"Total unique places found: {len(places)}")
    return places[:max_results]


In [None]:
import requests

def search_places(api_key, location, max_results):
    url = "https://maps.googleapis.com/maps/api/place/nearbysearch/json"
    params = {
        "key": api_key,
        "location": location,  # latitude and longitude as a string
        "rankby": "distance"  # Sort places strictly by distance
    }

    places = []
    while len(places) < max_results:
        # Make the API request
        response = requests.get(url, params=params)
        if response.status_code != 200:
            print(f"Failed to fetch places: {response.status_code} {response.text}")
            break

        results = response.json()
        new_places = results.get('results', [])
        places.extend(new_places)

        # Check for a next page token
        next_page_token = results.get('next_page_token')
        if not next_page_token or len(places) >= max_results:
            break

        # Delay required by Google before next page token can be used
        import time
        time.sleep(2)
        params['pagetoken'] = next_page_token

    return places[:max_results]

In [None]:


def main(api_key, north, south, east, west, max_results, max_comments):
    location, radius = calculate_center_and_radius(north, south, east, west)
    places = search_places(api_key, location, max_results)

    all_reviews = []  # Initialize a list to store all reviews

    if not places:
        print("No places found within the specified area.")
        return

    for place in places[:max_results]:  # Ensures you don't process more places than necessary
        reviews = get_place_details(api_key, place['place_id'], max_comments)
        if reviews == "No comments found":
            print("No comments found for", place['name'])
        else:
            print(f"Comments for {place['name']}:")
            for review in reviews:
                print(review)
                all_reviews.append((place['name'], review))  # Save reviews into the list

    return all_reviews  # Return the list of all reviews


# Example use case
api_key = 'AIzaSyBfcDwI-86zy3jZ00uwnsuxAwUjD8J0kCw'

north = bbox[3]
south = bbox[1]
east = bbox[2]
west = bbox[0]

initial_radius = max(abs(north - south), abs(east - west)) / 2 * 111000

all_reviews = main(api_key, north, south, east, west, max_results=120, max_comments=10)

#Print the reviews
for review in all_reviews:
    print( review)


# ***NLTK Processing***

*** / Generate symonyms and fine tune LLM***

In [None]:
!pip install nltk gensim

Tokenize Prompts

In [None]:
import nltk
from nltk.tokenize import word_tokenize

nltk.download('punkt')  # Download necessary datasets

def tokenize_text(text):
    return word_tokenize(text)

input_text = "find a indoor place that has a lot of windows, or it can be outdoor too. It should be close to the the park and university, relaxing and conforting vibe, people can rest, and it should also be clean. "
tokens = tokenize_text(input_text)
print(tokens)


***Build Synonyms***

***Use Word2Vec***

***if input outter data***

In [None]:
import nltk
from nltk.tokenize import word_tokenize

# Sample comments in a list of tuples
comments = []

def normalize_text(text):
    replacements = {
        "’": "'",
        "‘": "'",
        "“": '"',
        "”": '"',
        "\u2013": "-",  # en-dash
        "\u2014": "-",  # em-dash
    }
    for old, new in replacements.items():
        text = text.replace(old, new)
    return text

# Convert each tuple to a single string, normalize, and tokenize
tokenized_sentences = [word_tokenize(normalize_text(comment)) for _, comment in comments]


# Print the tokenized sentences
print(tokenized_sentences)


In [None]:
# Convert each tuple to a string and tokenize
tokenized_sentences = [word_tokenize("".join(sentence).lower()) for sentence in all_reviews]

print(tokenized_sentences)


Train Word2Vec

In [None]:
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
import string
import nltk

nltk.download('punkt')  # For tokenization
nltk.download('stopwords')  # For stopwords

#Clen data(punctuations and stopwords)
def clean_tokens(sentences):
    # Load English stopwords
    stop_words = set(stopwords.words('english'))

    # Combine stopwords with punctuation
    stop_words.update(string.punctuation)

    # Filter out stopwords and punctuation from each tokenized sentence
    cleaned_sentences = [[word for word in sentence if word not in stop_words] for sentence in sentences]

    return cleaned_sentences

# Convert each review to a list of words, then clean it
tokenized_sentences = [word_tokenize(" ".join(sentence).lower()) for sentence in all_reviews]
cleaned_sentences = clean_tokens(tokenized_sentences)

print(cleaned_sentences)

from gensim.models import Word2Vec

# Training the model
model = Word2Vec(cleaned_sentences, vector_size=100, window=10, min_count=0.1, workers=10)

# Save the model for later use
model.save("word2vec_model.model")


***Find closely-related words***

Find top synonyms of the sentence

In [None]:
def find_top_related_words(tokens, model):
    related_words = {}
    for token in tokens:
        try:
            # Retrieve the top three most similar words for each token
            similar_words = model.wv.most_similar(token, topn=3)
            related_words[token] = similar_words
        except KeyError:
            # Handle the case where the token is not in the model's vocabulary
            continue
    return related_words

def remove_stopwords_and_punctuation(tokenized_sentence):
    # Load stopwords for English (you can change the language as needed)
    stop_words = set(stopwords.words('english'))
    # Include all punctuation in the set of characters to remove
    stop_words.update(string.punctuation)

    # Filter out any words that are in the list of stopwords or are punctuations
    cleaned_sentence = [word for word in tokenized_sentence if word not in stop_words]

    return cleaned_sentence

# Clean the sentence
cleaned_input = remove_stopwords_and_punctuation(tokens)
print(cleaned_input)


# Load the trained model
model = Word2Vec.load("word2vec_model.model")

related_words = find_top_related_words(cleaned_input, model)
print(related_words)



***Gather all useful words into a list***

***Remove prompt if not in the comments***

In [None]:
selected_words = []

flattened_valid_words = [word for sublist in cleaned_sentences for word in sublist]

print(flattened_valid_words)
for key, values in related_words.items():
    # Check if the key is in the valid_words list
    if key in flattened_valid_words:
        selected_words.append(key)
        print(key) # Append the key only if it's in valid_words
    for value in values:
        selected_words.append(value[0])  # Append the first element of each tuple

print(selected_words)


***General wordnet result***

In [None]:
from nltk.corpus import wordnet
import nltk

# Download necessary WordNet data
nltk.download('wordnet')

def find_synonyms(words):
    all_synonyms = {}
    for word in words:
        synonyms = set()
        for syn in wordnet.synsets(word):
            for lemma in syn.lemmas():
                synonyms.add(lemma.name().replace('_', ' '))  # Replace underscores for multi-word synonyms
        all_synonyms[word] = list(synonyms)
    return all_synonyms

# Example usage with multiple words
words = cleaned_input
synonyms = find_synonyms(words)
for word, syn_list in synonyms.items():
    print(f"Synonyms for {word}: {syn_list}")



# ***Street View Segmentation***

***Download GroundingDino***

In [None]:
import os
HOME = os.getcwd()
print(HOME)

In [None]:
%cd {HOME}
!git clone https://github.com/IDEA-Research/GroundingDINO.git
%cd {HOME}/GroundingDINO
!pip install -q -e .
!pip install -q roboflow

In [None]:
import os

CONFIG_PATH = os.path.join(HOME, "GroundingDINO/groundingdino/config/GroundingDINO_SwinT_OGC.py")
print(CONFIG_PATH, "; exist:", os.path.isfile(CONFIG_PATH))

***Download weights***

In [None]:
%cd {HOME}
!mkdir {HOME}/weights
%cd {HOME}/weights

!wget -q https://github.com/IDEA-Research/GroundingDINO/releases/download/v0.1.0-alpha/groundingdino_swint_ogc.pth

In [None]:
import os

WEIGHTS_NAME = "groundingdino_swint_ogc.pth"
WEIGHTS_PATH = os.path.join(HOME, "weights", WEIGHTS_NAME)
print(WEIGHTS_PATH, "; exist:", os.path.isfile(WEIGHTS_PATH))

***Load GroundingDino data***

In [None]:
%cd {HOME}/GroundingDINO

from groundingdino.util.inference import load_model, load_image, predict, annotate
WEIGHTS_PATH = os.path.join(HOME, "weights", WEIGHTS_NAME)
model = load_model(CONFIG_PATH, WEIGHTS_PATH)

***Segmentation starts here***

In [None]:
selected_words = ['find', 'lines', 'mess', 'shocked', 'place','san', 'service', 'close', 'italy', 'official', 'nights', 'park', 'late', 'beautiful', 'really', 'relaxing', 'haircut', 'limited', 'vibe', 'judge',  'terre', 'people', 'rest', 'stars', 'highlighted', 'daily',  'terre', 'clean', 'classic', 'winding']

In [None]:
import os
from PIL import Image
import matplotlib.pyplot as plt

HOME = os.getcwd()
data_folder = "/content/images"
output_folder = os.path.join(HOME, 'output_images')  # Directory to save output images
os.makedirs(output_folder, exist_ok=True)  # Create the directory if it doesn't exist

# Constants
TEXT_PROMPT = ' '.join(selected_words)  # Ensure 'selected_words' is defined
BOX_THRESHOLD = 0.1
TEXT_THRESHOLD = 0.1

# Get a list of image files
image_files = [f for f in os.listdir(data_folder) if f.endswith(('.jpg', '.jpeg', '.png'))]

# List to hold the output data
output_data = []

# Process each image
for image_name in image_files:
    image_path = os.path.join(data_folder, image_name)
    image_source, image = load_image(image_path)  # Define load_image function or adjust accordingly if not defined

    boxes, logits, phrases = predict(
        model=model,
        image=image,
        caption=TEXT_PROMPT,
        box_threshold=BOX_THRESHOLD,
        text_threshold=TEXT_THRESHOLD
    )

    annotated_frame = annotate(image_source=image_source, boxes=boxes, logits=logits, phrases=phrases)  # Define annotate function or adjust accordingly

    # Store the image name and the confidence score of each bounding box
    image_data = [image_name] + [logit.max().item() for logit in logits]
    output_data.append(image_data)

    # Display and save the annotated image
    plt.figure(figsize=(8, 8))
    plt.imshow(annotated_frame)  # Adjust this line if sv.plot_image does not work directly
    plt.show()
    save_path = os.path.join(output_folder, f'annotated_{image_name}')  # Path to save the image
    plt.savefig(save_path)  # Save the figure to file
    plt.close()  # Close the figure to free memory

    # Save the dataset to a text file outside of the output_images folder
    dataset_path = os.path.join(HOME, 'dataset.txt')  # Adjust this path if you want it saved elsewhere
    with open(dataset_path, 'w') as file:
      for data in output_data:
        file.write(f"{data}\n")  # Writing each entry in a new line

# Print the dataset
for data in output_data:
    print(data)


In [None]:
!zip -r output.zip images/

#***Aerial View Segment***

***Convert tiff to jpg***

In [None]:
from PIL import Image

# Define the full path to the .tif file
tif_file_path = "/content/Image.tif"

# Define the full path where the .jpg file should be saved
jpg_file_path = "/content/NewImage.jpg"

# Load the TIFF image
image = Image.open(tif_file_path)

# Convert the image to JPEG and save it
image.convert('RGB').save(jpg_file_path, 'JPEG', quality=90)  # Adjust the quality as needed

print(f"Converted image saved at: {jpg_file_path}")



***Segmentation starts***

In [None]:
import os
from PIL import Image
import matplotlib.pyplot as plt

# Constants
TEXT_PROMPT = ' '.join(selected_words)
BOX_THRESHOLD = 0.1
TEXT_THRESHOLD = 0.1

# Directly specify the path to the image
image_path = "/content/NewImage.jpg"  # Update this to the exact path of your image

# Load the image
image = Image.open(image_path)
image_source = image.copy()  # Preserve the original image for annotation

# Dummy prediction function
def predict(model, image, caption, box_threshold, text_threshold):
    # Example output for demonstration
    boxes = [(20, 20, 70, 70)]  # Example box coordinates (x, y, x+w, y+h)
    logits = [4.5]  # Confidence scores
    phrases = ['example phrase']  # Detected phrases
    return boxes, logits, phrases

# Dummy annotation function modified to not show text
def annotate(image_source, boxes, logits, phrases):
    plt.figure(figsize=(8, 8))
    plt.imshow(image_source)
    for box in boxes:
        plt.gca().add_patch(plt.Rectangle((box[0], box[1]), box[2]-box[0], box[3]-box[1], fill=False, edgecolor='red', linewidth=2))
    plt.axis('off')
    plt.show()
    return image_source

# Run prediction
boxes, logits, phrases = predict(
    model=None,  # Define your model or replace with an actual model call
    image=image,
    caption=TEXT_PROMPT,
    box_threshold=BOX_THRESHOLD,
    text_threshold=TEXT_THRESHOLD
)

# Annotate the image
annotated_frame = annotate(image_source=image_source, boxes=boxes, logits=logits, phrases=phrases)

# Prepare dataset output
image_data = [os.path.basename(image_path)] + [logit for logit in logits]
dataset = [image_data]

# Output the dataset
print(dataset)


In [None]:
from samgeo.text_sam import LangSAM

In [None]:
sam = LangSAM()

In [None]:
text_prompt = ' '.join(selected_words)

In [None]:
sam.predict(image, text_prompt, box_threshold=0.1, text_threshold=0.1)

In [None]:
# Prepare an empty list to hold your dataset entries
dataset = []

# Extract data from each detected object
for box, logit in zip(boxes, logits):
    # Extract bounding box coordinates
    x1, y1, x2, y2 = box
    # Extract confidence score
    confidence_score = logit

    # Create a dictionary for each detected object
    detection_data = {
        "Coordinates": (x1, y1, x2, y2),
        "Confidence Score": confidence_score
    }

    # Append the dictionary to the dataset
    dataset.append(detection_data)

# Print or output the dataset
for data in dataset:
    print(data)


In [None]:
sam.show_anns(
    cmap="Greens",
    box_color="red",
    title="Automatic Segmentation",
    blend=True,
)

# ***Mapping***

In [None]:
!pip install matplotlib numpy basemap

***Get image coords and score***

In [None]:
# Path to the text file
file_path = '/content/segSV.txt'  # Update this to the path of your text file

# Read the data from the text file
data = []
with open(file_path, 'r') as file:
    for line in file:
        # Clean up the line to remove any leading/trailing whitespace and newline characters
        # Then split the line by commas
        line_cleaned = line.strip().replace('[', '').replace(']', '').replace("'", "")
        data.append(line_cleaned.split(','))

# Process each entry to extract coordinates and calculate average confidence
formatted_data = []

for entry in data:
    # Extract filename and remove the '.jpg' suffix to get the raw coordinates string
    filename = entry[0].strip()
    coords = filename[:-4]  # Remove '.jpg'
    latitude, longitude = coords.split('_')
    latitude = round(float(latitude), 4)  # Round latitude to four decimal places
    longitude = round(float(longitude), 4)  # Round longitude to four decimal places

    # Convert each string in the list to a float before calculating the average
    # Start from entry[1] to skip the filename
    confidence_scores = [float(score.strip()) for score in entry[1:]]
    average_confidence = sum(confidence_scores) / len(confidence_scores)

    # Round the average confidence to three decimal places
    average_confidence = round(average_confidence, 3)

    # Create a tuple and add it to the list
    formatted_data.append((latitude, longitude, average_confidence))

# Print the formatted data
for item in formatted_data:
    print(item)


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.basemap import Basemap

coordinates = formatted_data  # This should be a list of tuples (lat, lon, conf)

# Create a new map plot
fig, ax = plt.subplots()

# Define the bounding box for the map
lat_min, lat_max = bbox[3], bbox[1]
lon_min, lon_max = bbox[2], bbox[0]

# Create the map
m = Basemap(projection='merc', llcrnrlat=lat_min, urcrnrlat=lat_max, llcrnrlon=lon_min, urcrnrlon=lon_max, resolution='i', ax=ax)
m.drawmapboundary(fill_color='aqua')
m.fillcontinents(color='coral', lake_color='aqua')

# Correct the use of map projection and handle the coordinates and confidence scores properly
x, y = [], []
weights = []
for lat, lon, conf in coordinates:
    x_proj, y_proj = m(lon, lat)  # Get projected x, y from longitude and latitude
    x.append(x_proj)
    y.append(y_proj)
    weights.append(conf)  # Add confidence to the weights list

# Create the heatmap
heatmap, xedges, yedges = np.histogram2d(x, y, bins=50, weights=weights, density=True)
extent = [xedges[0], xedges[-1], yedges[0], yedges[-1]]

plt.imshow(heatmap.T, extent=extent, origin='lower', cmap='plasma', alpha=0.6)

plt.show()


In [None]:
import folium
from folium.plugins import HeatMap
import numpy as np

# Example coordinates
coordinates = formatted_data  # Your list of tuples [(lat, lon, conf), ...]

# Calculate bounds for the map view
latitudes, longitudes = zip(*[(lat, lon) for lat, lon, conf in coordinates])
southwest = [min(latitudes), min(longitudes)]
northeast = [max(latitudes), max(longitudes)]

# Create a map centered around the average location
map_center = np.mean(latitudes), np.mean(longitudes)
map = folium.Map(location=map_center, tiles='CartoDB positron', attr='Minimal')

# Add a heatmap
heatmap_data = [(lat, lon, conf) for lat, lon, conf in coordinates]
HeatMap(heatmap_data).add_to(map)

# Fit map to bounds
map.fit_bounds([southwest, northeast])

# Save to HTML or display
map.save('heatmap.html')
map
