In [None]:
#pip install apache-airflow

In [None]:
#Importing lirbaries
from datetime import datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from scraping_merging_data import scrape_rightmove, scrape_zoopla, merge_data
from writing_to_database import write_merged_data_to_db

import requests
from bs4 import BeautifulSoup

import pandas as pd
import numpy as np

import time

In [None]:
#Defining DAGs and operators
pages = 10

#Scraping Rightmove
dag_rightmove = DAG('scrape_rightmove', 
                    description = 'Scraping selected number of pages from Rightmove',
                    schedule_interval = '@monthly',
                    start_date = datetime(2022, 4, 26), 
                    catchup = False)

rightmove_operator = PythonOperator(task_id = 'rightmove_task', 
                                    python_callable = scrape_rightmove,
                                    op_kwargs = {"pages":pages}
                                    dag = dag_rightmove)

#Scraping Zoopla
dag_zoopla = DAG('scrape_zooppla',
                 description = 'Scraping selected number of pages from Zoopla',
                 schedule_interval = '@monthly',
                 start_date = datetime(2022, 4, 26), 
                 catchup = False)

zoopla_operator = PythonOperator(task_id = 'zoopla_task', 
                                 python_callable = scrape_zoopla,
                                 op_kwargs = {"pages":pages}
                                 dag = dag_zoopla)

#Merging data
dag_merge = DAG('merge_data',
                description = 'Merging data',
                schedule_interval = '@monthly',
                start_date = datetime(2022, 4, 26), 
                catchup = False)

merge_operator = PythonOperator(task_id = 'merge_task',
                                python_callable = merge_data,
                                op_kwargs = {"new_rightmove_data":scrape_rightmove(pages),
                                             "new_zoopla_data":scrape_zoopla(pages)}
                                dag = dag_merge)

#Storing data in database
dag_store = DAG('store_data',
                description = 'Store data in database',
                schedule_interval = '@monthly',
                start_date = datetime(2022, 4, 26), 
                catchup = False)

store_operator = PythonOperator(task_id = 'store_task',
                                python_callable = write_merged_data_to_db,
                                op_kwargs = {"df":merge_data(scrape_rightmove(pages), 
                                                             scrape_zoopla(pages))}
                                dag = dag_store)

#Scheduling operators
rightmove_operator >> zoopla_operator >> merge_operator >> dag_store