<a href="https://colab.research.google.com/github/MananM20/pyspark/blob/main/dags.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 3, 9),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
}
with DAG('calculator_dag', default_args=default_args, schedule_interval=None) as dag:
    # Prompt users for operation
    input_operation = BashOperator(
        task_id='input_operation',
        bash_command='read -p "Enter operation (+, -, *, /): " operation',
    )
    # Prompt users for input numbers
    input_x = BashOperator(
        task_id='input_x',
        bash_command='read -p "Enter first number (x): " x',
    )
    input_y = BashOperator(
        task_id='input_y',
        bash_command='read -p "Enter second number (y): " y',
    )
    # Perform calculation based on selected operation
    calculate = BashOperator(
        task_id='perform_calculation',
        bash_command='echo "The result is: $(($x $operation $y))"',
    )
    # Define task dependencies
    input_operation >> [input_x, input_y] >> calculate

In [None]:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 3, 11),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
}

with DAG('calculator_dag_auto', default_args=default_args, schedule_interval="*/5 * * * *") as dag:
    # Set the operation and numbers as environment variables
    # Prompt users for operation (simulated by setting an environment variable)
    input_operation = DummyOperator(
        task_id='input_operation',
        dag=dag
    )

    # Prompt users for input numbers (simulated by setting environment variables)
    input_x = DummyOperator(
        task_id='input_x',
        dag=dag
    )

    input_y = DummyOperator(
        task_id='input_y',
        dag=dag
    )

    # Perform calculation based on selected operation
    calculate = DummyOperator(
        task_id='perform_calculation',
        dag=dag
    )

    # Define task dependencies
    input_operation >> [input_x, input_y] >> calculate


In [None]:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
import joblib
def load_data():
    # Load your dataset here
    data = pd.read_csv('your_dataset.csv')
    return data
def preprocess_data(data):
    # Perform data preprocessing here
    # For example, handle missing values, encode categorical variables, etc.
    return preprocessed_data
def train_model(preprocessed_data):
    # Split data into features and target
    X = preprocessed_data.drop(columns=['target_column'])
    y = preprocessed_data['target_column']
    # Split data into training and testing sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    # Train a machine learning model
    model = RandomForestClassifier()
    model.fit(X_train, y_train)
    return model
def save_model(model):
    # Save the trained model to a file
    joblib.dump(model, 'trained_model.pkl')
    print("Model saved successfully.")
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 3, 9),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
}
with DAG('ml_pipeline_dag', default_args=default_args, schedule_interval=None) as dag:
    load_data_task = PythonOperator(
        task_id='load_data',
        python_callable=load_data,
    )
    preprocess_data_task = PythonOperator(
        task_id='preprocess_data',
        python_callable=preprocess_data,
        provide_context=True,
    )
    train_model_task = PythonOperator(
        task_id='train_model',
        python_callable=train_model,
        provide_context=True,
    )
    save_model_task = PythonOperator(
        task_id='save_model',
        python_callable=save_model,
        provide_context=True,
    )
    load_data_task >> preprocess_data_task >> train_model_task >> save_model_task

In [None]:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
import joblib
def load_data():
    # Load your dataset here
    data = pd.read_csv('your_dataset.csv')
    return data
def preprocess_data(data):
    # Perform data preprocessing here
    # For example, handle missing values, encode categorical variables, etc.
    return preprocessed_data
def train_model(preprocessed_data):
    # Split data into features and target
    X = preprocessed_data.drop(columns=['target_column'])
    y = preprocessed_data['target_column']
    # Split data into training and testing sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    # Train a machine learning model
    model = RandomForestClassifier()
    model.fit(X_train, y_train)
    return model
def save_model(model):
    # Save the trained model to a file
    joblib.dump(model, 'trained_model.pkl')
    print("Model saved successfully.")
def do_nothing():
    pass
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 3, 9),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
}
with DAG('ml_pipeline_dag_auto', default_args=default_args, schedule_interval="*/5 * * * *") as dag:
    load_data_task = PythonOperator(
        task_id='load_data',
        python_callable=do_nothing,
        dag=dag,
    )
    preprocess_data_task = PythonOperator(
        task_id='preprocess_data',
        python_callable=do_nothing,
        dag=dag,
    )
    train_model_task = PythonOperator(
        task_id='train_model',
        python_callable=do_nothing,
        dag=dag,
    )
    save_model_task = PythonOperator(
        task_id='save_model',
        python_callable=do_nothing,
        dag=dag,
    )
    load_data_task >> preprocess_data_task >> train_model_task >> save_model_task

In [None]:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("ML Pipeline DAG") \
    .getOrCreate()

def load_data():
    # Load your dataset here
    data = spark.read.csv('https://caps-newbucket.s3.amazonaws.com/Automobile.csv', header=True, inferSchema=True)
    return data

