# Homework solution. Week 2.

Create virtual environment with conda:
```
conda create -n zoom_week_2 python=3.9
conda activate zoom_week_2
pip install -r requirements.txt
```

## Question 1. Load January 2020 data

Using the `etl_web_to_gcs.py` flow that loads taxi data into GCS as a guide, create a flow that loads the green taxi CSV dataset for January 2020 into GCS and run it. Look at the logs to find out how many rows the dataset has.

How many rows does that dataset have?

Let's run our modified script:

In [23]:
!python etl_web_to_gcs.py --color=green --year=2020 --month=1

00:11:22.434 | [36mINFO[0m    | prefect.engine - Created flow run[35m 'swinging-moth'[0m for flow[1;35m 'etl-web-to-gcs'[0m
URL: https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2020-01.csv.gz
00:11:22.656 | [36mINFO[0m    | Flow run[35m 'swinging-moth'[0m - Created task run 'fetch-b4598a4a-0' for task 'fetch'
00:11:22.657 | [36mINFO[0m    | Flow run[35m 'swinging-moth'[0m - Executing 'fetch-b4598a4a-0' immediately...
  return pd.read_csv(dataset_url)
00:11:26.200 | [36mINFO[0m    | Task run 'fetch-b4598a4a-0' - Finished in state [32mCompleted[0m()
00:11:26.229 | [36mINFO[0m    | Flow run[35m 'swinging-moth'[0m - Created task run 'clean-b9fd7e03-0' for task 'clean'
00:11:26.230 | [36mINFO[0m    | Flow run[35m 'swinging-moth'[0m - Executing 'clean-b9fd7e03-0' immediately...
00:11:26.307 | [36mINFO[0m    | Task run 'clean-b9fd7e03-0' -    VendorID lpep_pickup_datetime  ... trip_type congestion_surcharge
0       2.0  2019-12-

### Answer: 447770

## Question 2. Scheduling with Cron

Cron is a common scheduling specification for workflows. 

Using the flow in `etl_web_to_gcs.py`, create a deployment to run on the first of every month at 5am UTC. What’s the cron schedule for that?

Let's create a deployment

In [13]:
!prefect deployment build ./etl_web_to_gcs.py:etl_web_to_gcs -n "My Taxi Flow" --cron "0 5 1 * *"
!prefect deployment apply ./etl_web_to_gcs-deployment.yaml

[32mFound flow 'etl-web-to-gcs'[0m
[32mDeployment YAML created at [0m
[32m'/Users/a_kulesh/Workspace/education/courses/data-engineering-zoomcamp/_fork/dat[0m
[32ma-engineering-zoomcamp/cohorts/2023/week_2_workflow_orchestration/etl_web_to_gcs[0m
[32m-deployment.yaml'.[0m
[32mDeployment storage None does not have upload capabilities; no files uploaded.  [0m
[32mSuccessfully loaded 'My Taxi Flow'[0m
[32mDeployment 'etl-web-to-gcs/My Taxi Flow' successfully created with id [0m
[32m'cef6a1b9-faef-46d2-be09-6b14d469deef'.[0m

To execute flow runs from this deployment, start an agent that pulls work from 
the 'default' work queue:
[34m$ prefect agent start -q 'default'[0m


## Question 3. Loading data to BigQuery 

Using `etl_gcs_to_bq.py` as a starting point, modify the script for extracting data from GCS and loading it into BigQuery. This new script should not fill or remove rows with missing values. (The script is really just doing the E and L parts of ETL).

The main flow should print the total number of rows processed by the script. Set the flow decorator to log the print statement.

Parametrize the entrypoint flow to accept a list of months, a year, and a taxi color. 

Make any other necessary changes to the code for it to function as required.

Create a deployment for this flow to run in a local subprocess with local flow code storage (the defaults).

Make sure you have the parquet data files for Yellow taxi data for Feb. 2019 and March 2019 loaded in GCS. Run your deployment to append this data to your BiqQuery table. How many rows did your flow code process?

Uploading data to GCS

In [26]:
!python etl_web_to_gcs.py --color=yellow --year=2019 --month=2
!python etl_web_to_gcs.py --color=yellow --year=2019 --month=3

00:29:48.267 | [36mINFO[0m    | prefect.engine - Created flow run[35m 'manipulative-crayfish'[0m for flow[1;35m 'etl-web-to-gcs'[0m
URL: https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2019-02.csv.gz
00:29:48.497 | [36mINFO[0m    | Flow run[35m 'manipulative-crayfish'[0m - Created task run 'fetch-b4598a4a-0' for task 'fetch'
00:29:48.500 | [36mINFO[0m    | Flow run[35m 'manipulative-crayfish'[0m - Executing 'fetch-b4598a4a-0' immediately...
00:33:08.447 | [36mINFO[0m    | Task run 'fetch-b4598a4a-0' - Finished in state [32mCompleted[0m()
00:33:08.480 | [36mINFO[0m    | Flow run[35m 'manipulative-crayfish'[0m - Created task run 'clean-b9fd7e03-0' for task 'clean'
00:33:08.481 | [36mINFO[0m    | Flow run[35m 'manipulative-crayfish'[0m - Executing 'clean-b9fd7e03-0' immediately...
00:33:08.575 | [36mINFO[0m    | Task run 'clean-b9fd7e03-0' -    VendorID tpep_pickup_datetime  ... total_amount  congestion_surcharge
0        

Uploading data from GCS to BigQuery

In [33]:
!python etl_gcs_to_bq.py --color=yellow --year=2019 --months=2,3

00:51:10.998 | [36mINFO[0m    | prefect.engine - Created flow run[35m 'fantastic-jacamar'[0m for flow[1;35m 'etl-gcs-to-bq'[0m
00:51:11.201 | [36mINFO[0m    | Flow run[35m 'fantastic-jacamar'[0m - Created task run 'extract_from_gcs-968e3b65-0' for task 'extract_from_gcs'
00:51:11.203 | [36mINFO[0m    | Flow run[35m 'fantastic-jacamar'[0m - Executing 'extract_from_gcs-968e3b65-0' immediately...
00:51:11.909 | [36mINFO[0m    | Task run 'extract_from_gcs-968e3b65-0' - Downloading blob named data/yellow/yellow_tripdata_2019-02.parquet from the dtc_data_lake_virtual-dynamo-375412 bucket to data/gcs/data/yellow/yellow_tripdata_2019-02.parquet/data/yellow/yellow_tripdata_2019-02.parquet
00:51:27.658 | [36mINFO[0m    | Task run 'extract_from_gcs-968e3b65-0' - Finished in state [32mCompleted[0m()
00:51:27.690 | [36mINFO[0m    | Flow run[35m 'fantastic-jacamar'[0m - Created task run 'transform-a7d916b4-0' for task 'transform'
00:51:27.691 | [36mINFO[0m    | Flow run[35

## Question 4. Github Storage Block

Using the `web_to_gcs` script from the videos as a guide, you want to store your flow code in a GitHub repository for collaboration with your team. Prefect can look in the GitHub repo to find your flow code and read it. Create a GitHub storage block from the UI or in Python code and use that in your Deployment instead of storing your flow code locally or baking your flow code into a Docker image. 

Note that you will have to push your code to GitHub, Prefect will not push it for you.

Run your deployment in a local subprocess (the default if you don’t specify an infrastructure). Use the Green taxi data for the month of November 2020.

How many rows were processed by the script?

## Question 5. Email or Slack notifications

Q5. It’s often helpful to be notified when something with your dataflow doesn’t work as planned. Choose one of the options below for creating email or slack notifications.

The hosted Prefect Cloud lets you avoid running your own server and has Automations that allow you to get notifications when certain events occur or don’t occur. 

Create a free forever Prefect Cloud account at app.prefect.cloud and connect your workspace to it following the steps in the UI when you sign up. 

Set up an Automation that will send yourself an email when a flow run completes. Run the deployment used in Q4 for the Green taxi data for April 2019. Check your email to see the notification.

Alternatively, use a Prefect Cloud Automation or a self-hosted Orion server Notification to get notifications in a Slack workspace via an incoming webhook. 

Join my temporary Slack workspace with [this link](https://join.slack.com/t/temp-notify/shared_invite/zt-1odklt4wh-hH~b89HN8MjMrPGEaOlxIw). 400 people can use this link and it expires in 90 days. 

In the Prefect Cloud UI create an [Automation](https://docs.prefect.io/ui/automations) or in the Prefect Orion UI create a [Notification](https://docs.prefect.io/ui/notifications/) to send a Slack message when a flow run enters a Completed state. Here is the Webhook URL to use: https://hooks.slack.com/services/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

Test the functionality.

Alternatively, you can grab the webhook URL from your own Slack workspace and Slack App that you create. 


How many rows were processed by the script?

## Question 6. Secrets

Prefect Secret blocks provide secure, encrypted storage in the database and obfuscation in the UI. Create a secret block in the UI that stores a fake 10-digit password to connect to a third-party service. Once you’ve created your block in the UI, how many characters are shown as asterisks (*) on the next page of the UI?