In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install duckdb
!pip install astropy
!pip install astroquery
!pip install aiohttp
!pip install aiofiles
!pip install xmltodict
!pip install tqdm
%cd /content/drive/MyDrive/spaceapps

In [None]:
# async version
import asyncio
import aiofiles
from aiohttp import ClientSession, CookieJar, ClientTimeout
import xmltodict


total_rows = 1_811_709_771
active_jobs = []
retrieved_rows = 0
retrieved_batches = []
batch_siz = 50_000_000



class GaiaQueryJob:
  start_job_url = 'https://gea.esac.esa.int/tap-server/tap/async'
  query = """
with gaia_ap as (
select source_id, mg_gspphot from gaiadr3.astrophysical_parameters where mg_gspphot is not null
),
gs as (
	select source_id, ra, dec, distance_gspphot from gaiadr3.gaia_source where ra is not null and dec is not null and distance_gspphot is not null and random_index >= {0} and random_index < {1}
)


select gs.source_id, gs.ra, gs.dec, gs.distance_gspphot, gaia_ap.mg_gspphot from gs join gaia_ap on gs.source_id = gaia_ap.source_id
"""

  def __init__(self, start, end, session):
    self.start = start
    self.end = end
    self.session = session
    self.jobId = None
    self.phase = 'PENDING'

  async def launch_job(self):
    data = {
        'PHASE': 'RUN',
        'LANG': 'ADQL',
        'REQUEST': 'doQuery',
        'FORMAT': 'csv',
        'QUERY': GaiaQueryJob.query.format(self.start, self.end)
    }
    async with self.session.post(GaiaQueryJob.start_job_url, data=data) as req:
        res = xmltodict.parse(await req.text())['uws:job']
        self.jobId = res['uws:jobId']
        self.phase = res.get('uws:phase', self.phase)

  async def get_phase(self):
    if self.jobId == None: return
    async with self.session.get(f'{GaiaQueryJob.start_job_url}/{self.jobId}') as req:
        res = xmltodict.parse(await req.text())['uws:job']
        self.phase = res.get('uws:phase', self.phase)

  async def run(self):
    try:
      await self.launch_job()
      print(f'Job launched: {self.jobId}')
    except Exception as e:
      print(f'Job launch failed: {e}')

    try:
      while self.phase != 'ABORTED' and self.phase !='ERROR' and self.phase != 'COMPLETED':
        await asyncio.sleep(60)
        await self.get_phase()
    except Exception as e:
      print(f'Failed to fetch phase: {e}')

    if self.phase == 'ABORTED' or self.phase == 'ERROR':
      print(f'Job {self.jobId} failed: {self.phase}')
      return

    print(f'Job {self.jobId} completed')

    async with self.session.get(f'{GaiaQueryJob.start_job_url}/{self.jobId}/results/result') as req:
      async with aiofiles.open(f'gaia_data/{self.start}-{self.end}.csv', 'wb+') as f:
        async for data in req.content.iter_chunked(16384):
          await f.write(data)

    print(f'Job {self.jobId} results saved')




async def get_tables(session):
  async with session.get('https://gea.esac.esa.int/tap-server/tap/jobs/async') as req:
    jobs = xmltodict.parse(await req.text())['uws:jobs']['uws:job']
    jobs = list(map(lambda x: x['uws:jobId'], jobs))
    assert len(jobs) == 37
  return jobs

async def download(jobId, session):
  async with session.get(f'https://gea.esac.esa.int/tap-server/tap/async/{jobId}/results/result', timeout=None) as req:
      async with aiofiles.open(f'gaia_data/{jobId}.csv', 'wb+') as f:
        async for data in req.content.iter_chunked(16384):
          await f.write(data)

async def bulk_download(jobs, session):
  tasks = []
  for job in jobs:
    tasks.append(download(job, session))
  await asyncio.gather(*tasks)

no_timeout = ClientTimeout(
    total=None
)

async def main():
  async with ClientSession(cookie_jar=CookieJar(), timeout=no_timeout) as session:
    await session.post('https://gea.esac.esa.int/tap-server/login', data={}) #username passowrd here
    print('Logged into Gaia')
    jobs = await get_tables(session)
    await bulk_download(jobs, session)
    # tasks = []
    # for i in range(0, total_rows, batch_siz):
    #   tasks.append(GaiaQueryJob(i, i+batch_siz, session).run())
    # await asyncio.gather(*tasks)

