# Data Acquisition (ETL)

**Starting Point:**  
The article data originates from a Kafka datastream. It is not normalized (so it cannot be analyzed directly) and requires Active Directory login (making collaboration difficult).  
- [View Kafka topic data (AKHQ)](https://akhq.pdp.production.admin.srgssr.ch/ui/strimzi/topic/articles-v2/data?sort=NEWEST&partition=All)

- **Processing steps:**  
  1. Read article data from the Delta table populated from Kafka.
  2. Flatten and transform nested fields (e.g., titles, resources, contributors) using a SQL view.
  3. Create a Spark DataFrame from the flattened view and inspect the results.
  4. Write the DataFrame to a Delta table for analytics and automation.
  5. Export a <25MB Parquet sample with only public data for sharing (e.g., via GitHub).

**Goal:**  
The data should be available as a Parquet file for sharing. Since the dataset is large (5GB), only a public sample is exported for easy distribution.

**Access Control:**  
To guarantee data integrity and protect sensitive information, data distribution is based on user access rights. Entitled users can access the full confidential dataset, while restricted users are provided with only the public sample. This ensures that only authorized users can view sensitive data, maintaining compliance and data security.

In [0]:
from pyspark.sql.utils import AnalysisException

def has_read_permission(table_name):
    try:
        spark.sql(f"SELECT 1 FROM {table_name} LIMIT 1")
        return True
    except AnalysisException:
        return False

has_read_access_udp_articles_v2 = has_read_permission("udp_prd_atomic.pdp.articles_v2")

## Step-01: Read from Kafka (SQL)

The following steps read article data from the Delta table `udp_prd_atomic.pdp.articles_v2`, which is assumed to be populated from a Kafka stream. The original Kafka data contains nested lists and complex structures (e.g., for multilingual fields or arrays of resources). In the transformation, the SQL view `articles_flat` flattens this nested data by extracting relevant fields and, where multiple values exist (such as for titles in different languages), selects the first available entry—typically prioritizing German (`'de'`) or otherwise the first value. This process prepares the data, along with Kafka metadata (key, topic, partition, offset, timestamp), for further analysis in a flat, tabular format.

In [0]:
%sql
-- Only run main logic if has_read_access_udp_articles_v2 is true

-- Uncomment below to run only when has_read_access_udp_articles_v2 = true
-- IF has_read_access_udp_articles_v2 THEN
CREATE OR REPLACE TEMP VIEW articles_flat AS
WITH base AS (
  SELECT
    key,
    topic,
    partition,
    offset,
    timestamp,
    value AS v
  FROM udp_prd_atomic.pdp.articles_v2
)
SELECT
  -- flat metadata
  v.id AS id,
  v.publisher AS publisher,
  v.provenance AS provenance,
  v.modificationDate AS modificationDate,
  v.releaseDate AS releaseDate,
  -- TITLE: always first entry
  try_element_at(transform(coalesce(v.title, array()), x -> x.content), 1) AS title_auto,
  -- LEAD: always first entry
  try_element_at(transform(coalesce(v.lead, array()), x -> x.content), 1) AS lead_auto,
  -- KICKER: always first entry
  try_element_at(transform(coalesce(v.kicker, array()), x -> x.content), 1) AS kicker_auto,
  -- Extract IDs specifically (not a Map!)
  try_element_at(
    transform(
      filter(coalesce(v.identifiers, array()), x -> x.type = 'urn'   AND x.value IS NOT NULL),
      x -> x.value
    ),
    1
  ) AS id_urn,
  try_element_at(
    transform(
      filter(coalesce(v.identifiers, array()), x -> x.type = 'srgId' AND x.value IS NOT NULL),
      x -> x.value
    ),
    1
  ) AS id_srg,
  -- First available image
  aggregate(
    coalesce(v.resources, array()),
    CAST(NULL AS STRING),
    (acc, r) -> coalesce(acc, r.picture.locator.url),
    acc -> acc
  ) AS picture_url,
  -- content.text.items -> CSV (space-separated)
  CASE
    WHEN v.content IS NOT NULL
      THEN concat_ws(' ', coalesce(v.content.text, CAST(array() AS ARRAY<STRING>)))
    ELSE NULL
  END AS content_text_csv,
  -- contributors -> CSV of names
  CASE
    WHEN v.contributors IS NOT NULL
      THEN concat_ws(
        ', ',
        transform(
          coalesce(v.contributors, array()),
          c -> coalesce(
            c.name,
            c.agent.person.name,
            c.agent.team.name,
            c.agent.department.name
          )
        )
      )
    ELSE NULL
  END AS contributors_csv,
  -- only first locator.url from resources -> CSV
  try_element_at(
    filter(
      flatten(
        transform(
          coalesce(v.resources, array()),
          r -> array(
            r.document.locator.url,
            r.picture.locator.url,
            r.link.locator.url
          )
        )
      ),
      x -> x IS NOT NULL
    ),
    1
  ) AS resources_locator_urls_csv,
  -- Keywords -> CSV
  CASE
    WHEN v.keywords IS NOT NULL
      THEN concat_ws(',', coalesce(v.keywords, CAST(array() AS ARRAY<STRING>)))
    ELSE NULL
  END AS keywords_csv,
  -- Kafka metadata (key safely as STRING)
  CAST(key AS STRING) AS key,
  topic,
  partition,
  offset,
  timestamp
FROM base;
-- END IF

## Step-02: Create DataFrame and Visually Inspect Results

The code below runs a Spark SQL query against the temporary view `articles_flat`, loads the result into a Spark DataFrame named `df`, and then displays the DataFrame for visual inspection. This step materializes the flattened article data so it can be further processed or written to a Delta table.

In [0]:
if has_read_access_udp_articles_v2:
    articles_flat = spark.sql("SELECT * FROM articles_flat").orderBy("modificationDate", ascending=False)
    display(articles_flat, truncate=False)

## Step-03: Save Data to Delta Table

In the next steps, the data will be saved both to a Delta table (for better automation and analytics).

### Write to (private) Delta Table - all articles

The following code appends the transformed DataFrame `df` to the Delta table `swi_audience_prd.pdp_articles_v2.articles_v2`. It writes in **append** mode, uses the **Delta** format, and enables **schema merging** so that any new columns are automatically added to the target table without overwriting existing data.

- Contains all articles (**confidential**)

In [0]:
if has_read_access_udp_articles_v2:
    articles_flat.write.format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable("swi_audience_prd.pdp_articles_v2.articles_v2_flat")


### Write to (public) Parquet File - selected articles, manually Upload to GitHub

Export a <25 MB sample of the data with only public data as a Parquet file for easy sharing via GitHub.  
**Note:** The Parquet file must be manually downloaded from Databricks and then uploaded to your GitHub repository.

In [0]:
if has_read_access_udp_articles_v2:
    from pyspark.sql.functions import col, current_date
    import math
    import shutil
    import os

    # Estimate average row size in bytes using a sample of 100,000 rows
    sample_row = articles_flat.limit(100000).toPandas()
    avg_row_size = sample_row.memory_usage(index=True, deep=True).sum() / len(sample_row)

    # Set maximum file size to 25 MB
    max_bytes = 25 * 1024 * 1024  # 25 MB

    # Parquet is more efficient than CSV, so increase row count estimate by a factor of 2
    max_rows = int(math.floor((max_bytes / avg_row_size) * 2))

    # Filter articles to only those published (releaseDate today or earlier), then sample up to max_rows
    articles_flat_sampled = (
        articles_flat
        .filter(col("releaseDate") <= current_date())
        .limit(max_rows)
    )

    # Write the sampled DataFrame as a single Parquet file to a temporary directory
    parquet_temp_dir = "/Volumes/swi_audience_prd/pdp_articles_v2/pdp_articles_v2_volume/export_articles_v2_sample25mb_tmp_parquet"
    parquet_final_path = "/Volumes/swi_audience_prd/pdp_articles_v2/pdp_articles_v2_volume/export_articles_v2_sample25mb.parquet"

    (articles_flat_sampled.coalesce(1)
        .write
        .mode("overwrite")
        .parquet(parquet_temp_dir)
    )

    # Move the single Parquet part file to the final destination
    files = [f for f in os.listdir(parquet_temp_dir) if f.endswith(".parquet")]
    if files:
        src_file = os.path.join(parquet_temp_dir, files[0])
        shutil.move(src_file, parquet_final_path)

    # Remove the temporary directory after moving the file
    shutil.rmtree(parquet_temp_dir)

...now manually:

1. **Open the CSV file in Databricks:**
   - Navigate to [Databricks Volume Browser](https://adb-4119964566130471.11.azuredatabricks.net/explore/data/volumes/swi_audience_prd/pdp_articles_v2/pdp_articles_v2_volume?o=4119964566130471) in the Databricks workspace file browser.

2. **Download the file:**
   - Right-click on `export_articles_v2_sample25mb.parquet` and select **"Download"** to save the file to your local machine.

3. **Upload the file to GitHub:**
   - Go to [GitHub Folder](https://github.com/Tao-Pi/CAS-Applied-Data-Science/tree/main/Module-3/01_Module%20Final%20Assignment).
   - Click **"Add file"** > **"Upload files"**.
   - Drag and drop `export_articles_v2_sample25mb.parquet` or use the file picker to select it.
   - Commit the changes to upload the file.


## Step-04: Load Data Based on User Rights

The next step is to load the data, with access determined by user rights:

- **Restricted users** can load only the public data sample (e.g., the Parquet file exported for sharing).
- **Entitled users** can load the full, confidential dataset from the Delta table.

This ensures that sensitive information is only accessible to authorized users, while still allowing broader access to public data for collaboration and analysis.

In [0]:
import pandas as pd

# Restricted users 
url = "https://github.com/Tao-Pi/CAS-Applied-Data-Science/raw/main/Module-3/01_Module%20Final%20Assignment/export_articles_v2_sample25mb.parquet"
srgssr_article_corpus = pd.read_parquet(url)

# Entitled users
if has_read_access_udp_articles_v2:
    srgssr_article_corpus = spark.table("swi_audience_prd.pdp_articles_v2.articles_v2_flat")

In [0]:
if isinstance(srgssr_article_corpus, pd.DataFrame):
    srgssr_article_corpus = pd.DataFrame(srgssr_article_corpus)

# Dataset Overview

In this chapter, we provide a brief overview of the dataset used for analysis. We indicate whether the loaded dataset is the full confidential version or the public sample, report the total number of articles available, and present a first look at the articles data to understand its structure and content.


## Step 1: Check Dataset Version (Confidential vs Public)

Here we check and indicate whether the loaded dataset is the full confidential version or the public sample.

In [0]:
def format_rowcount(n):
    if n >= 1_000_000:
        return f"more than {n // 1_000_000} million"
    elif n >= 1_000:
        return f"more than {n // 1_000} thousand"
        return f"more than {n // 1_000_000} Mio."
    elif n >= 1_000:
        return f"more than {n // 1_000} Tsd."
    else:
        return f"{n}"

if has_read_access_udp_articles_v2:
    rowcount = srgssr_article_corpus.count()
    print(f"congrats: you have successfully read the full data set. This contains the full corpus of {format_rowcount(rowcount)} Articles published by SRG-SSR as plain text together with some relevant metadata. You can access the dataframe object by calling 'srgssr_article_corpus' from Python now.")
else:
    if isinstance(srgssr_article_corpus, pd.DataFrame):
        rowcount = len(srgssr_article_corpus)
    else:
        rowcount = srgssr_article_corpus.count()
    print(f"congrats: you have successfully read the publically available (sampled) data set. This contains an excerpt of {format_rowcount(rowcount)} articles within SRG-SSR as plain text together with some relevant metadata. You can access the dataframe object by calling 'srgssr_article_corpus' from Python now.")


## Step 2: Overview of the Data

In this step, we provide an overview of the data contained in the loaded dataset. This includes a summary of the available articles and a first look at their structure and content.

In [0]:
first_row = srgssr_article_corpus.head(1)[0].asDict() if srgssr_article_corpus.head(1) else {}
cols_info = [
    {
        "column": name,
        "type": str(dtype),
        "example": first_row.get(name, None)
    }
    for name, dtype in srgssr_article_corpus.dtypes
]

display(cols_info, ["column", "type", "example"])

## Step 3: A Closer Look

In this step, we take a deeper look at the loaded dataset, exploring its structure and content in more detail.

In [0]:
display(srgssr_article_corpus)