# Homework06 - Rui Pinto

## Q1. Refactoring

Before we can start covering our code with tests, we need to 
refactor it. We'll start by getting rid of all the global variables. 

* Let's create a function `main` with two parameters: `year` and
`month`.
* Move all the code (except `read_data`) inside `main`
* Make `categorical` a parameter for `read_data` and pass it inside `main`

Now we need to create the "main" block from which we'll invoke
the main function. How does the `if` statement that we use for
this looks like? 


Hint: after refactoring, check that the code still works. Just run it e.g. for March 2023 and see if it finishes successfully. 

To make it easier to run it, you can write results to your local
filesystem. E.g. here:

```python
output_file = f'taxi_type=yellow_year={year:04d}_month={month:02d}.parquet'
```

In [None]:
import warnings
import os
from sklearn.exceptions import InconsistentVersionWarning
from homework06.batch_refactoring import main

warnings.filterwarnings("ignore", category=InconsistentVersionWarning)

# Use a local path for output to test Q1 (no S3 needed)
# Temporarily unset any environment variables that might affect the path
if "INPUT_FILE_PATTERN" in os.environ:
    del os.environ["INPUT_FILE_PATTERN"]
if "OUTPUT_FILE_PATTERN" in os.environ:
    del os.environ["OUTPUT_FILE_PATTERN"]
if "S3_ENDPOINT_URL" in os.environ:
    del os.environ["S3_ENDPOINT_URL"]

# Now run the main function which will use the default paths
output_path = main(2023, 3)
print(f"Output saved to: {output_path}")

In [None]:
print("=" * 50)
print("ANSWER TO Q1: The correct if statement for the 'main' block is:")
print('if __name__ == "__main__":')
print("=" * 50)

## Q2. Installing pytest

Now we need to install `pytest`:

```bash
uv add --dev pytest
```

Next, create a folder `tests` and create two files. One will be
the file with tests. We can name it `test_batch_refactoring.py`. 

What should be the other file? 

Hint: to be able to test `batch_refactoring.py`, we need to be able to
import it. Without this other file, we won't be able to do it.

In [None]:
# install if not already installed
# !uv add --dev pytest

In [None]:
import os

os.makedirs("homework06/tests", exist_ok=True)

# create the files inside the tests folder
!touch homework06/tests/test_batch_refactoring.py
!touch homework06/tests/__init__.py

!ls -l homework06/tests

In [None]:
print("=" * 50)
print("ANSWER TO Q2: The other file needed in the tests directory is __init__.py")
print(
    "This file allows Python to recognize the tests directory as a package, enabling imports."
)
print("=" * 50)

## Q3. Writing first unit test

Now let's cover our code with unit tests.

We'll start with the pre-processing logic inside `read_data`.

It's difficult to test right now because first reads
the file and then performs some transformations. We need to split this 
code into two parts: reading (I/O) and transformation. 

So let's create a function `prepare_data` that takes in a dataframe 
(and some other parameters too) and applies some transformation to it.

(That's basically the entire `read_data` function after reading 
the parquet file)

Now create a test and use this as input:

```python
data = [
    (None, None, dt(1, 1), dt(1, 10)),
    (1, 1, dt(1, 2), dt(1, 10)),
    (1, None, dt(1, 2, 0), dt(1, 2, 59)),
    (3, 4, dt(1, 2, 0), dt(2, 2, 1)),      
]

columns = ['PULocationID', 'DOLocationID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime']
df = pd.DataFrame(data, columns=columns)
```

Where `dt` is a helper function:

```python
from datetime import datetime

def dt(hour, minute, second=0):
    return datetime(2023, 1, 1, hour, minute, second)
```

Define the expected output and use the assert to make sure 
that the actual dataframe matches the expected one.

Tip: When you compare two Pandas DataFrames, the result is also a DataFrame.
The same is true for Pandas Series. Also, a DataFrame could be turned into a list of dictionaries.  

