# Dagster Pipeline for Telegram Medical Data

This notebook demonstrates how to use the Dagster pipeline for processing Telegram medical data.

## Overview
The pipeline consists of the following operations:
1. **scrape_telegram_data**: Scrapes data from Telegram channels using Telethon
2. **load_raw_to_postgres**: Loads raw data to PostgreSQL
3. **run_dbt_transformations**: Runs dbt transformations
4. **run_yolo_enrichment**: Processes images with YOLO
5. **generate_pipeline_report**: Creates execution reports

## Setup
Make sure you have:
- All dependencies installed in your environment
- Environment variables configured (.env file)
- PostgreSQL database running

Imports and Setup

In [13]:
# Import required libraries
import sys
import os
from pathlib import Path

# Add project root to Python path
project_root = Path.cwd().parent
sys.path.insert(0, str(project_root))

# Import Dagster components
from dagster import (
    execute_job, 
    DagsterInstance, 
    get_dagster_logger,
    job, 
    op
)

# Import pipeline components
from dags.telegram_pipeline import (
    telegram_pipeline_job,
    scrape_telegram_data,
    load_raw_to_postgres,
    run_dbt_transformations,
    run_yolo_enrichment,
    generate_pipeline_report
)

# Import source modules
from src.scraper.telegram_scraper import TelegramScraper
from src.loader.postgres_loader import PostgresLoader
from src.enrich.yolo_enricher import YOLOEnricher
from src.dbt_runner.dbt_executor import DBTExecutor
from src.utils.config import get_config

print("✅ All imports successful!")

✅ All imports successful!


## Environment Check

Let's verify that our environment is properly configured and all components are working.

In [14]:
# Check environment variables
import os
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Required environment variables
required_vars = [
    'TELEGRAM_API_ID',
    'TELEGRAM_API_HASH', 
    'TELEGRAM_PHONE'
]

print("🔍 Checking Environment Variables:")
print("=" * 50)

missing_vars = []
for var in required_vars:
    value = os.getenv(var)
    if value:
        # Mask sensitive values
        if 'API' in var or 'HASH' in var or 'PASSWORD' in var:
            display_value = value[:4] + "..." + value[-4:] if len(value) > 8 else "***"
        else:
            display_value = value
        print(f"✅ {var}: {display_value}")
    else:
        print(f"❌ {var}: NOT SET")
        missing_vars.append(var)

if missing_vars:
    print(f"\n⚠️  Missing environment variables: {missing_vars}")
    print("Please set these in your .env file")
else:
    print("\n✅ All required environment variables are set!")

# Check Dagster instance
try:
    instance = DagsterInstance.ephemeral()
    print("✅ Dagster instance created successfully")
except Exception as e:
    print(f"❌ Dagster instance error: {e}")

🔍 Checking Environment Variables:
✅ TELEGRAM_API_ID: ***
✅ TELEGRAM_API_HASH: 041e...b3e7
✅ TELEGRAM_PHONE: +251715349456

✅ All required environment variables are set!


2025-07-16 03:51:33,063 - alembic.runtime.migration - INFO - Context impl SQLiteImpl.
2025-07-16 03:51:33,064 - alembic.runtime.migration - INFO - Will assume non-transactional DDL.
2025-07-16 03:51:33,147 - alembic.runtime.migration - INFO - Running stamp_revision  -> 46b412388816
2025-07-16 03:51:33,158 - alembic.runtime.migration - INFO - Context impl SQLiteImpl.
2025-07-16 03:51:33,160 - alembic.runtime.migration - INFO - Will assume non-transactional DDL.
2025-07-16 03:51:33,226 - alembic.runtime.migration - INFO - Running stamp_revision  -> 46b412388816


✅ Dagster instance created successfully


## Pipeline Components Overview

Let's examine the individual operations that make up our pipeline.

In [15]:
# Display pipeline structure
print("📋 Pipeline Components:")
print("=" * 50)

