In [None]:
# pip install airflow

!pip install apache-airflow -q

In [None]:
from airflow import DAG
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.python import PythonOperator
from datetime import datetime, date
import requests
from bs4 import BeautifulSoup
import pandas as pd
import os

[2025-06-08T22:15:17.009+0000] {utils.py:162} INFO - NumExpr defaulting to 2 threads.


In [None]:
# Defining DAG
# tag = categorize and organize DAG

def scrape_autism_alliance_news():
    """
    Scrapes news from autismalliance.ca, filters for 'autism'/'austim' in title
    and future dates, and saves to a CSV.
    """
    url = 'https://autismalliance.ca/news/?news-topic=&year=&page=1'
    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'}

    try:
        response = requests.get(url, headers=headers, timeout=10) # Added timeout
        response.raise_for_status() # Raise an exception for HTTP errors (4xx or 5xx)

        soup = BeautifulSoup(response.text, 'html.parser')

        title_elements = soup.find_all('a', class_='archive-article__link')
        date_elements = soup.find_all('p', class_='archive-article__date')

        results = []
        today = date.today() # Get today's date once

        for title_tag, date_tag in zip(title_elements, date_elements):
            title_text = title_tag.get_text(strip=True)
            date_text = date_tag.get_text(strip=True)

            # Convert string date to datetime object
            try:
                pub_date = datetime.strptime(date_text, '%B %d, %Y').date()
            except ValueError:
                print(f"Skipping article due to invalid date format: {date_text}")
                continue

            # Apply filters
            if ('autism' in title_text.lower() or 'austim' in title_text.lower()) and \
               (pub_date > today): # Filter for future dates
                results.append({
                    'title': title_text,
                    'date': pub_date,
                    'url': title_tag['href']
                })

        output_dir = '/opt/airflow/dags/scraped_data'
        csv_filename = 'autism_alliance_news.csv'
        full_csv_path = os.path.join(output_dir, csv_filename)

        os.makedirs(output_dir, exist_ok=True)

        if results:
            df = pd.DataFrame(results)
            file_exists = os.path.exists(full_csv_path)
            df.to_csv(full_csv_path, mode='a', header=not file_exists, index=False)
            print(f"{len(results)} new article(s) collected and saved to {full_csv_path}.")
        else:
            print("No new articles with 'autism/austim' and a future date found.")

    except requests.exceptions.RequestException as e:
        print(f"Error during web request: {e}")
    except Exception as e:
        print(f"An unexpected error occurred during scraping: {e}")

# --- Airflow DAG Definition ---

with DAG(
    dag_id='autism_alliance_news_scraper',
    start_date=datetime(2025, 6, 9),
    schedule='@weekly',
    catchup=False,
    tags=['web_scraping', 'news', 'autism_alliance'],
) as dag:
    start_task = BashOperator(
        task_id='start_scraping_process',
        bash_command='echo "Starting web scraping process..."',
    )

    scrape_news_task = PythonOperator(
        task_id='scrape_autism_alliance_news_data',
        python_callable=scrape_autism_alliance_news,
    )

    end_task = BashOperator(
        task_id='finish_scraping_process',
        bash_command='echo "Web scraping process finished."',
    )

    start_task >> scrape_news_task >> end_task