await main()







In [None]:
# sync version
from astroquery.gaia import Gaia
import json
import time

total_rows = 1_811_709_771
active_jobs = []
retrieved_rows = 0
retrieved_batches = []
batch_siz = 50_000_000
max_active_jobs = 3
query = """
with gaia_ap as (
select source_id, mg_gspphot from gaiadr3.astrophysical_parameters where mg_gspphot is not null
),
gs as (
	select source_id, ra, dec, distance_gspphot from gaiadr3.gaia_source where ra is not null and dec is not null and distance_gspphot is not null and random_index >= {0} and random_index < {1}
)


select gs.source_id, gs.ra, gs.dec, gs.distance_gspphot, gaia_ap.mg_gspphot from gs join gaia_ap on gs.source_id = gaia_ap.source_id
"""


def pick_next_batch():
  global active_jobs, retrieved_rows, retrieved_batches, batch_siz
  active_job_idx = list(map(lambda x: x[1], active_jobs))
  for i in range(0, total_rows, batch_siz):
    if i not in retrieved_batches and i not in active_job_idx:
      return i

def launch_new_job():
  global active_jobs, retrieved_rows, retrieved_batches, batch_siz
  # job variable fields: https://github.com/astropy/astroquery/blob/e8fff0b51bdc2f367b21c8564195599aafca19fd/astroquery/utils/tap/model/job.py#L52
  batch_idx = pick_next_batch()
  job = Gaia.launch_job_async(query.format(batch_idx, batch_idx+batch_siz), dump_to_file = True, output_file = f'gaia_data/{batch_idx}-{batch_idx+batch_siz-1}.csv', output_format = 'csv', background = True)
  active_jobs.append((job, batch_idx))
  print(f'Started job: {job.jobid}')

def check_active_jobs():
  global active_jobs, retrieved_rows, retrieved_batches, batch_siz
  remove_idx = []
  for idx, (job, batch_idx) in enumerate(active_jobs):
    try:
      job.get_phase(update=True)
      if not job.is_finished():
        print(f'Job {job.jobid} in phase: {job.get_phase()}')
        continue
      if (job.get_phase() != 'COMPLETED'):
        remove_idx.append(idx)
        continue
      job.save_results()
      retrieved_rows += batch_siz
      retrieved_batches.append(batch_idx)
      with open('checkpoint.json', 'w+') as f:
        json.dump([retrieved_batches, retrieved_rows], f)
      Gaia.remove_jobs(job.jobid)
      remove_idx.append(idx)
    except:
      print(f'Failed to check job: {job.jobid}')
  print(f'Current Progress: {(retrieved_rows/total_rows):.2f} {retrieved_rows}/{total_rows}')
  active_jobs = [ele for idx,ele in enumerate(active_jobs) if idx not in remove_idx]

def load_checkpoint():
  global active_jobs, retrieved_rows, retrieved_batches, batch_siz
  try:
    with open('checkpoint.json', 'r') as f:
      retrieved_batches, retrieved_rows = json.load(f)
  except:
    print(f'Failed to load checkpoint')


Gaia.login() #username and password here

load_checkpoint()

while retrieved_rows < total_rows:
  check_active_jobs()
  while (retrieved_rows < total_rows and len(active_jobs)<max_active_jobs and pick_next_batch() != None):
    launch_new_job()
  time.sleep(120)










In [None]:
import duckdb
from astropy import units as u
from astropy.coordinates.sky_coordinate import SkyCoord
import numpy as np
import cupy as cp
import math
import os
import json
from tqdm import tqdm
import zipfile

def celestial_to_cartesian(ra, dec, dis):
    x = dis*math.cos(math.radians(dec))*math.cos(math.radians(ra))
    y = dis*math.cos(math.radians(dec))*math.sin(math.radians(ra))
    z = dis*math.sin(math.radians(dec))
    return [x, y, z]

planets_all = duckdb.sql("select pl_name, ra, dec, sy_dist from 'exoplanets.csv'").fetchall()
planets_all = list(map(lambda x: (x[0], *celestial_to_cartesian(x[1],x[2],x[3])), filter(lambda x: x[3] != None, planets_all)))
gaia_files = os.listdir('gaia_data')







