In [None]:
%%capture
!pip install contextily skyfield pytz astral gdown

In [None]:
%%capture

import gdown

URL = 'https://drive.google.com/drive/folders/1yDLSHMktxFiy3fmgvi2ptyPrxzJDlMJx?usp=sharing'
gdown.download_folder(URL, quiet=True)

In [None]:
import pandas as pd
from ast import literal_eval
from shapely.geometry import Polygon

def create_polygon(coords):
    return Polygon(coords)

building_heights_df = pd.read_csv('times city buildings/building_heights.csv')

custom_buildings_df = pd.read_csv('times city buildings/custom_buildings.csv')
osm_buildings_df = pd.read_csv('times city buildings/buildings.csv')

buildings_df = pd.concat([custom_buildings_df, osm_buildings_df], ignore_index=True)
houses_df = pd.merge(buildings_df, building_heights_df, on='way_id', how='inner')

houses_df['building_border'] = houses_df['building_border'].apply(literal_eval)
houses_df['building_polygon'] = houses_df['building_border'].apply(create_polygon)

houses_df.head()

In [None]:
%%capture

import gdown

STAR_NAMES = 'star_names.csv'
STAR_NAMES_URL = 'https://drive.google.com/file/d/1LtZqSzPvtkDZNswYLk999i7hWgG5HhrI/view?usp=sharing'

gdown.download(STAR_NAMES_URL, STAR_NAMES, quiet=True, fuzzy=True)

In [None]:
import pandas as pd
from skyfield.data import hipparcos
from skyfield.api import load

hipparcos_df = hipparcos.load_dataframe(load.open(hipparcos.URL))
star_df = pd.read_csv(STAR_NAMES)
common_star_df = pd.merge(hipparcos_df, star_df, left_on='hip', right_on='hip_id', how='inner')

common_star_df = common_star_df[common_star_df['magnitude'] <= 1.0]
common_star_df.shape

In [None]:
LOCATIONS = [
    (20.99380122842269, 105.8682189914924),
    (20.99433864645776, 105.86750935267474),
    (20.99308925121348, 105.86827516924997),
    (20.99201495512747, 105.86562240635608),
    (20.9937060628234, 105.86958776126532),
]
LOCATIONS

In [None]:
import math

def distance_to_horizon(height):
    earth_radius = 6371000
    distance = math.sqrt(2 * earth_radius * height + height**2)
    return distance

horizon_distance = distance_to_horizon(1.7)
horizon_distance

In [None]:
from geopy.distance import distance
from math import radians

def get_star_position(lat, lon, azimuth, horizon_distance):
    start = (lat, lon)
    new_point = distance(meters=horizon_distance).destination(start, azimuth)
    return new_point.latitude, new_point.longitude

def get_star_plot_position(lat, lon, azimuth):
    start = (lat, lon)
    new_point = distance(meters=500).destination(start, azimuth)
    return new_point.latitude, new_point.longitude

In [None]:
import pytz
from datetime import datetime, time, timedelta

TIMEZONE = 'Asia/Ho_Chi_Minh'
LOCAL_TIMEZONE = pytz.timezone(TIMEZONE)

def get_start_time():
  dt = datetime.now(pytz.utc)
  vietnam_tz = pytz.timezone('Asia/Ho_Chi_Minh')
  dt_vietnam = dt.astimezone(vietnam_tz)
  return dt_vietnam.replace(hour=21, minute=30, second=0, microsecond=0)

def is_right_time(dt):
  vietnam_tz = pytz.timezone('Asia/Ho_Chi_Minh')
  if dt.tzinfo is None:
      dt = vietnam_tz.localize(dt)

  dt = dt.astimezone(vietnam_tz)
  start_time = time(21, 30)

  today_midnight = dt.replace(hour=0, minute=0, second=0, microsecond=0)
  end_time = (today_midnight + timedelta(days=1)).time()

  current_time = dt.time()
  return start_time <= current_time or current_time <= end_time

def is_out_of_range(t):
    today = datetime.now(pytz.utc)
    future = today + timedelta(days=1)
    current_dt = t.astimezone(LOCAL_TIMEZONE)
    return current_dt > future

def is_above_horizon(position):
  alt, _, _ = position
  return alt.degrees > 0

In [None]:
from shapely.geometry import Point, LineString, Polygon
from shapely.ops import nearest_points
from geopy.distance import geodesic

def get_distance(start_point, intersection_point):
    point1 = (start_point.x, start_point.y)
    point2 = (intersection_point.x, intersection_point.y)
    return geodesic(point1, point2).meters

def get_angle(height, distance):
    tangent = height / distance
    angle_radians = math.atan(tangent)
    return math.degrees(angle_radians)

def is_blocked_by_building(location, star):
    stars_blocked, starts_visible = [], []

    lat, lon = location
    star_lat, star_lon, star_alt, star_name = star

    min_distance = float('inf')
    building_height = None

    for building in houses_df.itertuples(index=False):
      start_point = Point(lat, lon)
      end_point = Point(star_lat, star_lon)

      line_of_sight = LineString([start_point, end_point])
      intersection = line_of_sight.intersection(building.building_polygon)

      if intersection.is_empty:
          continue

      if intersection.geom_type == 'Point':
          distance = get_distance(start_point, intersection)
          if distance < min_distance:
              min_distance = distance
              building_height = building.height
      elif intersection.geom_type == 'MultiPoint':
          for point in intersection.geom:
              distance = get_distance(start_point, point)
              if distance < min_distance:
                  min_distance = distance
                  building_height = building.height
      elif intersection.geom_type == 'LineString':
          nearest_point_on_line = nearest_points(start_point, intersection)[1]
          distance = get_distance(start_point, nearest_point_on_line)
          if distance < min_distance:
              min_distance = distance
              building_height = building.height
      elif intersection.geom_type == 'MultiLineString':
          for line in intersection.geoms:
              nearest_point_on_line = nearest_points(start_point, line)[1]
              distance = get_distance(start_point, nearest_point_on_line)
              if distance < min_distance:
                  min_distance = distance
                  building_height = building.height

    if building_height != None:
        angle = get_angle(building_height, min_distance)
        if angle > star_alt:
            # print('Blocked (below building)', star_name, angle, star_alt)
            return True
        elif angle < star_alt:
            # print('Visible (above building)', star_name, angle, star_alt)
            return False
    else:
        # print('Visible', star_name, star_alt)
        return False

