In [1]:
"""
Inicialización general
"""

"""
Codigo general que crea variables que se usan en varias celdas de codigo de este Jupyter notebook. En general crea variables para accesar los archivos de datos de GAIA ya descargados 
en la computadora localmente. Carga varios módulos de librerias externas para utilizar su codigo.
"""

import numpy as np
import matplotlib.cm as cm
import matplotlib.pyplot as plt
import dask.dataframe as dd
import dask.array as da
import pandas as pd
from datetime import datetime
import logging
import os

def file_line_count(file_path):
	queryResultLineCount = 0
	with open(file_path) as f:
		queryResultLineCount = sum(1 for _ in f)
	return queryResultLineCount

def assert_query_result_file(file_path):
	"""
	true --> file is good
	false --> query limit reached
	"""
	lineThreshold = 3_000_000
	return file_line_count(file_path) <= lineThreshold

baseDataDir = "C:\\Users\\Ramon\\Documents\\Uni\\Metodos Computacionales en Astrofisica\\GAIA\\GAIA Datos"
csvDataDir  = f"{baseDataDir}\\CSV"
voTableDir  = f"{baseDataDir}\\VOTables-DecOnlyFilter-Test"
testDataDir = f"{csvDataDir}-Test"
resultDir   = f"{baseDataDir}\\Results"
limitedResultDir = f"{baseDataDir}\\Resultados-Restringidos"

queryTemplate = \
"""
SELECT gaia_source.solution_id,gaia_source.designation,gaia_source.source_id,gaia_source.ra,gaia_source.ra_error,gaia_source.dec,gaia_source.dec_error,gaia_source.parallax,gaia_source.parallax_error,
	gaia_source.phot_g_n_obs,gaia_source.phot_g_mean_flux,gaia_source.phot_g_mean_flux_over_error,gaia_source.phot_g_mean_mag,gaia_source.phot_bp_n_obs,gaia_source.phot_bp_mean_flux,gaia_source.phot_bp_mean_flux_over_error,
	gaia_source.phot_bp_mean_mag,gaia_source.phot_rp_n_obs,gaia_source.phot_rp_mean_flux,gaia_source.phot_rp_mean_flux_over_error,gaia_source.phot_rp_mean_mag,gaia_source.phot_bp_rp_excess_factor,gaia_source.phot_proc_mode,
	gaia_source.bp_rp,gaia_source.bp_g,gaia_source.radial_velocity,gaia_source.radial_velocity_error,gaia_source.phot_variable_flag,gaia_source.teff_val,gaia_source.a_g_val,gaia_source.e_bp_min_rp_val,gaia_source.e_bp_min_rp_percentile_upper,
	gaia_source.lum_val, gaiadr2.gaia_source.phot_variable_flag
FROM gaiadr2.gaia_source 
WHERE (gaiadr2.gaia_source.dec BETWEEN -30 AND 70 
	   AND gaiadr2.gaia_source.ra >= %s AND gaiadr2.gaia_source.ra < %s)
ORDER BY gaiadr2.gaia_source.dec
"""

In [None]:
"""
Automatización de query de GAIA
"""

"""
Codigo con el que intenté automatizar la recaudación de datos de GAIA iniciales, solo tomando en cuenta el filtro de declinación ya incluido en el query. Aunque este 
algoritmo funcionó para varios intervalos, eventualmente me encontré con lo que sospecho son limitaciones de la interfaz de GAIA. Con ciertos intervalos no pudo 
terminar de descargar los resultados, lo que causó que produzca archivos incompletos de información, aún despues de decrementar el intervalo del query de AR a 
un intervalo de 0.25. En total, este algoritmo funcionó bien para objetos con AR de 180 hasta 274, con excepciones para los intervalos de AR 265.5-265.75 y 266-266.25. 

El límite de la interfaz no la he confirmado con documentación oficial. Esto es basado en mis observaciones que el proceso empieza a fallar entre más tiempo lleva
corriendo, y también con el tamaño de los resultados que traté de descargar.
"""

from astroquery.gaia import Gaia
import _thread
import threading
from time import sleep
import logging
import os

