# Amadeus Flights Lab

### Introduction

In this lesson, we'll build an ETL pipeline that repeatedly extracts flight data using the Amadeus API.  There are various use cases for something like this.  For example, you can imagine us setting up alerts for when certain trips go below a certain price.  Or perhaps we want to collect data to reverse engineer our owning pricing algorithm, and then from there can predict the price of flights in the future.  


Ok, so for our ETL pipeline we'll use Python, the boto3 library, dockerized lambda functions, and airflow.  We'll use Python to hit our API and extract data and load it to S3 using boto.  And then we'll dockerize our tasks and depoy them to lambda functions.  From there, we'll use airflow to call each of these lambda functions with boto3.  

Let's get started.

## I. Building the lambda functions 

### Getting Started

The first step is to register for the Amadeus API, which is how we'll pull down our flights data.  You can register for a new account [here](https://developers.amadeus.com/register).

Click on `Register` in the top right corner, and click on create an account.

> **Trouble registering?** If you have a problem registering, try changing your browser to incognito mode, and then registering again. 

> <img src="./register-amadeus.png" width="70%">

Then after registering, check your email to activate your account.  Then, login with your username and password and [create a new app](https://developers.amadeus.com/my-apps).

> You can also create a new app by going to `My Self-Service Workspace` in the dropdown to the right, and then clicking on `Create new app`.

> <img src="./self-service.png" width="90%">

After creating your application, if you scroll down, you'll see the API keys.

<img src="./amadeus-keys.png" width="60%">

From here, we'll need to use our `api_key` and `api_secret` to get our access token, and then we can use the access token for the api.

## 1. Extract and Load - Making API requests 

For following write your code in the `extract_and_load` folder.

If you look at the `AmadeusClient`, you'll see how we can accomplish this.

* `get_access_token` - We filled in most of the get_access_token function for you, as it is pretty complicated.  Notice, that we are making a post request to retrieve the access token, and specifying the `client_id` and `client_secret`.  Now the client_id and client_secret come from the settings file, which imports environmental variables from the `.env` file.  Fill in the `.env` file now, and then run the first test in `tests/test_amadeus_client` to ensure that an access token is retrieved.

* `search_flights`
    The API documentation for Amadeus is not great.  But you can get a sense of how it works with the following.

```python
import requests
import json
from requests.structures import CaseInsensitiveDict
from datetime import datetime

originLocationCode = 'NYC'
destinationLocationCode = 'CHI'
departureDate = datetime.today().strftime('%Y-%m-%d')
adults = 1

url = 'https://test.api.amadeus.com/v2/shopping/flight-offers?' \
        'originLocationCode=' + originLocationCode + \
        '&destinationLocationCode=' + destinationLocationCode + \
        '&departureDate=' + departureDate + '&adults=' + str(adults)

headers = CaseInsensitiveDict()
headers['Authorization'] = 'Bearer {{ACCESS_TOKEN}}'
response = requests.get(url, headers=headers)
response.json()
```

Ok, so after looking at the above, pass through the same query parameters, but use a `params` dictionary to do so.  When it works, the second test in `test_amadeus_client` should pass.

### File Writer

Next up is to save the extensive json response from the above, into an S3 bucket.  So the first step is to create a bucket in s3.

After creating the bucket in s3, we'll want a function that will generate a file name for us, for us to store our results, and then we can create a new file in our bucket, that has this data.

* `generate_file_name`
    * This will take in arguments of origin, destination, and departure date, and return the proper string.  By default, it should create the file in a directory called `raw/`.
    * We want to ensure that the file name always has two digits for the month (eg. `'raw/NYC-CHI-2023-06-28.json'`) -- even if the input string only has one digit (eg. `'2023-6-28'`).  Accomplish this to get the second test to pass.

* `search_and_upload`
    * Next, we'll write the `search_and_upload` function.  This function should use the `AmadeusClient#search_flights` function to return the flight data of those of a provided `origin`, `destination`, and `departure_date`.  It should then upload the data to s3 (we provided a function in  `file_writer` that should help).  And should then read the data from that file (we provided a different function that should help).  Finally, it should return a dictionary with keys of and `{'file': '', 'flight_data': ''}`.  The dictionary's values should be the `file_name` we uploaded the data to, as well as the first three records of flight offers. 
    
    * In addition to getting the test to pass, go to the s3 web console to confirm that the file is uploaded to the bucket under the `raw` folder.  If it isn't, please fix it. 
    
### Lambda Function

Ok, so now it's time to write our lambda function.  Essentially, we would like our lambda function to -- given an event argument that has an `origin`, `destination`, and `departure_date_str` as it's key value pairs, then searches Amadeus and uploads the data in a corresponding file.

Get the corresponding test to pass.  To do so, you'll have to provide your bucket name in the test.

Ok, so at this point, we should have a lambda function, that given an event with keys of `origin`, `destination`, and `departure_date` will search the amadeus api for corresponding flights and write the results to a file in s3.

* Discussion

So the nice thing about the step above is that we have stored all of our returned data in S3.  So while we likely do not need all of that data initially, we can always retrieve it from our S3 buckets later on.

As the next step, we'll select just the data we want from our S3 bucket, and load that selected data to a separate file.

## 2. Transform and Load - Transforming our Data 

Ok, so now we can navigate to the `transform_and_load` folder.

As you can see, there we have two files: `flight_adapter` and `flights_file_reader`.

### Flight adapter

The flight takes in json regarding a single flight, and then the `select_attributes` function, returns just a dictionary with the `departure_time`, `arrival_time`, `departure_airport` and `arrival_airport` information.

Before getting there, write the `AmadeusFlightAdapter#segments` function.  A segment is each flight in a given (one way) trip.

* `Flight#segments`
    * Write a function called segments, which given the json regarding a single one way flight, returns a list of dictionaries of the segments.  You can see a sample flight in the `test_flight_adapter` file.  Get the corresponding test to pass.

* `Flight#select_attributes`
    * As mentioned this will return the departure and arrival time, departure .  

### Filereader

The file reader does two things.  It contains multiple functions which read from data from our s3 bucket.  It also has a `select_attributes` function.

* `Filereader#select_attributes`  This function loops through our `flights_json` returning a dictionary for each corresponding flight.   

* `Filereader#return_flights_from_bucket`  This function both reads data from our bucket and a corresponding file in that bucket, and then returns the corresponding list of dictionaries of selected attributes of flights in that bucket.

Ok, so now we have a function, `Filereader#return_flights_from_bucket` that reads data from a file in our bucket and returns a list of dictionaries, where each dictionary has selected attributes.

The next step is to write a lambda function, where we will both pull and coerce our data, and then write the results to new file.

### Lambda function

* `lambda_handler`
    * If you look at the `test_main.py` function, you can see how our `lambda_handler` works.  

> You'll have to change the test so that the `bucket_name` corresponds to your bucket, and the `file_name` refers to your bucket.

So with the lambda function, we'll use our event to pass in our `file_name_read` which is the file with our raw data to our lambda function.  Then the function should pull data from a specified file in the raw folder of the bucket, and write the transformed data to file of the same name, in the same bucket, but in a folder called `transformed`.  

Go to s3 and confirm that the uploaded data is in the `transformed` folder.

# II. Moving to the Cloud

So now we have two lambda functions:

1. `extract_load` - Pulls data from our api and loads to the our bucket under the `raw` folder.
2. `transform_load` - Pulls data from the `raw` folder in s3, transforms and writes data to the `transformed` folder

Now it's time to dockerize our two lambda functions, and eventually we'll deploy these dockerized lambda functions using serverless.

### 1. Dockerizing Extract_load

To dockerize our extract and load function we'll need to move the following files in our image:

* `requirements.txt`
* `src/` directory
* `main.py`
* `.env`
* `settings.py`

And we'll have to install the libraries in `requirements.txt`.  In the requirements.txt file we'll need the following libraries.

```txt
python-dotenv
requests
boto3
```

Remember that for our command argument we'll need the following: `CMD ["main.lambda_handler"]`.  This will specify that the `lambda_handler` should be triggered when our lambda function receives an event.  

Ok so we initially set `python:3.9` as our base image.  This is so we can debug and test out our docker image before we deploy it to aws.  We can do so by booting up our docker container, bashing into the container, and then calling our lambda function.

So begin by building the image.

`docker build -t extract .`

And then you can bash in to the docker image with something like the following:

```bash
docker run -v ~/.aws:/root/.aws -it extract bash
```

> Notice what the above does.  It not only allows us to connect to our container via bash, but it also bind mounts our aws credentials (located in the `~/.aws` directory) to the corresponding directory in our docker container.  This way we can use our boto3 library from there.

Ok, once we connect to the docker image, we can run our `main.py` file, and call our lambda function passing through sample `event` and `context` dictionaries).  

