In [37]:
import threading
import datetime
import random
import time
import csv
import re
import urllib.request
import xml.etree.ElementTree as ET
from urllib.error import HTTPError
import requests
from bs4 import BeautifulSoup as bs

date = datetime.datetime.now()
currDate = '{}/{}/{}'.format(date.day,date.month,date.year)
random.seed(datetime.datetime.now())

# Directories for the BBC news webpages I'm interested in
BBCArticleURLs = ('world','uk','business','politics','health',
'education','science_and_environment','technology','entertainment_and_arts',
'world/africa','world/asia','world/europe','world/latin_america','world/middle_east',
'world/us_and_canada','england','northern_ireland','scotland','wales')

def getArticles(dir, website):
    try:
        if website == 'BBC':
            tree = ET.parse(source=urllib.request.urlopen('http://feeds.bbci.co.uk/news/'+dir+'/rss.xml'))
        else:
            return None
    except HTTPError as err:
        print(err)
        return None
    except ET.ParseError as err:
        return None
    else:
        root = tree.getroot()
        allArticles = []
        for item in root.iter('item'):
            article = {}
            for elem in item:
                if elem.tag == 'title':
                    article['title'] = elem.text.strip()
                elif elem.tag == 'link':
                    article['link'] = elem.text.strip()
                elif elem.tag == 'description':
                    article['description'] = elem.text.strip()
                elif elem.tag == 'pubDate':
                    article['pubDate'] = elem.text.strip()
                elif website == 'BBC' and elem.tag.endswith('creator'):
                    article['author'] = elem.text.strip()
            allArticles.append(article)
        return allArticles

def extract_data(url):
    response = requests.get(url)
    soup = bs(response.content, 'html.parser')

    articles = soup.find_all('article')
    article_data = []
    for idx, article in enumerate(articles):
        title = article.find('h2').text.strip() if article.find('h2') else None
        description = article.find('p', class_='story__excerpt').text.strip() if article.find('p', class_='story__excerpt') else None
        time = article.find('time').text.strip() if article.find('time') else None
        article_data.append({'id': idx+1, 'title': title, 'description': description, 'time': time, 'source': url})

    return article_data

def preprocess(text):
    clean_text = re.sub('<.*?>', '', text)
    clean_text = re.sub('[^a-zA-Z]', ' ', clean_text)
    clean_text = clean_text.lower()
    clean_text = re.sub(' +', ' ', clean_text)
    return clean_text

def clean_data(data):
    cleaned_data = []
    for article in data:
        article['title'] = preprocess(article['title']) if article.get('title') else None
        article['description'] = preprocess(article['description']) if article.get('description') else None
        cleaned_data.append(article)
    return cleaned_data

def save_to_csv(file_name, articles):
    with open(file_name, 'w', newline='', encoding='utf-8') as csvfile:
        fieldnames = ['id', 'title', 'description', 'time', 'source']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        for article in articles:
            writer.writerow(article)

def writeCSV(articleList, dir, invalid, website):
    if invalid:
        with open('errorLog.csv', 'a', encoding="utf-8") as file:
            fields = ['date', 'website', 'dir', 'articleTitle']
            writeObj = csv.DictWriter(file, fieldnames=fields,lineterminator='\n')

            for article in articleList:
                writeObj.writerow({
                    'date': currDate,
                    'website': website,
                    'dir': dir,
                    'articleTitle': article['title'],
                    'author': article.get('author', ''),
                    'description': article.get('description', ''),
                    'pubDate': article.get('pubDate', '')
                })
    else:
        if website == 'BBC':
            with open('BBCinfoXML.csv', 'a', encoding="utf-8") as file:
                fields = ['date', 'dir', 'articleTitle', 'author', 'description', 'pubDate']
                writeObj = csv.DictWriter(file, fieldnames=fields,lineterminator='\n')

                for article in articleList:
                    writeObj.writerow({
                        'date': currDate,
                        'dir': dir,
                        'articleTitle': article['title'],
                        'author': article.get('author', ''),
                        'description': article.get('description', ''),
                        'pubDate': article.get('pubDate', '')
                    })

def scrape(dir, website):
    allArticles = getArticles(dir, website)
    if allArticles != None:
        writeCSV(allArticles, dir, 0, website)
        print('Downloaded articles from section: {} - {}'.format(website, dir))
    else:
        badscrapeMsg = 'Error could not scrape from section: {}'.format(dir)
        badscrape = []
        badscrape.append(badscrapeMsg)
        writeCSV(badscrape, dir, 1, website)
        print('############ Failed to download articles from section: {} ############ '.format(dir))

def BBCControl():
    for target in BBCArticleURLs:
        scrape(target, 'BBC')
        time.sleep(random.random())

def main():
    # URLs of Dawn and BBC
    urls = ['https://www.dawn.com/', 'https://www.bbc.com/']
    filename = "dawn.csv"

    # Extracting data from URLs
    all_data = []
    for url in urls:
        articles = extract_data(url)
        all_data.extend(articles)

    # Cleaning data
    cleaned_data = clean_data(all_data)

    # Saving data to CSV
    save_to_csv(filename, cleaned_data)
    threading.Thread(target=BBCControl).start()