In [None]:
import matplotlib.pyplot as plt
import contextily as ctx
import numpy as np
import matplotlib.dates as mdates

def mean_star_position(points):
    sum_lat = sum(point['star_lat'] for point in points)
    sum_lon = sum(point['star_lon'] for point in points)
    n = len(points)

    mean_lat = sum_lat / n
    mean_lon = sum_lon / n

    return mean_lat, mean_lon

def plot_stars(point, stars):
  plt.figure(figsize=(10, 8), dpi=300)

  start_lat, start_lon = point
  plt.scatter(start_lon, start_lat, color='red', s=50)

  star_colors = plt.cm.rainbow(np.linspace(0, 0.9, len(stars)))
  legend_elements = []

  for idx, (star_name, info) in enumerate(stars.items(), start=0):
      if not info:
        continue

      star_lat, star_lon = mean_star_position(info)
      color = star_colors[idx]

      plt.scatter(star_lon, star_lat, color=color, s=50)
      plt.plot([start_lon, star_lon], [start_lat, star_lat], color=color, linestyle='--', linewidth=1)

      # Calculate the midpoint of the line
      mid_lon = (start_lon + star_lon) / 2
      mid_lat = (start_lat + star_lat) / 2

      # Calculate the angle of the line in degrees
      dx = star_lon - start_lon
      dy = star_lat - start_lat
      angle = np.degrees(np.arctan2(dy, dx))

      # Add text parallel to the line
      plt.text(mid_lon, mid_lat, star_name, color='black', fontsize=10, ha='center', va='center', rotation=angle, rotation_mode='anchor')

      legend_elements.append(
          plt.Line2D([0], [0], marker='o', color='w', label=star_name, markerfacecolor=color, markersize=10)
      )

  plt.axis('equal')
  plt.xlabel('Longitude')
  plt.ylabel('Latitude')
  plt.legend(handles=legend_elements, loc='upper right')
  ctx.add_basemap(plt.gca(), crs='EPSG:4326', source=ctx.providers.OpenStreetMap.Mapnik)
  plt.show()

def plot_position(star_name, observations):
  azimuths = [obs['azimuth'] for obs in observations]
  altitudes = [obs['altitude'] for obs in observations]

  plt.style.use('default')
  plt.figure(figsize=(10, 6), dpi=300)
  plt.scatter(azimuths, altitudes,)

  plt.title(star_name.capitalize())
  plt.xlabel('Azimuth (°)')
  plt.ylabel('Altitude (°)')

  plt.ylim(0, 90)

  plt.grid(True, linestyle='--', alpha=0.7)
  plt.show()

def plot_timeline(data):
  plt.style.use('default')
  fig, ax = plt.subplots(figsize=(10, 6), dpi=300)

  y_positions = {star: i for i, star in enumerate([p for p, e in data.items() if e])}

  for star, events in data.items():
      if not events:
        continue

      times = [datetime.fromisoformat(event['event_time']) for event in events]
      ax.plot(times, [y_positions[star]] * len(times), 'o')

  ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M'))
  plt.xticks(rotation=45)

  ax.set_yticks(list(y_positions.values()))
  ax.set_yticklabels(list(y_positions.keys()))

  plt.tight_layout()
  plt.show()

In [None]:
from datetime import datetime, timezone
from skyfield.api import load, wgs84, Topos, Star

EPH = load('de421.bsp')

for (lat, lon) in LOCATIONS:
  location = Topos(
      latitude_degrees=lat,
      longitude_degrees=lon,
      elevation_m=0,
  )
  observer = EPH['earth'] + location

  stars = {}

  ts = load.timescale()
  for index, star_data in common_star_df.iterrows():
    star_name = star_data['name']

    stars[star_name] = []

    night_dt = get_start_time()
    t = ts.from_datetime(night_dt)

    while not is_out_of_range(t):
      dt = t.utc_datetime()

      time_ok = is_right_time(dt)
      if not time_ok:
        break

      star = Star.from_dataframe(star_data)

      astrometric = observer.at(t).observe(star)
      position = astrometric.apparent().altaz()

      can_be_seen = is_above_horizon(position)
      if can_be_seen:
          alt, az, _ = position

          local_dt = t.astimezone(LOCAL_TIMEZONE)
          event_time = local_dt.strftime('%Y-%m-%dT%H:%M:%S')

          star_lat, star_lon = get_star_position(lat, lon, az.degrees, horizon_distance)

          is_blocked = is_blocked_by_building(
              (lat, lon),
              (star_lat, star_lon, alt.degrees, star_name),
          )
          if not is_blocked:
            star_lat, star_lon = get_star_plot_position(lat, lon, az.degrees)

            stars[star_name].append({
                'event_time': event_time,
                'azimuth': az.degrees,
                'altitude': alt.degrees,
                'magnitude': star_data['magnitude'],
                'star_lat': star_lat,
                'star_lon': star_lon,
            })

      t += timedelta(hours=1)

  print(stars)
  plot_stars((lat, lon), stars)

  plot_timeline(stars)

  for star_name, observations in stars.items():
    if not observations:
      continue
    plot_position(star_name, observations)