> Below are a couple you can use.  Make sure the date is in the future.

```python
# context = {}
# event = {"origin": "NYC", "destination": "CHI", 'departure_date_str': "2023-08-02"}
```

Ok, once you call the lambda function, you should see the results, like we do below.

<img src="./results-lambda.png">

When you confirm that it's working, make sure to change the base image to the one for lambda.  It should be the following if you have a mac m1 or m2 chip.

```Dockerfile
FROM public.ecr.aws/lambda/python:3.9-arm64
```
Otherwise it should be:

```Dockerfile
FROM public.ecr.aws/lambda/python:3.9
```


### 2. Dockerizing Transform_load

Ok, next up is to dockerize transform_load.  For this, you'll follow similar steps as we did before.  For the `requirements.txt` file, you'll need the following pip packages.

```
requests==2.25.1
pandas
boto3
```

Build an image called `transform` -- again using python:3.9 as the base image.

```bash
docker build -t transform .
```

Then confirm that the lambda function works by calling the lambda.  We provided some sample arguments to pass into your lambda function.  

```python
# file_name = 'NYC-CHI-2023-07-06.json'
# event = {'file_name_read': f'raw/{file_name}'}
# context = {}
# lambda_handler(event, context)
```

> You'll have to update the file_name to match one in your bucket.