components = [
    ("scrape_telegram_data", "Scrapes Telegram channels for medical data"),
    ("load_raw_to_postgres", "Loads raw data to PostgreSQL database"),
    ("run_dbt_transformations", "Runs dbt models and transformations"),
    ("run_yolo_enrichment", "Processes images with YOLO for medical content"),
    ("generate_pipeline_report", "Creates comprehensive execution report")
]

for i, (name, description) in enumerate(components, 1):
    print(f"{i}. {name}")
    print(f"   └─ {description}")

print(f"\n�� Main Job: telegram_pipeline_job")
print("   └─ Orchestrates all operations in sequence")

# Show job definition
print(f"\n�� Job Definition:")
print(f"   {telegram_pipeline_job.__doc__}")

📋 Pipeline Components:
1. scrape_telegram_data
   └─ Scrapes Telegram channels for medical data
2. load_raw_to_postgres
   └─ Loads raw data to PostgreSQL database
3. run_dbt_transformations
   └─ Runs dbt models and transformations
4. run_yolo_enrichment
   └─ Processes images with YOLO for medical content
5. generate_pipeline_report
   └─ Creates comprehensive execution report

�� Main Job: telegram_pipeline_job
   └─ Orchestrates all operations in sequence

�� Job Definition:
   
    Main Dagster job for the Telegram medical data pipeline.
    
    This job orchestrates the following operations:
    1. scrape_telegram_data: Scrapes data from Telegram channels
    2. load_raw_to_postgres: Loads raw data to PostgreSQL
    3. run_dbt_transformations: Runs dbt transformations
    4. run_yolo_enrichment: Processes images with YOLO
    5. generate_pipeline_report: Generates execution report
    


## Individual Operation Testing

Let's test each operation individually to ensure they work correctly.

Test Scraping Operation

In [16]:
# Test scraping operation
print("🧪 Testing Scraping Operation")
print("=" * 50)

try:
    # Create a test context
    from dagster import build_op_context
    
    context = build_op_context()
    
    # Test the scraping operation
    result = scrape_telegram_data(context)
    
    print(f"✅ Scraping operation completed!")
    print(f"Status: {result.get('status', 'unknown')}")
    print(f"Messages scraped: {result.get('message_count', 0)}")
    print(f"Channels processed: {result.get('channels_processed', 0)}")
    
    if result.get('status') == 'failed':
        print(f"❌ Error: {result.get('error', 'Unknown error')}")
    
except Exception as e:
    print(f"❌ Scraping test failed: {e}")
    print("This might be due to:")
    print("- Missing Telegram credentials")
    print("- Network connectivity issues")
    print("- Rate limiting from Telegram")

🧪 Testing Scraping Operation


2025-07-16 03:51:33,318 - alembic.runtime.migration - INFO - Context impl SQLiteImpl.
2025-07-16 03:51:33,318 - alembic.runtime.migration - INFO - Will assume non-transactional DDL.
2025-07-16 03:51:33,402 - alembic.runtime.migration - INFO - Running stamp_revision  -> 46b412388816
2025-07-16 03:51:33,414 - alembic.runtime.migration - INFO - Context impl SQLiteImpl.
2025-07-16 03:51:33,414 - alembic.runtime.migration - INFO - Will assume non-transactional DDL.
2025-07-16 03:51:33,500 - alembic.runtime.migration - INFO - Running stamp_revision  -> 46b412388816
2025-07-16 03:51:33,506 - dagster.builtin - INFO - Starting Telegram data scraping operation
2025-07-16 03:51:33,508 - src.scraper.telegram_scraper - INFO - Initialized TelegramScraper with 3 target channels
2025-07-16 03:51:33,511 - telethon.network.mtprotosender - INFO - Connecting to 149.154.167.91:443/TcpFull...
2025-07-16 03:51:33,673 - telethon.network.mtprotosender - INFO - Connection to 149.154.167.91:443/TcpFull complete!

✅ Scraping operation completed!
Status: success
Messages scraped: 1975
Channels processed: 3


Test Loading Operation

In [17]:
# Test loading operation
print("🧪 Testing Loading Operation")
print("=" * 50)

