In [0]:
%run ../../../utils/commom_libs

In [0]:
%run ../../../utils/yaml_manager

In [0]:
%run ../../../connector/databricks_api

In [0]:
class WorkflowScheduleControl:

    def __init__(self, country:str):
        self.country = country.lower()
        self.logger_scan = LoggerScan()
        self.yaml_manager = YamlManager(project_name="EJIT-01")
        self.adb_conn = DatabricksApi(scope="DataEngTeam", base_url="adb-url", secret_key= "adb-token-api")
        self.parameter_sent = {}

    @property
    def generate_schedule_schema(self):
        return T.StructType([
            T.StructField('country', T.StringType(), True),
            T.StructField('partner_id', T.StringType(), True),
            T.StructField('partner_name', T.StringType(), True),
            T.StructField('pipeline', T.StringType(), True),
            T.StructField('is_enable_pipeline', T.BooleanType(), True),
            T.StructField('pipeline_schedule', T.ArrayType(T.StringType(), True), True),
            T.StructField('job_id', T.StringType(), True),
            T.StructField('margin_time', T.LongType(), True),
            T.StructField('target_date', T.DateType(), True),
            T.StructField('load_ts', T.TimestampType(), True)
        ])


    @property
    def generate_schema_log_table(self):
        return T.StructType([
            T.StructField('country', T.StringType(), True),
            T.StructField('partner_id', T.StringType(), True),
            T.StructField('partner_name', T.StringType(), True),
            T.StructField('pipeline_name', T.StringType(), True),
            T.StructField('log_date', T.DateType(), True),
            T.StructField('idempotency_token', T.StringType(), True),
            T.StructField('job_id', T.StringType(), True),
            T.StructField('run_id', T.LongType(), True),
            T.StructField('schedule_yaml_time', T.StringType(), True),
            T.StructField('execution_at', T.StringType(), True),
            T.StructField('job_parameters', T.MapType(T.StringType(), T.StringType(), True), True)
        ])
      

    @property
    def get_yaml_files(self) -> List[Dict]:
        yaml_files = []
        job_to_run_list = []
        all_yaml_files = self.yaml_manager.get_all_files

        for yaml_file_path in all_yaml_files:            
            file_name = yaml_file_path.split(os.sep)[9].removesuffix(".yaml")
            if self.country in file_name:
                yaml_files.append(file_name)

        for path_yaml_file in yaml_files:
            data = self.yaml_manager.load_yaml_file(path_yaml_file)
            for partner in (data.keys()):
                for pipeline in data[partner]["schedule"]["pipelines"].keys():
                    job_to_run_list.append({
                        "country": (path_yaml_file.split("_")[0]),
                        "partner_name":data[partner]["name"],
                        "partner_id": partner,
                        "pipeline": pipeline,
                        "margin_time": data[partner]["schedule"]["margin_time"],
                        "is_enable_pipeline": data[partner]["schedule"]["pipelines"][pipeline]["enable"],
                        "job_id":data[partner]["schedule"]["pipelines"][pipeline]["job_id"],
                        "pipeline_schedule": data[partner]["schedule"]["pipelines"][pipeline]["trigger_at"],
                        "load_ts": dt.datetime.now(),
                        "target_date": dt.date.today()
                    })
        return  job_to_run_list

    @property
    def get_schedule_pipeline(self) -> DataFrame:
        yaml_dict = self.get_yaml_files
        schema_yaml_schedule = self.generate_schedule_schema

        config_df = spark.createDataFrame(yaml_dict, schema_yaml_schedule) \
            .withColumn("pipeline_schedule", F.explode(F.col("pipeline_schedule")))
            
        try:
            self.margin_time = config_df.select("margin_time").collect()[0]["margin_time"]
        except Exception as e:
            self.margin_time = 0

        return config_df \
            .withColumn("past_utc_date", F.lit(dt.datetime.utcnow() - dt.timedelta(minutes= self.margin_time))) \
            .withColumn("current_utc_date", F.lit(dt.datetime.utcnow())) \
            .withColumn("future_utc_date", F.lit(dt.datetime.utcnow() + dt.timedelta(minutes= self.margin_time))) \
            .withColumn("is_prepared_to_execute", F.date_format(F.col("pipeline_schedule"), "HH:mm").between(F.col("past_utc_date"), F.col("future_utc_date")))


    def start_adb_job(self, parameter_payload: dict):
        try:
            self.logger_scan.info(f"START Databricks API JOB .....")
            response_api = self.adb_conn.run_job(parameter_payload)
            self.parameter_sent.update(response_api)
            get_run_id = response_api["run_id"]
            self.logger_scan.info(f"FINISHED Databricks API JOB with run_id {get_run_id}....")
        except Exception as e:
            self.logger_scan.warning(f"FAILED Databricks API JOB - {e}.")


    def trigger_job_schedule(self):
        log_parameter_list = []
        self.logger_scan.info(f"GENERATE SCHEMA - on Yaml Files for a Temporary DF")

        schedule_info_df = self.get_schedule_pipeline \
            .filter(F.col("is_enable_pipeline") == True) \
            .filter(F.col("is_prepared_to_execute") == True)


        while True:
            if schedule_info_df.isEmpty():
                self.logger_scan.info(f"NO SCHEDULE - for RUN based on schedule from yaml check if it is enabled")
                break

            self.logger_scan.info(f"WE HAVE {schedule_info_df.count()} schedules prepared to Run in a interval of the margin_time: {self.margin_time} minutes")

            self.logger_scan.info(f"GET PARAMETERS -  based on schedule from yaml")
            for row in schedule_info_df.collect():
                parameters_data_dict = {
                    "job_id": row['job_id'],
                    "job_parameters": {
                        "country" : row['country'],
                        "partner_name" : row['partner_name'],
                        "pipeline_name": row['pipeline'],
                        "environment": "uat"
                    }
                }
                self.logger_scan.info(f"START PROCESS - for RUN job_id: {row['job_id']} based on schedule from yaml")
                self.start_adb_job(parameters_data_dict)

                schedule_parameters = {
                    "country": row['country'],
                    "partner_id": row['partner_id'],
                    "partner_name": row['partner_name'],
                    "pipeline_name": row['pipeline'],
                    "log_date" : dt.date.today() ,
                    "execution_at" : dt.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'),
                    "schedule_yaml_time" : row['pipeline_schedule']
                }
                combined_parameters = {**schedule_parameters, **parameters_data_dict, **self.parameter_sent}
                log_parameter_list.append(combined_parameters)
                self.logger_scan.info(f"""FINISHED PROCESS - pipeline: {row['pipeline']}, job_id: {row['job_id']}, Schedule_Time: {row['pipeline_schedule']}, country: {row['country']} and partner: {row['partner_name']}
                """)
                
            break
        try:
            self.logger_scan.info(f"START SAVE LOG ON DELTA TABLE")
            spark.createDataFrame(log_parameter_list, self.generate_schema_log_table)\
                .write\
                .format("delta") \
                .mode("append") \
                .option("mergeSchema", "true") \
                .option("overwriteSchema", "true") \
                .saveAsTable("eng_team_log__workflow_job") #add table from UnityCatalog 
            self.logger_scan.info(f"FINISHED SAVE LOG ON eng_team.log__workflow_job successfully.")
        except Exception as e:
            self.logger_scan.warning(f"FAILED SAVE LOG eng_team.log__workflow_job - {e}.")