How many rows should be there in the expected dataframe?

* 1
* 2 ✅
* 3
* 4

In [None]:
# Run pytest from the notebook
import pytest
import sys
from io import StringIO


def run_test():
    # Capture stdout to get test results
    output = StringIO()
    sys.stdout = output

    # Run the test
    pytest.main(["-xvs", "homework06/tests/test_batch_refactoring.py"])

    # Restore stdout and return results
    sys.stdout = sys.__stdout__
    return output.getvalue()


print("=" * 50)
print("RUNNING UNIT TEST FOR Q3")
print("=" * 50)
test_output = run_test()
print(test_output)

# Extract and display the answer explicitly
print("\n" + "=" * 50)
print("ANSWER TO Q3: The expected dataframe should have 2 rows ✅")
print("=" * 50)

## Q4. Mocking S3 with Localstack 

Now let's prepare for an integration test. In our script, we 
write data to S3. So we'll use Localstack to mimic S3.

First, let's run Localstack with Docker compose. Let's create a 
`docker-compose.yaml` file with just one service: localstack. Inside
localstack, we're only interested in running S3. 

Start the service and test it by creating a bucket where we'll
keep the output. Let's call it "nyc-duration".

With AWS CLI, this is how we create a bucket:

```bash
aws s3 mb s3://nyc-duration
```

Then we need to check that the bucket was successfully created. With AWS, this is how we typically do it:

```bash
aws s3 ls
```

In both cases we should adjust commands for localstack. What option do we need to use for such purposes?

* `--backend-store-uri`
* `--profile`
* `--endpoint-url` ✅
* `--version`


## Make input and output paths configurable

Right now the input and output paths are hardcoded, but we want
to change it for the tests. 

One of the possible ways would be to specify `INPUT_FILE_PATTERN` and `OUTPUT_FILE_PATTERN` via the env 
variables. Let's do that:


```bash
export INPUT_FILE_PATTERN="s3://nyc-duration/in/{year:04d}-{month:02d}.parquet"
export OUTPUT_FILE_PATTERN="s3://nyc-duration/out/{year:04d}-{month:02d}.parquet"
```

And this is how we can read them:

```python
def get_input_path(year, month):
    default_input_pattern = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{year:04d}-{month:02d}.parquet'
    input_pattern = os.getenv('INPUT_FILE_PATTERN', default_input_pattern)
    return input_pattern.format(year=year, month=month)


def get_output_path(year, month):
    default_output_pattern = 's3://nyc-duration-prediction-alexey/taxi_type=fhv/year={year:04d}/month={month:02d}/predictions.parquet'
    output_pattern = os.getenv('OUTPUT_FILE_PATTERN', default_output_pattern)
    return output_pattern.format(year=year, month=month)


def main(year, month):
    input_file = get_input_path(year, month)
    output_file = get_output_path(year, month)
    # rest of the main function ... 
```


## Reading from Localstack S3 with Pandas

So far we've been reading parquet files from S3 with using
pandas `read_parquet`. But this way we read it from the
actual S3 service. Now we need to replace it with our localstack
one.

For that, we need to specify the endpoint url:

```python
options = {
    'client_kwargs': {
        'endpoint_url': S3_ENDPOINT_URL
    }
}

df = pd.read_parquet('s3://bucket/file.parquet', storage_options=options)
```

Let's modify our `read_data` function:

- check if `S3_ENDPOINT_URL` is set, and if it is, use it for reading
- otherwise use the usual way

In [None]:
import os

# Configure paths for local testing with LocalStack
os.environ["INPUT_FILE_PATTERN"] = "s3://nyc-duration/in/{year:04d}-{month:02d}.parquet"
os.environ["OUTPUT_FILE_PATTERN"] = (
    "s3://nyc-duration/out/{year:04d}-{month:02d}.parquet"
)
os.environ["S3_ENDPOINT_URL"] = "http://localhost:4566"