In [None]:
for (pl_name, _, _, _) in tqdm(planets_all):
  if (not os.path.exists(f'staged_data/{pl_name}')):
    os.mkdir(f'staged_data/{pl_name}')
    print(f'Created dir {pl_name}')


In [None]:

gpu_kernel = cp.RawKernel(r'''
extern "C" __global__
void my_kernel(const int N, const float* d, const float* de, const float* ra, const float* mg, const float* base, float* y) {
  int idx = blockDim.x * blockIdx.x + threadIdx.x;
  if (idx < N) {
      float sra, cra, sde, cde;
      sincosf(0.017453292 * ra[idx], &sra, &cra);
      sincosf(0.017453292 * de[idx], &sde, &cde);
      y[idx] = mg[idx]+2.5*log10f(powf(d[idx]*cra*cde-base[0], 2)+powf(d[idx]*cde*sra-base[1], 2)+powf(d[idx]*sde, 2))-5;
  }
}
''', 'my_kernel')

async def main():
  # folder level checkpoint
  checkpoint = []
  try:
    with open('checkpoint.json', 'r') as f:
      checkpoint = json.load(f)
  except:
    pass
  for gaia_file in gaia_files:
    if gaia_file in checkpoint: continue
    stars_all = duckdb.sql(f"select source_id as id,ra,dec,distance_gspphot as dist,mg_gspphot as mag from 'gaia_data/{gaia_file}'").fetchnumpy()
    n = stars_all['ra'].shape[0]
    d = cp.asarray(np.float32(stars_all['dist']))
    de = cp.asarray(np.float32(stars_all['dec']))
    ra = cp.asarray(np.float32(stars_all['ra']))
    mg = cp.asarray(np.float32(stars_all['mag']))
    blk_siz = 128
    grid_siz = math.floor((n+blk_siz-1)/blk_siz)
    stars_map = {}
    tot = 0
    with zipfile.ZipFile(f'staged_data/{gaia_file}.zip', 'w') as zf:
      for (pl_name, px, py, pz) in tqdm(planets_all):
        # file level checkpoint
        # if os.path.exists(f'staged_data/{pl_name}/{gaia_file}.json'): continue
        base = cp.array([px, py, pz], dtype=cp.float32)
        res_vec = cp.zeros((n, ), dtype=cp.float32)
        gpu_kernel((grid_siz, ), (blk_siz, ), (n, d, de, ra, mg, base, res_vec))
        res = res_vec.get()
        mask = np.where(res<=6)[0]
        out = [[int(stars_all['id'][idx]), x - px, y - py, z - pz, np.float64(res[idx])] for idx in mask for x, y, z in [celestial_to_cartesian(stars_all['ra'][idx], stars_all['dec'][idx], stars_all['dist'][idx])]]
        zf.writestr(f'{pl_name}.json', json.dumps(out))
    checkpoint.append(gaia_file)
    with open('checkpoint.json', 'w+') as f:
      json.dump(checkpoint, f)

await main()

In [None]:
for gaia_file in gaia_files[:1]:
  stars_all = duckdb.sql(f"select source_id as id,ra,dec,distance_gspphot as dist,mg_gspphot as mag from 'gaia_data/{gaia_file}'").fetchnumpy()


In [None]:
from cupyx.profiler import benchmark

gpu_kernel = cp.RawKernel(r'''
extern "C" __global__
void my_kernel(const int N, const float* d, const float* de, const float* ra, const float* mg, const float* base, float* y) {
  int idx = blockDim.x * blockIdx.x + threadIdx.x;
  if (idx < N) {
      float sra, cra, sde, cde;
      sincosf(0.017453292 * ra[idx], &sra, &cra);
      sincosf(0.017453292 * de[idx], &sde, &cde);
      y[idx] = mg[idx]+2.5*log10f(powf(d[idx]*cra*cde-base[0], 2)+powf(d[idx]*cde*sra-base[1], 2)+powf(d[idx]*sde, 2))-5;
  }
}
''', 'my_kernel')

