# Apitube Ingestion

### Documentazione del Servizio Apitube
- news rest api service:  https://apitube.io/
- documentation:          https://docs.apitube.io/guides/user-guide/what-is-apitube
- response structure:     https://docs.apitube.io/platform/news-api/response-structure
- dashboard api key:      https://dashboard.apitube.io/
- cookbook:               https://apitube.io/cookbook

### Setup

##### WS Client

In [0]:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import ExternalFunctionRequestHttpMethod

# workspace client creation
w = WorkspaceClient()

##### Expected Schema

In [0]:

# Schema dettagliato della risposta APITube
response_schema = """
STRUCT<
  status: STRING,
  page: LONG,
  per_page: LONG,
  path: STRING,
  has_next_pages: BOOLEAN,
  next_page: STRING,
  has_previous_page: BOOLEAN,
  previous_page: STRING,
  export: STRUCT<
    json: STRING,
    xlsx: STRING,
    csv: STRING,
    tsv: STRING,
    xml: STRING
  >,
  request_id: STRING,
  results: ARRAY<STRUCT<
    id: LONG,
    href: STRING,
    published_at: STRING,
    title: STRING,
    description: STRING,
    body: STRING,
    language: STRING,

    author: STRUCT<
      id: LONG,
      name: STRING
    >,

    image: STRING,

    categories: ARRAY<STRUCT<
      id: STRING,
      name: STRING,
      score: DOUBLE,
      taxonomy: STRING,
      links: STRUCT<
        self: STRING
      >
    >>,

  topics: ARRAY<STRUCT<
    id: STRING,
    name: STRING,
    score: DOUBLE,
    taxonomy: STRING,
    links: STRUCT<
      self: STRING
    >
  >>,

    industries: ARRAY<STRUCT<
      id: LONG,
      name: STRING,
      links: STRUCT<
        self: STRING
      >
    >>,

    entities: ARRAY<STRUCT<
      id: LONG,
      name: STRING,
      links: STRUCT<
        self: STRING,
        wikipedia: STRING,
        wikidata: STRING
      >,
      types: ARRAY<STRING>,
      language: STRING,
      frequency: LONG,
      title: STRUCT<
        pos: ARRAY<STRUCT<
          start: LONG,
          end: LONG
        >>
      >,
      body: STRUCT<
        pos: ARRAY<STRUCT<
          start: LONG,
          end: LONG
        >>
      >
    >>,

    source: STRUCT<
      id: LONG,
      domain: STRING,
      home_page_url: STRING,
      type: STRING,
      bias: STRING,
      rankings: STRUCT<
        opr: DOUBLE
      >,
      location: STRUCT<
        country_name: STRING,
        country_code: STRING
      >,
      favicon: STRING
    >,

    sentiment: STRUCT<
      overall: STRUCT<
        score: DOUBLE,
        polarity: STRING
      >,
      title: STRUCT<
        score: DOUBLE,
        polarity: STRING
      >,
      body: STRUCT<
        score: DOUBLE,
        polarity: STRING
      >
    >,

    summary: ARRAY<STRUCT<
      sentence: STRING,
      sentiment: STRUCT<
        score: DOUBLE,
        polarity: STRING
      >
    >>,

    keywords: ARRAY<STRING>,
    links: ARRAY<STRING>,

    media: ARRAY<STRUCT<
      url: STRING,
      type: STRING,
      format: STRING
    >>,

    story: STRUCT<
      id: LONG,
      uri: STRING
    >,

    is_duplicate: BOOLEAN,
    is_paywall: BOOLEAN,
    is_breaking: BOOLEAN,

    read_time: LONG,
    sentences_count: LONG,
    paragraphs_count: LONG,
    words_count: LONG,
    characters_count: LONG
  >>
>
"""

##### Kafka Setup

