# Wrangler Prefect Walkthrough

### Introduction

In this lesson, we'll move through the wrangler prefect lab.  The lab will use the texas drink receipts api. 

### Project Justification

Before moving on, let's talk through why this topic can make for a good project.  We can use alcoholic receipts as a proxy for revenue of a company, and here we can get insights on a per store basis, that we cannot get from a company report.  

For example, in the `etl/tx_drinks/seed` file, you can see that we chose restaurants that are publicly listed.  The last set of restaurants are different Applebees chains.  We could imagine a project showing the following.

* Overview analysis
1. Showing a relationship between drink revenue and overall revenue
2. A dashboard with an overall map display of revenue per store
3. Then detail  about an individual store, as we click on a store
    * Eg. Revenue per month, monthly sales in previous year vs current
    
* Deeper Analysis
And then if given more time, we could explore location based metrics.
1. Does income of region predict revenue?
2. How about population size?
3. Or sales of nearby competitors (eg. Chilis)
4. Outliers - Are there over or underperformers that we should look more deeply at?

Stats like this would allow us to (1) justify where to open/close retail locations of similar stores (2) assess performance to dig deeper and learn about a store's performance.


### Getting setup 

To be able to run the code, we should setup our virtual environment, and install the necessary packages.

```bash
python3 python -m venv ./venv
```

```bash
source venv/bin/activate
```

Now that the environment is activated, we can install the packages in the `requirements.txt` file.

```bash
pip3 install -r requirements.txt
```

If you look at the `requirements.txt` file, you can see that we installed the following packages.

```
prefect
requests
pandas
awswrangler
python-dotenv
```

And then we should assign the `PYTHONPATH` to the path of our `revenue_tracker` folder.  This will mean that as we import files, it will always relative to the specified revenue_tracker.  

In my environment, I set this by placing the following into bash.

```bash
export PYTHONPATH=/Users/jeffreykatz/Documents/jigsaw/curriculum/1-career-services/prefect-lessons/9-aws-wrangler-lab/revenue_tracker
```

But you should navigate to your `revenue_tracker` folder, and type `pwd` to find the absolute path to your folder.  

Now, when in our code we have something like:

```python
import settings
```

It will look for the module in the `revenue_tracker` folder.

From there, we should set the environmental variables, which you can see in the `.env` file.  As you can see, we need a specified bucket folder to store our data, as well as a name for our `glue_db` (eg. `revenue_tracker` is fine).

### Kicking off the project

A good place to start with the project is the `console.py` file.  At the bottom, you can find some useful functions to try out.

