In [None]:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

def extract_data():
    data_extractor = DataExtractor(remote_url="http://localhost:4444")
    data_extractor.fill_search_bar_by_link(job="data scientist", location="France", from_last_days=3)
    data_extractor.driver.find_element(By.ID, "onetrust-reject-all-handler").click()
    data = data_extractor.get_job_descriptions()
    data_extractor.store_data_in_mongo(data)

default_args = {
    'owner': 'you',
    'start_date': datetime.now(),
    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(dag_id='data_extractor_dag', default_args=default_args, schedule_interval=timedelta(hours=24))

extract_data_task = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data,
    dag=dag)


Yes, that's correct. In this example, there is only one task defined in the DAG, the extract_data_task . The task is defined by the PythonOperator and it calls the extract_data() function when it runs.
The extract_data() function in turn calls the different methods of your class to perform the data extraction, cleaning and storing steps.

You can add more tasks to the DAG if your pipeline includes more steps. For example, you could add a task to perform data analysis and another task to update the dashboard with the results.

You can also use other type of operators like BashOperator or PythonOperator to perform other tasks.
You can also use subdag or shortcircuit operator to create a more complex pipeline with multiple tasks.

Keep in mind that each task should be atomic and idempotent, and should only perform one specific action. This makes it easier to troubleshoot and maintain your pipeline.

In [None]:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

def extract_data():
    data_extractor = DataExtractor(remote_url="http://localhost:4444")
    data_extractor.fill_search_bar_by_link(job="data scientist", location="France", from_last_days=3)
    data_extractor.driver.find_element(By.ID, "onetrust-reject-all-handler").click()
    data = data_extractor.get_job_descriptions()
    return data

def store_data(**kwargs):
    data = kwargs['ti'].xcom_pull(task_ids='extract_data')
    data_extractor.store_data_in_mongo(data)

def analyze_data(**kwargs):
    data = kwargs['ti'].xcom_pull(task_ids='store_data')
    # perform analysis
    return analysis_result

def update_dashboard(**kwargs):
    analysis_result = kwargs['ti'].xcom_pull(task_ids='analyze_data')
    #update dashboard

default_args = {
    'owner': 'you',
    'start_date': datetime.now(),
    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(dag_id='data_extractor_dag', default_args=default_args, schedule_interval=timedelta(hours=24))

extract_data_task = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data,
    provide_context=True,
    dag=dag)

store_data_task = PythonOperator(
    task_id='store_data',
    python_callable=store_data,
    provide_context=True,
    dag=dag)

analyze_data_task = PythonOperator(
    task_id='analyze_data',
)


In the first function extract_data(), I returned the extracted data instead of pushing it to XCOM (Airflow's task instance metadata store).
When a task returns a value, it is implicitly passed to the next task in the DAG as an upstream_task_result parameter.

For example, the store_data() function can access the data returned by the extract_data() function by calling kwargs['ti'].upstream_task_result which will have the return value of the extract_data() function.

For large datasets, it is generally better to read and write the data from a remote storage, rather than passing the data between tasks.

Passing large datasets between tasks in memory could cause performance issues and may even cause the pipeline to fail due to memory constraints. It also could cause issues with the airflow scheduler and worker, which could lead to unexpected behavior.

Instead, you can store the data in a remote storage such as an object storage or a database and read and write the data from there. This way, you can avoid the need to pass large datasets between tasks and the data can be accessed by different tasks and even different DAGs.

For example, you can use a remote storage such as S3 or GCS to store the scraped data and then use the Airflow's S3KeySensor or GoogleCloudStorageObjectSensor to trigger the next task when the data is available in the storage.

You can also use a database like MongoDB or PostgreSQL to store the scraped data, and then use the MongoHook or PostgresHook to read and write the data from the database.

By using a remote storage, you also gain the benefits of data persistence, data backup and versioning.

It's also worth noting that depending on your use case, it might be more efficient to use a distributed data processing framework like Apache Hadoop, Apache Spark, or Apache Flink to process large datasets, rather than using Airflow alone.

If you want to avoid storing the data in your local computer before uploading it, you can use a file-like object to store the data in memory and then write it to Minio. Here's an example of how you can use the io.BytesIO class to write data to Minio without storing it on your local computer:

In [None]:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from selenium.webdriver.common.by import By
from helpers import DataExtractor
import pickle

# Task to extract the data
def extract_data(**kwargs):
    data_extractor = DataExtractor(remote_url="http://20.224.70.229:4444")
    data_extractor.fill_search_bar_by_link(job="data scientist", location="France", from_last_days=3)
    data_extractor.remove_pop_up()
    data = data_extractor.get_job_descriptions()

    # store the data into xcom to be used by the next task
    kwargs['ti'].xcom_push(key='scraped_data', value=data)
    data_extractor.close()


# Task to load the data
def load_data(**kwargs):
    # Retrieve the data from the XCom
    data = kwargs['ti'].xcom_pull(key='scraped_data', task_ids='extract_data')
    print("this is the data: ", data)
    
    # Load the data into MongoDB
    data_extractor = DataExtractor(remote_url="http://20.224.70.229:4444")
    data_extractor.store_data_in_mongo(data, bulk=False)
    
    # Close the connection
    data_extractor.close()




# define the default arguments for the DAG
default_args = {
    "owner": "abdessamad",  # the owner of the DAG
    "start_date": datetime.now(),  # the start date of the DAG
    "depends_on_past": True,  # the DAG depends on the past
    "retries": 2,  # the number of retries
    "retry_delay": timedelta(hours=1),  # the delay between retries
    "catchup": False,  # the DAG does not catch up with the past
    'schedule_interval': '@hourly',  # the schedule interval of the DAG
}

# Instantiate the DAG
dag = DAG('data_extraction_and_loading_17', 
          default_args=default_args, 
          schedule_interval=None)

# Instantiate the extract task
extract_data_task = PythonOperator(task_id='extract_data', 
                                   python_callable=extract_data, 
                                   provide_context=True,
                                   dag=dag)

# Instantiate the load task
load_data_task = PythonOperator(task_id='load_data', 
                                python_callable=load_data, 
                                provide_context=True, 
                                trigger_rule="all_success",
                                dag=dag)

# Set the task dependencies
extract_data_task >> load_data_task