Then confirm that the lambda function works.

<img src="./response.png" width="100%">

Finally change the base image so that it's for a lambda function.

```dockerfile
FROM public.ecr.aws/lambda/python:3.9-arm64
```

### Deploying the lambda functions

Before moving on confirm, that the base images in each of the dockerfiles are using the lambda base image and not Python:3.9.  Please just check.

Ok, so now, we'll use serverless to deploy our lambda functions.  You can review how we use serverless with Docker with the lesson Serverless Triggers located [here](https://colab.research.google.com/github/data-engineering-jigsaw/serverless-readings/blob/main/serverless-lesson/1-serverless-triggers.ipynb).

> Note, that we actually don't need to trigger a lambda function with an S3 file being uploaded, because we'll use airflow to trigger each of our lambda functions.

Ok, fill in the `serverless.yaml` file.  

> Review serverless and give it a shot, but if you get stuck for over half an hour or so, feel free to look at the serverless.yaml file provided in this `lesson` directory.

Remember that to deploy your lambda functions with serverless, you can navigate to the directory with `serverless.yaml` and then run `sls deploy`.

* Testing our deployed lambda functions

Once our lambda functions are deployed it's time to test each of them.

1. `extract_load`

First, go to the aws console, find the extract_load lambda function (it is likely preceded with the app name), and then let's test that it works by passing through a test event.

Fill in some event json.

> <img src="./event-json.png">

And then run the test event.

<img src="./success-fn.png" width="60%">