try:
    # Create a test context
    context = build_op_context()
    
    # Mock scrape results for testing
    mock_scrape_results = {
        'status': 'success',
        'message_count': 0,
        'channels_processed': 0
    }
    
    # Test the loading operation
    result = load_raw_to_postgres(context, mock_scrape_results)
    
    print(f"✅ Loading operation completed!")
    print(f"Status: {result.get('status', 'unknown')}")
    print(f"Records loaded: {result.get('records_loaded', 0)}")
    
    if result.get('status') == 'failed':
        print(f"❌ Error: {result.get('error', 'Unknown error')}")
    
except Exception as e:
    print(f"❌ Loading test failed: {e}")
    print("This might be due to:")
    print("- Database connection issues")
    print("- Missing database tables")
    print("- Incorrect database URL")

🧪 Testing Loading Operation


2025-07-16 04:11:22,783 - alembic.runtime.migration - INFO - Context impl SQLiteImpl.
2025-07-16 04:11:22,785 - alembic.runtime.migration - INFO - Will assume non-transactional DDL.
2025-07-16 04:11:23,912 - alembic.runtime.migration - INFO - Running stamp_revision  -> 46b412388816
2025-07-16 04:11:23,922 - alembic.runtime.migration - INFO - Context impl SQLiteImpl.
2025-07-16 04:11:23,924 - alembic.runtime.migration - INFO - Will assume non-transactional DDL.
2025-07-16 04:11:23,999 - alembic.runtime.migration - INFO - Running stamp_revision  -> 46b412388816
2025-07-16 04:11:24,004 - dagster.builtin - INFO - Starting raw data loading to PostgreSQL


Connected to PostgreSQL successfully
Error loading messages: column "media_type" of relation "raw_messages" does not exist
LINE 4: ...           message_text, message_date, has_media, media_type...
                                                             ^



2025-07-16 04:11:24,122 - dagster.builtin - ERROR - Error loading file CheMed123.json: column "media_type" of relation "raw_messages" does not exist
LINE 4: ...           message_text, message_date, has_media, media_type...
                                                             ^



Error loading messages: column "media_type" of relation "raw_messages" does not exist
LINE 4: ...           message_text, message_date, has_media, media_type...
                                                             ^



2025-07-16 04:11:24,158 - dagster.builtin - ERROR - Error loading file lobelia4cosmetics.json: column "media_type" of relation "raw_messages" does not exist
LINE 4: ...           message_text, message_date, has_media, media_type...
                                                             ^



Error loading messages: column "media_type" of relation "raw_messages" does not exist
LINE 4: ...           message_text, message_date, has_media, media_type...
                                                             ^



2025-07-16 04:11:24,200 - dagster.builtin - ERROR - Error loading file tikvahpharma.json: column "media_type" of relation "raw_messages" does not exist
LINE 4: ...           message_text, message_date, has_media, media_type...
                                                             ^



Error loading messages: column "media_type" of relation "raw_messages" does not exist
LINE 4: ...           message_text, message_date, has_media, media_type...
                                                             ^



2025-07-16 04:11:24,224 - dagster.builtin - ERROR - Error loading file CheMed123.json: column "media_type" of relation "raw_messages" does not exist
LINE 4: ...           message_text, message_date, has_media, media_type...
                                                             ^



Error loading messages: column "media_type" of relation "raw_messages" does not exist
LINE 4: ...           message_text, message_date, has_media, media_type...
                                                             ^



2025-07-16 04:11:24,253 - dagster.builtin - ERROR - Error loading file lobelia4cosmetics.json: column "media_type" of relation "raw_messages" does not exist
LINE 4: ...           message_text, message_date, has_media, media_type...
                                                             ^



Error loading messages: column "media_type" of relation "raw_messages" does not exist
LINE 4: ...           message_text, message_date, has_media, media_type...
                                                             ^



2025-07-16 04:11:24,305 - dagster.builtin - ERROR - Error loading file tikvahpharma.json: column "media_type" of relation "raw_messages" does not exist
LINE 4: ...           message_text, message_date, has_media, media_type...
                                                             ^

