In [1]:
# prerequeste including maven dependencies preinstalled by cloudformation.
# start pyspark: spark/bin/pyspark 

import multiprocessing
from pyspark.sql import SparkSession
from pyspark import SparkConf
from os.path import expanduser
import os
from airflow.models import DAG
from datetime import datetime
from datetime import timedelta
import pyspark.sql.functions as F
from airflow.operators.bash import BashOperator
from pathlib import Path
import findspark
findspark.init()
import pyspark
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from cassandra.cluster import Cluster
from pyspark.sql.functions import when, regexp_replace, regexp_extract, col
import requests
from time import sleep
import random
from multiprocessing import Process
import boto3
import json
import sqlalchemy
import uuid
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator

In [None]:
# 
home = expanduser("~")
airflow_dir = os.path.join(home, 'airflow')
assert os.path.isdir(airflow_dir)

In [2]:

home = expanduser("~")
airflow_dir = os.path.join(home, 'airflow')
Path(f"{airflow_dir}/dags").mkdir(parents=True, exist_ok=True)

In [None]:


def ETL():

    '''
    Initialise and configure spark setting.
    '''

    spark = SparkSession.builder \
            .setMaster(f"local[{multiprocessing.cpu_count()}]") \
            .appName("s3tospark") \
            .getOrCreate()
    # hadoopConf = sc._jsc.hadoopConfiguration()
    # hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    # hadoopConf.set('fs.s3a.access.key',<your access key>)
    # hadoopConf.set('fs.s3a.secret.key', <your secret key>)
    # hadoopConf.set('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')


    """
    loading data from aws s3 bucket
    """
    df = spark.read.json("/home/kafka/Documents/pinterest_project/data/*.json")

    """
    Removing duplicates and replace error value with None value
    """   
    
    # remove duplicates
    df = df.dropDuplicates()

    # replace error cells with Nones        
    df = df.replace({'No description available Story format': None}, subset = ['description'])\
                    .replace({'No description available': None}, subset = ['description'])\
                    .replace({'Image src error.': None}, subset = ['image_src'])\
                    .replace({'User Info Error': None}, subset = ['poster_name'])\
                    .replace({'N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e': None}, subset=['tag_list'])\ 
                    .replace({'No Title Data Available': None}, subset = ['title'])\
                    .replace({'User Info Error': "0"}, subset = ['follower_count'])# replace error values in follower_count with 0   

    # replace error values with null
    df = df.withColumn('save_location', regexp_replace('save_location', 'Local save in ', '')) 
    # drop the rows with null values in 'image_src' column, as we don't want a row without a pin
    df = df.na.drop(subset=["image_src"])

    df = df.withColumn('follower_count', 
            when(df.follower_count.endswith('M'),regexp_replace(df.follower_count,'M','000000')) \
            .when(df.follower_count.endswith('k'),regexp_replace(df.follower_count,'k','000')) \
            .otherwise(df.follower_count)) 

    # cast follower_count column type as int
    df = df.withColumn("follower_count", F.col("follower_count").cast("int"))
    # cassandra not allow use index as a column name, change "index" to "idx"
    df = df.withColumnRenamed('index', 'idx')
    # reorder selected columns
    df = df.select('idx', 'title', 'poster_name', 'category', 'follower_count', 'description', 'image_src', 'is_image_or_video', 'tag_list', 'unique_id')



def instantiate_cassandra():
    # initial cassandra
    # initialise cassandra driver
    cluster = Cluster()
    session = cluster.connect()
    # create a cassandra keyspace
    session.execute("CREATE KEYSPACE pinterest_project WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 3};")

    # create table
    session.execute("CREATE TABLE pinterest(idx int PRIMARY KEY, title text, poster_name text, category text, follower_count int, description text, image_src text, is_image_or_video text, tag_list text, unique_id text);")
    # make preparedUpdate statements

def sending_data_to_cassandra():
    cluster = Cluster()
    session = cluster.connect()
    session.execute("USE pinterest_project;")
    preparedUpdate = session.prepare(
        """ 
        INSERT INTO pinterest (idx, title, poster_name, category, follower_count, description, image_src, is_image_or_video, tag_list, unique_id) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
        """
        )

    # write df to cassandra
    for item in df.collect():
        session.execute(preparedUpdate, [item[0], item[1], item[2], item[3], item[4], item[5], item[6], item[7], item[8], item[9]])




In [None]:

class ETL:

    def __init__(self):
        '''
        Initialise and configure spark setting.
        '''

        findspark.init()
        self.spark = SparkSession.builder \
                .setMaster(f"local[{multiprocessing.cpu_count()}]") \
                .appName("s3tospark") \
                .getOrCreate()
        hadoopConf = sc._jsc.hadoopConfiguration()
        hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        hadoopConf.set('fs.s3a.access.key',<your access key>)
        hadoopConf.set('fs.s3a.secret.key', <your secret key>)
        hadoopConf.set('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')


    def load_data_from_aws_s3(self):
        """
        loading data from aws s3 bucket
        """
        self.df = self.spark.read.json("s3a://basicaccountstack-pinterestdataeng-proje-datalake-tcvpj2nf0cpq/pins/*.json")
        

    def data_cleaning(self):

        """
        Removing duplicates and replace error value with None value
        """   
        
        # remove duplicates
        self.df = self.df.dropDuplicates()

        # replace error cells with Nones        
        self.df = self.df.replace({'No description available Story format': None}, subset = ['description'])\
                        .replace({'No description available': None}, subset = ['description'])\
                        .replace({'Image src error.': None}, subset = ['image_src'])\
                        .replace({'User Info Error': None}, subset = ['poster_name'])\
                        .replace({'N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e': None}, subset = ['tag_list']) \ 
                        .replace({'No Title Data Available': None}, subset = ['title']) \
                        .replace({'User Info Error': "0"}, subset = ['follower_count'])
        # replace error values with null
        self.df = self.df.withColumn('save_location', regexp_replace('save_location', 'Local save in ', '')) 
        # drop the rows with null values in 'image_src' column, as we don't want a row without a pin
        self.df = self.df.na.drop(subset=["image_src"])
    
    def data_transformation(self):
        self.df = self.df.withColumn('follower_count', 
                when(self.df.follower_count.endswith('M'),regexp_replace(self.df.follower_count,'M','000000')) \
                .when(self.df.follower_count.endswith('k'),regexp_replace(self.df.follower_count,'k','000')) \
                .otherwise(self.df.follower_count)) 

        # cast follower_count column type as int
        self.df = self.df.withColumn("follower_count", F.col("follower_count").cast("int"))
        # cassandra not allow use index as a column name, change "index" to "idx"
        self.df = self.df.withColumnRenamed('index', 'idx')
        # reorder selected columns
        self.df = self.df.select('idx', 'title', 'poster_name', 'category', 'follower_count', 'description', 'image_src', 'is_image_or_video', 'tag_list', 'unique_id')

    def sending_data_to_cassandra(self):
        # initial cassandra
        # initialise cassandra driver
        cluster = Cluster()
        session = cluster.connect()
        # create a cassandra keyspace
        session.execute("CREATE KEYSPACE pinterest_project WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 3};")
        # initialise keyspace
        session.execute("USE pinterest_project;")
        # create table
        session.execute("CREATE TABLE pinterest(idx int PRIMARY KEY, title text, poster_name text, category text, follower_count int, description text, image_src text, is_image_or_video text, tag_list text, unique_id text);")
        # make preparedUpdate statements
        preparedUpdate = session.prepare(
            """ 
            INSERT INTO pinterest (idx, title, poster_name, category, follower_count, description, image_src, is_image_or_video, tag_list, unique_id) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
            """
        )

        # write df to cassandra
        for item in self.df.collect():
            session.execute(preparedUpdate, [item[0], item[1], item[2], item[3], item[4], item[5], item[6], item[7], item[8], item[9]])

    


In [None]:
t1=PythonOperator(
    task_id='python_script',
    python_callable=script.main,
    dag=dag
)

In [None]:
read_data_from_s3 = [
                PythonOperator(
                task_id="read_data_from_s3",
                python_callable=_ETL.,
                op_kwargs={
                "model": model_id
                }
                ) for model_id in ['A', 'B', 'C']
                ]
choosing_best_model = BranchPythonOperator(
                    task_id="choosing_best_model",
                    python_callable=_choosing_best_model
                    )
accurate = BashOperator(
            task_id="accurate",
            bash_command="echo 'accurate'"
            )
inaccurate = BashOperator(
                task_id="inaccurate",
                bash_command=" echo 'inaccurate'"
                )
training_model_tasks >> choosing_best_model >> [accurate, inaccurate]


In [None]:
default_args = {
    'owner': 'Michael',
    'depends_on_past': False,
    'email': ['h1m1w1@googlemail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'start_date': datetime(2023, 1, 10),
    'retry_delay': timedelta(minutes=5),
    'end_date': datetime(2023, 1, 20),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'trigger_rule': 'all_success'
}

In [None]:

# set arguements
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 9, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'schedule_interval': '@daily',
    'retries': 1,
    'retry_delay': timedelta(seconds=5),
}


dag = DAG(
  dag_id='my_dag', 
  description='Simple tutorial DAG',
  default_args=default_args)

src1_s3 = PythonOperator(
  task_id='source1_to_s3', 
  python_callable=source1_to_s3, 
  dag=dag)