Finally, go to the bucket and download and look at the data to confirm that it the proper json was pulled and uploaded (a lot of things can go wrong in our pipeline, so it's good to see it for yourself).

<img src="./download-json.png" width="60%">

2. `transform_load`

Ok, let's do the same thing for our other lambda function. Go to the lambda function in the web console, and pass through a test event -- something like the followig.

In [None]:
{"file_name_read": "raw/NYC-CHI-2023-08-02.json"}

> <img src="./test-event.png" width="60%">

And then run the test.

<img src="./result.png" width="60%">

Finally, go to the corresponding file in the transformed folder and confirm the file is there, and that it has a transformed list of dictionaries.

# III. Deploying to Airflow 

Ok, so now that we have our lambda functions deployed, and we have confirmed that they each individually work, our next step is to have airflow call each of our functions.

Navigate to the `airflow` folder.

We'll get airflow to call our functions in two steps.  First, we'll write functions that use boto to invoke our lambda functions.  Then, we'll have airflow use separate tasks to call these functions in sequence.

Ok, so first we'll set up boto to call our lambda functions.

### 1. Invoking our lambda functions from boto

There, you will notice a file called `dags/lambda_caller`.  

* extract_load

Ok, so we have filled out the first function for you.  This function uses boto to invoke our lambda function.  

> You will just have to update the `function_name` -- to the name of your lambda function you want invoked.

```python
def extract_load(origin, destination, departure_date_str):
    event = {"origin": origin,
              "destination": destination,
                "departure_date_str": departure_date_str}
    function_name = 'flights-app-dev-extract_load'
    response = lambda_client.invoke(
        FunctionName=function_name,
        InvocationType='Event',  
        Payload=json.dumps(event)
    )
    return response['ResponseMetadata']['RequestId']
```

If you look at the function above, the key part is with `lambda_client.invoke`.  There, we call the lambda function by passing through the `FunctionName` and we also pass through an event.  

**Remember**, you'll have to have the function name match the name of your lambda function.

If you look at the bottom of the file, you can see that we have code to pass through certain arguments to our `extract_load` function, and these arguments are used to fill in the event data that is then passed to our lambda function.  

So run the `lambda_caller.py` file:

```bash
python3 -i lambda_caller.py
```

And then confirm that the file was loaded in the s3 bucket and that the data is in there.

> **Warning**: This invocation is extremely picky, and the interaction with the amadeus api is not yet understood by man or machine, so please confirm that both the file was created, AND that the data is in there.  Also, try to keep the `extract_load` function as is, except for an update to the function name.



* Debugging

If there are issues, look at the cloudwatch logs of the lambda function to debug.  Notice that we return the `RequestId` from our function, which you should be able to use, along with the timestamp in the logs to debug from there.

> <img src="./cloudwatch-log.png">

> If you go to the lambda function, and then `monitor`, you'll see the `cloudwatch logs`.  Notice the `Timestamp`, and `RequestId` followed by a link to the related logs of that invocation.  If you do not see the requestid returned by the function, you may have to wait and refresh. 

2. `transform_load`

This function, we'll let *you* fill in.  Notice that at the bottom of the `lambda_caller` file, we have code to call the `extract_load` function.  In the second to last line, we call the `generate_file_name` function, this way we can use that return value to pass to our transform_load lambda function, to pull the data and transform it.

Fill out the function, taking in the correct argument, and then go to the s3 bucket `transformed` folder to confirm that it was invoked.  Remember the debugging techniques specified above.

### 2. Setting up airflow to call our functions

Ok, so now we can use airflow to call our functions.  Before doing so, because airflow is using docker, we'll need to make sure that our docker containers have access to our aws credentials.

Remember, how we did this previously with a single docker container -- we had to bindmount the .aws directory from our computer into our container.  Here, with docker compose, we have to do something similar.

Take at the volumes key in the image below.

<img src="./docker-compose.png">

1. Bindmount the credentials

With the last line, we are bindmounting the .aws directory in our laptop to the `/usr/local/airflow/.aws` directory in the container.  Normally, we would place this in the home directory (the place you go to with `cd ~`), but airflow does not allow us to write to this location.  So instead we bindmount to `/usr/local/airflow`.

Notice the `/Users/jeffreykatz` -- change that to match your home directory.  You can type `echo ~` in the terminal to see what that is.

2. Specify the location of our credentials

Ok, so AWS will typically look for credentials in the environment's home directory -- but as we mentioned above, this time we are writing to the `usr/local/airflow` directory.  So we have to tell AWS to look for the credentials there. If you look at the last two key value pairs under `environment`, you can see that we are setting two environmental variables: `AWS_CONFIG_FILE` and `AWS_SHARED_CREDENTIALS_FILE`, to tell AWS where to look for these files.  

Make sure you update your docker-compose file with the above changes.

Then you can ensure aws is reading your AWS credentials from airflow by sh-ing into the container, typing `python3` and then using boto to check the current user.  

Let's show you how.

First, from the airflow folder, boot up docker compose.

```bash
docker compose up
```

Then from a different tab, bash into the webserver.

```bash
docker compose exec airflow-webserver bash
```

And then type `python3` to open up a python console.  And from there, you can get the current user with the following.

In [None]:
import boto3
iam_client = boto3.client('iam')
response = iam_client.get_user()
user_name = response['User']['UserName']

### 3. Writing our airflow code

Ok, now it's time to write our airflow code.  Essentially, we'll want a dag called `etl_dag` that calls two tasks -- `extract_load_task`, followed by `transform_load_task`. 

The dag should have the tasks defined nested inside of them like so.

```python
default_args = {'start_date': days_ago(1)}

@dag(schedule_interval='@once', default_args=default_args, catchup=False)
def etl_dag():
    
    @task
    def extract_load_task(origin, destination, departure_date_str):
        pass
    
    @task
    def transform_load_task(origin, destination, departure_date_str):
        pass
```

Ok, so you can see how we set this up, at the end of our dag, we define a few arguments to pass through our task individually.  

```python
origin = "NYC" # PHI
destination = "CHI"
departure_date_str = "2023-07-6"
data = extract_load_task(origin, destination, departure_date_str)
result = transform_load_task(origin, destination, departure_date_str)
```

You should try out different dates and origins and destinations -- just be careful to make sure you keep the same date format (two digits for the month).

Ok, now it's your turn.

For our airflow dag, we want the following to occur.

* `extract_load_task (origin, destination, departure_date_str)` 
    * This should invoke our `extract_load` function defined in the `lambda_caller` file.  In other words, it should invoke our lambda function, passing through the arguments.  
    * Add some logging so that we can confirm the origin destination and departure arguments.  You can do so, by placing the following line in the task `logging.info(f'extracting data from: {origin}, {destination}, departure: {departure_date_str}')`
    * Also log the lambda_invocation_id that is returned from our `extract_load` function. So you should log something like: `extract_load lambda id: 2a1dac9c-4e51-4388-ad7c`.

* `transform_load_task(origin, destination, departure_date_str)`
    * This also takes in arguments of origin, destination, and departure_date_str.  Here, we'll need to use these arguments to call our transform_load function, which will trigger our lambda.  But to do this, we'll need to specify the file name to read from.  So use the `generate_file_name` defined in the utils file to do this.  (You'll have to do some work to make sure you read the correct file).  
    * Log both the file you are reading from, and also log the result id from the `transform_load` invocation. 
    
Finally, in the dag, make sure the `extract_load_task` is triggered before the `transform_load_task`.

If you boot up the webserver, you should see the `etl_dag`.

<img src="./etl-dag.png" width="20%">

> If there is a red error above, click on it, read the error message and make the fix.

If you click on the dag, and go to graph view, you should be able to confirm that the `extract_load_task` comes before the `transform_load_task`.

> There should be an arrow between them.

<img src="./two-tasks.png" width="50%">

You can test out the dag by clicking on the `Trigger Dag` button.

<img src="./trigger-dag.png">

And then triggering the dag.

Then let's check the log of each of the tasks.  Go to the graph view, and click on the extract_load_task box, followed by logs.

<img src="./log-el-task.png" width="50%">

If you scroll down in the logs, you should see the logged information we specified in the task.

<img src="./logged-info.png" width="90%">

Now check the logs of the transform load task.  You should see both the file we are reading from and the invocation id of the lambda function.

<img src="./tl-log.png" width="90%">

Ok, it's not done until we confirm that we have our raw and transformed files in the bucket.  So go to s3 and confirm that the file is in our bucket, and click on the file to make sure that our json is in there, and that the flights are for the correct date.

Ok, once you do that you can be done.

* Discussion

Notice that our airflow code is very small.  All our airflow tasks do is trigger our lambda invocation functions.  By keeping our airflow functions small we get a number of benefits.

1. Testing -- We cannot really write tests for the tasks.  This is a big deal.  The more logic we have inside of our tasks, the more of that logic will be untested.

2. Debugging -- Related to that.  It's pretty difficult to debug inside of our airflow containers.  For example, we can't really place a `breakpoint()` from inside of our airflow docker container?  Also, if we do run into a bug it's would be difficult to know if it's related to airflow or specific to our code.

3. Lockin - What if we want to move to a different orchestrator -- like prefect, or dagster?  If we choose to -- there's very little code to migrate.  

4. Breakdown - We'll need to run airflow on an EC2 instance.  If that instance goes down, or we have trouble debugging it, we can just use event scheduler to trigger our lambda functions.

So why use airflow at all?

With airflow, we still get the benefit of logging.  We can use features like retrying the function if the lambda breaks, or emailing someone if the task breaks with configuration like the following.

```python
default_args = {
    'owner': 'me',
    'start_date': datetime(2019, 2, 8),
    'email': ['you@work.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=1)
}
```

So here, we get the benefit of airflow's logging, retries, and notifications (like sending an email).  But also have our lambda functions independently hosted. 

### What's next

1. Load to a database

* Ideally, we would complete the cycle by loading the transformed data from our `transformed` folder into postgres, redshift, or snowflake.  If you look at the `load_to_postgres` folder, you can see some sample code for doing this.  You can see how we could use a snowflake connector to accomplish something similarly [here](https://docs.snowflake.com/en/developer-guide/python-connector/sqlalchemy).

* In addition, if we can deploy a database with serverless, we should update our file to do so.

2. Let's say we wanted to read a list of destinations from a database, and create a separate dag from each one.  You can get a sense of how we might do so with [Dag generation](https://airflow.apache.org/docs/apache-airflow/stable/howto/dynamic-dag-generation.html).

3. Deploying airflow to AWS.  Also remember that we will need this orchestrator to live somewhere on aws.  You can get a sense of how to do so [here](https://www.youtube.com/watch?v=o88LNQDH2uI&ab_channel=DatawithMarc), but remember you'll have to migrate the aws credentials.  You can see that we can just directly set the keys with environmental variables as shown [here](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html).