if __name__ == "__main__":
    main()


since Python 3.9 and will be removed in a subsequent version. The only 
supported seed types are: None, int, float, str, bytes, and bytearray.
  random.seed(datetime.datetime.now())


Downloaded articles from section: BBC - world
Downloaded articles from section: BBC - uk
Downloaded articles from section: BBC - business
Downloaded articles from section: BBC - politics
Downloaded articles from section: BBC - health
Downloaded articles from section: BBC - education
Downloaded articles from section: BBC - science_and_environment
Downloaded articles from section: BBC - technology
Downloaded articles from section: BBC - entertainment_and_arts
Downloaded articles from section: BBC - world/africa
Downloaded articles from section: BBC - world/asia
Downloaded articles from section: BBC - world/europe
Downloaded articles from section: BBC - world/latin_america
Downloaded articles from section: BBC - world/middle_east
Downloaded articles from section: BBC - world/us_and_canada
Downloaded articles from section: BBC - england
Downloaded articles from section: BBC - northern_ireland
Downloaded articles from section: BBC - scotland


In [41]:
def transform(dawn_file, bbc_file, output_file):
    combined_data = []
    dawn_id = 0
    bbc_id = 0
    
    # Read data from dawn.csv
    with open(dawn_file, 'r', newline='', encoding='utf-8') as dawn_csv:
        dawn_reader = csv.DictReader(dawn_csv)
        for row in dawn_reader:
            dawn_id += 1
            combined_data.append({'Id': dawn_id, 'title': row['title'], 'source': 'Dawn'})

    # Read data from BBCinfoXML.csv
    with open(bbc_file, 'r', newline='', encoding='utf-8') as bbc_csv:
        bbc_reader = csv.reader(bbc_csv)
        for row in bbc_reader:
            if row:
                bbc_id += 1
                combined_data.append({'Id': dawn_id + bbc_id, 'title': row[2], 'source': 'BBC'})

    # Write the combined data to a new CSV file
    with open(output_file, 'w', newline='', encoding='utf-8') as output_csv:
        fieldnames = ['Id', 'title', 'source']
        writer = csv.DictWriter(output_csv, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(combined_data)

# Usage
transform('dawn.csv', 'BBCinfoXML.csv', 'cleaned.csv')


In [6]:
!pip install apache-airflow



Collecting apache-airflow
  Using cached apache_airflow-2.9.1-py3-none-any.whl.metadata (43 kB)
Collecting alembic<2.0,>=1.13.1 (from apache-airflow)
  Using cached alembic-1.13.1-py3-none-any.whl.metadata (7.4 kB)
Collecting argcomplete>=1.10 (from apache-airflow)
  Using cached argcomplete-3.3.0-py3-none-any.whl.metadata (16 kB)
Collecting asgiref (from apache-airflow)
  Using cached asgiref-3.8.1-py3-none-any.whl.metadata (9.3 kB)
Collecting blinker>=1.6.2 (from apache-airflow)
  Using cached blinker-1.8.2-py3-none-any.whl.metadata (1.6 kB)
Collecting colorlog<5.0,>=4.0.2 (from apache-airflow)
  Using cached colorlog-4.8.0-py2.py3-none-any.whl.metadata (9.8 kB)
Collecting configupdater>=3.1.1 (from apache-airflow)
  Using cached ConfigUpdater-3.2-py2.py3-none-any.whl.metadata (10 kB)
Collecting connexion<3.0,>=2.10.0 (from connexion[flask]<3.0,>=2.10.0->apache-airflow)
  Using cached connexion-2.14.2-py2.py3-none-any.whl.metadata (28 kB)
Collecting cron-descriptor>=1.2.24 (from apac

In [8]:
from datetime import datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from my_module import extract_data, clean_data, save_to_csv, transform

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 5, 12),
    'retries': 3,
}

with DAG('mlops_dag', default_args=default_args, schedule_interval='@daily') as dag:
    
    def extract_transform_load():
        extract_data(['https://www.dawn.com/', 'https://www.bbc.com/'])
        clean_data('dawn.csv')
        transform('dawn.csv', 'BBCinfoXML.csv', 'cleaned.csv')
        save_to_csv('cleaned.csv')
    
    run_etl = PythonOperator(
        task_id='run_etl',
        python_callable=extract_transform_load
    )
    
    run_etl


[[34m2024-05-12T15:47:43.658+0500[0m] {[34mlogging_config.py:[0m71} ERROR[0m - Unable to load the config, contains a configuration error.[0m


ValueError: Unable to configure formatter 'airflow'

export GOOGLE_APPLICATION_CREDENTIALS="C:\\Users\\gladi\\OneDrive\\Desktop\\mlops a3\\turing-reach-413612-f3ac0771dced.json"