Hey, I'm Jobert Gutierrez and hereafter you'll find the logic and code used to answer the second assignment in the program Data Engineering Zoomcamp offered by Data Talks Club.

# __Module 2 Homework__

> In case you don't get one option exactly, select the closest one

For the homework, we'll be working with the green taxi dataset located here:

https://github.com/DataTalksClub/nyc-tlc-data/releases/tag/green/download

## Assignment
The goal will be to construct an ETL pipeline that loads the data, performs some transformations, and writes the data to a database (and Google Cloud!).

- Create a new pipeline, call it _green_taxi_etl_
- Add a data loader block and use Pandas to read data for the final quarter of 2020 (months 10, 11, 12).
    - You can use the same datatypes and date parsing methods shown in the course.
    - `BONUS`: load the final three months using a for loop and `pd.concat`
- Add a transformer block and perform the following:
- Remove rows where the passenger count is equal to 0 or the trip distance is equal to zero.
- Create a new column lpep_pickup_date by converting lpep_pickup_datetime to a date.
- Rename columns in Camel Case to Snake Case, e.g. VendorID to `vendor_id`.
- Add three assertions:
    - vendor_id is one of the existing values in the column (currently)
    - passenger_count is greater than 0
    - trip_distance is greater than 0
- Using a Postgres data exporter (SQL or Python), write the dataset to a table called green_taxi in a schema mage. Replace the table if it already exists.
- Write your data as Parquet files to a bucket in GCP, partioned by lpep_pickup_date. Use the pyarrow library!
- Schedule your pipeline to run daily at `5AM UTC`.

## Questions
### Question 1. Data Loading
Once the dataset is loaded, what's the shape of the data?

266,855 rows x 20 columns <br>
544,898 rows x 18 columns <br>
544,898 rows x 20 columns <br>
133,744 rows x 20 columns <br>

### Answer: 
Using the code:

In [None]:
import io
import pandas as pd
import requests
if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test


@data_loader
def load_data_from_api(*args, **kwargs):

    month_list = [10, 11, 12]
    data = list()
    
    for month in month_list:

        url = f'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2020-{month}.csv.gz'
        
        
        #Declaring data types is a goood DE practice coz it reduces memory consumption
        taxi_dtypes = {
            'VendorID': pd.Int64Dtype(),
            'store_and_fwd_flag': str,
            'RatecodeID': pd.Int64Dtype(),    
            'PULocationID': pd.Int64Dtype(),
            'DOLocationID': pd.Int64Dtype(),
            'passenger_count': pd.Int64Dtype(),
            'trip_distance': float,
            'fare_amount': float,
            'extra': float,
            'mta_tax': float,
            'tip_amount': float,
            'tolls_amount': float,        
            'improvement_surcharge': float,
            'total_amount': float,
            'payment_type': pd.Int64Dtype(),
            'trip_type':pd.Int64Dtype(),
            'congestion_surcharge': float,
        }
        parse_dates = ['lpep_pickup_datetime', 'lpep_dropoff_datetime']

        df = pd.read_csv(url, sep=',', compression='gzip', dtype=taxi_dtypes, parse_dates = parse_dates)

        data.append(df)

    dataframe = pd.concat(data)
    
    return dataframe

We obtain the result of __266855 rows x 20 columns__.

## Question 2. Data Transformation
Upon filtering the dataset where the passenger count is greater than 0 and the trip distance is greater than zero, how many rows are left?

544,897 rows <br>
266,855 rows <br>
139,370 rows <br>
266,856 rows <br>

## Answer:

Using the code:

In [None]:
from mage_ai.data_cleaner.transformer_actions.base import BaseAction
from mage_ai.data_cleaner.transformer_actions.constants import ActionType, Axis
from mage_ai.data_cleaner.transformer_actions.utils import build_transformer_action
from pandas import DataFrame

if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test


@transformer
def execute_transformer_action(df: DataFrame, *args, **kwargs) -> DataFrame:
    
    # Identifying the number of rows with zero passengers
    print(f'Preprocessing: rows with zero passenger count: ', df['passenger_count'].isin([0]).sum())
    data = df[df['passenger_count']>0]

    # Identifying the number of rows with trip distance of zero 
    print(f'Preprocessing: rows with trip distance of zero: ', data['trip_distance'].isin([0]).sum())
    data = data[data['trip_distance']>0]

    # Creating the date 'lpep_pickup_date' column 
    data['lpep_pickup_date'] = data['lpep_pickup_datetime'].dt.date

    # Converting column names from Camel case to Snake case
    data.columns = (data.columns
                    .str.replace('(?<=[a-z])(?=[A-Z])', '_', regex=True)
                    .str.lower())

    return data


@test
def test_vendor_id(output, *args) -> None:
    assert "vendor_id" in output.columns, 'vendor_id exists in Column names.'

@test
def test_passenger_count(output, *args) -> None:
    assert output['passenger_count'].isin([0]).sum() == 0, 'There are rides with zero passengers.'

@test
def test_trip_distance(output, *args) -> None:
    assert output['trip_distance'].isin([0]).sum() == 0, 'There are rides with trip distance of zero.'

We get the result __139370 rows x 21 columns__.

## Question 3. Data Transformation
Which of the following creates a new column `lpep_pickup_date` by converting `lpep_pickup_datetime` to a date?

data = data['lpep_pickup_datetime'].date <br>
data('lpep_pickup_date') = data['lpep_pickup_datetime'].date <br>
data['lpep_pickup_date'] = data['lpep_pickup_datetime'].dt.date <br>
data['lpep_pickup_date'] = data['lpep_pickup_datetime'].dt().date() <br>

## Answer:

As seen in the code snippet in previous question, the correct asnwer is <br>`data['lpep_pickup_date'] = data['lpep_pickup_datetime'].dt.date`

## Question 4. Data Transformation
What are the existing values of VendorID in the dataset?

1, 2, or 3 <br>
1 or 2 <br>
1, 2, 3, 4 <br>
1 <br>

## Answer:

Using the code `print(data['vendor_id'].unique())` we see that the __unique values contained into the vendor_id column are [1, 2]__.

## Question 5. Data Transformation
How many columns need to be renamed to snake case?

3 <br>
6 <br>
2 <br>
4 <br>

## Answer:

Based on the small amount of columns (20), we can follow and count by visual inspection. Hence, __only 4 columns__ need to be renamed (VendorID, RatecodeID, PULocationID, DOLocationID).

## Question 6. Data Exporting
Once exported, how many partitions (folders) are present in Google Cloud?

96 <br>
56 <br>
67 <br>
108 <br>

## Answer:

Using the following code:

In [None]:
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.google_cloud_storage import GoogleCloudStorage
from pandas import DataFrame
import pyarrow as pa
import pyarrow.parquet as pq
import os

if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter

os.environ['GOOGLE_APPLICATION_CREDENTIALS']= "/home/src/data-taxi-1-a1d4e91c10cd.json"

project_id = 'data-taxi-1'
bucket_name = 'terraform-taxi-data-1'
table_name = 'nyc_taxi_data'

root_path = f'{bucket_name}/{table_name}'

@data_exporter
def export_data_to_google_cloud_storage(df: DataFrame, **kwargs) -> None:
    
    table = pa.Table.from_pandas(df)

    gcs = pa.fs.GcsFileSystem()

    pq.write_to_dataset(
        table,
        root_path = root_path,
        partition_cols = ['lpep_pickup_date'],
        filesystem = gcs,
    )

By using the field `lpep_pickup_date` to create partitions in Google Cloud Storage, __I get 96 folders in my table nyc_taxi_data from the data's upload operation__. 