def f(stars_all, planets_all, n, d, de, ra, mg):
  blk_siz = 128
  grid_siz = math.floor((n+blk_siz-1)/blk_siz)
  stars_map = {}
  tot = 0
  for (pl_name, px, py, pz) in planets_all[:1]:
    base = cp.array([px, py, pz], dtype=cp.float32)
    res_vec = cp.zeros((n, ), dtype=cp.float32)
    gpu_kernel((grid_siz, ), (blk_siz, ), (n, d, de, ra, mg, base, res_vec))
    res = res_vec.get()
    mask = np.where(res<=6)[0]
    out = [[int(stars_all['id'][idx]), *celestial_to_cartesian(stars_all['ra'][idx], stars_all['dec'][idx], stars_all['dist'][idx]), np.float64(res[idx])] for idx in mask]
    with open(f'exoplanet_stars_map/{pl_name}.json', 'r+') as f:
      temp = json.load(f)
      temp += out
      f.seek(0)
      json.dump(temp, f)
      f.truncate()

n = stars_all['ra'].shape[0]
d = cp.asarray(np.float32(stars_all['dist']))
de = cp.asarray(np.float32(stars_all['dec']))
ra = cp.asarray(np.float32(stars_all['ra']))
mg = cp.asarray(np.float32(stars_all['mag']))

print(benchmark(f, (stars_all, planets_all, n, d, de, ra, mg), n_repeat=100))


In [None]:

f = lambda x: x[3]+2.5*math.log10(base[2]*base[2] + x[2]*x[2] - 2*base[2]*x[2]*(math.sin(base[1])*math.sin(x[1])+math.cos(base[1])*math.cos(x[1])*math.cos(x[0]-base[2])))

res_classic = np.apply_along_axis(f, 1, stars)

print(res_classic)

In [None]:

stars_vec = np.column_stack((d*d, d*np.sin(de), d*np.cos(de)*np.cos(ra - base[0])))
res_vec = np.add(mg, 2.5*np.log10(np.dot(stars_vec,np.array([1, -2*base[2]*math.sin(base[1]), -2*base[2]*math.cos(base[1])])) + (base[2]*base[2])))

res_vec

In [None]:
import matplotlib.pyplot as plt

print(np.min(res_vec))
print((res_vec < 11).sum())

plt.hist(res_vec, bins=10)

In [None]:
%%timeit
gbase = cp.array([base[0], base[1], base[2]]);
gd = cp.asarray(stars_all['dist'])
gde = cp.asarray(stars_all['dec'])
gra = cp.asarray(stars_all['ra'])
gmg = cp.asarray(stars_all['mag'])

stars_vec = cp.column_stack((gd*gd, gd*cp.sin(gde), gd*cp.cos(gde)*cp.cos(gra - gbase[0])))
res_vec = cp.add(gmg, 2.5*cp.log10(cp.dot(stars_vec,cp.array([1, -2*base[2]*math.sin(base[1]), -2*base[2]*math.cos(base[1])]))))
cp.asnumpy(res_vec)

In [None]:
%%timeit
gpu_kernel = cp.RawKernel(r'''
extern "C" __global__
void my_kernel(const float* d, const float* de, const float* ra, const float* mg, const float* base, float* y) {
  int idx = blockDim.x * blockIdx.x + threadIdx.x;
  y[idx] = mg[idx]+2.5*log10f(base[2]*base[2] + d[idx]*d[idx] - 2*base[2]*d[idx]*(sinf(base[1])*sinf(de[idx])+cosf(base[1])*cosf(de[idx])*cosf(ra[idx]-base[2])));
}
''', 'my_kernel')

gbase = cp.array([base[0], base[1], base[2]], dtype=cp.float32);
gd = cp.asarray(d)
gde = cp.asarray(de)
gra = cp.asarray(ra)
gmg = cp.asarray(mg)
res_vec = cp.zeros((1000000, ), dtype=cp.float32)

gpu_kernel((50000, ), (20, ), (gd, gde, gra, gmg, gbase, res_vec))



In [None]:
import zipfile
import os
import json
import tqdm
import duckdb

planets_all = duckdb.sql("select pl_name, ra, dec, sy_dist from 'exoplanets.csv'").fetchall()
planets_all = list(map(lambda x: x[0], filter(lambda x: x[3] != None, planets_all)))

zfs = []

for staged_file in os.listdir('staged_data'):
  zfs.append(zipfile.ZipFile(f'staged_data/{staged_file}', 'r'))


with zipfile.ZipFile('test.zip', 'w') as final:
  for pl_name in tqdm.tqdm(planets_all[:3]):
    stars = []
    for zf in zfs:
      stars += json.loads(zf.read(f'{pl_name}.json').decode('utf-8'))
    final.writestr(f'{pl_name}.json', json.dumps(stars))

