In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.drive.operators.drive import GoogleDriveUploadOperator
from airflow.providers.github.operators.github import GithubFileOperator
import requests
from bs4 import BeautifulSoup
import re
import string
import dvc



In [None]:

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 3, 21),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'data_pipeline',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
)

def extract_data(url):
    response = requests.get(url)
    soup = BeautifulSoup(response.text, 'html.parser')
    
    links = [a['href'] for a in soup.find_all('a', href=True)]
    titles = [h2.text for h2 in soup.find_all('h2')]
    descriptions = [p.text for p in soup.find_all('p')]
    
    return links, titles, descriptions

def preprocess_text(text):
    # Convert to lowercase
    text = text.lower()
    
    # Remove punctuation
    text = re.sub('['+string.punctuation+']', '', text)
    
    # Remove extra whitespace
    text = re.sub(r'\s+', ' ', text)
    
    # Remove numbers
    text = re.sub(r'\d+', '', text)
    
    # Remove common stopwords (e.g., "the", "and", etc.)
    stopwords = ['the', 'and', 'a', 'of', 'to', 'in', 'for', 'is', 'on', 'that', 'with']
    text = ' '.join([word for word in text.split() if word not in stopwords])
    
    return text

def transform_data(**kwargs):
    # Extract data from (link unavailable) and (link unavailable)
    dawn_links, dawn_titles, dawn_descriptions = extract_data('https://www.dawn.com')
    bbc_links, bbc_titles, bbc_descriptions = extract_data('https://www.bbc.com')
    
    # Preprocess and transform the extracted data
    dawn_titles = [preprocess_text(title) for title in dawn_titles]
    dawn_descriptions = [preprocess_text(description) for description in dawn_descriptions]
    bbc_titles = [preprocess_text(title) for title in bbc_titles]
    bbc_descriptions = [preprocess_text(description) for description in bbc_descriptions]
    
    return dawn_links, dawn_titles, dawn_descriptions, bbc_links, bbc_titles, bbc_descriptions

def store_data(**kwargs):
    # Store the processed data on Google Drive
    dawn_links, dawn_titles, dawn_descriptions, bbc_links, bbc_titles, bbc_descriptions = kwargs['ti'].xcom_pull(task_ids='transform_data')
    drive_folder = "15O-_4K4xvYGkS2HrcYD6JQOl1MFzySRx"
    GoogleDriveUploadOperator(
        task_id='upload_to_drive',
        folder=drive_folder,
        file_name='data.zip',
        data='data'
    )
    # Implement Data Version Control (DVC) to track versions of the data
    dvc.init()
    dvc.add('data')
    dvc.commit('-m "Initial commit"')
    dvc.push()
    # Version the metadata against each DVC push to Github repo
    GithubFileOperator(
        task_id='store_metadata',
        repo='Ali20782/MLOps_A2',
        file_path='data/metadata.txt',
        commit_message='Initial commit',
        overwrite=True
    )

extract_task = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data
)

transform_task = PythonOperator(
    task_id='transform_data',
    python_callable=transform_data
)

store_task = PythonOperator(
    task_id='store_data',
    python_callable=store_data
)

dag.append(extract_task)
dag.append(transform_task)
dag.append(store_task)