def queryLaunch(min_ra, max_ra):
	queryRaInterval = max_ra - min_ra
	queryFinished = False
	while not queryFinished:
		try:
			print(f"\nLaunching query {min_ra}-{max_ra}")
			query = queryTemplate % (min_ra, max_ra)
			filePath = f"{testDataDir}\\ra_{min_ra}-{max_ra}-result-script.csv"
			job = Gaia.launch_job_async(query, dump_to_file=True, output_format="csv", output_file=filePath)
			print(f"{min_ra} - {max_ra} => {job}\n")
			if not assert_query_result_file(filePath):
				print(f"Query limit reached for ra_{min_ra}-{max_ra}. Trying smaller granularity")
				print(f"Deleting file {filePath}")
				os.remove(filePath)
				midRa = min_ra + (queryRaInterval / 2)
				queryLaunch(min_ra, midRa)
				queryLaunch(midRa, max_ra)
		except Exception as Argument:
			logging.exception(f"Exception occurred during query for ra {min_ra}-{max_ra}. Sleeping for 30 seconds.")
			sleep(30)
			print(f"Retrying {min_ra}-{max_ra} query")
		else:
			queryFinished = True

# Constants
raInterval = 5
raLimit = 360
maxQueries = 1
#-----------------

minRa = 0
while (minRa + raInterval <= raLimit):
	maxRa = minRa + raInterval
	try:
		queryLaunch(minRa, maxRa)
	except Exception as Argument:
		logging.exception(f"Error in query for ra_{minRa}-{maxRa}")
	minRa = maxRa

In [None]:
"""
Validación de datos de declinación
"""

"""
Validación básica de datos recaudados con solo el filtro de la declinación. Esta validación funciona con un query que cuenta cuantos resultados deben 
de existir en los archivos locales de mi computadora que contienen estos datos. Este codigo principalmente sirve para validar que el código automático
para recaudar datos (ubicado en la celda de código 'Automatización de query de GAIA') funcionó correctamente.
"""

from astroquery.gaia import Gaia
import _thread
import threading
from time import sleep
import re

countQuery = \
"""
SELECT COUNT(gaiadr2.gaia_source.solution_id)
FROM gaiadr2.gaia_source 
WHERE (gaiadr2.gaia_source.dec BETWEEN -30 AND 70 
	   AND gaiadr2.gaia_source.ra >= %s AND gaiadr2.gaia_source.ra < %s)
"""

def count_query_launch(min_ra, max_ra, file_path):
	while True:
		try:
			print(f"\nLaunching query {min_ra}-{max_ra}")
			query = countQuery % (min_ra, max_ra)
			job = Gaia.launch_job_async(query)
			count = job.get_results()['count'][0]
			scriptResultCount = file_line_count(file_path) - 1
			return scriptResultCount, count
		except Exception as Argument:
			logging.exception(f"Error occurred when retrieving results for {min_ra}-{max_ra}. Sleeping for 30 secs.")
			sleep(30)
			print("Retrying...")

checkMinRa = 0
dataDir = csvDataDir
incorrectQueryResults = []
totalScriptLines, totalQueryLines = 0, 0
for f in os.listdir(dataDir):
	if not f.endswith("csv"):
		print(f"Skipping unfinished download query {f}")
	else:
		try:
			match = re.search("ra_(\d*\.?\d*)-(\d*\.?\d*)", f)
			min_ra = float(match.groups()[0])
			max_ra = float(match.groups()[1])
			if min_ra >= checkMinRa:
				scriptCount, queryCount = count_query_launch(min_ra, max_ra, f"{dataDir}\\{f}")
				totalScriptLines += scriptCount
				totalQueryLines += queryCount
				missingResults = queryCount - scriptCount
				if missingResults != 0:
					print(f"{missingResults} results missing for {f}")
					incorrectQueryResults.append([min_ra, max_ra])
		except Exception as Argument:
			logging.exception(f"Exception while processing {f}")
			break

print(incorrectQueryResults)
print(f"Script: {totalScriptLines}", f"Query: {totalQueryLines}")

In [None]:
"""
Traductor de VOTable a CSV
"""

