# Notebook: Data Pipelines with Dagster

## Section 1: Introduction

In this notebook, we'll take a look at managing a real-world problem effectively. We'll
focus on generating clean, reusable code and utilize Dagster for monitoring the whole
data pipeline. 

### The example: Video sequencing, transcription, and summarization
Sometimes, we don't have the time to listen to a video in full. As an example for a
data pipelines and workflow management, we'll thus consider the following processes:

- Video scene detection
- Video segmentation
- Transcription
- Summarization

In the following, we'll introduce the tools used in each step, and then show how to
**orchestrate them** in a pipeline using Dagster.


## Section 2: Tool Overview

There are many off-the-shelf solutions for the problems mentioned. To solve our little
problem of segmenting, transcribing, and summarizing a video, we'll use:

1. **PySceneDetect** for scene detection in videos (https://www.scenedetect.com/)
2. **MoviePy** for cutting and manipulating video files (https://pypi.org/project/moviepy/)
3. **Faster Whisper** for audio transcription (https://docs.linuxserver.io/images/docker-faster-whisper/)
4. **Ollama / LLMs** for text summarization (https://ollama.com/search)
5. **Dagster** for pipeline orchestration (https://dagster.io/)


## Scene Detection

Scene Detection uses histogram changes across video frames to assess whether the setting
(i.e., the *scene*) has changed considerably. You'll have seen this before on Youtube:

https://www.youtube.com/watch?v=TZe5UqlUg0c

Let's take a look on how that works in practice!

In [1]:
### Taking a closer look Scene Detection with PySceneDetect...
from scenedetect import open_video, SceneManager
from scenedetect.detectors import ContentDetector

# select a sample video 
video_path = "data/example_video.mp4"
video = open_video(video_path)

# set up SceneDetect's Scene Manager
scene_manager = SceneManager()
scene_manager.add_detector(ContentDetector(threshold=27.0))

# detect scenes in the video
scene_manager.detect_scenes(video)
scene_list = scene_manager.get_scene_list()
scene_times = [(start.get_seconds(), end.get_seconds()) for start, end in scene_list]

for k, scene_time in enumerate(scene_times):
    start_time, end_time = scene_time
    if start_time < 60 and end_time < 60:
        print(f"Scene {k+1}: start {scene_time[0]:.2f}s, end {scene_time[1]:.2f}s")
    elif start_time < 60 and end_time >= 60:
        print(f"Scene {k+1}: start {scene_time[0]:.2f}s, end {scene_time[1]/60:.2f}min")
    else:
        print(f"Scene {k+1}: start {scene_time[0]/60:.2f}min, end {scene_time[1]/60:.2f}min")

### output: temporal edges of each individual recognized scene!

Scene 1: start 0.00s, end 7.60s
Scene 2: start 7.60s, end 21.00s
Scene 3: start 21.00s, end 6.60min
Scene 4: start 6.60min, end 6.80min
Scene 5: start 6.80min, end 13.28min
Scene 6: start 13.28min, end 13.41min
Scene 7: start 13.41min, end 18.23min


## Video Segmentation

Now that we know the edges of each scene, we can use this information to cut the
original video into single scenes. For this, we'll use Moviepy.

Let's see how that works!

In [2]:
# testing video segmentation with Moviepy...
from moviepy import VideoFileClip
import os

# let's first set up a folder to store the clipped videos in:
output_dir = "data/scenes_demo"
os.makedirs(output_dir, exist_ok=True)

# import video as a VideoFileClip (base class for video clips in Moviepy)
video_path = "data/example_video.mp4"
video_clip = VideoFileClip(video_path)

# for each single scene, we now "clip" (cut) the video_clip 
for idx, (start_time, end_time) in enumerate(scene_times):
    subclip = video_clip.subclipped(start_time, end_time)
    out_path = os.path.join(output_dir, f"scene_{idx+1}.mp4")
    subclip.write_videofile(out_path)
video_clip.close()

print(f"Scenes saved in {output_dir}")

### output: videos of the single scenes saved in out output folder!

MoviePy - Building video data/scenes_demo/scene_1.mp4.
MoviePy - Writing audio in scene_1TEMP_MPY_wvf_snd.mp3


                                                                    

MoviePy - Done.
MoviePy - Writing video data/scenes_demo/scene_1.mp4



                                                                         

MoviePy - Done !
MoviePy - video ready data/scenes_demo/scene_1.mp4
MoviePy - Building video data/scenes_demo/scene_2.mp4.
MoviePy - Writing audio in scene_2TEMP_MPY_wvf_snd.mp3


                                                                    

MoviePy - Done.
MoviePy - Writing video data/scenes_demo/scene_2.mp4



                                                                         

MoviePy - Done !
MoviePy - video ready data/scenes_demo/scene_2.mp4
MoviePy - Building video data/scenes_demo/scene_3.mp4.
MoviePy - Writing audio in scene_3TEMP_MPY_wvf_snd.mp3


                                                                      

MoviePy - Done.
MoviePy - Writing video data/scenes_demo/scene_3.mp4



                                                                             

MoviePy - Done !
MoviePy - video ready data/scenes_demo/scene_3.mp4
MoviePy - Building video data/scenes_demo/scene_4.mp4.
MoviePy - Writing audio in scene_4TEMP_MPY_wvf_snd.mp3


                                                                   

MoviePy - Done.
MoviePy - Writing video data/scenes_demo/scene_4.mp4



                                                                         

MoviePy - Done !
MoviePy - video ready data/scenes_demo/scene_4.mp4
MoviePy - Building video data/scenes_demo/scene_5.mp4.
MoviePy - Writing audio in scene_5TEMP_MPY_wvf_snd.mp3


                                                                      

MoviePy - Done.
MoviePy - Writing video data/scenes_demo/scene_5.mp4



                                                                             

MoviePy - Done !
MoviePy - video ready data/scenes_demo/scene_5.mp4
MoviePy - Building video data/scenes_demo/scene_6.mp4.
MoviePy - Writing audio in scene_6TEMP_MPY_wvf_snd.mp3


                                                                   

MoviePy - Done.
MoviePy - Writing video data/scenes_demo/scene_6.mp4



                                                                         

MoviePy - Done !
MoviePy - video ready data/scenes_demo/scene_6.mp4
MoviePy - Building video data/scenes_demo/scene_7.mp4.
MoviePy - Writing audio in scene_7TEMP_MPY_wvf_snd.mp3


                                                                      

MoviePy - Done.
MoviePy - Writing video data/scenes_demo/scene_7.mp4



                                                                           

MoviePy - Done !
MoviePy - video ready data/scenes_demo/scene_7.mp4
Scenes saved in data/scenes_demo


## Video Transcription

Now we segmented the videos into individual scenes and saved those as separate videos
(for tracability). We don't want to listen to the scenes, though - we'd like to have
the text transcribed for us.

Here's how we'll go about this:

In [None]:
# video transcription using whisper...
from faster_whisper import WhisperModel
import os

# we'll use a tiny Whisper model that can run locally without problems
model = WhisperModel("tiny", device="cpu")

# set up directories for input scenes and output transcripts
scenes_dir = "data/scenes_demo"
transcripts_dir = "data/transcripts_demo"
os.makedirs(transcripts_dir, exist_ok=True)

# for every video file in the scenes directory...
for file_name in os.listdir(scenes_dir):
    if file_name.lower().endswith(".mp4"):

        #... we fetch input and set output file paths...
        file_path = os.path.join(scenes_dir, file_name)
        txt_filename = os.path.splitext(file_name)[0] + ".txt"
        txt_path = os.path.join(transcripts_dir, txt_filename)

        #...we use the Whisper model to transcribe the video scene...
        segments, _ = model.transcribe(file_path)
        transcript = "\n".join([segment.text.strip() for segment in segments])
        
        #...and we write the resulting text in a .txt file into the output folder
        with open(txt_path, "w", encoding="utf-8") as f:
            f.write(transcript)

print(f"Transcripts saved in {transcripts_dir}")

### output: transcribed text of each single scene in separate .txt files!



Transcripts saved in data/transcripts_demo


## Transcript summarization

We finally got the scenes transcribed; now the only thing left is to summarize the
transcripts to save us some reading time.

Here's how we'll do that:

In [None]:
# summarizing natural language texts with a small Ollama model (llama3.2:1b)
from ollama import chat, ChatResponse

# again, let's set up an output directory first...
summary_dir = "data/summaries_demo"
os.makedirs(summary_dir, exist_ok=True)

# we'll need to give the LLM an instruction on what to do:
prompt = f"""Summarize the following text. Do not verify facts, and do not add
    commentary. Only output a concise summary in five sentences maximum. """

# look through all the .txt-files in transcripts_dir...
for file_name in sorted(os.listdir(transcripts_dir)):
    if file_name.lower().endswith(".txt"):

        # fetch the specific transcript and extract the text
        file_path = os.path.join(transcripts_dir, file_name)
        with open(file_path, "r", encoding="utf-8") as f:
            transcript = f.read()

        # now we can call the LLM to summarize the transcript
        response: ChatResponse = chat(
            model='llama3.2:1b', 
            messages=[{'role': 'user', 'content': prompt + transcript}])
        result = response["message"]["content"]
        txt_filename = os.path.splitext(file_name)[0] + "_summary.txt"
        txt_path = os.path.join(summary_dir, txt_filename)
        with open(txt_path, "w", encoding="utf-8") as f:
            f.write(result)

## General Statements

We now build all of the parts we need for the entire process - however, we had to run it
all manually in a sequence.

We don't want that.

To make everything reproducible and robust, we'll push all of the above into a clean
pipeline.

## Let's talk about the pipeline, now

Instead of running each step manually, we can define a **Dagster pipeline**:

- Each step (scene detection, segmentation, transcription, summarization) is an **asset**.
- Dagster handles dependencies and execution order.
- Advantages:
    - Reproducibility
    - Observability
    - Easy orchestration of complex workflows

You can check the full Dagster pipeline in `pipeline.py`.