2025-07-16 04:11:24,307 - dagster.builtin - INFO - Raw data loading completed: {'status': 'success', 'records_loaded': 0, 'files_processed': 0}


✅ Loading operation completed!
Status: success
Records loaded: 0


Test DBT Operation

In [18]:
# Test dbt operation
print("🧪 Testing DBT Transformations")
print("=" * 50)

try:
    # Create a test context
    context = build_op_context()
    
    # Test the dbt operation
    result = run_dbt_transformations(context)
    
    print(f"✅ DBT operation completed!")
    print(f"Status: {result.get('status', 'unknown')}")
    print(f"Debug success: {result.get('debug_success', False)}")
    print(f"Run success: {result.get('run_success', False)}")
    print(f"Test success: {result.get('test_success', False)}")
    
    if result.get('status') == 'failed':
        print(f"❌ Error: {result.get('error', 'Unknown error')}")
    
except Exception as e:
    print(f"❌ DBT test failed: {e}")
    print("This might be due to:")
    print("- Missing dbt configuration")
    print("- Database connection issues")
    print("- Missing dbt models")

🧪 Testing DBT Transformations


2025-07-16 04:11:24,360 - alembic.runtime.migration - INFO - Context impl SQLiteImpl.
2025-07-16 04:11:24,361 - alembic.runtime.migration - INFO - Will assume non-transactional DDL.
2025-07-16 04:11:24,451 - alembic.runtime.migration - INFO - Running stamp_revision  -> 46b412388816
2025-07-16 04:11:24,466 - alembic.runtime.migration - INFO - Context impl SQLiteImpl.
2025-07-16 04:11:24,467 - alembic.runtime.migration - INFO - Will assume non-transactional DDL.
2025-07-16 04:11:24,563 - alembic.runtime.migration - INFO - Running stamp_revision  -> 46b412388816
2025-07-16 04:11:24,567 - dagster.builtin - INFO - Starting dbt transformations
2025-07-16 04:11:24,569 - dagster.builtin - INFO - Running dbt debug...
2025-07-16 04:11:24,578 - dagster.builtin - ERROR - dbt debug failed: [WinError 2] The system cannot find the file specified


✅ DBT operation completed!
Status: failed
Debug success: False
Run success: False
Test success: False
❌ Error: dbt configuration error


Test YOLO Operation

In [19]:
# Test YOLO operation
print("🧪 Testing YOLO Enrichment")
print("=" * 50)

try:
    # Create a test context
    context = build_op_context()
    
    # Test the YOLO operation
    result = run_yolo_enrichment(context)
    
    print(f"✅ YOLO operation completed!")
    print(f"Status: {result.get('status', 'unknown')}")
    print(f"Images processed: {result.get('images_processed', 0)}")
    print(f"Detections found: {result.get('detections_found', 0)}")
    
    if result.get('status') == 'failed':
        print(f"❌ Error: {result.get('error', 'Unknown error')}")
    
except Exception as e:
    print(f"❌ YOLO test failed: {e}")
    print("This might be due to:")
    print("- Missing YOLO model files")
    print("- No images found for processing")
    print("- GPU/CUDA issues")

🧪 Testing YOLO Enrichment


2025-07-16 04:11:24,639 - alembic.runtime.migration - INFO - Context impl SQLiteImpl.
2025-07-16 04:11:24,642 - alembic.runtime.migration - INFO - Will assume non-transactional DDL.
2025-07-16 04:11:24,730 - alembic.runtime.migration - INFO - Running stamp_revision  -> 46b412388816
2025-07-16 04:11:24,742 - alembic.runtime.migration - INFO - Context impl SQLiteImpl.
2025-07-16 04:11:24,743 - alembic.runtime.migration - INFO - Will assume non-transactional DDL.
2025-07-16 04:11:24,824 - alembic.runtime.migration - INFO - Running stamp_revision  -> 46b412388816
2025-07-16 04:11:24,831 - dagster.builtin - INFO - Starting YOLO image enrichment
2025-07-16 04:11:24,849 - dagster.builtin - INFO - Processing 2535 images with YOLO


