- Name: Isaac Ndirangu Muturi
- Email: ndirangumuturi749@gmail.com

## Week 2 Homework

For the homework, we'll be working with the _green_ taxi dataset located here:

`https://github.com/DataTalksClub/nyc-tlc-data/releases/tag/green/download`

### Assignment

The goal will be to construct an ETL pipeline that loads the data, performs some transformations, and writes the data to a database (and Google Cloud!).

- Create a new pipeline, call it `green_taxi_etl`
- Add a data loader block and use Pandas to read data for the final quarter of 2020 (months `10`, `11`, `12`).
  - You can use the same datatypes and date parsing methods shown in the course.
  - `BONUS`: load the final three months using a for loop and `pd.concat`



In [None]:
import io
import pandas as pd
import requests
if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test

@data_loader
def load_data_from_api(*args, **kwargs):
    base_url = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green'
    taxi_dtypes = {
        'VendorID': pd.Int64Dtype(),
        'passenger_count': pd.Int64Dtype(),
        'trip_distance': float,
        'RatecodeID': pd.Int64Dtype(),
        'store_and_fwd_flag': str,
        'PULocationID': pd.Int64Dtype(),
        'DOLocationID': pd.Int64Dtype(),
        'payment_type': pd.Int64Dtype(),
        'fare_amount': float,
        'extra': float,
        'mta_tax': float,
        'tip_amount': float,
        'tolls_amount': float,
        'improvement_surcharge': float,
        'total_amount': float,
        'congestion_surcharge': float
    }
    # native date parsing
    parse_dates = ['lpep_pickup_datetime', 'lpep_dropoff_datetime']

    dfs = []  # List to store DataFrames for each month

    # Load data for the final three months (10, 11, 12) using a for loop
    for month in range(10, 13):
        url = f'{base_url}/green_tripdata_2020-{month}.csv.gz'
        df = pd.read_csv(url, sep=',', compression='gzip', dtype=taxi_dtypes, parse_dates=parse_dates)
        dfs.append(df)

    # Concatenate DataFrames for the final quarter
    final_quarter_data = pd.concat(dfs, ignore_index=True)
    data = final_quarter_data
    return data

@test
def test_output(output, *args) -> None:
    assert output is not None, 'The output is undefined'


- Add a transformer block and perform the following:
  - Remove rows where the passenger count is equal to 0 _or_ the trip distance is equal to zero.
  - Create a new column `lpep_pickup_date` by converting `lpep_pickup_datetime` to a date.
  - Rename columns in Camel Case to Snake Case, e.g. `VendorID` to `vendor_id`.
  - Add three assertions:
    - `vendor_id` is one of the existing values in the column (currently)
    - `passenger_count` is greater than 0
    - `trip_distance` is greater than 0


In [None]:
if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test

@transformer
def transform_data(data, *args, **kwargs):
    # Remove rows where passenger count is equal to 0 or trip distance is equal to 0
    data = data[(data['passenger_count'] > 0) & (data['trip_distance'] > 0)]

    # Create a new column lpep_pickup_date by converting lpep_pickup_datetime to date
    data['lpep_pickup_date'] = data['lpep_pickup_datetime'].dt.date

    # Rename columns from Camel Case to Snake Case
    data.rename(columns={
        'VendorID': 'vendor_id',
        'RatecodeID': 'ratecode_id',
        'PULocationID': 'pu_location_id',
        'DOLocationID': 'do_location_id',
    }, inplace=True)

    return data

@test
def test_output(output, *args) -> None:
    # Add assertions
    # Assertion: vendor_id is one of the columns
    assert 'vendor_id' in data.columns, 'vendor_id column not found in the dataset'
    assert (data['passenger_count'] > 0).all(), 'Invalid passenger_count values'
    assert (data['trip_distance'] > 0).all(), 'Invalid trip_distance values'


- Using a Postgres data exporter (SQL or Python), write the dataset to a table called `green_taxi` in a schema `mage`. Replace the table if it already exists.


In [None]:
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.postgres import Postgres
from pandas import DataFrame
from os import path

if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter


@data_exporter
def export_data_to_postgres(df: DataFrame, **kwargs) -> None:
    schema_name = 'mage'  # Specify the name of the schema to export data to
    table_name = 'green_taxi'  # Specify the name of the table to export data to
    config_path = path.join(get_repo_path(), 'io_config.yaml')
    config_profile = 'dev'

    with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
        loader.export(
            df,
            schema_name,
            table_name,
            index=False,  # Specifies whether to include index in exported table
            if_exists='replace',  # Specify resolution policy if table name already exists
        )


- Write your data as Parquet files to a bucket in GCP, partioned by `lpep_pickup_date`. Use the `pyarrow` library!


In [None]:
if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter

import pyarrow as pa
import pyarrow.parquet as pq
import os

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/home/src/direct-disk-412820-951f0087b486.json'

bucket_name = 'mage_data_engineering_zoomcamp'
project_id = 'direct-disk-412820'

table_name = "green_taxi_data"
root_path = f'{bucket_name}/{table_name}'

@data_exporter
def export_data(data, *args, **kwargs):
    table = pa.Table.from_pandas(data)
    gcs = pa.fs.GcsFileSystem()
    pq.write_to_dataset(
        table,
        root_path=root_path,
        partition_cols=['lpep_pickup_date'],
        filesystem=gcs
    )


- Schedule your pipeline to run daily at 5AM UTC.

In [1]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2021-07.csv.gz

--2024-02-04 21:46:49--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2021-07.csv.gz
Resolving github.com (github.com)... 20.87.245.0
Connecting to github.com (github.com)|20.87.245.0|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/a5bb4029-0fe6-4367-b2f6-eb6c8a9d9d86?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240204%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240204T184655Z&X-Amz-Expires=300&X-Amz-Signature=429c38c154bf63a67361c71da428f2e4979642d8ba41e5d4c5feba46668b2b29&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dgreen_tripdata_2021-07.csv.gz&response-content-type=application%2Foctet-stream [following]
--2024-02-04 21:46:52--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/a5bb4029-0

In [3]:
file_path = 'green_tripdata_2021-07.csv.gz'

import pandas as pd
pd.read_csv(
    file_path,
    sep=',',
    compression='gzip',
        ).head()

  pd.read_csv(


Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,1.0,2021-07-01 00:30:52,2021-07-01 00:35:36,N,1.0,74,168,1.0,1.2,6.0,0.5,0.5,0.0,0.0,,0.3,7.3,2.0,1.0,0.0
1,2.0,2021-07-01 00:25:36,2021-07-01 01:01:31,N,1.0,116,265,2.0,13.69,42.0,0.5,0.5,0.0,0.0,,0.3,43.3,2.0,1.0,0.0
2,2.0,2021-07-01 00:05:58,2021-07-01 00:12:00,N,1.0,97,33,1.0,0.95,6.5,0.5,0.5,2.34,0.0,,0.3,10.14,1.0,1.0,0.0
3,2.0,2021-07-01 00:41:40,2021-07-01 00:47:23,N,1.0,74,42,1.0,1.24,6.5,0.5,0.5,0.0,0.0,,0.3,7.8,2.0,1.0,0.0
4,2.0,2021-07-01 00:51:32,2021-07-01 00:58:46,N,1.0,42,244,1.0,1.1,7.0,0.5,0.5,0.0,0.0,,0.3,8.3,2.0,1.0,0.0


### Questions

## Question 1. Data Loading

Once the dataset is loaded, what's the shape of the data?

* 266,855 rows x 20 columns
* 544,898 rows x 18 columns
* 544,898 rows x 20 columns
* 133,744 rows x 20 columns


In [None]:
# (266855, 20)


## Question 2. Data Transformation

Upon filtering the dataset where the passenger count is greater than 0 _and_ the trip distance is greater than zero, how many rows are left?

* 544,897 rows
* 266,855 rows
* 139,370 rows
* 266,856 rows


In [None]:
# (139370, 21)


## Question 3. Data Transformation

Which of the following creates a new column `lpep_pickup_date` by converting `lpep_pickup_datetime` to a date?

* `data = data['lpep_pickup_datetime'].date`
* `data('lpep_pickup_date') = data['lpep_pickup_datetime'].date`
* `data['lpep_pickup_date'] = data['lpep_pickup_datetime'].dt.date`
* `data['lpep_pickup_date'] = data['lpep_pickup_datetime'].dt().date()`


In [None]:
data['lpep_pickup_date'] = data['lpep_pickup_datetime'].dt.date

## Question 4. Data Transformation

What are the existing values of `VendorID` in the dataset?

* 1, 2, or 3
* 1 or 2
* 1, 2, 3, 4
* 1


In [None]:
SELECT DISTINCT vendor_id FROM mage.green_taxi;
# * 1 or 2

## Question 5. Data Transformation

How many columns need to be renamed to snake case?

* 3
* 6
* 2
* 4


In [None]:
# Rename columns from Camel Case to Snake Case
data.rename(columns={
    'VendorID': 'vendor_id',
    'RatecodeID': 'ratecode_id',
    'PULocationID': 'pu_location_id',
    'DOLocationID': 'do_location_id',
}, inplace=True)
# 4

## Question 6. Data Exporting

Once exported, how many partitions (folders) are present in Google Cloud?

* 96
* 56
* 67
* 108


In [None]:
gsutil ls -d gs://mage_data_engineering_zoomcamp/green_taxi_data/*/ | wc -l
# * 96