def preprocess_data(**kwargs):
    ti = kwargs['ti']
    data = ti.xcom_pull(task_ids='load_data')
    # Perform data preprocessing here
    # For example, handle missing values, encode categorical variables, etc.
    # Here, we'll simply assemble features into a single vector
    feature_cols = data.columns[:-1]  # Assuming last column is the target
    assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
    preprocessed_data = assembler.transform(data).select('features', 'target_column')
    return preprocessed_data

def train_model(**kwargs):
    ti = kwargs['ti']
    preprocessed_data = ti.xcom_pull(task_ids='preprocess_data')
    # Train a machine learning model
    rf = RandomForestClassifier()
    model = rf.fit(preprocessed_data)
    return model

def save_model(**kwargs):
    ti = kwargs['ti']
    model = ti.xcom_pull(task_ids='train_model')
    # Save the trained model to a file
    model.write().save('hdfs://path/to/trained_model')  # Save model to HDFS or any other file system
    print("Model saved successfully.")

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 3, 11),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
}

with DAG('pyspark_ml_pipeline_dag', default_args=default_args, schedule_interval=None) as dag:
    load_data_task = PythonOperator(
        task_id='load_data',
        python_callable=load_data,
    )

    preprocess_data_task = PythonOperator(
        task_id='preprocess_data',
        python_callable=preprocess_data,
    )

    train_model_task = PythonOperator(
        task_id='train_model',
        python_callable=train_model,
    )

    save_model_task = PythonOperator(
        task_id='save_model',
        python_callable=save_model,
    )

    load_data_task >> preprocess_data_task >> train_model_task >> save_model_task

# Stop the SparkSession
spark.stop()


In [None]:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("ML Pipeline DAG") \
    .getOrCreate()

def load_data():
    # Load your dataset here
    data = spark.read.csv('https://caps-newbucket.s3.amazonaws.com/Automobile.csv', header=True, inferSchema=True)
    return data

def preprocess_data(**kwargs):
    ti = kwargs['ti']
    data = ti.xcom_pull(task_ids='load_data')
    # Perform data preprocessing here
    # For example, handle missing values, encode categorical variables, etc.
    # Here, we'll simply assemble features into a single vector
    feature_cols = data.columns[:-1]  # Assuming last column is the target
    assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
    preprocessed_data = assembler.transform(data).select('features', 'target_column')
    return preprocessed_data

def train_model(**kwargs):
    ti = kwargs['ti']
    preprocessed_data = ti.xcom_pull(task_ids='preprocess_data')
    # Train a machine learning model
    rf = RandomForestClassifier()
    model = rf.fit(preprocessed_data)
    return model
def do_nothing():
    pass

def save_model(**kwargs):
    ti = kwargs['ti']
    model = ti.xcom_pull(task_ids='train_model')
    # Save the trained model to a file
    model.write().save('hdfs://path/to/trained_model')  # Save model to HDFS or any other file system
    print("Model saved successfully.")

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 3, 11),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
}

with DAG('pyspark_ml_pipeline_dag_auto', default_args=default_args, schedule_interval="*/5 * * * *") as dag:
    load_data_task = PythonOperator(
        task_id='load_data',
        python_callable=do_nothing,
    )

    preprocess_data_task = PythonOperator(
        task_id='preprocess_data',
        python_callable=do_nothing,
    )

    train_model_task = PythonOperator(
        task_id='train_model',
        python_callable=do_nothing,
    )

    save_model_task = PythonOperator(
        task_id='save_model',
        python_callable=do_nothing,
    )

    load_data_task >> preprocess_data_task >> train_model_task >> save_model_task

# Stop the SparkSession
spark.stop()


In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
import requests
import json
def fetch_json(url):
   """Fetch JSON data from a given URL."""
   print("Fetching JSON data...")
   response = requests.get(url=url)
   if response.status_code == 200:
       return response.text
   else:
       raise Exception(f"Failed to fetch data, status code: {response.status_code}")
def upload_json_to_s3(bucket_name, file_name, json_data):
   """Upload JSON data to an S3 bucket."""
   print("Uploading JSON data to S3...")
   key_path = f"asset/{file_name}.json"
   hook = S3Hook('s3_conn')
   hook.load_string(
       string_data=json_data,
       key=key_path,
       bucket_name=bucket_name,
       replace=True
   )
   print(f"Success: JSON object has been uploaded to {key_path} S3 key path.")
default_args = {
   'owner': 'airflow',
   'start_date': datetime(2023, 9, 1),
   'retries': 1,
   'retry_delay': timedelta(minutes=5),
 }
dag = DAG(
   dag_id='dump_to_s3',
   default_args=default_args,
   schedule_interval=None,
   catchup=False
 )