"""
Codigo de ayuda para convertir datos locales en el formato de VOTable a un archivo CSV para usar facilmente con Dask. Esto me permite poder analizar 200+ GB de datos en mi 
computadora sin pasar los limites de memoria de mi hardware. Sin embargo, este codigo en si solo funcionó con archivos de VOTable que medían menos de 4 GB, por lo que no 
lo pude utilizar como solución general. Esto se debe a un error aún no resuelto en la libreria externa de astropy.io.votable que causa que la función de parse_single_table 
ocupe más memoria de lo que debería. Más información se puede encontrar en el foro oficial de astropy: https://github.com/astropy/astropy/issues/8946 

Este codigo solo funcionó para los archivos de los intervalos de AR de 285-287.5, 290-292.5, y 292.5-295
"""

from astropy.io.votable import parse_single_table
from astropy.io import ascii
import gc

columns = ['designation', 'ra', 'dec', 'phot_g_mean_mag', 'phot_bp_mean_mag', 'phot_rp_mean_mag']

for file in os.listdir(voTableDir):
	if file.endswith("gz"):
		table = None
		writeResult = None
		try:
			print(f"{datetime.now()} Processing {file}")
			interimTable = parse_single_table(f"{voTableDir}\\{file}", columns=columns)
			table = interimTable.to_table()
			print(f"{datetime.now()} Table successfully read to memory. Writing to csv file.")
			writeResult = ascii.write(table, f"{testDataDir}\\{file}.csv", format="csv", strip_whitespace=False, fast_writer=True)
			print("\n")
		except Exception as Argument:
			logging.exception(f"Error processing file {file}")
		finally:
			gc.collect()
	else:
		print(f"Skipping {file}")

In [None]:
"""
Filtro de magnitud visible
"""

"""
Aplica un filtro a los datos de GAIA (estos ya habiendo sido filtrados en base a su declinación) en base a su magnitud visible usando la siguiente transformación fotométrica:

V = 0.01760 + 0.006860(GBP - GRP) + 0.1732(GBP - GRP)^2 + G

Donde:
GBP = phot_bp_mean_mag
GRP = phot_rp_mean_mag
G   = phot_g_mean_mag
V   = magnitud en el espectro visible Johnson-Cousins
"""

def visible_mag(g_mag, bp_mag, rp_mag):
	bpMinusRp = bp_mag - rp_mag
	return 0.01760 + (0.006860 * bpMinusRp) + (0.1732 * da.float_power(bpMinusRp, 2)) + g_mag

# Don't truncate text fields in the display
pd.set_option("display.max_colwidth", None)

csvPath = f"{csvDataDir}\\*.csv"
df = dd.read_csv(csvPath, assume_missing=True)

filteredByMag = df[visible_mag(df.phot_g_mean_mag, df.phot_bp_mean_mag, df.phot_rp_mean_mag) <= 16]
visibleMagAppended = filteredByMag.assign(visible_mean_mag=visible_mag(filteredByMag['phot_g_mean_mag'], filteredByMag['phot_bp_mean_mag'], filteredByMag['phot_rp_mean_mag']))
print([len(df), len(visibleMagAppended)])
visibleMagAppended.to_csv(f"{resultDir}\\objetos-visibles-particion-*.csv")
visibleMagAppended.head()

In [None]:
"""
Ajuste de límites
"""

def apply_dec_mag_limits(visible_mean_mag, dec):
	return (visible_mean_mag >= 6) & (visible_mean_mag <= 12) & (dec >= 15) & (dec <= 45)

csvPath = f"{resultDir}\\*.csv"
df = dd.read_csv(csvPath, assume_missing=True)
newDecMagLimitFilter = df[apply_dec_mag_limits(df.visible_mean_mag, df.dec)]
finalResult = newDecMagLimitFilter[['designation', 'ra', 'ra_error', 'dec', 'dec_error', 'visible_mean_mag', 'parallax', 'parallax_error']]
print(len(finalResult))
finalResult = finalResult.repartition(npartitions=1)
finalResult.to_csv(f"{limitedResultDir}\\Resultados.csv")
finalResult.head(npartitions=-1)