# Lezione Completa: Costruzione di una Pipeline Dati per Video YouTube

**Obiettivo del Notebook:** In questa sessione, costruiremo passo dopo passo una pipeline di dati completa per raccogliere, elaborare, validare e arricchire informazioni provenienti da video di YouTube. Impareremo a interagire con API esterne, gestire dati testuali, effettuare pulizia e trasformazioni, e infine generare *embedding* semantici dal testo. Questo notebook è pensato per essere una guida didattica completa per data scientist e data engineer.

**Cosa imparerai:**
* Comprendere la struttura di un progetto di data pipeline.
* Interagire con l'API di YouTube Data v3 per recuperare metadati dei video.
* Utilizzare `youtube_transcript_api` per scaricare le trascrizioni dei video.
* Eseguire l'ispezione e la validazione dei dati raccolti per identificare problemi di qualità.
* Effettuare operazioni di pulizia e pre-elaborazione su dati testuali (titoli e trascrizioni) basate sui risultati della validazione.
* Convertire e standardizzare i tipi di dati utilizzando la libreria Polars.
* Generare *embedding* testuali con `sentence-transformers` per catturare il significato semantico del testo.
* Strutturare una pipeline di dati modulare e riutilizzabile in Python.
* Salvare i dati elaborati in formati efficienti come Parquet.
* Comprendere come automatizzare tale pipeline utilizzando GitHub Actions.

Questo notebook è pensato per essere interattivo e auto-eseguibile. Sentiti libero di modificare il codice, sperimentare con i parametri e osservare i risultati.

## 0. Prerequisiti e Istruzioni per l'Esecuzione

Prima di iniziare, assicurati di aver configurato correttamente il tuo ambiente.

### 0.1. Ambiente Virtuale (Consigliato)
Per mantenere le dipendenze del progetto isolate, è buona pratica utilizzare un ambiente virtuale (ad es. con `venv` o `conda`).
```bash
# Esempio con venv
python -m venv .venv
source .venv/bin/activate  # Su macOS/Linux
# .venv\Scripts\activate    # Su Windows (prompt dei comandi)
# .venv\Scripts\Activate.ps1 # Su Windows (PowerShell)
```

### 0.2. Installazione delle Librerie Python
Le seguenti librerie sono necessarie. Puoi installarle eseguendo il comando sottostante nel terminale del tuo ambiente virtuale, o direttamente in una cella di codice del notebook anteponendo `!pip install ...`. È consigliabile utilizzare un file `requirements.txt` per gestire le dipendenze in progetti reali.

Un esempio di file `requirements.txt` potrebbe contenere:
```
requests
polars
youtube-transcript-api
sentence-transformers
pandas
jupyterlab
matplotlib
accelerate
```
E poi installarle con:
```bash
pip install -r requirements.txt
```
Oppure, per una installazione diretta:
```bash
pip install requests polars youtube-transcript-api sentence-transformers pandas jupyterlab matplotlib accelerate
```

