# Workers and Queues Lab

### Introduction

In this lesson, we'll work with prefect deployments to schedule requests to the Spotify API.  Let's get started.

### Getting set up

You'll notice that we are again working with the spotify codebase from the previous lab. 

* Do **not** create a python environment for this lab -- it will likely mess things up.  In fact, make sure there is no python environment activated
```bash
    deactivate
    conda deactivate
```

* Install the libraries in the `requirements.txt` file.

* Add in your spotify API keys to the `etl/spotify_extractor/.env` file.  If you do not have spotify api keys, you can see the docs on how to create them [here](https://developer.spotify.com/documentation/web-api/tutorials/getting-started). 

* From there run `python3 spotify_workflow.py`, which will run the workflow.  Confirm that the track data is saved in the `data` folder.  

> Please look at the file itself, which should look like the following:
```csv
,track_id,ranking,date,playlist_id
0,4xhsWYTOGcal8zt0J161CU,1,2024-01-11,37i9dQZEVXbLRQDuF5jeBp
1,0mflMxspEfB0VbI1kyLiAv,2,2024-01-11,37i9dQZEVXbLRQDuF5jeBp
```

### Loading to a database

* Create the database

    * Create a database in your local postgres instance called `spotify` top songs
    * Then run the file in the migrations folder to create a new table in the database
    
* Connect to the database
    * If you look in the db.py file, you can see that we have added some that uses the sqlalchemy library to connect to the database.

* From there, add the following function to the `listings_adapter.py` file
```python
    def load_to_postgres(df, engine, table_name = 'tracks'):
        df.to_sql(table_name, engine, if_exists='append', index=False)
```
The function, takes a dataframe, and uses the `to_sql` method to save it to the specified table (tracks).

Make sure that the function works properly by running the corresponding code in the console, and then connecting to postgres to confirm that the data has been written.

* You can also see the `read_sql` function
    * This allows us to use pandas to read data from our postgres database
```python
    from sqlalchemy import text
    def read_sql(query, engine):
        df = pd.DataFrame(engine.connect().execute(text(query)))
        return df
```

```sql
spotify=# select * from tracks limit 3;
        track_id        | ranking |    date    |      playlist_id
------------------------+---------+------------+------------------------
 7gaA3wERFkFkgivjwbSvkG |       1 | 2024-01-17 | 37i9dQZEVXbLRQDuF5jeBp
 52eIcoLUM25zbQupAZYoFh |       2 | 2024-01-17 | 37i9dQZEVXbLRQDuF5jeBp
 4xhsWYTOGcal8zt0J161CU |       3 | 2024-01-17 | 37i9dQZEVXbLRQDuF5jeBp
```

### Revisiting our process

So now we have code to pull from the Spotify API, and coerce the data and then save it to a csv, from there we can read the data from the csv into a dataframe, and from there into the postgres database.


Now, eventually we'll schedule this to run every day.  And this makes sense as the playlist is for the [top fifty songs](https://open.spotify.com/playlist/37i9dQZEVXbLRQDuF5jeBp) in the USA. Which will change each day.

But we do not just want to just keep appending to the same CSV file.  So instead let's update the write CSV file.

* Update `write_to_csv`
    * Now instead of writing to a file `./data/track_listings.csv`, update the `write_to_csv` function so that the date is included in the file name.  For example, if the date is Jan 17 2024 the track listings file path should be `./data/2024-01-17-track_listings.csv`.



* `find_recent_files`
    * So now we can `write_to_csv`, and then load the csv files into the database.  However, we want to make sure we are not re-inserting the same data into the database.  To prevent against this, write a function that only finds files later than the most recently added data:
        * First use the `read_sql` function to find for the most recent date that we loaded data into the database.
        * Then only return the file names after that date.
        * For example, you can see that we have included files from 2024, and 2025.  The function should return a list of files with the file from 2025. 

* `load_files_to_postgres`
    * The `load_files_to_postgres` calls the `find_recent_files` function, and then reads those csv files, and loads them to a database. 
    * You can see in the `2025` csv file that we added a track `sample-data` that should be loaded into postgres  after calling the `load_files_to_postgres` function.

### Updating the flow

Now in the `spotify_workflow.py` file, add the following:
    
1. New `load_files_to_postgres` task
    * This should call the function in the adapter
    
2. Add the task to the flow
    * Add this `load_files_to_postgres` task to the flow

Run the `spotify_workflow.py` file, and confirm that the data is loaded to postgres (there will be a new record with `sample-data` as the track each time you run it.

### Setting up Deployments

Ok, so now we can see the entrypoint to our prefect workflow as the `extract_and_write` function in the `spotify_workflow.py` file.

* `extract_and_write(playlist_id)`

Next let's turn this into a deployment.  Use the `serve` method to:

* name the deployment `get-songs-deployment`
* pass through the necessary parameters to call the workflow

Then re-run the `spotify_workflow.py` file.

* You can see that this created a deployment.

From a new tab, run the following:
    
`prefect server start`

And then click on deployments, where you should see the `get-songs-deployment`.

<img src="./get-songs-deployment.png">

From there, if you click on the `get-songs-deployment`, followed by the `Runs` or `Upcoming` tabs, you can see that nothing has been run, and nothing is scheduled.  

Let's change that.

> First import both the interval schedule and the Deployment class with the following code: 

```python
from prefect.server.schemas.schedules import IntervalSchedule
from prefect.deployments.deployments import Deployment
```

Create a prefect deployment using the `Deployment.build_from_flow` method.

In doing that you'll need to set the following parameters:

```python
name="spotify_deployment",
flow=extract_and_write,
version=1,
schedule=schedule,
is_schedule_active=True,
work_queue_name="default",
parameters=parameters,
entrypoint="./spotify_workflow.py:extract_and_write",
```

We'll let you assign the parameters to the appropriate Python variable.  And for the schedule, assign this to an instance of the IntervalSchedule, to be run every ten seconds.  

Make sure to add the Python code to apply the deployment.

```python
deployment.apply(upload=True)
```

And then run the Python script that has this deployment to apply the deployment.

From there, you'll need to have to start up the `default-agent-pool`.

```bash
prefect agent start -p 'default-agent-pool'
```

Now if you navigate to the webserver, and click on deployments you should see your new `spotify_deployment`.  And from there click on the deployment followed by `runs`, and you should see some successful runs.

<img src="./success.png">

### Resources

[Jeff Hale - Deployments](https://medium.com/the-prefect-blog/deploy-prefect-pipelines-with-python-perfect-68c944a3a89f)

[Kevin Kho](https://medium.com/the-prefect-blog/the-simple-guide-to-productionizing-data-workflows-with-docker-31a5aae67c0a)

[Creating a Deployment](https://discourse.prefect.io/t/error-when-creating-a-deployment-with-the-cli-modulenotfound/2426/4)

[Sample deployment](https://github.com/PrefectHQ/prefect/issues/8710)