fetch_json_task = PythonOperator(
   task_id="fetch_json",
   python_callable=fetch_json,
   op_args=["http://engineering-exam.s3-website.ap-southeast-2.amazonaws.com/"],
   dag=dag
 )
upload_json_task = PythonOperator(
   task_id="upload_json_to_s3",
   python_callable=upload_json_to_s3,
   op_kwargs={
       'bucket_name': "INSERT YOUR S3 BUCKET NAME",
       'file_name': "airflow_tester",
       'json_data': "{{ ti.xcom_pull(task_ids='fetch_json') }}"
   },
   dag=dag
 )
fetch_json_task >> upload_json_task

In [None]:
from airflow import Dataset
from airflow.decorators import dag, task
from pendulum import datetime
from airflow.providers.amazon.aws.operators.s3 import (
    S3ListPrefixesOperator,
    S3CreateObjectOperator,
)
MY_S3_BUCKET = "myexamplebucketone"
MY_S3_FOLDER_PREFIX = "e"
MY_S3_BUCKET_DELIMITER = "/"
MY_DATA = "Hi S3 bucket!"
MY_FILENAME = "my_message.txt"
AWS_CONN_ID = "aws_conn"
@dag(
    dag_id="upstream_datasets_taskflow_usecase_pgs",
    start_date=datetime(2024, 3, 11),
    schedule=None,
    catchup=False,
    tags=["datasets", "taskflow", "usecase"],
)
def upstream_datasets_taskflow_usecase():
    # list all root level folders in my S3 bucket that start with my prefix
    list_folders_in_bucket = S3ListPrefixesOperator(
        task_id="list_folders_in_bucket",
        aws_conn_id=AWS_CONN_ID,
        bucket=MY_S3_BUCKET,
        prefix=MY_S3_FOLDER_PREFIX,
        delimiter=MY_S3_BUCKET_DELIMITER,
    )
    # create a separate Dataset object for each of the folders
    @task
    def create_datasets_from_s3_folders(folder_list):
        datasets_lists = []
        for folder in folder_list:
            uri = f"s3://{MY_S3_BUCKET}{MY_S3_BUCKET_DELIMITER}{folder}{MY_FILENAME}"
            datasets_lists.append(Dataset(uri))
        print(f"These datasets were created: {datasets_lists}")
        # returning a list of Datasets (2.5 feature)
        return datasets_lists
    list_of_datasets = create_datasets_from_s3_folders(list_folders_in_bucket.output)
    # write MY_DATA into a new MY_FILENAME in each of the S3 folders
    write_file_to_S3 = S3CreateObjectOperator.partial(
        task_id="write_file_to_S3", aws_conn_id=AWS_CONN_ID, data=MY_DATA, replace=True
    ).expand(
        s3_key=list_of_datasets.map(
            lambda x: x.uri
        )  # retrieving the keys from the Datasets
    )
    # since outlets is not a mappable parameter, use a task flow task to produce to the datasets
    # this too is only possible in 2.5+
    @task
    def produce_to_datasets(dataset_obj):
        return dataset_obj
    write_file_to_S3 >> produce_to_datasets.expand(dataset_obj=list_of_datasets)
upstream_datasets_taskflow_usecase()

In [None]:
"""
### Upload Data to S3

A simple DAG that shows how to upload data to S3.

Creates 5 small CSV files by looping through a Python Operator and uploads them all to S3.
This pattern of dynamically generating tasks can be used anytime the list of things in your DAGs is a known.
"""


from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import datetime, timedelta
import os


S3_CONN_ID='group7'
BUCKET='caps-newbucket'

name='workshop' #swap your name here


def upload_to_s3(file_name):

    # Instanstiate
    s3_hook=S3Hook(aws_conn_id=S3_CONN_ID)

    # Create file
    sample_file = "{0}_file_{1}.txt".format(name, file_name) #swap your name here
    example_file = open(sample_file, "w+")
    example_file.write("Putting some data in for task {0}".format(file_name))
    example_file.close()

    s3_hook.load_file(sample_file, 'globetelecom/{0}'.format(sample_file), bucket_name=BUCKET, replace=True)




# Default settings applied to all tasks
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}


# Using a DAG context manager, you don't have to specify the dag property of each task
with DAG('s3_upload',
         start_date=datetime(2024, 3, 12),
         max_active_runs=1,
         schedule_interval='0 12 8-14,22-28 * 6',  # https://airflow.apache.org/docs/stable/scheduler.html#dag-runs
         default_args=default_args,
         catchup=False # enable if you don't want historical dag runs to run
         ) as dag:

    t0 = DummyOperator(task_id='start')

    for i in range(0,5): # generates 10 tasks
        generate_files=PythonOperator(
            task_id='generate_file_{0}_{1}'.format(name, i), # task id is generated dynamically
            python_callable=upload_to_s3,
            op_kwargs= {'file_name': i}
        )

        t0 >> generate_files