# Develop a Scheduled Data Pipeline with Prefect

<hr>

## Environment Setup

The code in this notebook uses `prefect` for orchestration *(figuring out what to do, and in what order)* and `dask` for execution *(doing the things)*.

It relies on the following additional non-builtin libraries:


* `pyspark`: data manipulation
* `pyspark`: read in data from the server
* `dask-saturn`: create and interact with Saturn Cloud `Dask` clusters ([link](https://github.com/saturncloud/dask-saturn))

In [None]:
import json
import numpy as np
import os
import pandas as pd
import prefect
import requests
import uuid
from datetime import datetime, timedelta
from prefect import task, Flow, Parameter
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType, FloatType
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import lit
import pyspark.sql.functions as F
sc = SparkContext('local')
spark = SparkSession(sc)

from datetime import datetime
from io import BytesIO
from prefect import task, Parameter, Flow
from prefect.engine.executors.dask import DaskExecutor
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import median_absolute_error
from sklearn.metrics import mean_squared_error
from sklearn.metrics import r2_score
from zipfile import ZipFile

from dask_saturn import SaturnCluster

<hr>

## Define Tasks

`prefect` refers to a workload as a "flow", which comprises multiple individual things to do called "tasks". From [the Prefect docs](https://docs.prefect.io/core/concepts/tasks.html):

> A task is like a function: it optionally takes inputs, performs an action, and produces an optional result.

The goal of this notebooks flow is to evaluate, on an ongoing basis, the performance of a model that predicts time-to-close for tickets in an IT support system.

That can be broken down into the following tasks

* `get_trial_id()`: assign a unique ID to each run
* `extract()`: extract data from cloud(where csv for three years are stored)
* `transform()`: transform dataset to column oriented and row oriented 
* `load()`:  merge datasets and load in database
* `get_trial_summary()`: collect all evaluation metrics in one object
* `write_trial_summary()`: write trial results somewhere

In [None]:
import json
import requests
import pandas as pd
from datetime import datetime, timedelta
from prefect import task, Flow, Parameter

@task(max_retries=10, retry_delay=timedelta(seconds=10))
def extract(url: str) -> dict:
    try :
        ypath = url+"yellow_tripdata_*.csv"
        gpath = url+"green_tripdata_*.csv"
      
        taxi_schema = StructType([StructField("VendorID", IntegerType(), False),
                                  StructField("pickup_datetime", TimestampType(), False),
                                  StructField("dropoff_datetime", TimestampType(), False),
                                  StructField("store_and_fwd_flag", StringType(), False),
                                  StructField("RatecodeID", IntegerType(), False),
                                  StructField("PULocationID", IntegerType(), False),
                                  StructField("DOLocationID", IntegerType(), False),
                                  StructField("passenger_count", IntegerType(), False),
                                  StructField("trip_distance", FloatType(), False),
                                  StructField("fare_amount", FloatType(), False),
                                  StructField("extra", FloatType(), False),
                                  StructField("mta_tax", FloatType(), False),
                                  StructField("tip_amount", FloatType(), False),
                                  StructField("tolls_amount", FloatType(), False),
                                  StructField("ehail_fee", FloatType(), False),
                                  StructField("improvement_surcharge", FloatType(), False),
                                  StructField("total_amount", FloatType(), False),
                                  StructField("payment_type", IntegerType(), False),
                                  StructField("trip_type", IntegerType(), False)])

        yellow_df = spark.read.option("header", True)\
        .schema(taxi_schema) \
        .csv(ypath)\
        .withColumnRenamed("tpep_pickup_datetime", "pickup_datetime") \
        .withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime")\
        .withColumn("taxi_type", lit("yellow")) \
        .withColumn("ehail_fee", lit(0.0)) 
   
    
        green_df = spark.read.option("header", True)\
        .schema(taxi_schema) \
        .csv(gpath) \
        .withColumnRenamed("lpep_pickup_datetime", "pickup_datetime") \
        .withColumnRenamed("lpep_dropoff_datetime", "dropoff_datetime")\
        .withColumn("taxi_type", lit("green"))

    except:
        raise Exception('No data fetched!')
    
    return yellow_df,green_df


@task
def transform(yellow_df: pd.DataFrame,green_df: pd.DataFrame):
    #Add hour column
    yellow_df = yellow_df.withColumn("pickup_hour", F.from_unixtime(F.unix_timestamp(col("pickup_datetime"),"yyyy-MM-dd hh:mm:ss"),"yyyy-MM-dd hh:00:00"))
    green_df = green_df.withColumn("pickup_hour", F.from_unixtime(F.unix_timestamp(col("pickup_datetime"),"yyyy-MM-dd hh:mm:ss"),"yyyy-MM-dd hh:00:00"))
    yellow_df = yellow_df.withColumn("dropoff_hour", F.from_unixtime(F.unix_timestamp(col("dropoff_datetime"),"yyyy-MM-dd hh:mm:ss"),"yyyy-MM-dd hh:00:00"))
    green_df = green_df.withColumn("dropoff_hour", F.from_unixtime(F.unix_timestamp(col("dropoff_datetime"),"yyyy-MM-dd hh:mm:ss"),"yyyy-MM-dd hh:00:00"))

    
    
    taxi_df = yellow_df.union(green_df)
    taxi_schema = StructType(
      [StructField("VendorID", IntegerType(), False),
      StructField("pickup_datetime", TimestampType(), False),
      StructField("dropoff_datetime", TimestampType(), False),
      StructField("store_and_fwd_flag", StringType(), False),
      StructField("RatecodeID", IntegerType(), False),
      StructField("PULocationID", IntegerType(), False),
      StructField("DOLocationID", IntegerType(), False),
      StructField("passenger_count", IntegerType(), False),
      StructField("trip_distance", FloatType(), False),
      StructField("fare_amount", FloatType(), False),
      StructField("extra", FloatType(), False),
      StructField("mta_tax", FloatType(), False),
      StructField("tip_amount", FloatType(), False),
      StructField("tolls_amount", FloatType(), False),
      StructField("ehail_fee", FloatType(), False),
      StructField("improvement_surcharge", FloatType(), False),
      StructField("total_amount", FloatType(), False),
      StructField("payment_type", IntegerType(), False),
      StructField("trip_type", IntegerType(), False),
      StructField("taxi_type", IntegerType(), False),
      StructField("pickup_hour", IntegerType(), False),
      StructField("dropoff_hour", IntegerType(), False)])
    
    taxi_df.write.option("schema",taxi_schema).mode('append').parquet("https://cloud.uni-koblenz.de/s/tTcoPwsBdoXnWcG/parquet/taxi_df.parquet")

    avro_schema = { "type": "record",
    "name":"avro_schema",
    "type":"record",
        "fields":[
            {"type":"int", "name":"VendorID"},
            {"type":"datetime", "name":"pickup_datetime"}
            {"type":"datetime", "name":"dropoff_datetime"}
            {"type":"string", "name":"store_and_fwd_flag"}
            {"type":"int", "name":"RatecodeID"}
            {"type":"int", "name":"PULocationID"}
            {"type":"int", "name":"DOLocationID"}
            {"type":"int", "name":"passenger_count"}
            {"type":"float", "name":"trip_distance"}
            {"type":"float", "name":"fare_amount"}
            {"type":"float", "name":"extra"}
            {"type":"float", "name":"mta_tax"}
            {"type":"float", "name":"tip_amount"}
            {"type":"float", "name":"tolls_amount"}
            {"type":"float", "name":"ehail_fee"}
            {"type":"float", "name":"improvement_surcharge"}
            {"type":"float", "name":"total_amount"}
            {"type":"float", "name":"payment_type"}
            {"type":"float", "name":"trip_type"}
            {"type":"float", "name":"taxi_type"}
            {"type":"float", "name":"pickup_hour"}
            {"type":"float", "name":"dropoff_hour"}
        ]
     }
    
    taxi_df.write.option("forceSchema", avro_schema).save("https://cloud.uni-koblenz.de/s/tTcoPwsBdoXnWcG/parquet/taxi_df.avro")
   
    # taxi_df_parquet = spark.read.parquet("https://cloud.uni-koblenz.de/s/tTcoPwsBdoXnWcG/parquet/taxi_df.parquet")
    
    # taxi_df_avro = sqlContext.read.format("com.databricks.spark.avro").load("https://cloud.uni-koblenz.de/s/tTcoPwsBdoXnWcG/parquet/taxi_df.avro")
    
    return taxi_df


@task
def load(taxi_df: pd.DataFrame, path: str) -> None:
    
    # If output is needed in csv 
    taxi_df.write.csv('output.csv')
    #set variable to be used to connect the database
    database = "TestDB"
    table = "dbo.tbl_spark_df"
     #write the dataframe into a sql table
    taxi_df.write.mode("overwrite") \
    .format("jdbc") \
    .option("url", f"jdbc:sqlserver://localhost/SQLEXPRESS;databaseName={database};") \
    .option("dbtable", table) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .save()

    #for updating
    #taxi_df.write.mode(SaveMode.Append).jdbc(JDBCurl,mySqlTable,connectionProperties)
    
@task
def get_trial_id() -> str:
    #Generate a unique identifier for this trial.

    return str(uuid.uuid4())


@task
def get_trial_summary(trial_id: str, taxi_df: pd.DataFrame) -> dict:
    out = {"id": trial_id}
    out["data"] = {
        "num_obs": taxi_df.shape[0],
    }
    return out


@task(log_stdout=True)
def write_trial_summary(trial_summary: str):
    """
    Write out a summary of the file. Currently just logs back to the
    Prefect logger
    """
    logger = prefect.context.get("logger")
    logger.info(json.dumps(trial_summary))

<hr>

## Construct a Flow

Now that all of the task logic has been defined, the next step is to compose those tasks into a "flow". From [the Prefect docs](https://docs.prefect.io/core/concepts/flows.html):

> A Flow is a container for Tasks. It represents an entire workflow or application by describing the dependencies between tasks.

> Flows are DAGs, or "directed acyclic graphs." This is a mathematical way of describing certain organizational principles:

> * A graph is a data structure that uses "edges" to connect "nodes." Prefect models each Flow as a graph in which Task dependencies are modeled by Edges.
> * A directed graph means that edges have a start and an end: when two tasks are connected, one of them unambiguously runs first and the other one runs second.
> * An acyclic directed graph has no circular dependencies: if you walk through the graph, you will never revisit a task you've seen before.

In [None]:


with Flow(f"{SATURN_USERNAME}-tlcdata", schedule=schedule) as flow:
    param_url = Parameter(name='p_url', required=True)
    
    yellow_df,green_df = extract(url=param_url)
    taxi_df = transform(yellow_df,green_df)
    load(data=taxi_df, path=f'C:/Users/Soujanya/users_{int(datetime.now().timestamp())}.csv')
    batch_size = Parameter("batch-size", default=1000)
    trial_id = get_trial_id()

    # get trial summary in a string
    trial_summary = get_trial_summary(
        trial_id=trial_id,
        input_df=taxi_df,
    )

    # store trial summary
    trial_complete = write_trial_summary(trial_summary)
    sc.stop()

At this point, we have all of the work defined in tasks and arranged within a flow, but none of the tasks have run yet. In the next section, we'll do that using `Dask`.

<hr>

## Run the flow with Dask

If you run `flow.visualize()` on the code above, you'll see that this flow is not linear. Some tasks are independent of others, and can be run at the same time.

`Dask` is a framework that runs graphs like the one above. Saturn makes it easy to create and manage Dask clusters. The code below will create a Dask cluster with two workers.

Next, you need to tell `prefect` to use this `Dask` cluster to do work! This is done with something called an "executor". From the [Prefect docs]():

> Prefect Executors implement the logic for how Tasks are run.

In [None]:
executor = DaskExecutor(
    cluster_class=SaturnCluster,
    cluster_kwargs={"n_workers": 3},
)

Finally, run the flow! The code below will pass all of the tasks to the `Dask` cluster you created above.

In [None]:
flow.run(executor=executor)
flow.run(parameters={
        'p_url': 'https://cloud.uni-koblenz.de/apps/files/DOES_NOT_EXIST'
    }