In [None]:
import sys, os
from pyspark.sql.functions import col,lit, current_timestamp,concat,desc
from centreon_apirest_transformations import get_token
import requests
import json 
import datetime

In [None]:
##parametros de JOB no Databricks
systemSource = dbutils.widgets.get("systemSource")
tableSource = dbutils.widgets.get("tableSource")
isIncremental = dbutils.widgets.get("isIncremental")
isSensitive = dbutils.widgets.get("isSensitive")
isDev = dbutils.widgets.get("isDev")
trigger = dbutils.widgets.get("trigger")
schemaName = dbutils.widgets.get("schemaName")

In [None]:

assert None not in [systemSource,tableSource,isDev,isSensitive,isIncremental,schemaName], "None is not a valid input"
systemSource = str(systemSource).strip().lower()
tableSource = str(tableSource).strip()
isIncremental = str(isIncremental).strip().capitalize()
isSensitive = str(isSensitive).strip().capitalize()
isDev = str(isDev).strip().capitalize()
trigger = str(trigger).strip().lower()
schemaName = str(schemaName).strip().lower()
assert bool(systemSource), f"systemSource: '' is not a valid input"
assert bool(tableSource), f"tableSource:  '' is not a valid input"
assert bool(trigger), f"trigger:  '' is not a valid input"
assert bool(schemaName), f"schemaName:  '' is not a valid input"
assert isDev != "" and (isDev in ("True","False","0","1")), f"isDev: {isDev} is not a valid input, expected True or False"
assert isIncremental != "" and (isIncremental in ("True","False","0","1")), f"isIncremental: {isIncremental} is not a valid input, expected True or False"
assert isSensitive != "" and (isSensitive in ("True","False","0","1")), f"isSensitive: {isSensitive} is not a valid input, expected True or False"
isDev = bool(eval(isDev))
isIncremental = bool(eval(isIncremental))
isSensitive = bool(eval(isSensitive))
if not isDev:
    environment = "prod"
    control_sources = "landing.control.sources"
    env_path = ""
    control_incremental = "landing.control.incremental"
    schema_adb = 'staging'
    token_table = "landing.control.token_table"
if isDev:
    environment = "dev"
    control_sources = "landing.dev.sources"
    env_path = "/dev"  
    control_incremental = "landing.dev.incremental"
    schema_adb = 'dev'
    token_table = "landing.dev.token_table"
par_sources = spark.sql(f"""
                   SELECT id, id_col, incr_col, incr_col_type, incr_size
                   FROM {control_sources}
                   WHERE 
                   systemSource  = '{systemSource}' AND
                   tableSource   = '{tableSource}'  AND
                   isIncremental = {isIncremental}  AND
                   isSensitive   = {isSensitive}    AND
                   isDev         = {isDev}          AND
                   trigger       = '{trigger}'      AND
                   schemaName    = '{schemaName}'
                   LIMIT 1
                   """)

assert par_sources.count() == 1, f"No Table Found in {control_sources} Based on the Chosen Criteria"
source_id, id_col, incr_col, incr_col_type, incr_size = par_sources.collect()[0]
incremental = 'incremental' if isIncremental else 'snapshot'
sensitive = 'sensitive' if isSensitive else 'general'
merge_cols = eval(id_col)
id_col = merge_cols[0]
now = datetime.datetime.now()
year_created, month_created, day_created, hour_created  =(now.year,now.month,now.day,now.hour)
path_raw = f"abfss://raw@storageaccount.dfs.core.windows.net{env_path}/{sensitive}/{systemSource}/{incremental}/{schemaName}/{tableSource}/{year_created}/{month_created}/{day_created}/{hour_created}/"
endpoint = tableSource
api_url = "***********************"

In [None]:
if not spark.catalog.tableExists(token_table):
    spark.sql(f"""
            CREATE OR REPLACE TABLE {token_table}
            (
            id BIGINT GENERATED ALWAYS AS IDENTITY,
            token STRING,
            date_generation TIMESTAMP DEFAULT CURRENT_TIMESTAMP()
            )
            CLUSTER BY (date_generation)
            TBLPROPERTIES(
            'delta.feature.allowColumnDefaults' = 'supported',
            'delta.autoOptimize.optimizeWrite' = 'true',
            'delta.autoOptimize.autoCompact' = 'true',
            'deltaTable.deletedFileRetentionDuration' = 'interval 60 days',
            'delta.timeUntilArchived' = '730 days',
            'delta.enableDeletionVectors' = 'true'
            )
              """)

In [None]:
try:
    token = (spark.table(token_table)
                .orderBy(desc("date_generation"))
                .select("token")).first()[0]
except:
    token = 'init'

In [None]:
url = api_url + endpoint
headers = {
    'Accept': 'application/json',
    'X-AUTH-TOKEN': token
}

params = {
    'show_service': 'true',
    'limit': 100000,
    'page': 1,
    'search': '{"host.name":{"$lk":"%CLIENTE%"}}'
}

request_data = []

while True:
    response = requests.get(url, params=params, headers=headers)

    if response.status_code == 401:
        get_token(environment,token_table)

        token = (spark.table(token_table)
            .orderBy(desc("date_generation"))
            .select("token")).first()[0]
        
        headers['X-AUTH-TOKEN'] = token
        continue 
    
    response.raise_for_status()
    
    if response.status_code == 200:
        data = response.json()['result']
        if not data:
            break  
        request_data.extend(data)
        params['page'] += 1  

    else:
        print("Request failed:", response.text)
        break

In [None]:
json_df = sc.parallelize(request_data).map(lambda x: json.dumps(x))
df = spark.read.json(json_df)
df = df.dropDuplicates()

In [None]:
df.write.format("json").mode("overwrite").option("overwriteSchema", "true").save(path_raw)