print(f"S3_ENDPOINT_URL: {os.environ['S3_ENDPOINT_URL']}")
print(f"OUTPUT_FILE_PATTERN: {os.environ['OUTPUT_FILE_PATTERN']}")
print(f"INPUT_FILE_PATTERN: {os.environ['INPUT_FILE_PATTERN']}")

In [None]:
# Make sure the S3 bucket exists before running tests
!python homework06/tests/setup_s3.py

In [None]:
# Verify the LocalStack bucket exists
print("=" * 50)
print("CHECKING S3 FOR Q4")
print("=" * 50)

!aws --endpoint-url=http://localhost:4566 s3 ls

print("\n" + "=" * 50)
print(
    "ANSWER TO Q4: The correct option to use with AWS CLI for LocalStack is '--endpoint-url' ✅"
)
print("=" * 50)

## Q5. Creating test data

Now let's create `integration_test.py`

We'll use the dataframe we created in Q3 (the dataframe for the unit test)
and save it to S3. You don't need to do anything else: just create a dataframe 
and save it.

We will pretend that this is data for January 2023.

Run the `integration_test.py` script. After that, use AWS CLI to verify that the 
file was created. 

Use this snipped for saving the file:

```python
df_input.to_parquet(
    input_file,
    engine='pyarrow',
    compression=None,
    index=False,
    storage_options=options
)
```

What's the size of the file?

* 3620 ✅
* 23620
* 43620
* 63620

Note: it's important to use the code from the snippet for saving
the file. Otherwise the size may be different depending on the OS,
engine and compression. Even if you use this exact snippet, the size
of your dataframe may still be a bit off. Just select the closest option.

In [None]:
# Check the file size of the test data file
import os

# Configure the S3 endpoint and paths
os.environ["INPUT_FILE_PATTERN"] = "s3://nyc-duration/in/{year:04d}-{month:02d}.parquet"
os.environ["S3_ENDPOINT_URL"] = "http://localhost:4566"

# Define the path to the test data file
year, month = 2023, 1
input_file = os.environ["INPUT_FILE_PATTERN"].format(year=year, month=month)

# Use aws cli to get the file size
import subprocess

cmd = f"aws --endpoint-url={os.environ['S3_ENDPOINT_URL']} s3 ls {input_file}"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)

if result.returncode == 0:
    # Parse the output to get file size
    output = result.stdout.strip()
    if output:
        parts = output.split()
        if len(parts) >= 3:
            size = parts[2]
            print(f"File size of test data: {size} bytes")
            print("\nAnswer for Q5: The file size is closest to 3620 bytes ✅")
        else:
            print("Could not parse file size from output:", output)
    else:
        print("No output returned from aws command")
else:
    print(f"Error: {result.stderr}")
    print(
        "Make sure to run the integration_test.py script first to create the test data."
    )
    print("You can run it with: python homework06/tests/integration_test.py")

## Q6. Finish the integration test

We can read from our localstack s3, but we also need to write to it.

Let's run the `batch_refactoring.py` script for January 2023 (the fake data
we created in Q5).

We can do that from our integration test in Python: we can use
`os.system` for doing that (there are other options too).

Now it saves the result to localstack.

The only thing we need to do now is to read this data and 
verify the result is correct. 

What's the sum of predicted durations for the test dataframe?

* 13.08
* 36.28 ✅
* 69.28
* 81.08

In [None]:
import os
import pandas as pd
import sys
import subprocess