Error loading YOLO model: Weights only load failed. This file can still be loaded, to do so you have two options, do those steps only if you trust the source of the checkpoint. 
	(1) In PyTorch 2.6, we changed the default value of the `weights_only` argument in `torch.load` from `False` to `True`. Re-running `torch.load` with `weights_only` set to `False` will likely succeed, but it can result in arbitrary code execution. Do it only if you got the file from a trusted source.
	(2) Alternatively, to load with `weights_only=True` please check the recommended steps in the following error message.
	WeightsUnpickler error: Unsupported global: GLOBAL ultralytics.nn.tasks.DetectionModel was not an allowed global by default. Please use `torch.serialization.add_safe_globals([ultralytics.nn.tasks.DetectionModel])` or the `torch.serialization.safe_globals([ultralytics.nn.tasks.DetectionModel])` context manager to allowlist this global if you trust this class/function.

Check the documentation of t

2025-07-16 04:11:24,882 - dagster.builtin - ERROR - Error in run_yolo_enrichment: Weights only load failed. This file can still be loaded, to do so you have two options, do those steps only if you trust the source of the checkpoint. 
	(1) In PyTorch 2.6, we changed the default value of the `weights_only` argument in `torch.load` from `False` to `True`. Re-running `torch.load` with `weights_only` set to `False` will likely succeed, but it can result in arbitrary code execution. Do it only if you got the file from a trusted source.
	(2) Alternatively, to load with `weights_only=True` please check the recommended steps in the following error message.
	WeightsUnpickler error: Unsupported global: GLOBAL ultralytics.nn.tasks.DetectionModel was not an allowed global by default. Please use `torch.serialization.add_safe_globals([ultralytics.nn.tasks.DetectionModel])` or the `torch.serialization.safe_globals([ultralytics.nn.tasks.DetectionModel])` context manager to allowlist this global if you 

❌ YOLO test failed: Weights only load failed. This file can still be loaded, to do so you have two options, do those steps only if you trust the source of the checkpoint. 
	(1) In PyTorch 2.6, we changed the default value of the `weights_only` argument in `torch.load` from `False` to `True`. Re-running `torch.load` with `weights_only` set to `False` will likely succeed, but it can result in arbitrary code execution. Do it only if you got the file from a trusted source.
	(2) Alternatively, to load with `weights_only=True` please check the recommended steps in the following error message.
	WeightsUnpickler error: Unsupported global: GLOBAL ultralytics.nn.tasks.DetectionModel was not an allowed global by default. Please use `torch.serialization.add_safe_globals([ultralytics.nn.tasks.DetectionModel])` or the `torch.serialization.safe_globals([ultralytics.nn.tasks.DetectionModel])` context manager to allowlist this global if you trust this class/function.

Check the documentation of torch.l

## Full Pipeline Execution

Now let's run the complete pipeline and see how all operations work together.

In [20]:
# Execute the full pipeline
print("🚀 Executing Full Pipeline")
print("=" * 50)

try:
    # Execute the job
    result = execute_job(telegram_pipeline_job)
    
    if result.success:
        print("✅ Pipeline executed successfully!")
        print(f"Run ID: {result.run_id}")
        
        # Get execution summary
        for event in result.all_events:
            if event.event_type_value == "STEP_SUCCESS":
                print(f"✅ Step {event.step_key} completed successfully")
            elif event.event_type_value == "STEP_FAILURE":
                print(f"❌ Step {event.step_key} failed")
                print(f"   Error: {event}")
    
    else:
        print("❌ Pipeline execution failed!")
        for event in result.all_events:
            if event.event_type_value == "STEP_FAILURE":
                print(f"❌ Step {event.step_key} failed: {event}")
    
except Exception as e:
    print(f"❌ Pipeline execution error: {e}")
    print("Check your environment configuration and try again.")