In [0]:
def kafka_setup():

    # Event Hubs configuration
    EH_NAMESPACE                    = "aeh-ag83-cm-eventhub-001" # spark.conf.get("iot.ingestion.eh.namespace")
    EH_NAME                         = "eventhub-001" # spark.conf.get("iot.ingestion.eh.name")
    EH_CONN_SHARED_ACCESS_KEY_NAME  = "aar-ag83-cm-eventhub-001" # spark.conf.get("iot.ingestion.eh.accessKeyName")

    # Event Hubs configuration - key
    SECRET_SCOPE                    = "scp-ws-pipelines" # spark.conf.get("io.ingestion.eh.secretsScopeName")
    SECRET_NAME                     = "scr-eventhub-key" # spark.conf.get("io.ingestion.eh.secretName")
    EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = f"{SECRET_SCOPE}", key = f"{SECRET_NAME}")

    # Event Hubs configuration - connectionString
    EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
    # Kafka Consumer configuration

    KAFKA_OPTIONS = {
      "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
      "topic"                    : EH_NAME,
      "kafka.sasl.mechanism"     : "PLAIN",
      "kafka.security.protocol"  : "SASL_SSL",
      "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
      "kafka.request.timeout.ms" : 60000,
      "kafka.session.timeout.ms" : 30000,
      "maxOffsetsPerTrigger"     : 50000,
      "failOnDataLoss"           : 'false',
      "startingOffsets"          : 'latest'
    }
    return KAFKA_OPTIONS

### Apitube getData

##### Get daily data

In [0]:
from pyspark.sql.functions import from_json, explode, col, current_timestamp
from pyspark.sql import Row

def getdata_daily(process_date: str):
    response = w.serving_endpoints.http_request(
        conn="api-apitube-http",   
        method=ExternalFunctionRequestHttpMethod.GET,
        path="",
        params={
            # "title": "Databricks,Iveco,Luxottica,Essilor,Teamsystem,Prysmian,Milan,Rome,Roma",
            # "language.code": "it",
            # se vuoi altri filtri li aggiungi qui, es:
            # "location.name": "Milan,Rome",
            # "ignore.location.name": "New York",
            "published_at": process_date,
        },
        headers={}
    )

    raw_json = response.text  # stringa JSON APITube

    # 2. Crea un DataFrame con la risposta grezza
    df_raw = spark.createDataFrame([Row(raw_response=raw_json)])

    # aggiungi colonna timestamp corrente
    df_raw = df_raw.withColumn("getdata_timestamp", current_timestamp())

    return df_raw

##### Get weekly data

In [0]:
from pyspark.sql.functions import from_json, explode, col, current_timestamp
from pyspark.sql import Row
from datetime import datetime, timedelta

def getdata_weekly(process_date: str):
    # Calcola l'intervallo di 7 giorni
    process_week_start = process_date
    process_week_end = (datetime.strptime(process_date, "%Y-%m-%d") + timedelta(days=6)).strftime("%Y-%m-%d")

    print(f"Sto considerando la settimana con data inizio: {process_week_start} e data fine: {process_week_end}")

    response = w.serving_endpoints.http_request(
        conn="api-apitube-http",   
        method=ExternalFunctionRequestHttpMethod.GET,
        path="",
        params={
            # "title": "Databricks,Iveco,Luxottica,Essilor,EssilorLuxottica,Teamsystem,Prysmian,Milan,Rome,Roma",
            "published_at.start": process_week_start,
            "published_at.end": process_week_end,
        },
        headers={}
    )

    raw_json = response.text  # stringa JSON APITube

    # 2. Crea un DataFrame con la risposta grezza
    df_raw = spark.createDataFrame([Row(raw_response=raw_json)])

    # aggiungi colonna timestamp corrente
    df_raw = df_raw.withColumn("getdata_timestamp", current_timestamp())

    display(df_raw)
    
    return df_raw

### Apitube Enrich Data

In [0]:
def enrichData(df_raw):
    # 4. Parsing del JSON e esplosione dell'array results
    parsed = df_raw.select(
        from_json(col("raw_response"), response_schema).alias("data"),
        col("getdata_timestamp")
    )

    articles = (
        parsed
        .select(explode(col("data.results")).alias("article"), col("getdata_timestamp"))
        .select("article.*", "getdata_timestamp")
        .withColumn("enrich_timestamp", current_timestamp())
    )

    return articles

### Apitube Load Data to Delta Table

In [0]:
from pyspark.sql.functions import current_timestamp