### 0.3. Chiave API di YouTube
Per recuperare i metadati dei video da YouTube (Passo 2.1 della pipeline), è **necessaria** una chiave API di YouTube Data v3.
1.  Ottieni una chiave API dalla [Google Cloud Console](https://console.cloud.google.com/apis/credentials).
2.  Nel tuo progetto Google Cloud, abilita l'API "YouTube Data API v3".
3.  **Importante per la Sicurezza:** Imposta la tua chiave API come variabile d'ambiente chiamata `YT_API_KEY`. **Non inserire mai la chiave API direttamente nel codice o committarla in un repository Git.**
    * **Linux/macOS (terminale):** `export YT_API_KEY='LA_TUA_CHIAVE_API'`
    * **Windows (PowerShell):** `$env:YT_API_KEY='LA_TUA_CHIAVE_API'`
    * **Windows (Prompt dei comandi):** `set YT_API_KEY=LA_TUA_CHIAVE_API`
    *(Per rendere l'impostazione persistente, aggiungi questi comandi al file di configurazione della tua shell, come `.bashrc`, `.zshrc`, o usa strumenti come `python-dotenv` in un ambiente di sviluppo locale, assicurandoti che il file `.env` non sia tracciato da Git)*.

    **Nota Importante:** Se la variabile d'ambiente `YT_API_KEY` non è configurata correttamente, la pipeline utilizzerà dati segnaposto per il recupero degli ID video, permettendo comunque di eseguire i passaggi successivi con dati fittizi. Questo è utile per testare il flusso della pipeline senza chiamate API reali.

### 0.4. Esecuzione del Notebook
1.  Salva questo file (ad es. `Lezione_Completa_Pipeline_YouTube.ipynb`) in una cartella di progetto.
2.  Assicurati che le dipendenze siano installate e la chiave API (se desideri dati reali) sia configurata.
3.  Avvia JupyterLab o Jupyter Notebook: `jupyter lab` o `jupyter notebook`.
4.  Apri il notebook e esegui le celle in sequenza (solitamente con `Shift + Enter`).

In [3]:
!pip install requests polars youtube-transcript-api sentence-transformers pandas jupyterlab matplotlib accelerate

Collecting youtube-transcript-api
  Downloading youtube_transcript_api-1.0.3-py3-none-any.whl.metadata (23 kB)
Collecting jupyterlab
  Downloading jupyterlab-4.4.3-py3-none-any.whl.metadata (16 kB)
Collecting async-lru>=1.0.0 (from jupyterlab)
  Downloading async_lru-2.0.5-py3-none-any.whl.metadata (4.5 kB)
Collecting jupyter-lsp>=2.0.0 (from jupyterlab)
  Downloading jupyter_lsp-2.2.5-py3-none-any.whl.metadata (1.8 kB)
Collecting jupyter-server<3,>=2.4.0 (from jupyterlab)
  Downloading jupyter_server-2.16.0-py3-none-any.whl.metadata (8.5 kB)
Collecting jupyterlab-server<3,>=2.27.1 (from jupyterlab)
  Downloading jupyterlab_server-2.27.3-py3-none-any.whl.metadata (5.9 kB)
Collecting jupyter-client>=6.1.12 (from ipykernel>=6.5.0->jupyterlab)
  Downloading jupyter_client-8.6.3-py3-none-any.whl.metadata (8.3 kB)
Collecting jupyter-events>=0.11.0 (from jupyter-server<3,>=2.4.0->jupyterlab)
  Downloading jupyter_events-0.12.0-py3-none-any.whl.metadata (5.8 kB)
Collecting jupyter-server-term

In [9]:
import os
from getpass import getpass

# Configurazione della YouTube API Key
youtube_api_key = os.environ.get("YOUTUBE_API_KEY")
if not youtube_api_key:
    youtube_api_key = getpass("Inserisci la tua YouTube API Key: ")
    os.environ["YOUTUBE_API_KEY"] = youtube_api_key  # La imposta per la sessione corrente
else:
    print("YouTube API Key letta correttamente dalla variabile d'ambiente.")

# (Opzionale) Configurazione per OpenAI API Key
# openai_api_key = os.environ.get("OPENAI_API_KEY")
# if not openai_api_key:
#     openai_api_key = getpass("Inserisci la tua OpenAI API Key: ")
#     os.environ["OPENAI_API_KEY"] = openai_api_key
# else:
#     print("OpenAI API Key letta correttamente dalla variabile d'ambiente.")

print("Configurazione YouTube API Key completata.")

YouTube API Key letta correttamente dalla variabile d'ambiente.
Configurazione YouTube API Key completata.


In [10]:
import os

print("Valore di YOUTUBE_API_KEY in os.environ:", os.environ.get("YOUTUBE_API_KEY"))

Valore di YOUTUBE_API_KEY in os.environ: AIzaSyAXPOzEm_T-WDf5EAkt0lKW1Va9TJDio4Y


In [12]:
# *** Copia lo stesso valore anche in YT_API_KEY, che è quello che getVideoIDs si aspetta ***
os.environ["YT_API_KEY"] = os.environ["YOUTUBE_API_KEY"]

print("Configurazione YouTube API Key completata.")

Configurazione YouTube API Key completata.


## 1. Struttura del Progetto e dei Dati

Prima di addentrarci nel codice, è utile capire come un progetto di data pipeline potrebbe essere strutturato e come i dati fluiscono attraverso di esso. In uno scenario reale, separeremmo logica, configurazioni e dati.

### 1.1. Organizzazione dei File (Esempio)

In un progetto più strutturato, potremmo avere una cartella principale (es. `youtube_data_pipeline_project`) con una struttura simile a questa:

```
youtube_data_pipeline_project/
|-- notebooks/                    # Contiene i Jupyter Notebooks come questo
|   |-- Lezione_Completa_Pipeline_YouTube.ipynb
|-- src/                          # Codice sorgente della pipeline
|   |-- functions.py              # Modulo con le funzioni riutilizzabili della pipeline
|   |-- data_pipeline.py          # Script principale per eseguire l'intera pipeline (es. da riga di comando)
|-- data/                         # Dati grezzi, intermedi e finali (spesso ignorata da Git se i dati sono grandi)
|   |-- video-ids.parquet
|   |-- video-transcripts.parquet
|   |-- video-transcripts-transformed.parquet
|   |-- video-index.parquet
|-- requirements.txt              # Dipendenze Python del progetto
|-- .env                          # (Opzionale, per sviluppo locale) Variabili d'ambiente come YT_API_KEY (DA IGNORARE IN GIT!)
|-- .gitignore                    # File per specificare cosa Git deve ignorare (es. .env, data/, __pycache__/)
|-- README.md                     # Descrizione del progetto
|-- .github/workflows/            # (Per l'automazione) File YAML per GitHub Actions
|   |-- youtube_data_pipeline.yml
```

**Spiegazione:**
* **`notebooks/`**: Qui risiedono i notebook per l'analisi esplorativa, la prototipazione e la documentazione interattiva come questa lezione.
* **`src/`**: Contiene il codice Python modulare.
    * `functions.py`: Definisce le funzioni specifiche per ogni passaggio della pipeline (estrazione dati, trasformazione, ecc.). Questo promuove la riutilizzabilità e la testabilità del codice. Nel nostro notebook, incorporeremo queste funzioni direttamente per chiarezza didattica, ma in un progetto reale sarebbero in questo file separato.
    * `data_pipeline.py`: Uno script che importa le funzioni da `functions.py` e le orchestra per eseguire l'intera pipeline dall'inizio alla fine. È utile per l'esecuzione automatizzata.
* **`data/`**: Questa cartella è cruciale. È dove vengono salvati tutti i dati generati dalla pipeline. Idealmente, ogni passaggio intermedio salva il suo output qui, tipicamente in formati efficienti come Parquet. Per questo notebook, creeremo una cartella `data` nella stessa directory del notebook.
* **`requirements.txt`**: Elenca tutte le librerie Python necessarie per eseguire il progetto, facilitando la creazione di ambienti riproducibili.
* **`.env`**: (Opzionale, per sviluppo locale) Un file per memorizzare variabili d'ambiente come la chiave API di YouTube. **Questo file non deve MAI essere committato su Git.**
* **`.gitignore`**: Specifica a Git quali file e cartelle ignorare (es. `.venv/`, `__pycache__/`, `data/` se i dati sono troppo grandi o sensibili, `*.env`).
* **`README.md`**: Fornisce una descrizione del progetto, come configurarlo ed eseguirlo.
* **`.github/workflows/`**: Contiene i file di configurazione YAML per GitHub Actions, usati per automatizzare l'esecuzione della pipeline (vedi Sezione 9).

Per questa lezione, semplificheremo leggermente la struttura: le funzioni saranno definite direttamente nel notebook e tutti i file di output verranno salvati in una cartella `data/` creata nella stessa directory di questo notebook.

### 1.2. Flusso dei Dati

La pipeline che costruiremo seguirà questo flusso di dati:
1.  **ID Video e Metadati**: Interrogazione dell'API di YouTube -> `video-ids.parquet`
2.  **Trascrizioni**: Lettura da `video-ids.parquet`, interrogazione API trascrizioni -> `video-transcripts.parquet`
3.  **Validazione e Trasformazione**: Lettura da `video-transcripts.parquet`, pulizia e standardizzazione -> `video-transcripts-transformed.parquet`
4.  **Embedding**: Lettura da `video-transcripts-transformed.parquet`, generazione embedding -> `video-index.parquet` (output finale)

L'uso del formato **Parquet** è preferibile per i dati tabellari intermedi e finali perché è un formato colonnare efficiente in termini di spazio e velocità di lettura/scrittura, specialmente con librerie come Polars e Pandas.

## 2. Setup Iniziale: Importazioni e Configurazioni

Iniziamo importando tutte le librerie Python che utilizzeremo nel corso del notebook e configurando alcuni parametri di base, come la creazione della directory di output per i nostri dati.

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
import requests
import json
import polars as pl # Libreria per data manipulation ad alte prestazioni
from youtube_transcript_api import YouTubeTranscriptApi # Per scaricare le trascrizioni
from sentence_transformers import SentenceTransformer # Per generare embedding testuali
import os # Per interazioni con il sistema operativo (es. gestione path)
import time # Per misurare i tempi di esecuzione
import datetime # Per gestire date e orari

# --- Configurazioni Globali ---
DATA_DIR = '/content/drive/MyDrive/SIAE/Week 1 - Foundations & Intro GenAI/Day 1 - Welcome & Setup/Esercizi/Soluzioni/6 - Github Actions - DE + Pipeline/data' # Nome della directory dove salveremo i dati
YT_CHANNEL_ID = 'UCa9gErQ9AE5jT2DZLjXBIdA' # ID del canale YouTube di Shaw Talebi (esempio)

# Creazione della directory 'data' se non esiste già
# Qui salveremo i file intermedi e finali della nostra pipeline.
if not os.path.exists(DATA_DIR):
    os.makedirs(DATA_DIR)
    print(f"Directory '{DATA_DIR}' creata con successo.")
else:
    print(f"Directory '{DATA_DIR}' già esistente.")

Directory '/content/drive/MyDrive/SIAE/Week 1 - Foundations & Intro GenAI/Day 1 - Welcome & Setup/Esercizi/Soluzioni/6 - Github Actions - DE + Pipeline/data' già esistente.


## 3. Definizione delle Funzioni della Pipeline

Per rendere il nostro codice modulare, riutilizzabile e più facile da comprendere, definiamo una serie di funzioni, ognuna responsabile di un compito specifico all'interno della pipeline. In un progetto reale, queste funzioni risiederebbero in un file separato (es. `src/functions.py`).

Ogni funzione è documentata per spiegare il suo scopo, i parametri e cosa restituisce.

In [5]:
# --- Funzioni per l'Interazione con l'API di YouTube ---
def getVideoRecords(response: requests.models.Response) -> list:
    """
    Estrae i record video significativi dalla risposta JSON dell'API di Youtube.
    Per ogni video, recupera ID, data di pubblicazione e titolo.

    Args:
        response: L'oggetto Response da una chiamata requests.get all'API di YouTube.

    Returns:
        Una lista di dizionari, dove ogni dizionario rappresenta un video.
    """
    video_record_list = []
    for raw_item in json.loads(response.text)['items']:
        # Processa solo item di tipo 'youtube#video'
        if raw_item['id']['kind'] != "youtube#video":
            continue
        video_record = {}
        video_record['video_id'] = raw_item['id']['videoId']
        video_record['datetime'] = raw_item['snippet']['publishedAt']
        video_record['title'] = raw_item['snippet']['title']
        video_record_list.append(video_record)
    return video_record_list

def getVideoIDs(channel_id: str, output_path: str = os.path.join(DATA_DIR, 'video-ids.parquet')):
    """
    Recupera tutti gli ID video, le date di pubblicazione e i titoli per un dato canale YouTube
    e salva i risultati in un file Parquet.
    Gestisce la paginazione per recuperare tutti i video.
    Se la chiave API non è fornita, crea un file Parquet segnaposto.

    Args:
        channel_id: L'ID del canale YouTube da cui estrarre i video.
        output_path: Il percorso del file Parquet dove salvare i dati.
    """
    page_token = None
    url = 'https://www.googleapis.com/youtube/v3/search'
    my_key = os.getenv('YT_API_KEY') # Recupera la chiave API dalla variabile d'ambiente

    if not my_key:
        print("ATTENZIONE: Chiave API di YouTube (YT_API_KEY) non trovata nelle variabili d'ambiente.")
        print(f"Verranno creati dati segnaposto in '{output_path}'.")
        # Dati segnaposto se la chiave API non è disponibile
        placeholder_df = pl.DataFrame({
            'video_id': ['esempio_id_1', 'esempio_id_2', 'esempio_id_3'],
            'datetime': ['2023-01-01T00:00:00Z', '2023-01-15T10:00:00Z', '2023-02-01T12:30:00Z'],
            'title': ['Video Esempio 1 (Segnaposto API)', 'Altro Video Interessante (Segnaposto API)', 'Tutorial Fantastico (Segnaposto API)']
        })
        placeholder_df.write_parquet(output_path)
        print(f"File segnaposto creato: {output_path}")
        return

    video_record_list = []
    print(f"Recupero ID video per il canale: {channel_id}...")
    try:
        while page_token != 0: # Il token 0 indica che non ci sono più pagine
            params = {
                "key": my_key,
                'channelId': channel_id,
                'part': ["snippet","id"],
                'order': "date",
                'maxResults': 50, # Massimo consentito per pagina
                'pageToken': page_token
            }
            response = requests.get(url, params=params)
            response.raise_for_status() # Solleva un errore per status HTTP cattivi (4xx o 5xx)

            data = json.loads(response.text)
            video_record_list.extend(getVideoRecords(response))

            page_token = data.get('nextPageToken')
            if not page_token: # Se non c'è nextPageToken, abbiamo finito
                page_token = 0

            print(f"Recuperati {len(video_record_list)} video finora... Prossima pagina token: {page_token if page_token else 'Nessuna'}")
            time.sleep(0.5) # Cortesia verso l'API, evita di fare troppe richieste ravvicinate

    except requests.exceptions.RequestException as e:
        print(f"Errore durante la richiesta API a YouTube: {e}")
        if not video_record_list: # Se non abbiamo raccolto nessun dato e c'è un errore
            print(f"Creazione di un file segnaposto a causa dell'errore API in '{output_path}'.")
            # Crea un DataFrame vuoto con lo schema corretto se l'API fallisce e non ci sono dati
            placeholder_df = pl.DataFrame({'video_id': [], 'datetime': [], 'title': []},
                                        schema={'video_id': pl.Utf8, 'datetime': pl.Utf8, 'title': pl.Utf8})
            placeholder_df.write_parquet(output_path)
            return
    except json.JSONDecodeError as e:
        print(f"Errore durante la decodifica della risposta JSON da YouTube: {e}")
        # Considera una gestione dell'errore simile a quella sopra se necessario
        return

    if video_record_list:
        df_videos = pl.DataFrame(video_record_list)
        df_videos.write_parquet(output_path)
        print(f"Totale {len(df_videos)} ID video e metadati salvati in '{output_path}'")
    elif not os.path.exists(output_path): # Se la lista è vuota e non esiste un file segnaposto
        print("Nessun record video recuperato e nessun file segnaposto esistente. Creazione di un file vuoto.")
        df_empty = pl.DataFrame({'video_id': [], 'datetime': [], 'title': []},
                                schema={'video_id': pl.Utf8, 'datetime': pl.Utf8, 'title': pl.Utf8})
        df_empty.write_parquet(output_path)
        print(f"File vuoto creato: {output_path}")

# --- Funzioni per la Gestione delle Trascrizioni Video ---
def extractTranscriptText(transcript_list_of_dicts: list) -> str:
    """
    Estrae e concatena il testo da una lista di dizionari di trascrizione.
    Ogni dizionario nella lista dovrebbe avere una chiave 'text'.

    Args:
        transcript_list_of_dicts: Lista di dizionari fornita da YouTubeTranscriptApi.

    Returns:
        Una stringa unica contenente tutto il testo della trascrizione.
    """
    text_list = [part['text'] for part in transcript_list_of_dicts]
    return ' '.join(text_list)

def getVideoTranscripts(ids_path: str = os.path.join(DATA_DIR, 'video-ids.parquet'),
                        output_path: str = os.path.join(DATA_DIR, 'video-transcripts.parquet')):
    """
    Legge gli ID video da un file Parquet, scarica le loro trascrizioni (italiano o inglese)
    e salva i risultati (incluso ID, datetime, titolo, trascrizione) in un nuovo file Parquet.
    Se una trascrizione non è disponibile, inserisce 'n/d'.

    Args:
        ids_path: Percorso del file Parquet contenente gli ID video.
        output_path: Percorso del file Parquet dove salvare i dati con le trascrizioni.
    """
    if not os.path.exists(ids_path):
        print(f"File degli ID video '{ids_path}' non trovato. Esegui prima getVideoIDs.")
        print(f"Creazione di un file segnaposto per le trascrizioni in '{output_path}'.")
        # Dati segnaposto se il file degli ID non esiste
        placeholder_df = pl.DataFrame({
            'video_id': ['esempio_id_1', 'esempio_id_2'],
            'datetime': ['2023-01-01T00:00:00Z', '2023-01-15T10:00:00Z'],
            'title': ['Video Esempio 1 (Segnaposto Trasc.)', 'Altro Video (Segnaposto Trasc.)'],
            'transcript': ['Questa è una trascrizione di esempio (segnaposto).', 'n/d']
        })
        placeholder_df.write_parquet(output_path)
        return

    df = pl.read_parquet(ids_path)
    if df.is_empty():
        print(f"Il file degli ID video '{ids_path}' è vuoto. Nessuna trascrizione da recuperare.")
        # Aggiunge una colonna 'transcript' vuota se il DataFrame degli ID è vuoto ma esiste
        df = df.with_columns(pl.lit(None).cast(pl.Utf8).alias("transcript"))
        df.write_parquet(output_path)
        print(f"File vuoto delle trascrizioni video creato in '{output_path}'.")
        return

    transcript_text_list = []
    total_videos = len(df)
    print(f"Recupero delle trascrizioni per {total_videos} video...")

    for i, video_id in enumerate(df['video_id']):
        try:
            # Richiede la trascrizione in italiano, fallback in inglese
            transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=['it', 'en'])
            transcript_text = extractTranscriptText(transcript_list)
        except Exception as e:
            # print(f"  ATTENZIONE: Impossibile recuperare la trascrizione per {video_id}: {e}")
            transcript_text = "n/d" # Not defined/Not available
        transcript_text_list.append(transcript_text)

        if (i + 1) % 10 == 0 or (i + 1) == total_videos: # Logga ogni 10 video o all'ultimo video
            print(f"  Elaborati {i+1}/{total_videos} video...")

    df = df.with_columns(pl.Series(name="transcript", values=transcript_text_list))
    df.write_parquet(output_path)
    print(f"Trascrizioni video salvate in '{output_path}'.")

# --- Funzioni per la Pulizia e Trasformazione dei Dati ---
def handleSpecialStrings(df: pl.DataFrame) -> pl.DataFrame:
    """
    Pulisce caratteri speciali HTML-encoded e altre stringhe specifiche dalle colonne 'title' e 'transcript'.

    Args:
        df: DataFrame Polars contenente le colonne 'title' e/o 'transcript'.

    Returns:
        DataFrame Polars con le stringhe pulite.
    """
    # Mappatura dei caratteri speciali comuni e loro sostituzioni
    replacements = {
        '&#39;': "'",      # Apostrofo HTML entity
        '&amp;': "&",      # E commerciale HTML entity
        '&quot;': '"',   # Virgolette HTML entity
        '“': '"',        # Virgolette doppie aperte
        '”': '"',        # Virgolette doppie chiuse
        '‘': "'",        # Virgoletta singola aperta
        '’': "'",        # Virgoletta singola chiusa
        '…': "...",      # Puntini di sospensione
        'Sha ': 'Shaw ', # Correzione specifica (esempio)
        'SHA ': 'Shaw '  # Correzione specifica (esempio)
    }
    columns_to_clean = ['title', 'transcript']

    for col_name in columns_to_clean:
        if col_name in df.columns:
            print(f"Pulizia caratteri speciali nella colonna: {col_name}")
            # Applica tutte le sostituzioni in una volta per efficienza se possibile
            # Polars permette .str.replace_all con più pattern, ma qui lo facciamo iterativamente per chiarezza
            temp_col = df[col_name]
            for special_str, replacement_str in replacements.items():
                temp_col = temp_col.str.replace_all(special_str, replacement_str)
            df = df.with_columns(temp_col.alias(col_name))
    return df

def setDatatypes(df: pl.DataFrame) -> pl.DataFrame:
    """
    Converte la colonna 'datetime' da stringa a tipo pl.Datetime.
    Gestisce potenziali errori durante la conversione.

    Args:
        df: DataFrame Polars con una colonna 'datetime' di tipo stringa.

    Returns:
        DataFrame Polars con la colonna 'datetime' convertita.
    """
    if 'datetime' in df.columns:
        print("Conversione della colonna 'datetime' al tipo pl.Datetime.")
        try:
            # Tenta la conversione, specificando il formato atteso. 'strict=False' permette una certa flessibilità.
            df = df.with_columns(pl.col('datetime').str.to_datetime(format="%Y-%m-%dT%H:%M:%SZ", strict=False).cast(pl.Datetime))
        except Exception as e:
            print(f"Errore durante la conversione della colonna 'datetime': {e}.")
            print("La colonna 'datetime' potrebbe rimanere di tipo stringa o contenere valori nulli dove la conversione è fallita.")
    return df

def transformData(input_path: str = os.path.join(DATA_DIR, 'video-transcripts.parquet'),
                  output_path: str = os.path.join(DATA_DIR, 'video-transcripts-transformed.parquet')):
    """
    Applica trasformazioni ai dati (pulizia stringhe, conversione tipi) letti da un file Parquet
    e salva il risultato in un nuovo file Parquet.

    Args:
        input_path: Percorso del file Parquet di input.
        output_path: Percorso del file Parquet di output per i dati trasformati.
    """
    if not os.path.exists(input_path):
        print(f"File '{input_path}' non trovato. Esegui prima getVideoTranscripts.")
        print(f"Creazione di un file segnaposto per i dati trasformati in '{output_path}'.")
        # Dati segnaposto se il file di input non esiste
        placeholder_df = pl.DataFrame({
            'video_id': ['esempio_id_1'],
            'datetime': ['2023-01-01T00:00:00Z'],
            'title': ['Titolo &amp; Esempio (Segnaposto Transf.)'],
            'transcript': ['Testo con caratteri &#39;speciali&#39; (segnaposto transf).']
        })
        placeholder_df = handleSpecialStrings(placeholder_df) # Applica comunque la pulizia
        placeholder_df = setDatatypes(placeholder_df)       # e la conversione tipi
        placeholder_df.write_parquet(output_path)
        return

    df = pl.read_parquet(input_path)
    if df.is_empty():
        print(f"Il file '{input_path}' è vuoto. Nessun dato da trasformare.")
        df.write_parquet(output_path) # Salva un file vuoto con lo schema corretto
        print(f"File vuoto dei dati trasformati creato in '{output_path}'.")
        return

    print(f"Applicazione trasformazioni (pulizia stringhe, tipi di dati) al file: {input_path}")
    df = handleSpecialStrings(df)
    df = setDatatypes(df)

    df.write_parquet(output_path)
    print(f"Dati trasformati salvati in '{output_path}'.")

# --- Funzione per la Creazione di Embedding Testuali ---
def createTextEmbeddings(input_path: str = os.path.join(DATA_DIR, 'video-transcripts-transformed.parquet'),
                         output_path: str = os.path.join(DATA_DIR, 'video-index.parquet')):
    """
    Carica i dati trasformati, genera embedding per le colonne 'title' e 'transcript'
    utilizzando un modello SentenceTransformer, e salva il DataFrame risultante (video index)
    in un file Parquet.

    Args:
        input_path: Percorso del file Parquet con i dati trasformati.
        output_path: Percorso del file Parquet dove salvare l'indice video con gli embedding.
    """
    if not os.path.exists(input_path):
        print(f"File dati trasformati '{input_path}' non trovato. Esegui prima transformData.")
        print(f"Creazione di un file segnaposto per l'indice video in '{output_path}'.")
        # Dati segnaposto se il file di input non esiste
        placeholder_df = pl.DataFrame({
            'video_id': ['esempio_id_1'],
            'datetime': [datetime.datetime(2023,1,1)], # Usa un vero datetime per coerenza
            'title': ['Titolo Pulito (Segnaposto Emb.)'],
            'transcript': ['Trascrizione pulita (segnaposto emb).']
        })
        # Nota: Non generiamo embedding fittizi qui per semplicità, ma potresti farlo.
        placeholder_df.write_parquet(output_path)
        return

    df = pl.read_parquet(input_path)
    if df.is_empty():
        print(f"Il file '{input_path}' è vuoto. Nessun dato per creare embedding.")
        df.write_parquet(output_path) # Salva un file vuoto con lo schema corretto
        print(f"File vuoto dell'indice video creato in '{output_path}'.")
        return

    # Modello SentenceTransformer pre-addestrato. 'all-MiniLM-L6-v2' è un buon compromesso tra performance e dimensione.
    model_name = 'all-MiniLM-L6-v2'
    print(f"Caricamento del modello SentenceTransformer: '{model_name}'...")
    try:
        model = SentenceTransformer(model_name)
    except Exception as e:
        print(f"ERRORE CRITICO: Impossibile caricare il modello SentenceTransformer '{model_name}': {e}.")
        print("La generazione degli embedding verrà saltata. Controlla la connessione internet o l'installazione del modello.")
        df.write_parquet(output_path) # Salva il df senza embedding
        return

    columns_to_embed = ['title', 'transcript']
    print(f"Generazione degli embedding per le colonne: {', '.join(columns_to_embed)}...")

    df_final_parts = [df] # Lista per contenere il DataFrame originale e i DataFrame degli embedding

    for column_name in columns_to_embed:
        if column_name not in df.columns or df[column_name].is_empty() or df[column_name].null_count() == len(df[column_name]):
            print(f"  ATTENZIONE: Colonna '{column_name}' mancante, vuota o con tutti valori null. Embedding saltato per questa colonna.")
            continue

        print(f"  Processando colonna: '{column_name}'...")
        # Sostituisce i null con stringa vuota prima di generare embedding per evitare errori
        texts_to_embed = df[column_name].fill_null("").to_list()

        if not texts_to_embed:
            print(f"  Nessun dato testuale valido da cui generare embedding per la colonna '{column_name}'.")
            continue

        # Genera gli embedding. show_progress_bar è utile per dataset grandi.
        embedding_arr = model.encode(texts_to_embed, show_progress_bar=True)
        embedding_dim = embedding_arr.shape[1]

        # Crea nomi di colonna per gli embedding (es. title_embedding_0, title_embedding_1, ...)
        embedding_col_names = [f"{column_name}_embedding_{i}" for i in range(embedding_dim)]

        # Crea un DataFrame Polars con gli embedding
        df_embedding = pl.DataFrame(embedding_arr, schema=embedding_col_names)
        df_final_parts.append(df_embedding)
        print(f"  Embedding per '{column_name}' generati (dimensione: {embedding_dim}).")

    # Concatena il DataFrame originale con i nuovi DataFrame di embedding orizzontalmente
    if len(df_final_parts) > 1:
        df_with_embeddings = pl.concat(df_final_parts, how='horizontal')
    else: # Se nessun embedding è stato generato (es. colonne mancanti o vuote)
        df_with_embeddings = df

    df_with_embeddings.write_parquet(output_path)
    print(f"Indice video con embedding salvato in '{output_path}'.")

## 4. Validazione dei Dati e Giustificazione delle Trasformazioni

Prima di procedere con trasformazioni complesse o la generazione di embedding, è fondamentale **validare i dati** raccolti. Questa fase ci permette di capire la qualità dei dati, identificare problemi (es. valori mancanti, formati non corretti, caratteri speciali) e, di conseguenza, giustificare i passaggi di pulizia e trasformazione successivi.

Useremo il file `video-transcripts.parquet` (generato dal Passo 2 dell'esecuzione della pipeline, che vedremo più avanti) come esempio per questa fase di ispezione. In un flusso reale, questa validazione informerebbe la progettazione delle funzioni `handleSpecialStrings` e `setDatatypes`.

Le operazioni di validazione comuni includono:
* **Controllo dimensionale**: Quante righe e colonne abbiamo? (`df.shape`)
* **Controllo dei tipi di dato**: Le colonne hanno il tipo di dato atteso? (`df.dtypes` o `df.schema` in Polars)
* **Valori mancanti**: Ci sono valori nulli o mancanti e come sono distribuiti? (`df.null_count()`)
* **Unicità**: Quanti valori unici ci sono per colonna? Ci sono duplicati inattesi? (`df.n_unique()`)
* **Statistiche descrittive**: Per colonne numeriche (min, max, media, mediana) e per colonne testuali (lunghezza stringhe, pattern comuni).
* **Ispezione di stringhe particolari**: Ricerca di caratteri speciali, encoding errati, ecc.

Eseguiremo alcune di queste validazioni sul file che conterrà le trascrizioni.

In [6]:
path_video_transcripts_for_validation = os.path.join(DATA_DIR, 'video-transcripts.parquet')

# Assicuriamoci che il file esista (potrebbe essere stato creato con dati segnaposto se l'API non era disponibile)
if os.path.exists(path_video_transcripts_for_validation):
    print(f"Lettura del file '{path_video_transcripts_for_validation}' per la validazione...\n")
    df_validate = pl.read_parquet(path_video_transcripts_for_validation)

    if not df_validate.is_empty():
        print("--- Informazioni Generali ---")
        print(f"Forma del DataFrame (righe, colonne): {df_validate.shape}")
        print(f"Numero di righe uniche: {df_validate.n_unique()}\n")

        print("--- Schema e Tipi di Dato Iniziali ---")
        print(df_validate.schema) # Mostra i tipi di dato attuali
        # Osservazione: 'datetime' è probabilmente Utf8 (stringa) e necessita conversione.

        print("\n--- Conteggio Valori Nulli per Colonna ---")
        print(df_validate.null_count())
        # Osservazione: La colonna 'transcript' potrebbe avere nulli se alcune trascrizioni non sono state trovate ('n/d').

        print("\n--- Valori Unici per Colonna ---")
        for col_name in df_validate.columns:
            print(f"N. valori unici in '{col_name}': {df_validate[col_name].n_unique()}")
        # Osservazione: Se n. unico di 'video_id' < numero righe totali, ci sono duplicati.
        # 'transcript' potrebbe avere meno valori unici di 'video_id' se più video hanno la stessa trascrizione (improbabile) o più video hanno 'n/d'.

        print("\n--- Ispezione Primi Titoli per Caratteri Speciali ---")
        if 'title' in df_validate.columns:
            print(df_validate.select(pl.col('title')).head())
            # Osservazione: Cercare entità HTML come '&amp;', '&#39;', ecc. che necessitano pulizia.
            # La funzione handleSpecialStrings si occuperà di questo.
        else:
            print("Colonna 'title' non presente per l'ispezione.")

        # Validazione lunghezza trascrizioni (opzionale, può essere fatto dopo la trasformazione)
        # if 'transcript' in df_validate.columns:
        #     df_validate = df_validate.with_columns(df_validate['transcript'].str.len_chars().alias('transcript_length'))
        #     print("\n--- Statistiche Lunghezza Trascrizioni ---")
        #     print(df_validate.select(pl.col('transcript_length')).describe())
        #     # plt.hist(df_validate['transcript_length'].drop_nulls()) # Richiede matplotlib
        #     # plt.title('Distribuzione Lunghezza Trascrizioni')
        #     # plt.xlabel('Lunghezza')
        #     # plt.ylabel('Frequenza')
        #     # plt.show()

        print("\n--- Conclusione Validazione Preliminare ---")
        print("Questa ispezione preliminare ci mostra la necessità di:")
        print("1. Convertire la colonna 'datetime' in un formato data/ora effettivo.")
        print("2. Gestire i caratteri speciali nei titoli e nelle trascrizioni.")
        print("3. Assicurare una gestione coerente dei valori mancanti (es. 'n/d' per le trascrizioni).")
        print("Le funzioni `setDatatypes` e `handleSpecialStrings` nella pipeline sono progettate per affrontare questi problemi.")
    else:
        print(f"Il file '{path_video_transcripts_for_validation}' è vuoto. Impossibile eseguire la validazione.")
else:
    print(f"File '{path_video_transcripts_for_validation}' non trovato. Eseguire prima il Passo 2 della pipeline (getVideoTranscripts).")
    print("La validazione sarà saltata. Le funzioni di trasformazione gestiranno la creazione di file segnaposto se necessario.")

File '/content/drive/MyDrive/SIAE/Week 1 - Foundations & Intro GenAI/Day 1 - Welcome & Setup/Esercizi/Soluzioni/6 - Github Actions - DE + Pipeline/data/video-transcripts.parquet' non trovato. Eseguire prima il Passo 2 della pipeline (getVideoTranscripts).
La validazione sarà saltata. Le funzioni di trasformazione gestiranno la creazione di file segnaposto se necessario.


## 5. Esecuzione della Pipeline Completa

Ora che tutte le funzioni necessarie sono state definite e abbiamo compreso l'importanza della validazione, possiamo eseguire la pipeline passo dopo passo. Misureremo anche il tempo di esecuzione di ciascun passaggio principale per monitorare le performance.

Ogni passaggio leggerà i dati dal file Parquet generato dal passaggio precedente e salverà il proprio output in un nuovo file Parquet, seguendo il flusso descritto nella Sezione 1.2.

In [14]:
print("INIZIO PIPELINE DATI YOUTUBE")
pipeline_start_time = time.time()
print(f"Ora di inizio: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("----------------------------------------------------------------------")

print("Passo 1: Estrazione degli ID Video e Metadati")
step1_start_time = time.time()
path_video_ids = os.path.join(DATA_DIR, 'video-ids.parquet')
# Eseguiamo la funzione per ottenere gli ID video. `YT_CHANNEL_ID` è definito nelle configurazioni globali.
getVideoIDs(channel_id=YT_CHANNEL_ID, output_path=path_video_ids)
step1_end_time = time.time()
print(f"---> Passo 1 completato in {round(step1_end_time - step1_start_time, 2)} secondi.\n")

print("Passo 2: Estrazione delle Trascrizioni Video")
step2_start_time = time.time()
path_video_transcripts = os.path.join(DATA_DIR, 'video-transcripts.parquet')
# Usiamo il file degli ID video generato nel Passo 1 come input
getVideoTranscripts(ids_path=path_video_ids, output_path=path_video_transcripts)
step2_end_time = time.time()
print(f"---> Passo 2 completato in {round(step2_end_time - step2_start_time, 2)} secondi.\n")

print("Passo 3: Validazione (già discussa), Trasformazione e Pulizia dei Dati")
# La validazione è stata discussa nella Sezione 4.
# Ora applichiamo le trasformazioni basate su quelle osservazioni.
step3_start_time = time.time()
path_video_transcripts_transformed = os.path.join(DATA_DIR, 'video-transcripts-transformed.parquet')
# Usiamo il file delle trascrizioni generato nel Passo 2 come input
transformData(input_path=path_video_transcripts, output_path=path_video_transcripts_transformed)
step3_end_time = time.time()
print(f"---> Passo 3 completato in {round(step3_end_time - step3_start_time, 2)} secondi.\n")

print("Passo 4: Generazione degli Embedding Testuali")
step4_start_time = time.time()
path_video_index = os.path.join(DATA_DIR, 'video-index.parquet')
# Usiamo il file trasformato generato nel Passo 3 come input
createTextEmbeddings(input_path=path_video_transcripts_transformed, output_path=path_video_index)
step4_end_time = time.time()
print(f"---> Passo 4 completato in {round(step4_end_time - step4_start_time, 2)} secondi.\n")

print("----------------------------------------------------------------------")
pipeline_end_time = time.time()
print(f"FINE PIPELINE DATI YOUTUBE. Tempo totale esecuzione: {round(pipeline_end_time - pipeline_start_time, 2)} secondi.")
print(f"Ora di fine: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

INIZIO PIPELINE DATI YOUTUBE
Ora di inizio: 2025-06-02 15:47:39
----------------------------------------------------------------------
Passo 1: Estrazione degli ID Video e Metadati
Recupero ID video per il canale: UCa9gErQ9AE5jT2DZLjXBIdA...
Recuperati 48 video finora... Prossima pagina token: CDIQAA
Recuperati 96 video finora... Prossima pagina token: CGQQAA
Recuperati 139 video finora... Prossima pagina token: Nessuna
Totale 139 ID video e metadati salvati in '/content/drive/MyDrive/SIAE/Week 1 - Foundations & Intro GenAI/Day 1 - Welcome & Setup/Esercizi/Soluzioni/6 - Github Actions - DE + Pipeline/data/video-ids.parquet'
---> Passo 1 completato in 2.63 secondi.

Passo 2: Estrazione delle Trascrizioni Video
Recupero delle trascrizioni per 139 video...
  Elaborati 10/139 video...
  Elaborati 20/139 video...
  Elaborati 30/139 video...
  Elaborati 40/139 video...
  Elaborati 50/139 video...
  Elaborati 60/139 video...
  Elaborati 70/139 video...
  Elaborati 80/139 video...
  Elaborati 

Batches:   0%|          | 0/5 [00:00<?, ?it/s]

  Embedding per 'title' generati (dimensione: 384).
  Processando colonna: 'transcript'...


Batches:   0%|          | 0/5 [00:00<?, ?it/s]

  Embedding per 'transcript' generati (dimensione: 384).
Indice video con embedding salvato in '/content/drive/MyDrive/SIAE/Week 1 - Foundations & Intro GenAI/Day 1 - Welcome & Setup/Esercizi/Soluzioni/6 - Github Actions - DE + Pipeline/data/video-index.parquet'.
---> Passo 4 completato in 2.66 secondi.

----------------------------------------------------------------------
FINE PIPELINE DATI YOUTUBE. Tempo totale esecuzione: 95.59 secondi.
Ora di fine: 2025-06-02 15:49:14


## 6. Ispezione dei Risultati Finali

Dopo aver eseguito l'intera pipeline, è fondamentale ispezionare l'output finale (`data/video-index.parquet`) per assicurarsi che i dati siano stati elaborati come previsto e che gli embedding siano stati aggiunti.

Questo file rappresenta il nostro "indice video", pronto per essere utilizzato in applicazioni di ricerca semantica, raccomandazione o altre analisi basate sul contenuto testuale dei video.

In [15]:
final_output_path = os.path.join(DATA_DIR, 'video-index.parquet')

if os.path.exists(final_output_path):
    print(f"Ispezione del file di output finale: '{final_output_path}'")
    df_final = pl.read_parquet(final_output_path)

    if not df_final.is_empty():
        print("\n--- Prime 5 righe del DataFrame finale ---")
        # Selezioniamo colonne rappresentative, inclusa un'anteprima della trascrizione
        cols_to_show_head = ['video_id', 'datetime', 'title']
        if 'transcript' in df_final.columns:
            cols_to_show_head.append('transcript')

        # Assicuriamoci che le colonne esistano prima di selezionarle
        existing_cols_for_head = [col for col in cols_to_show_head if col in df_final.columns]

        if existing_cols_for_head:
            if 'transcript' in existing_cols_for_head:
                # Mostra solo i primi 100 caratteri della trascrizione per leggibilità
                print(df_final.select(existing_cols_for_head).with_columns(pl.col('transcript').str.slice(0, 100).alias('transcript_preview')).head())
            else:
                print(df_final.select(existing_cols_for_head).head())
        else:
            print("Nessuna delle colonne di base ('video_id', 'datetime', 'title', 'transcript') trovata per l'anteprima.")
            print("Mostro tutte le colonne disponibili (prime 5 righe):")
            print(df_final.head())

        print("\n--- Schema del DataFrame finale (tipi di dati) ---")
        print(df_final.schema)
        # Verifica che 'datetime' sia di tipo Datetime e che ci siano colonne di embedding (float).

        # Controlla la presenza e il numero di colonne di embedding
        title_embedding_cols = [col for col in df_final.columns if 'title_embedding' in col]
        transcript_embedding_cols = [col for col in df_final.columns if 'transcript_embedding' in col]

        print("\n--- Informazioni sugli Embedding ---")
        if title_embedding_cols:
            print(f"Trovate {len(title_embedding_cols)} colonne di embedding per 'title' (es. da '{title_embedding_cols[0]}' a '{title_embedding_cols[-1]}').")
        else:
            print("Nessuna colonna di embedding per 'title' trovata. Controllare il log del Passo 4.")

        if transcript_embedding_cols:
            print(f"Trovate {len(transcript_embedding_cols)} colonne di embedding per 'transcript' (es. da '{transcript_embedding_cols[0]}' a '{transcript_embedding_cols[-1]}').")
        else:
            print("Nessuna colonna di embedding per 'transcript' trovata. Controllare il log del Passo 4.")

        print("\n--- Dimensioni totali del DataFrame finale ---")
        print(f"Numero di righe (video): {df_final.shape[0]}")
        print(f"Numero di colonne: {df_final.shape[1]}")
    else:
        print("Il file di output finale è vuoto. Controllare i log dei passaggi precedenti per eventuali errori o dati segnaposto.")
else:
    print(f"ERRORE: File di output finale '{final_output_path}' non trovato. La pipeline potrebbe non essere stata completata con successo.")

Ispezione del file di output finale: '/content/drive/MyDrive/SIAE/Week 1 - Foundations & Intro GenAI/Day 1 - Welcome & Setup/Esercizi/Soluzioni/6 - Github Actions - DE + Pipeline/data/video-index.parquet'

--- Prime 5 righe del DataFrame finale ---
shape: (5, 5)
┌─────────────┬─────────────────────┬────────────────────────────┬────────────┬────────────────────┐
│ video_id    ┆ datetime            ┆ title                      ┆ transcript ┆ transcript_preview │
│ ---         ┆ ---                 ┆ ---                        ┆ ---        ┆ ---                │
│ str         ┆ datetime[μs]        ┆ str                        ┆ str        ┆ str                │
╞═════════════╪═════════════════════╪════════════════════════════╪════════════╪════════════════════╡
│ nSH2vZjb2TA ┆ 2025-06-01 15:01:13 ┆ My PhD Defense [Applied    ┆ n/d        ┆ n/d                │
│             ┆                     ┆ Physic…                    ┆            ┆                    │
│ nPQkBGf55YA ┆ 2025-05-25 15:

## 7. Conclusioni e Prossimi Passi

Congratulazioni! Hai costruito ed eseguito con successo una pipeline completa per l'elaborazione di dati da video di YouTube. Abbiamo coperto i seguenti passaggi chiave:

1.  **Estrazione dei Metadati**: Recupero di ID video, titoli e date di pubblicazione tramite l'API di YouTube.
2.  **Estrazione delle Trascrizioni**: Download delle trascrizioni testuali dei video.
3.  **Validazione e Trasformazione dei Dati**: Ispezione dei dati per problemi di qualità e successiva applicazione di pulizia (caratteri speciali) e standardizzazione (tipi di dato).
4.  **Generazione di Embedding**: Creazione di rappresentazioni vettoriali semantiche per titoli e trascrizioni.
5.  **Salvataggio**: Memorizzazione efficiente dei dati intermedi e finali in formato Parquet.

**Prossimi Passi Potenziali:**

* **Analisi Esplorativa Approfondita**: Utilizzare il `video-index.parquet` finale per analisi più complesse (es. topic modeling sulle trascrizioni, analisi temporale delle pubblicazioni, correlazione tra titoli e popolarità – se avessimo dati di view count).
* **Costruzione di un Motore di Ricerca Semantica**: Sfruttare gli embedding generati per costruire un sistema che permetta di cercare video basandosi sul significato delle query e non solo sulle parole chiave.
* **Miglioramento della Pipeline**:
    * Aggiungere la gestione di errori più robusta e logging dettagliato.
    * Integrare il recupero di ulteriori metadati (es. conteggio visualizzazioni, like, commenti) se disponibili e utili.
    * Ottimizzare le performance per dataset molto più grandi.
    * Parametrizzare ulteriormente la pipeline (es. ID del canale, modello di embedding) per renderla più flessibile.
* **Deployment e Automazione Avanzata**: Oltre a GitHub Actions, esplorare strumenti come Apache Airflow o Kubeflow Pipelines per orchestrare pipeline di dati complesse in ambienti di produzione.
* **Versionamento dei Dati**: Utilizzare strumenti come DVC (Data Version Control) per tracciare le versioni dei dataset e dei modelli, garantendo la riproducibilità.

Questo notebook fornisce una solida base per comprendere e implementare pipeline di dati end-to-end. Le competenze acquisite sono trasferibili a molti altri domini e tipi di dati.

## 8. Automazione della Pipeline con GitHub Actions

Finora abbiamo eseguito la pipeline manualmente all'interno di questo Jupyter Notebook. Per un utilizzo più robusto e in scenari reali (es. aggiornare periodicamente i dati, deployment), è fondamentale automatizzare l'esecuzione della pipeline. GitHub Actions è uno strumento potente e integrato in GitHub che ci permette di farlo.

### 8.1 Cos'è GitHub Actions?
GitHub Actions è una piattaforma di Integrazione Continua e Continuous Delivery (CI/CD) che ti permette di automatizzare il tuo workflow di sviluppo software, inclusa la compilazione, il testing e il deployment del codice, direttamente da GitHub. Puoi creare workflow personalizzati che vengono eseguiti in risposta a eventi specifici (es. un push sul repository, la creazione di una Pull Request, una schedulazione temporale).

### 8.2 Perché Automatizzare questa Pipeline Dati?
Automatizzare la nostra pipeline per i dati di YouTube offre diversi vantaggi:
* **Aggiornamenti Automatici:** Può essere schedulata per recuperare nuovi video e trascrizioni automaticamente (es. ogni giorno, ogni settimana).
* **Riproducibilità:** Assicura che la pipeline venga eseguita sempre nello stesso ambiente configurato, riducendo errori dovuti a configurazioni locali diverse.
* **Tracciabilità:** Ogni esecuzione viene loggata, facilitando il debug e il monitoraggio.
* **Collaborazione:** Facilita la collaborazione, poiché il processo di aggiornamento dei dati è standardizzato e non dipende da un singolo sviluppatore.

### 8.3 Come Configurare una GitHub Action per questa Pipeline

Per automatizzare la nostra pipeline, idealmente utilizzeremmo gli script Python (`data_pipeline.py` e `functions.py` dalla cartella `src/` come descritto nella Sezione 1.1) piuttosto che questo notebook interattivo. Lo script `data_pipeline.py` è già progettato per essere eseguito da riga di comando.

**Struttura dei file per GitHub Actions (esempio):**
Supponiamo che il tuo repository GitHub abbia questa struttura:
```
youtube_data_pipeline_project/
|-- src/
|   |-- functions.py
|   |-- data_pipeline.py
|-- data/                     # Potrebbe essere vuota o contenere dati iniziali
|-- requirements.txt
|-- .github/workflows/
|   |-- youtube_data_pipeline.yml
```

**Passaggi principali per la configurazione:**

1.  **Prepara gli Script Python e le Dipendenze:**
    * Assicurati che `src/data_pipeline.py` e `src/functions.py` siano nel tuo repository. Lo script `data_pipeline.py` dovrebbe orchestrare la chiamata delle funzioni da `functions.py` per eseguire l'intera pipeline, leggendo e scrivendo file nella cartella `data/` (relativa alla posizione dello script).
    * Verifica che un file `requirements.txt` (contenente `polars`, `youtube_transcript_api`, `sentence-transformers`, `requests`, `accelerate`) sia presente nella root del progetto o in una posizione accessibile dal workflow (es. `src/requirements.txt`).

2.  **Crea un File di Workflow YAML:**
    Crea una cartella `.github/workflows/` nella root del tuo repository (se non esiste già). All'interno di questa cartella, crea un file YAML, ad esempio `youtube_data_pipeline.yml`.

3.  **Definisci il Workflow nel File YAML:**
    Ecco un esempio di come potrebbe apparire il contenuto del file `youtube_data_pipeline.yml`:

    ```yaml
    name: YouTube Data Pipeline - Aggiornamento Automatico

    on:
      schedule:
        # Esegue la pipeline ogni giorno alle 02:00 UTC (configurabile)
        - cron: '0 2 * * *'
      workflow_dispatch: # Permette l'avvio manuale dalla UI di GitHub Actions

    jobs:
      run-youtube-data-pipeline:
        runs-on: ubuntu-latest # Specifica il sistema operativo del runner
        
        defaults:
          run:
            # Imposta la directory di lavoro per gli script Python
            # Assumendo che data_pipeline.py e functions.py siano in src/
            # e requirements.txt nella root del progetto.
            working-directory: ./src

        steps:
          - name: Checkout repository
            # Azione per scaricare il codice del repository sul runner
            uses: actions/checkout@v4

          - name: Set up Python
            # Azione per configurare l'ambiente Python
            uses: actions/setup-python@v5
            with:
              python-version: '3.10' # Specifica la versione di Python

          - name: Install dependencies
            run: |
              python -m pip install --upgrade pip
              # Installa le dipendenze dal requirements.txt nella root del progetto
              pip install -r ../requirements.txt
            # Nota: il path di requirements.txt è relativo a working-directory (./src)
            # Se requirements.txt fosse in ./src, sarebbe pip install -r requirements.txt

          - name: Run YouTube Data Pipeline Script
            env:
              # Usa un secret di GitHub per la chiave API. MAI hardcodarla!
              YT_API_KEY: ${{ secrets.YT_API_KEY }}
            # Esegue lo script principale della pipeline. Lo script si aspetta di trovare/creare una cartella 'data'
            # relativa alla sua posizione (quindi src/data/ se DATA_DIR non è un path assoluto).
            # Assicurati che data_pipeline.py gestisca correttamente i path per la cartella 'data'.
            # Una soluzione comune è creare la cartella 'data' relativa allo script o usare path assoluti gestiti da config.
            # Per questo esempio, assumiamo che gli script creino/usino una cartella 'data' all'interno di 'src' o
            # che i path in functions.py siano relativi a 'src/'.
            # Idealmente, functions.py e data_pipeline.py dovrebbero usare path relativi
            # che puntino a una cartella 'data' nella root del progetto, es. ../data/
            # Modifica: Creiamo la cartella data nella root prima di eseguire lo script
          - name: Create data directory in project root if not exists
            run: mkdir -p ../data
            working-directory: . # Temporaneamente cambia working-directory per creare la cartella data nella root
            
          - name: Run YouTube Data Pipeline Script (con data dir nella root)
            env:
              YT_API_KEY: ${{ secrets.YT_API_KEY }}
            # Assumendo che gli script in src/ siano stati modificati per usare '../data' come DATA_DIR
            run: python data_pipeline.py
            working-directory: ./src # Ritorna alla working-directory per lo script

          - name: Commit and push data changes (Opzionale)
            # Questo passo è opzionale. Gestisce il commit dei file Parquet generati.
            # Per dataset grandi, considera Git LFS o storage esterno (S3, GCS).
            run: |
              git config --global user.name 'GitHub Action Bot'
              git config --global user.email 'action-bot@github.com'
              # Controlla se ci sono modifiche nella cartella '../data/' (relativa a ./src)
              # Attenzione: il path qui deve essere corretto rispetto alla working-directory corrente (./src)
              if ! git diff --quiet ../data/; then
                git add ../data/
                git commit -m "Aggiornamento automatico dati video YouTube ($(date +'%Y-%m-%d %H:%M:%S'))"
                git push
              else
                echo "Nessuna modifica ai dati da committare."
              fi
            working-directory: ./src # Assicura che i comandi git siano eseguiti da ./src
            continue-on-error: true # Continua anche se il push fallisce (es. per fork senza permessi)
    ```

    **Spiegazione dettagliata del file YAML:**
    * `name`: Nome del workflow (es. `YouTube Data Pipeline - Aggiornamento Automatico`).
    * `on`: Definisce gli eventi che attivano il workflow.
        * `schedule - cron: '0 2 * * *'`: Esegue il workflow a intervalli regolari (qui, ogni giorno alle 02:00 UTC).
        * `workflow_dispatch`: Permette di avviare il workflow manualmente dalla tab "Actions" del repository GitHub.
    * `jobs`: Contiene uno o più job (attività) da eseguire.
        * `run-youtube-data-pipeline`: Nome del nostro job.
        * `runs-on: ubuntu-latest`: Specifica il tipo di runner (macchina virtuale) su cui eseguire il job.
        * `defaults.run.working-directory: ./src`: Imposta la directory di lavoro predefinita per i comandi `run` all'interno del job a `./src`. Questo è cruciale se `data_pipeline.py` si aspetta di essere eseguito da `src/` e i path ai dati (es. `../data/`) sono relativi a quella posizione.
        * `steps`:
            * `actions/checkout@v4`: Azione standard per fare il checkout (scaricare) il codice del tuo repository sul runner.
            * `actions/setup-python@v5`: Azione per configurare l'ambiente Python alla versione specificata (`3.10`).
            * `Install dependencies`: Esegue i comandi per installare le librerie Python. Il path `../requirements.txt` è relativo alla `working-directory` (`./src`), quindi punta alla root del progetto.
            * `Create data directory...`: Questo step è stato aggiunto per creare la cartella `data` nella root del progetto, assumendo che gli script in `src/` siano stati adattati per salvare i dati lì (es. usando `os.path.join('..', 'data', 'nomefile.parquet')`).
            * `Run YouTube Data Pipeline Script...`: Esegue lo script principale.
                * `env.YT_API_KEY: ${{ secrets.YT_API_KEY }}`: **Fondamentale per la sicurezza.** La chiave API di YouTube non deve essere scritta nel codice o nel workflow. Va memorizzata come un "secret" nelle impostazioni del tuo repository GitHub (Settings -> Secrets and variables -> Actions -> New repository secret). Il workflow la leggerà da lì e la imposterà come variabile d'ambiente per lo script Python.
                * `run: python data_pipeline.py`: Esegue lo script. È importante che `data_pipeline.py` (e `functions.py`) sappiano dove leggere/scrivere i file di dati (es. in una cartella `../data/` relativa a `src/` se `DATA_DIR` è definito come `os.path.join('..', 'data')` negli script).
            * `Commit and push data changes (Opzionale)`: Questo passo mostra come fare il commit dei file di dati generati (es. i file Parquet nella cartella `data/` nella root del progetto) e pusharli al repository. **Attenzione ai percorsi qui (`../data/`) relativi alla `working-directory` (`./src`).**

4.  **Aggiungi la Chiave API come Secret su GitHub:**
    * Vai al tuo repository su GitHub.
    * Settings -> Secrets and variables -> Actions.
    * Clicca su "New repository secret".
    * Nomina il secret `YT_API_KEY` e incolla la tua chiave API come valore.

5.  **Adatta gli Script Python per i Path dei Dati:**
    È cruciale che i tuoi script Python (`functions.py` e `data_pipeline.py`) gestiscano correttamente i percorsi per la cartella `data`. Se il workflow esegue gli script da `src/`, e vuoi che i dati siano salvati in `youtube_data_pipeline_project/data/`, allora negli script dovrai usare percorsi relativi come `os.path.join('..', 'data', 'nomefile.parquet')` per `DATA_DIR` o per i singoli path dei file.
    Ad esempio, in `functions.py` o all'inizio di `data_pipeline.py`, potresti definire `DATA_DIR = os.path.join(os.path.dirname(__file__), '..', 'data')` per far sì che punti alla cartella `data` nella root del progetto, assumendo che `functions.py` sia in `src`.

6.  **Commit e Push del Workflow:**
    Fai il commit del file `.github/workflows/youtube_data_pipeline.yml`, degli script Python aggiornati (se necessario per i path) e del file `requirements.txt`. Pusha tutto sul tuo repository GitHub. GitHub Actions rileverà automaticamente il nuovo workflow.

### 8.4 Considerazioni Aggiuntive sull'Automazione
* **Gestione degli Output dei Dati**: Se i file Parquet diventano molto grandi, fare il commit diretto su Git non è ideale. Considera:
    * **Git LFS (Large File Storage)**: Per versionare file grandi con Git.
    * **Artefatti del Workflow**: Salva i dati come artefatti dell'esecuzione di GitHub Actions.
    * **Storage Cloud Esterno**: Carica gli output su servizi come Amazon S3, Google Cloud Storage, Azure Blob Storage.
* **Logging e Notifiche**: GitHub Actions fornisce log dettagliati per ogni esecuzione. Puoi configurare notifiche (es. email, Slack) in caso di fallimento del workflow.
* **Test del Workflow**: Prima di affidarti completamente alla schedulazione, testa il workflow triggerandolo manualmente (`workflow_dispatch`) o pushando su un branch di sviluppo dedicato.
* **Costi delle API e Quote**: Le esecuzioni automatiche frequenti dell'API di YouTube potrebbero consumare la tua quota API gratuita o generare costi. Monitora l'utilizzo e adatta la frequenza di schedulazione di conseguenza.

Automatizzare la tua pipeline dati con GitHub Actions è un passo importante verso la creazione di sistemi di data science più robusti, affidabili e manutenibili.