🚀 Executing Full Pipeline
❌ Pipeline execution error: execute_job() missing 1 required positional argument: 'instance'
Check your environment configuration and try again.


## Pipeline Monitoring and Reporting

Let's examine the pipeline results and generate reports.

In [21]:
# Analyze pipeline results
print("📊 Pipeline Results Analysis")
print("=" * 50)

try:
    # Get the latest run from Dagster instance
    instance = DagsterInstance.ephemeral()
    
    # List recent runs
    runs = instance.get_runs(limit=5)
    
    if runs:
        print(f"📈 Recent Pipeline Runs:")
        for run in runs:
            status = "✅ SUCCESS" if run.status.value == "SUCCESS" else "❌ FAILED"
            print(f"   {run.run_id[:8]}... - {status} - {run.start_time}")
        
        # Get the most recent run
        latest_run = runs[0]
        print(f"\n�� Latest Run Details:")
        print(f"   Run ID: {latest_run.run_id}")
        print(f"   Status: {latest_run.status.value}")
        print(f"   Start Time: {latest_run.start_time}")
        print(f"   End Time: {latest_run.end_time}")
        
        if latest_run.status.value == "SUCCESS":
            print("   ✅ Pipeline completed successfully!")
        else:
            print("   ❌ Pipeline failed - check logs for details")
    
    else:
        print("📭 No pipeline runs found")
        print("Run the pipeline first to see results")
        
except Exception as e:
    print(f"❌ Error analyzing results: {e}")

📊 Pipeline Results Analysis


2025-07-16 04:11:24,966 - alembic.runtime.migration - INFO - Context impl SQLiteImpl.
2025-07-16 04:11:24,969 - alembic.runtime.migration - INFO - Will assume non-transactional DDL.
2025-07-16 04:11:25,060 - alembic.runtime.migration - INFO - Running stamp_revision  -> 46b412388816
2025-07-16 04:11:25,071 - alembic.runtime.migration - INFO - Context impl SQLiteImpl.
2025-07-16 04:11:25,074 - alembic.runtime.migration - INFO - Will assume non-transactional DDL.
2025-07-16 04:11:25,159 - alembic.runtime.migration - INFO - Running stamp_revision  -> 46b412388816


📭 No pipeline runs found
Run the pipeline first to see results


## Configuration and Customization

Learn how to customize the pipeline for your specific needs.

In [22]:
# Show pipeline configuration options
print("⚙️  Pipeline Configuration")
print("=" * 50)

# Get default configuration
from dags.telegram_pipeline import get_pipeline_config

config = get_pipeline_config()
print("📋 Default Configuration:")
for op_name, op_config in config.get('ops', {}).items():
    print(f"   {op_name}:")
    for key, value in op_config.get('config', {}).items():
        print(f"     {key}: {value}")

print("\n🔧 Customization Options:")
print("   1. Modify scrape_telegram_data config for different channels")
print("   2. Adjust load_raw_to_postgres batch sizes")
print("   3. Configure dbt project settings")
print("   4. Set YOLO confidence thresholds")
print("   5. Add custom schedules")

# Example: Custom configuration
print("\n💡 Example Custom Configuration:")
custom_config = {
    "ops": {
        "scrape_telegram_data": {
            "config": {
                "max_messages_per_channel": 500,  # Reduced from 1000
                "include_media": True
            }
        },
        "run_yolo_enrichment": {
            "config": {
                "confidence_threshold": 0.7,  # Increased from 0.5
                "model_path": "models/custom_yolo.pt"
            }
        }
    }
}
print("   Custom config created for testing")

⚙️  Pipeline Configuration
📋 Default Configuration:
   scrape_telegram_data:
     max_messages_per_channel: 1000
     include_media: True
   load_raw_to_postgres:
     batch_size: 1000
     retry_attempts: 3
   run_dbt_transformations:
     dbt_project_dir: dbt
     profiles_dir: dbt
   run_yolo_enrichment:
     confidence_threshold: 0.5
     model_path: models/yolo_medical.pt