def loadData(articles):
    # scegli catalogo e schema esistenti
    catalog_name = "das_information_technologies"         
    schema_name  = "apitube"       
    table_name   = "apitube_articles"

    full_table_name = f"{catalog_name}.{schema_name}.{table_name}"

    articles = articles.withColumn("loaddata_timestamp", current_timestamp())

    (
        articles
        .write
        .mode("append")   # o "append" se vuoi accumulare
        .format("delta")
        .saveAsTable(full_table_name)
    )

    num_records = articles.count()
    print(f"Tabella scritta: {full_table_name} - Numero record: {num_records}")

    return articles

### Kafka send records

In [0]:
import pyspark.sql.types as T
from pyspark.sql.functions import col, to_json, struct, array, struct as pyspark_struct, lit

def send_kafka(df, KAFKA_OPTIONS):
    articles_kafka = df.withColumn(
        "value",
        to_json(struct([col for col in df.columns]), {"pretty": "true"})
    ).withColumn(
        "headers",
        array(
            pyspark_struct(
                lit("content-type").alias("key"),
                lit("application/json").cast("binary").alias("value")
            )
        )
    ).selectExpr("CAST(value AS STRING) as value", "headers")

    articles_kafka.write.format("kafka").options(
        **KAFKA_OPTIONS
    ).option("includeHeaders", "true").save()

    num_records = articles_kafka.count()
    print(f"Spediti a Kafka #record: {num_records}")

### Wait Function

In [0]:
def waitNextData(min_frequency):
    # Attendi min_frequency minuti
    sleep_seconds = min_frequency * 40
    print(f"Attendo {min_frequency} minuti prima della prossima data...")
    time.sleep(sleep_seconds)

### Test Apitube ETL

##### Daily ETL

In [0]:
def test_daily(process_date):
    
    df_raw = getdata_daily(process_date)
    display(df_raw)

    articles = enrichData(df_raw)
    display(articles)

    loadData(articles)

# esecuzione test
test_daily("2025-12-16")

##### Weekly ETL

In [0]:
def test_weekly(process_date):
    
    df_raw = getdata_weekly(process_date)
    display(df_raw)

    articles = enrichData(df_raw)
    display(articles)

    loadData(articles)

# esecuzione test
test_weekly("2025-12-01")

### Test Apitube ETL + Kafka sent

In [0]:
k_opt = kafka_setup()

df_raw = getdata_daily('2025-12-16')
display(df_raw)

enriched_articles = enrichData(df_raw)    
loaded_articles = loadData(enriched_articles)    
display(loaded_articles)

send_kafka(loaded_articles, k_opt)



### Main Loop

In [0]:
from datetime import datetime, timedelta
import time

# kafka setup
k_opt = kafka_setup()

# ============================
# 1. Parametri dal Job
# ============================

# Data iniziale (yyyy-MM-dd), es: "2015-01-01"
dbutils.widgets.text("start_date", "2024-01-01", "Data iniziale (yyyy-MM-dd)")
start_date_str = dbutils.widgets.get("start_date")
start_date = datetime.strptime(start_date_str, "%Y-%m-%d").date()

# Frequenza in minuti tra una iterazione e la successiva
dbutils.widgets.text("min_frequency", "1", "Frequenza minuti")
min_freq_str = dbutils.widgets.get("min_frequency")
min_frequency = int(min_freq_str)

print(f"Data iniziale: {start_date}")
print(f"Frequenza: {min_frequency} minuti")

# ============================
# 2. Ciclo infinito sulle date
# ============================

current_date = start_date

while True:
    process_date = current_date
    process_date_str = process_date.isoformat()

    print(f"\n=== Elaboro data = {process_date_str} ===")

    df_raw = getdata_weekly(process_date_str)
    enriched_articles = enrichData(df_raw)    
    loaded_articles = loadData(enriched_articles)    
    send_kafka(loaded_articles, k_opt)
    
    print(f"Data elaborata: {process_date_str}")

    # Incrementa la data di 1 giorno per il prossimo giro
    current_date = current_date + timedelta(days=6)

    waitNextData(min_frequency)
