In [None]:
# src/main.py
import asyncio
from dotenv import load_dotenv
from src.scraping.telegram_scraper import TelegramScraper
from src.scraping.channel_registry import CHANNEL_REGISTRY

async def run_scraping():
    """Orchestrate the scraping pipeline"""
    load_dotenv()  # Load environment variables
    
    scraper = TelegramScraper()
    
    try:
        async with scraper.client:
            # Scrape all registered channels
            for channel_name in CHANNEL_REGISTRY.values():
                success = await scraper.scrape_channel(channel_name)
                
                if not success:
                    print(f"Failed to scrape {channel_name}")
                else:
                    print(f"Successfully scraped {channel_name}")
                    
    except Exception as e:
        print(f"Scraping failed: {str(e)}")

if __name__ == "__main__":
    asyncio.run(run_scraping())
    

In [None]:
# Example of manually calling storage functions
from datetime import datetime
from src.scraping.storage_manager import DataLakeManager

# Initialize storage
storage = DataLakeManager()

# Example data to store
sample_data = [
    {
        "id": 123,
        "text": "Medical supplies available",
        "date": datetime.now().isoformat()
    }
]

# Call storage function
storage.write_to_datalake(
    data=sample_data,
    data_type="messages",
    channel="chemed",
    date=datetime.now().strftime('%Y-%m-%d')
)


In [None]:
# src/pipeline.py
import asyncio
from typing import List, Dict
from src.scraping.telegram_scraper import TelegramScraper
from src.scraping.storage_manager import DataLakeManager

class MedicalDataPipeline:
    def __init__(self):
        self.scraper = TelegramScraper()
        self.storage = DataLakeManager()
    
    async def execute(self):
        """Full pipeline execution"""
        try:
            async with self.scraper.client:
                # 1. Scrape data
                raw_data = await self._scrape_all_channels()
                
                # 2. Process and store
                self._process_and_store(raw_data)
                
        except Exception as e:
            print(f"Pipeline failed: {str(e)}")
            raise
    
    async def _scrape_all_channels(self) -> Dict[str, List[Dict]]:
        """Scrape all registered channels"""
        from src.scraping.channel_registry import CHANNEL_REGISTRY
        
        results = {}
        for channel in CHANNEL_REGISTRY.values():
            messages = await self.scraper.scrape_channel(channel)
            results[channel] = messages
        return results
    
    def _process_and_store(self, data: Dict[str, List[Dict]]):
        """Process and store scraped data"""
        for channel_name, messages in data.items():
            # Add any preprocessing here
            processed = self._preprocess(messages)
            
            # Store to data lake
            self.storage.write_to_datalake(
                data=processed,
                data_type="messages",
                channel=channel_name
            )
    
    def _preprocess(self, messages: List[Dict]) -> List[Dict]:
        """Example preprocessing"""
        return [msg for msg in messages if msg.get('text')]  # Filter empty messages

# To run the complete pipeline
if __name__ == "__main__":
    pipeline = MedicalDataPipeline()
    asyncio.run(pipeline.execute())
    