🔧 Customization Options:
   1. Modify scrape_telegram_data config for different channels
   2. Adjust load_raw_to_postgres batch sizes
   3. Configure dbt project settings
   4. Set YOLO confidence thresholds
   5. Add custom schedules

💡 Example Custom Configuration:
   Custom config created for testing


## Troubleshooting and Debugging

Common issues and how to resolve them.

In [23]:
# Troubleshooting guide
print("🔧 Troubleshooting Guide")
print("=" * 50)

troubleshooting = {
    "Telegram Connection Issues": [
        "Check TELEGRAM_API_ID and TELEGRAM_API_HASH in .env",
        "Verify phone number format (+1234567890)",
        "Ensure internet connectivity",
        "Check for rate limiting"
    ],
    "Database Connection Issues": [
        "Verify DATABASE_URL format",
        "Check PostgreSQL is running",
        "Ensure database exists",
        "Check user permissions"
    ],
    "dbt Issues": [
        "Run 'dbt debug' to check configuration",
        "Verify dbt project structure",
        "Check profiles.yml setup",
        "Ensure dbt dependencies installed"
    ],
    "YOLO Issues": [
        "Check YOLO model files exist",
        "Verify CUDA/GPU setup",
        "Check image file paths",
        "Ensure ultralytics is installed"
    ],
    "Dagster Issues": [
        "Check dagster.yaml configuration",
        "Verify workspace setup",
        "Check Python environment",
        "Review Dagster logs"
    ]
}

for issue, solutions in troubleshooting.items():
    print(f"\n❓ {issue}:")
    for i, solution in enumerate(solutions, 1):
        print(f"   {i}. {solution}")

print("\n📞 For more help:")
print("   - Check Dagster documentation")
print("   - Review pipeline logs")
print("   - Test individual operations")

🔧 Troubleshooting Guide

❓ Telegram Connection Issues:
   1. Check TELEGRAM_API_ID and TELEGRAM_API_HASH in .env
   2. Verify phone number format (+1234567890)
   3. Ensure internet connectivity
   4. Check for rate limiting

❓ Database Connection Issues:
   1. Verify DATABASE_URL format
   2. Check PostgreSQL is running
   3. Ensure database exists
   4. Check user permissions

❓ dbt Issues:
   1. Run 'dbt debug' to check configuration
   2. Verify dbt project structure
   3. Check profiles.yml setup
   4. Ensure dbt dependencies installed

❓ YOLO Issues:
   1. Check YOLO model files exist
   2. Verify CUDA/GPU setup
   3. Check image file paths
   4. Ensure ultralytics is installed

❓ Dagster Issues:
   1. Check dagster.yaml configuration
   2. Verify workspace setup
   3. Check Python environment
   4. Review Dagster logs

📞 For more help:
   - Check Dagster documentation
   - Review pipeline logs
   - Test individual operations


## Next Steps

Your Dagster pipeline is now ready for production use!

In [24]:
# Next steps and recommendations
print("�� Next Steps")
print("=" * 50)

next_steps = [
    "1. Set up production environment variables",
    "2. Configure database for production use",
    "3. Set up monitoring and alerting",
    "4. Create custom schedules for automation",
    "5. Add data quality checks",
    "6. Implement error handling and retries",
    "7. Set up logging and metrics collection",
    "8. Create backup and recovery procedures"
]

for step in next_steps:
    print(f"   {step}")

print("\n🚀 Ready to deploy!")
print("   Your Dagster pipeline is configured and ready for production use.")
print("   Use 'dagster dev' to start the UI and monitor your pipelines.")

�� Next Steps
   1. Set up production environment variables
   2. Configure database for production use
   3. Set up monitoring and alerting
   4. Create custom schedules for automation
   5. Add data quality checks
   6. Implement error handling and retries
   7. Set up logging and metrics collection
   8. Create backup and recovery procedures

🚀 Ready to deploy!
   Your Dagster pipeline is configured and ready for production use.
   Use 'dagster dev' to start the UI and monitor your pipelines.
