In [5]:
import os

from pyspark.sql.types import *
from pyspark.sql.functions import lit

In [6]:
def spark_start(project_path, metastore=None):
    from pyspark.sql import SparkSession

    warehouse_location = os.path.join(project_path, 'spark-warehouse')

    local_dir = os.path.join(project_path, 'spark-tmp')

    spark = ( 
            SparkSession.builder
            .appName("LSD2")
            .config("spark.sql.warehouse.dir", warehouse_location)
            .config('spark.master', "local[12]")
            .config('spark.driver.memory', '16G') # 128
            .config('spark.local.dir', local_dir)
            .config('spark.memory.offHeap.enabled', 'true')
            .config('spark.memory.offHeap.size', '16G') # 256
            .config("spark.sql.execution.arrow.enabled", "true")
            .config("spark.driver.maxResultSize", "16G")
            .config("spark.driver.extraJavaOptions", f"-Dderby.system.home={metastore}")
            .enableHiveSupport()
            .getOrCreate()
                    )   

    return spark

In [7]:
root_dir = "/epyc/users/ctslater"


In [8]:
spark = spark_start(root_dir, metastore=os.path.join(root_dir, 'metastore_db'))

In [9]:
#
# These types are not authoritative at all, just reasonable judgement.
#
col_names = ["solution_id", "designation", "source_id", "random_index", "ref_epoch", "ra",
            "ra_error", "dec", "dec_error", "parallax", "parallax_error",
            "parallax_over_error", "pmra", "pmra_error", "pmdec", "pmdec_error",
            "ra_dec_corr", "ra_parallax_corr", "ra_pmra_corr", "ra_pmdec_corr",
            "dec_parallax_corr", "dec_pmra_corr", "dec_pmdec_corr", "parallax_pmra_corr",
            "parallax_pmdec_corr", "pmra_pmdec_corr", "astrometric_n_obs_al",
            "astrometric_n_obs_ac", "astrometric_n_good_obs_al", "astrometric_n_bad_obs_al",
            "astrometric_gof_al", "astrometric_chi2_al", "astrometric_excess_noise",
            "astrometric_excess_noise_sig", "astrometric_params_solved",
            "astrometric_primary_flag", "astrometric_weight_al",
            "astrometric_pseudo_colour", "astrometric_pseudo_colour_error",
            "mean_varpi_factor_al", "astrometric_matched_observations",
            "visibility_periods_used", "astrometric_sigma5d_max",
            "frame_rotator_object_type", "matched_observations", "duplicated_source",
            "phot_g_n_obs", "phot_g_mean_flux", "phot_g_mean_flux_error",
            "phot_g_mean_flux_over_error", "phot_g_mean_mag", "phot_bp_n_obs",
            "phot_bp_mean_flux", "phot_bp_mean_flux_error", "phot_bp_mean_flux_over_error",
            "phot_bp_mean_mag", "phot_rp_n_obs", "phot_rp_mean_flux",
            "phot_rp_mean_flux_error", "phot_rp_mean_flux_over_error", "phot_rp_mean_mag",
            "phot_bp_rp_excess_factor", "phot_proc_mode", "bp_rp", "bp_g", "g_rp",
            "radial_velocity", "radial_velocity_error", "rv_nb_transits",
            "rv_template_teff", "rv_template_logg", "rv_template_fe_h",
            "phot_variable_flag", "l", "b", "ecl_lon", "ecl_lat", "priam_flags", "teff_val",
            "teff_percentile_lower", "teff_percentile_upper", "a_g_val",
            "a_g_percentile_lower", "a_g_percentile_upper", "e_bp_min_rp_val",
            "e_bp_min_rp_percentile_lower", "e_bp_min_rp_percentile_upper", "flame_flags",
            "radius_val", "radius_percentile_lower", "radius_percentile_upper", "lum_val",
            "lum_percentile_lower", "lum_percentile_upper"]

col_types = [LongType(), StringType(), LongType(), LongType(), FloatType(), DoubleType(),
            FloatType(), DoubleType(), FloatType(), FloatType(), FloatType(), FloatType(),
            FloatType(), FloatType(), FloatType(), FloatType(), FloatType(), FloatType(),
            FloatType(), FloatType(), FloatType(), FloatType(), FloatType(), FloatType(),
            FloatType(), FloatType(), IntegerType(), IntegerType(), IntegerType(),
            IntegerType(), FloatType(), FloatType(), FloatType(), FloatType(),
            IntegerType(), StringType(), FloatType(), FloatType(), FloatType(), FloatType(),
            IntegerType(), IntegerType(), FloatType(), IntegerType(), IntegerType(),
            StringType(), IntegerType(), DoubleType(), FloatType(), FloatType(),
            FloatType(), IntegerType(), FloatType(), FloatType(), FloatType(), FloatType(),
            IntegerType(), FloatType(), FloatType(), FloatType(), FloatType(), FloatType(),
            IntegerType(), FloatType(), FloatType(), FloatType(), FloatType(), FloatType(),
            IntegerType(), FloatType(), FloatType(), FloatType(), StringType(),
            DoubleType(), DoubleType(), DoubleType(), DoubleType(), IntegerType(),
            FloatType(), FloatType(), FloatType(), FloatType(), FloatType(), FloatType(),
            FloatType(), FloatType(), FloatType(), IntegerType(), FloatType(), FloatType(),
            FloatType(), FloatType(), FloatType(), FloatType() ]

fields = [StructField(field_name, type_class, True) for field_name, type_class in zip(col_names, col_types)]
schema = StructType(fields)

In [16]:

single_filename = "/data/epyc/data/gaia_dr2_csv/gaia_source/GaiaSource_1763645856731413248_1763758698407549184.csv.gz"
df = spark.read.load(single_filename, format="csv", schema=schema, infer_schema=False, header="true")

In [10]:
%%time

gaia_wildcard = "/data/epyc/data/gaia_dr2_csv/gaia_source/GaiaSource_*.csv.gz"
df_in = spark.read.load(gaia_wildcard, format="csv", schema=schema, infer_schema=False, header="true")

gaia = df_in.withColumn("hpix12",
                        df_in['source_id']/34359738368). \
            withColumn("zone", ((df_in['dec']+90.0)*60).cast(IntegerType()))


CPU times: user 23.9 ms, sys: 7.76 ms, total: 31.6 ms
Wall time: 58.8 s


In [16]:
%%time 

spark.conf.set("spark.sql.shuffle.partitions", "500")



gaia_duplicate_sources = gaia.where((gaia['zone'] > 0) &
                                    (((gaia['dec'] + 90) % (1/60.0)) < (5/3600.0)) ).withColumn("dup", lit(1))
gaia_duplicated = gaia_duplicate_sources.union(gaia.withColumn("dup", lit(0)))
                            
gaia_duplicated.repartition(500, "zone").write.bucketBy(500, "zone").sortBy("zone","ra").format("parquet"). \
    option("path", "file:///epyc/data/gaia_dr2_1am/").saveAsTable("gaia_dr2_1am")

#gaia.write.sortBy("ra").format("parquet").save(
#    os.path.join("/epyc/data/gaia_dr2", "gaia_dr2_test.parquet"), partitionBy="zone")

CPU times: user 3.65 s, sys: 1.08 s, total: 4.73 s
Wall time: 6h 10min 29s


In [None]:
spark.catalog.listTables()