In [1]:
from fink_filters.classification import extract_fink_classification
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import ArrayType, DoubleType
import pandas as pd
from pyspark.sql import Window
import pyspark.sql.functions as F

import time as t

from dateutil import rrule
from datetime import datetime, timedelta, date
import numpy as np


from pyspark.sql.functions import col, create_map, lit
from itertools import chain

from fink_fat.orbit_fitting.orbfit_cluster import orbit_wrapper


import exploring_script as es



In [2]:
def get_ast_traj(df):
    class_df = df.withColumn(
        "class",
        extract_fink_classification(
            df["cdsxmatch"],
            df["roid"], 
            df["mulens"],
            df["snn_snia_vs_nonia"], 
            df["snn_sn_vs_all"], 
            df["rf_snia_vs_nonia"],
            df["candidate.ndethist"], 
            df["candidate.drb"], 
            df["candidate.classtar"], 
            df["candidate.jd"], 
            df["candidate.jdstarthist"], 
            df["rf_kn_vs_nonkn"], 
            df["tracklet"]
        )
    )
    
    sso_class = class_df.filter(class_df["class"] == "Solar System MPC")
    w = Window.partitionBy('candidate.ssnamenr')
    sso_class = sso_class.select(
        sso_class["objectId"],
        sso_class["candidate.candid"],
        sso_class["candidate.ra"],
        sso_class["candidate.dec"],
        sso_class["candidate.jd"],
        sso_class["candidate.nid"],
        sso_class["candidate.fid"],
        sso_class["candidate.ssnamenr"],
        sso_class["candidate.ssdistnr"],
        sso_class["candidate.magpsf"],
        sso_class["candidate.sigmapsf"],
        sso_class["candidate.magnr"],
        sso_class["candidate.sigmagnr"],
        sso_class["candidate.magzpsci"],
        sso_class["candidate.isdiffpos"],
        F.count('candidate.ssnamenr').over(w).alias('nb_detection')
    )
    return sso_class

In [3]:
df = spark.read.format("parquet").load("/user/julien.peloton/archive/science/year=2022/month=11")

In [4]:
confirmed_sso = get_ast_traj(df).toPandas()

In [5]:
def is_in_tw(x, tw_traj):
    jd = x["jd"]
    
    diff_jd = np.diff(jd)
    
    return np.all(diff_jd < tw_traj)

def get_traj_limit(df, tw, limit):
    list_sso_gb = df.sort_values("jd").groupby("ssnamenr").agg(list)
    
    list_limit_sso = list_sso_gb[list_sso_gb.apply(lambda x: is_in_tw(x, tw), axis=1)]
    
    for df_col in list_limit_sso.columns:
        list_limit_sso[df_col] = list_limit_sso[df_col].apply(lambda x: x[:limit])
    
    list_limit_sso = list_limit_sso.reset_index()
    all_ssnamenr = {
        sso_name:id_traj 
        for sso_name, id_traj in zip(list_limit_sso["ssnamenr"], np.arange(len(list_limit_sso)))
    }
    list_limit_sso["trajectory_id"] = list_limit_sso["ssnamenr"].map(all_ssnamenr)
    
    return list_limit_sso.set_index(["ssnamenr", "trajectory_id"]).apply(pd.Series.explode).reset_index()

In [6]:
TW = 15

nb_traj_limit = 10000

In [7]:
def compute_orbit(nb_point):
    confirmed_point_limit = confirmed_sso[confirmed_sso["nb_detection"] >= nb_point]
    prep_to_orbit = get_traj_limit(confirmed_sso, TW, nb_point)
    
    prep_to_orbit.to_parquet("res_orbit_nb_point/{}_point_traj.parquet".format(nb_point))
    
    # transform the local pandas dataframe into a spark dataframe
    sparkDF = spark.createDataFrame(prep_to_orbit)

    spark_gb = (
        sparkDF.groupby("trajectory_id")
        .agg(
            F.sort_array(
                F.collect_list(F.struct("jd", "ra", "dec", "fid", "magpsf"))
            ).alias("collected_list")
        )
        .withColumn("ra", F.col("collected_list.ra"))
        .withColumn("dec", F.col("collected_list.dec"))
        .withColumn("fid", F.col("collected_list.fid"))
        .withColumn("magpsf", F.col("collected_list.magpsf"))
        .withColumn("jd", F.col("collected_list.jd"))
        .drop("collected_list")
    )
    
    spark_gb = spark_gb.limit(nb_traj_limit)

    max_core = int(dict(spark.sparkContext.getConf().getAll())["spark.cores.max"])
    spark_gb = spark_gb.repartition(1 if nb_traj_limit // max_core == 0 else nb_traj_limit // max_core)

    spark_column = spark_gb.withColumn(
        "orbital_elements",
        orbit_wrapper(
            spark_gb.ra,
            spark_gb.dec,
            spark_gb.magpsf,
            spark_gb.fid,
            spark_gb.jd,
            spark_gb.trajectory_id,
            "/tmp/ramdisk/roman",
            30,
            20,
            None,
            verbose=3,
        ),
    )

    res_orbit = spark_column.toPandas()

    orbital_columns = [
        "ref_epoch",
        "a",
        "e",
        "i",
        "long. node",
        "arg. peric",
        "mean anomaly",
        "rms_a",
        "rms_e",
        "rms_i",
        "rms_long. node",
        "rms_arg. peric",
        "rms_mean anomaly",
        "chi_reduced",
    ]

    split_df = pd.DataFrame(
        res_orbit["orbital_elements"].tolist(), columns=orbital_columns
    )
    orbit_results = pd.concat([res_orbit["trajectory_id"], split_df], axis=1)
    orbit_results.to_parquet("res_orbit_nb_point/{}_point_orbit.parquet".format(nb_point))

In [8]:
for i in np.arange(3, 11):
    t_before = t.time()
    compute_orbit(i)
    print("{} point / elapsed time: {}".format(i, t.time() - t_before))

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  


3 point / elapsed time: 389.13769006729126
4 point / elapsed time: 391.4380407333374
5 point / elapsed time: 399.9971191883087
6 point / elapsed time: 407.40319657325745
7 point / elapsed time: 412.00954723358154
8 point / elapsed time: 411.9934003353119
9 point / elapsed time: 414.9642653465271
10 point / elapsed time: 416.444669008255