src2_hdfs = PythonOperator(
  task_id='source2_to_hdfs', 
  python_callable=source2_to_hdfs, 
  op_kwargs = {'config' : config},
  provide_context=True,
  dag=dag
)

src3_s3 = PythonOperator(
  task_id='source3_to_s3', 
  python_callable=source3_to_s3, 
  dag=dag)

spark_job = BashOperator(
  task_id='spark_task_etl',
  bash_command='spark-submit --master spark://localhost:7077 spark_job.py',
  dag = dag)

# setting dependencies
src1_s3 >> spark_job
src2_hdfs >> spark_job
src3_s3 >> spark_job

In [None]:
with DAG(dag_id='pin_dag',
         default_args=default_args,
         schedule_interval='*/1 * * * *',
         catchup=False,
         tags=['test']
         ) as dag:
    # Define the tasks. Here we are going to define only one bash operator
    ETL = ETL()
    
    test_task = PythonOperator(
        task_id='write_date_file',
        bash_command='cd ~/Desktop && date >> ai_core.txt',
        dag=dag)
    
    read_data_from_s3 = [
                PythonOperator(
                task_id="read_data_from_s3",
                python_callable=_ETL.load_data_from_aws_s3,

                ]
choosing_best_model = BranchPythonOperator(
                    task_id="choosing_best_model",
                    python_callable=_choosing_best_model
                    )
accurate = BashOperator(
            task_id="accurate",
            bash_command="echo 'accurate'"
            )
inaccurate = BashOperator(
                task_id="inaccurate",
                bash_command=" echo 'inaccurate'"
                )
training_model_tasks >> choosing_best_model >> [accurate, inaccurate]

In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
from random import randint
def _choosing_best_model(ti):
    accuracies = ti.xcom_pull(task_ids=[
    'training_model_A',
    'training_model_B',
    'training_model_C'
    ])
    if max(accuracies) > 8:
        return 'accurate'
    return 'inaccurate'

def _training_model(model):
    return randint(1, 10)

with DAG("my_dag",
    start_date=datetime(2021, 1 ,1), 
    schedule_interval='@daily', 
    catchup=False) as dag:

    training_model_tasks = [
        PythonOperator(
            task_id=f"training_model_{model_id}",
            python_callable=_training_model,
            op_kwargs={
            "model": model_id
            }
        ) for model_id in ['A', 'B', 'C']
    ]

    choosing_best_model = BranchPythonOperator(
    task_id="choosing_best_model",
    python_callable=_choosing_best_model
    )

    accurate = BashOperator(
    task_id="accurate",
    bash_command="echo 'accurate'"
    ) 

    inaccurate = BashOperator(
    task_id="inaccurate",
    bash_command=" echo 'inaccurate'"
    )
training_model_tasks >> choosing_best_model >> [accurate, inaccurate]

In [None]:
# airflowRedditPysparkDag.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
import os

'''
input arguments for downloading S3 data 
and Spark jobs
REMARK: 
Replace `srcDir` and `redditFile` as the full paths containing your PySpark scripts
and location of the Reddit file will be stored respectively 
'''
s3Bucket = '<YOUR_S3_BUCKET>'
s3Key = '<YOUR_S3_KEY>'
redditFile = os.getcwd() + '/data/RC-s3-2007-10'
srcDir = os.getcwd() + '/src/'
sparkSubmit = '/usr/local/spark/bin/spark-submit'

## Define the DAG object
default_args = {
    'owner': 'insight-dan',
    'depends_on_past': False,
    'start_date': datetime(2016, 10, 15),
    'retries': 5,
    'retry_delay': timedelta(minutes=1),
}
dag = DAG('s3RedditPyspark', default_args=default_args, schedule_interval=timedelta(1))

'''
Defining three tasks: one task to download S3 data
and two Spark jobs that depend on the data to be 
successfully downloaded
task to download data
'''
downloadData= BashOperator(
    task_id='download-data',
    bash_command='python ' + srcDir + 'python/s3-reddit.py ' + s3Bucket + ' ' + s3Key + ' ' + redditFile,
    dag=dag)

#task to compute number of unique authors
numUniqueAuthors = BashOperator(
    task_id='unique-authors',
    bash_command=sparkSubmit + ' ' + srcDir + 'pyspark/numUniqueAuthors.py ' + redditFile,
    dag=dag)
#Specify that this task depends on the downloadData task
numUniqueAuthors.set_upstream(downloadData)

#task to compute average upvotes
averageUpvotes = BashOperator(
    task_id='average-upvotes',
    bash_command=sparkSubmit + ' ' + srcDir + 'pyspark/averageUpvote.py ' + redditFile,
    dag=dag)
averageUpvotes.set_upstream(downloadData)