def verify_s3_bucket_exists(bucket_name="nyc-duration"):
    """Make sure the S3 bucket exists before running the test"""
    s3_endpoint_url = os.getenv("S3_ENDPOINT_URL", "http://localhost:4566")

    # Create the bucket if it doesn't exist
    cmd_check = f"aws --endpoint-url={s3_endpoint_url} s3 ls s3://{bucket_name} || aws --endpoint-url={s3_endpoint_url} s3 mb s3://{bucket_name}"
    print("Verifying S3 bucket exists...")
    subprocess.run(cmd_check, shell=True, check=True)

    # Create directories if needed
    cmd_dirs = f"aws --endpoint-url={s3_endpoint_url} s3api put-object --bucket {bucket_name} --key in/"
    subprocess.run(cmd_dirs, shell=True, check=True)
    cmd_dirs = f"aws --endpoint-url={s3_endpoint_url} s3api put-object --bucket {bucket_name} --key out/"
    subprocess.run(cmd_dirs, shell=True, check=True)

    # Verify test data file exists
    year, month = 2023, 1
    input_pattern = os.getenv(
        "INPUT_FILE_PATTERN", "s3://nyc-duration/in/{year:04d}-{month:02d}.parquet"
    )
    input_file = input_pattern.format(year=year, month=month)

    cmd_check_file = f"aws --endpoint-url={s3_endpoint_url} s3 ls {input_file}"
    result = subprocess.run(cmd_check_file, shell=True, capture_output=True)

    if result.returncode != 0:
        print(f"Test data file does not exist: {input_file}")
        print("Run the integration_test.py script first to create the test data.")
        print("Attempting to run it now...")
        subprocess.run(["python", "homework06/tests/integration_test.py"], check=True)

    return True


def test_integration():
    """Integration test for batch prediction

    1. First make sure the test data exists in S3
    2. Run batch_refactoring.py for January 2023
    3. Read the results and calculate sum of predictions
    """
    print("=" * 50)
    print("RUNNING INTEGRATION TEST FOR Q6")
    print("=" * 50)

    # Configure environment variables
    os.environ["INPUT_FILE_PATTERN"] = (
        "s3://nyc-duration/in/{year:04d}-{month:02d}.parquet"
    )
    os.environ["OUTPUT_FILE_PATTERN"] = (
        "s3://nyc-duration/out/{year:04d}-{month:02d}.parquet"
    )
    os.environ["S3_ENDPOINT_URL"] = "http://localhost:4566"

    # Make sure the bucket and test data exist
    verify_s3_bucket_exists()

    # Set parameters for January 2023
    year, month = 2023, 1
    s3_endpoint_url = os.environ["S3_ENDPOINT_URL"]

    # S3 options for reading/writing with LocalStack
    options = {"client_kwargs": {"endpoint_url": s3_endpoint_url}}

    # Import and run the batch prediction script
    from homework06.batch_refactoring import main

    output_path = main(year, month)
    print(f"Batch prediction completed. Output saved to: {output_path}")

    # Define the output path (should be the same as returned by main)
    output_pattern = os.environ["OUTPUT_FILE_PATTERN"]
    output_file = output_pattern.format(year=year, month=month)

    # Read the results
    try:
        df_result = pd.read_parquet(output_file, storage_options=options)
        print(f"Successfully read output file: {output_file}")
    except Exception as e:
        print(f"Error reading output file: {e}")
        return

    # Show the results
    print("\nOutput DataFrame:")
    print(df_result)

    # Calculate sum of predicted durations
    sum_pred = df_result["predicted_duration"].sum()
    print("\n" + "=" * 50)
    print(f"Sum of predicted durations: {sum_pred:.2f}")

    # Check which option is closest to our result
    options = [13.08, 36.28, 69.28, 81.08]
    closest = min(options, key=lambda x: abs(x - sum_pred))
    print("\n" + "=" * 50)
    print(f"ANSWER TO Q6: The sum of predicted durations is {sum_pred:.2f}")
    print(f"The closest option is: {closest} ✅")
    print("=" * 50)


# Run the integration test
test_integration()

## Running the test (ungraded)
The rest is ready, but we need to write a shell script for doing that.

Let's do that!

In [None]:
# Change to the tests directory and run the tests
!./run_tests.sh
!./run_integration_tests.sh