* `find_all_receipts`
    * This makes a request to the API to retrieve our receipt records.  Take a look at the function to see how it works, and then call the function (you can pass through the name specified in the console: `name = "THE HOUSTON CHEESECAKE FACTORY CORPORATION"`.  
    
    
* `receipts_adapter.coerce_df`
    * This function is tricky.  The idea is to use pandas to convert as many columns as possible into either a numeric or datetime column.  We want to coerce our dataframe before we store the data in s3.  This way, when we save the data, and eventually crawl the data with athena, athena will have those numeric/datetime datatypes.  Notice that we return the dataframe if the dataframe is empty, which may occur if we do not get records back from the api.

> So before moving on, give those two methods a shot.  Run the console, and call the `find_all_receipts` and `coerce_df` functions.  Then use the `dtypes` property to confirm that we have more numeric data after passing our dataframe to the `coerce_df` function. 


* `aws_utils.write_to_s3`
    * this is in the `aws_utils` as there are multiple times that we may want to write to s3, and we wanted to avoid repeating the code.  Take a look at the function.  Notice that we are specifying a partition column, and a default mode of `append` to the dataset.
    

#### Next steps

* `find_and_coerce`

So this is your first task. In the run.py file, take a look at the `find_and_coerce` function so that it given a restaurant name it finds the receipts, and then coerces the resulting dataframe (you can use the functions above).

* Update the `find` method
    * One thing to note is that by default, the api only returns 1000 records.  So to seed our database, we would [change the limit](https://dev.socrata.com/docs/paging.html) to the max of `50_000` records.  This would give us an initial dataset, and we could schedule our prefect code to repeatedly update this (more below).

* Seed the bucket

Then use the `find_and_coerce` function to return the coerced dataframe.  And use our `write_to_s3` function to write our data to s3.  **Note**: Before you do so, you'll first need to create a bucket to store your data, and fill in the correct information in the `.env` file.

* From there go to the s3 bucket, and confirm that the data was written.

* Add a function called `read_from_s3` that will use wrangler to read from the bucket specified in your `.env` file.  Import this in your `console.py`, and confirm that it does read back the data.  If you call `.dtypes` on the dataframe, you should confirm that many of the columns were retrieved as numeric.  

### Setting up Glue/Athena

So after we have seeded our data lake with data, the next step is to tell Glue to crawl it so that we can query the data with a query engine like Athena.

For this, we can continue on at the bottom of the `console.py` file.  

* `athena.migrations.create_db`
    * This creates a database in glue.
    
* `athena.migrations.crawl_dataset("receipts")`
    * This creates a new table in our database called `receipts`, sets the specified datatypes, and seeds with our data.
    
* `display_schema("receipts")`
    * Call display schema to confirm the data is of the correct type

Still if we run those two functions, we should see our database seeded with some initial data.

* `athena.queries.read_query`

From there, we can query our dataset with athena with our `read_query(query)` function.

### Seeding a list of llcs

Eventually, we would like this to work for a list of llcs.  And we want this to be a common list of llcs regardless of whose computer is running this.

So in the `console.py` file you can see that we call `write_llcs`, which writes to a parquet file in S3, a list of llc names defined in `seed/restaurant_llcs.py`.  Then we can read them and get back a list of llcs.

### Our Run file

Ok, so now we have seeded and crawled our database with some receipts data.  If you look at the `etl/tx_drinks/run.py` file, you can see that our flow works similarly, but slightly differently from what we just walked through.

The main issue is that when we have a new restaurant we want to pull all data, and when we already have restaurant data in S3, we just want to pull in only the new data.

* find_and_coerce
    * This calls `receipts_client.find`, which first looks for new data, or if we don't have receipts for the restaurant in the db will find all data. 
   * `receipts_client.find_recent()`
        * `queries.find_last_end_date(name)`
            * It does this by first finding the last obligation end_date of the specified restaurant.  
        * `receipts_client.find_receipts_after(name, last_end_date)`
            * Then having been provided that end date, we query the api only for restaurant receipts after that date.
        * `find_all_receipts()`
            * Called if there are no existing receipts. 

* `find_and_coerce_llcs`()
    * calls `find_and_coerce` for all llcs stored in our s3 file.
    * From there, we coerce the dataframe, and store in s3.

So the above approach will allow us to just query for new receipts that we have not seen before.

### Flow

Now onto the `flow.py` file.  Notice that once again there is almost **hardly any code** in the flow.py file.

The only exception is the `find_and_write_receipts` flow.  And even that, we have another function that in run.py that essentially does the same thing.  

So again, the point is to test and write as much logic as possible outside of our workflow manager, prefect.  This makes it easier to test, and rapidly speeds up the feedback loop when we try our code, instead of the slow task of waiting for a flow to run.

### Review

So think about the steps we needed to perform.

A. Call data and store in s3

1. Call an API
2. Store the results in an s3
3. But first, make sure to coerce the data to make it as much as possible into a numeric format

B. Setup glue

1. Create the database
2. Crawl the bucket to create a new table
3. Try querying the table through athena

C. Only store new data

1. Use athena to find the last date for a given restaurant
2. Then query the api only for information after that date
3. Then loop through for all restaurants

D. Integrate prefect

* Finally create a flow.py file that will for each restaurant
    * find_and_coerce the recent data
    * write the data to s3
