# Yellow Taxi Data

In [1]:
import io
import gzip
import pandas as pd
import requests
if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test

@data_loader
def load_data_from_api(*args, **kwargs):
    file_list = [
        'yellow_tripdata_2019-01.csv.gz', 
        'yellow_tripdata_2019-02.csv.gz', 
        'yellow_tripdata_2019-03.csv.gz',
        'yellow_tripdata_2019-04.csv.gz', 
        'yellow_tripdata_2019-05.csv.gz', 
        'yellow_tripdata_2019-06.csv.gz',
        'yellow_tripdata_2019-07.csv.gz', 
        'yellow_tripdata_2019-08.csv.gz', 
        'yellow_tripdata_2019-09.csv.gz',
        'yellow_tripdata_2019-10.csv.gz', 
        'yellow_tripdata_2019-11.csv.gz', 
        'yellow_tripdata_2019-12.csv.gz',
        'yellow_tripdata_2020-01.csv.gz', 
        'yellow_tripdata_2020-02.csv.gz', 
        'yellow_tripdata_2020-03.csv.gz',
        'yellow_tripdata_2020-04.csv.gz', 
        'yellow_tripdata_2020-05.csv.gz', 
        'yellow_tripdata_2020-06.csv.gz',
        'yellow_tripdata_2020-07.csv.gz', 
        'yellow_tripdata_2020-08.csv.gz', 
        'yellow_tripdata_2020-09.csv.gz',
        'yellow_tripdata_2020-10.csv.gz', 
        'yellow_tripdata_2020-11.csv.gz', 
        'yellow_tripdata_2020-12.csv.gz',
        ]
    url = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/'
    
    taxi_dtypes = {
                    'VendorID': pd.Int64Dtype(),
                    'passenger_count': pd.Int64Dtype(),
                    'trip_distance': float,
                    'RatecodeID':pd.Int64Dtype(),
                    'store_and_fwd_flag':str,
                    'PULocationID':pd.Int64Dtype(),
                    'DOLocationID':pd.Int64Dtype(),
                    'payment_type': pd.Int64Dtype(),
                    'fare_amount': float,
                    'extra':float,
                    'mta_tax':float,
                    'tip_amount':float,
                    'tolls_amount':float,
                    'improvement_surcharge':float,
                    'total_amount':float,
                    'congestion_surcharge':float
                }

    # native date parsing 
    parse_dates = ['tpep_pickup_datetime', 'tpep_dropoff_datetime']

    data = pd.DataFrame()
    shards = []
    for f in file_list:
        print(f)
        shards.append(pd.read_csv(f'{url}{f}', sep=',', compression='gzip', dtype=taxi_dtypes, parse_dates=parse_dates))
    data = pd.concat(shards, axis=0)

    return data


@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert output is not None, 'The output is undefined'

In [2]:
data = load_data_from_api()

yellow_tripdata_2019-01.csv.gz
yellow_tripdata_2019-02.csv.gz
yellow_tripdata_2019-03.csv.gz
yellow_tripdata_2019-04.csv.gz
yellow_tripdata_2019-05.csv.gz
yellow_tripdata_2019-06.csv.gz
yellow_tripdata_2019-07.csv.gz
yellow_tripdata_2019-08.csv.gz
yellow_tripdata_2019-09.csv.gz
yellow_tripdata_2019-10.csv.gz
yellow_tripdata_2019-11.csv.gz
yellow_tripdata_2019-12.csv.gz
yellow_tripdata_2020-01.csv.gz
yellow_tripdata_2020-02.csv.gz
yellow_tripdata_2020-03.csv.gz
yellow_tripdata_2020-04.csv.gz
yellow_tripdata_2020-05.csv.gz
yellow_tripdata_2020-06.csv.gz
yellow_tripdata_2020-07.csv.gz
yellow_tripdata_2020-08.csv.gz
yellow_tripdata_2020-09.csv.gz
yellow_tripdata_2020-10.csv.gz
yellow_tripdata_2020-11.csv.gz
yellow_tripdata_2020-12.csv.gz


In [3]:
if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test


@transformer
def transform(data, *args, **kwargs):
    #data = data[data['passenger_count'] > 0]
    #data = data[data['trip_distance'] > 0]
    data['tpep_pickup_date'] = data['tpep_pickup_datetime'].dt.date

    data = data.rename(columns={
        'VendorID': 'vendor_id', 
        'RatecodeID': 'ratecode_id', 
        'PULocationID': 'pu_location_id',
        'DOLocationID': 'do_location_id'
        }
    )
    return data


@test
def test_output(output, *args) -> None:
    assert output is not None, 'The output is undefined'
    #assert output['vendor_id'] is not None
    #assert output['passenger_count'].isin([0]).sum() == 0
    #assert output['trip_distance'].isin([0]).sum() == 0

In [4]:
data = transform(data)

In [5]:
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.google_cloud_storage import GoogleCloudStorage
from pandas import DataFrame
from os import path

if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter


@data_exporter
def export_data_to_google_cloud_storage(df: DataFrame, **kwargs) -> None:
    
    config_path = path.join(get_repo_path(), 'io_config.yaml')
    config_profile = 'default'

    bucket_name = 'mage-zoomcamp-2024-alex-korga'
    object_key = 'yellow_taxi_data.parquet'

    GoogleCloudStorage.with_config(ConfigFileLoader(config_path, config_profile)).export(
        df,
        bucket_name,
        object_key,
    )

In [6]:
export_data_to_google_cloud_storage(data)

GoogleCloudStorage initialized
└─ Exporting data frame to bucket 'mage-zoomcamp-2024-alex-korga' at key 'yellow_taxi_data.parquet'...DONE


In [7]:
import pyarrow as pa
import pyarrow.parquet as pq
from pandas import DataFrame
import os

if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = './credentials/cred.json'

bucket_name = 'mage-zoomcamp-2024-alex-korga'
project_id = 'dezoomcamp2024-412718'

table_name = 'yellow_taxi_data'

root_path = f'{bucket_name}/{table_name}'

@data_exporter
def export_data(df: DataFrame, *args, **kwargs):
    table = pa.Table.from_pandas(df)
    gcs = pa.fs.GcsFileSystem()

    pq.write_to_dataset(
        table, 
        root_path, 
        partition_cols=['tpep_pickup_date'], 
        filesystem=gcs
    )

In [8]:
export_data(data)

In [9]:
del data

# Green Taxi Data

In [10]:
import io
import gzip
import pandas as pd
import requests
if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test

@data_loader
def load_data_from_api(*args, **kwargs):
    file_list = [
        'green_tripdata_2019-01.csv.gz', 
        'green_tripdata_2019-02.csv.gz', 
        'green_tripdata_2019-03.csv.gz',
        'green_tripdata_2019-04.csv.gz', 
        'green_tripdata_2019-05.csv.gz', 
        'green_tripdata_2019-06.csv.gz',
        'green_tripdata_2019-07.csv.gz', 
        'green_tripdata_2019-08.csv.gz', 
        'green_tripdata_2019-09.csv.gz',
        'green_tripdata_2019-10.csv.gz', 
        'green_tripdata_2019-11.csv.gz', 
        'green_tripdata_2019-12.csv.gz',
        'green_tripdata_2020-01.csv.gz', 
        'green_tripdata_2020-02.csv.gz', 
        'green_tripdata_2020-03.csv.gz',
        'green_tripdata_2020-04.csv.gz', 
        'green_tripdata_2020-05.csv.gz', 
        'green_tripdata_2020-06.csv.gz',
        'green_tripdata_2020-07.csv.gz', 
        'green_tripdata_2020-08.csv.gz', 
        'green_tripdata_2020-09.csv.gz',
        'green_tripdata_2020-10.csv.gz', 
        'green_tripdata_2020-11.csv.gz', 
        'green_tripdata_2020-12.csv.gz',
        ]
    url = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/'
    
    taxi_dtypes = {
                    'VendorID': pd.Int64Dtype(),
                    'passenger_count': pd.Int64Dtype(),
                    'trip_distance': float,
                    'RatecodeID':pd.Int64Dtype(),
                    'store_and_fwd_flag':str,
                    'PULocationID':pd.Int64Dtype(),
                    'DOLocationID':pd.Int64Dtype(),
                    'payment_type': pd.Int64Dtype(),
                    'fare_amount': float,
                    'extra':float,
                    'mta_tax':float,
                    'tip_amount':float,
                    'tolls_amount':float,
                    'improvement_surcharge':float,
                    'total_amount':float,
                    'congestion_surcharge':float
                }

    # native date parsing 
    parse_dates = ['lpep_pickup_datetime', 'lpep_dropoff_datetime']

    data = pd.DataFrame()
    shards = []
    for f in file_list:
        print(f)
        shards.append(pd.read_csv(f'{url}{f}', sep=',', compression='gzip', dtype=taxi_dtypes, parse_dates=parse_dates))
    data = pd.concat(shards, axis=0)

    return data


@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert output is not None, 'The output is undefined'

In [11]:
data = load_data_from_api()

green_tripdata_2019-01.csv.gz
green_tripdata_2019-02.csv.gz
green_tripdata_2019-03.csv.gz
green_tripdata_2019-04.csv.gz
green_tripdata_2019-05.csv.gz
green_tripdata_2019-06.csv.gz
green_tripdata_2019-07.csv.gz
green_tripdata_2019-08.csv.gz
green_tripdata_2019-09.csv.gz
green_tripdata_2019-10.csv.gz
green_tripdata_2019-11.csv.gz
green_tripdata_2019-12.csv.gz
green_tripdata_2020-01.csv.gz
green_tripdata_2020-02.csv.gz
green_tripdata_2020-03.csv.gz
green_tripdata_2020-04.csv.gz
green_tripdata_2020-05.csv.gz
green_tripdata_2020-06.csv.gz
green_tripdata_2020-07.csv.gz
green_tripdata_2020-08.csv.gz
green_tripdata_2020-09.csv.gz
green_tripdata_2020-10.csv.gz
green_tripdata_2020-11.csv.gz
green_tripdata_2020-12.csv.gz


In [13]:
if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test


@transformer
def transform(data, *args, **kwargs):
    #data = data[data['passenger_count'] > 0]
    #data = data[data['trip_distance'] > 0]
    data['lpep_pickup_date'] = data['lpep_pickup_datetime'].dt.date

    data = data.rename(columns={
        'VendorID': 'vendor_id', 
        'RatecodeID': 'ratecode_id', 
        'PULocationID': 'pu_location_id',
        'DOLocationID': 'do_location_id'
        }
    )
    return data


@test
def test_output(output, *args) -> None:
    assert output is not None, 'The output is undefined'
    #assert output['vendor_id'] is not None
    #assert output['passenger_count'].isin([0]).sum() == 0
    #assert output['trip_distance'].isin([0]).sum() == 0

In [14]:
data = transform(data)

In [15]:
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.google_cloud_storage import GoogleCloudStorage
from pandas import DataFrame
from os import path

if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter


@data_exporter
def export_data_to_google_cloud_storage(df: DataFrame, **kwargs) -> None:
    
    config_path = path.join(get_repo_path(), 'io_config.yaml')
    config_profile = 'default'

    bucket_name = 'mage-zoomcamp-2024-alex-korga'
    object_key = 'green_taxi_data.parquet'

    GoogleCloudStorage.with_config(ConfigFileLoader(config_path, config_profile)).export(
        df,
        bucket_name,
        object_key,
    )

In [16]:
export_data_to_google_cloud_storage(data)

GoogleCloudStorage initialized
└─ Exporting data frame to bucket 'mage-zoomcamp-2024-alex-korga' at key 'green_taxi_data.parquet'...DONE


In [17]:
import pyarrow as pa
import pyarrow.parquet as pq
from pandas import DataFrame
import os

if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = './credentials/cred.json'

bucket_name = 'mage-zoomcamp-2024-alex-korga'
project_id = 'dezoomcamp2024-412718'

table_name = 'green_taxi_data'

root_path = f'{bucket_name}/{table_name}'

@data_exporter
def export_data(df: DataFrame, *args, **kwargs):
    table = pa.Table.from_pandas(df)
    gcs = pa.fs.GcsFileSystem()

    pq.write_to_dataset(
        table, 
        root_path, 
        partition_cols=['lpep_pickup_date'], 
        filesystem=gcs
    )

In [18]:
export_data(data)

In [19]:
del data

# FHV Taxi Data

In [20]:
import io
import gzip
import pandas as pd
import requests
if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test

@data_loader
def load_data_from_api(*args, **kwargs):
    file_list = [
        'fhv_tripdata_2019-01.csv.gz', 
        'fhv_tripdata_2019-02.csv.gz', 
        'fhv_tripdata_2019-03.csv.gz',
        'fhv_tripdata_2019-04.csv.gz', 
        'fhv_tripdata_2019-05.csv.gz', 
        'fhv_tripdata_2019-06.csv.gz',
        'fhv_tripdata_2019-07.csv.gz', 
        'fhv_tripdata_2019-08.csv.gz', 
        'fhv_tripdata_2019-09.csv.gz',
        'fhv_tripdata_2019-10.csv.gz', 
        'fhv_tripdata_2019-11.csv.gz', 
        'fhv_tripdata_2019-12.csv.gz',
        ]
    url = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/'
    
    taxi_dtypes = {
                    'VendorID': pd.Int64Dtype(),
                    'passenger_count': pd.Int64Dtype(),
                    'trip_distance': float,
                    'RatecodeID':pd.Int64Dtype(),
                    'store_and_fwd_flag':str,
                    'PULocationID':pd.Int64Dtype(),
                    'DOLocationID':pd.Int64Dtype(),
                    'payment_type': pd.Int64Dtype(),
                    'fare_amount': float,
                    'extra':float,
                    'mta_tax':float,
                    'tip_amount':float,
                    'tolls_amount':float,
                    'improvement_surcharge':float,
                    'total_amount':float,
                    'congestion_surcharge':float
                }

    # native date parsing 
    parse_dates = ['pickup_datetime', 'dropOff_datetime']

    data = pd.DataFrame()
    shards = []
    for f in file_list:
        print(f)
        shards.append(pd.read_csv(f'{url}{f}', sep=',', compression='gzip', dtype=taxi_dtypes, parse_dates=parse_dates))
    data = pd.concat(shards, axis=0)

    return data


@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert output is not None, 'The output is undefined'

In [21]:
data = load_data_from_api()

fhv_tripdata_2019-01.csv.gz
fhv_tripdata_2019-02.csv.gz
fhv_tripdata_2019-03.csv.gz
fhv_tripdata_2019-04.csv.gz
fhv_tripdata_2019-05.csv.gz
fhv_tripdata_2019-06.csv.gz
fhv_tripdata_2019-07.csv.gz
fhv_tripdata_2019-08.csv.gz
fhv_tripdata_2019-09.csv.gz
fhv_tripdata_2019-10.csv.gz
fhv_tripdata_2019-11.csv.gz
fhv_tripdata_2019-12.csv.gz


In [22]:
if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test


@transformer
def transform(data, *args, **kwargs):
    #data = data[data['passenger_count'] > 0]
    #data = data[data['trip_distance'] > 0]
    data['pickup_date'] = data['pickup_datetime'].dt.date

    data = data.drop(labels='SR_Flag', axis=1)

    data = data.rename(columns={
        'VendorID': 'vendor_id', 
        'RatecodeID': 'ratecode_id', 
        'PULocationID': 'pu_location_id',
        'DOLocationID': 'do_location_id', 
        'dropOff_datetime': 'drop_off_datetime',
        'PUlocationID': 'pu_location_id',
        'DOlocationID': 'do_location_id',
        'Affiliated_base_number': 'affiliated_base_number'
        }
    )
    return data


@test
def test_output(output, *args) -> None:
    assert output is not None, 'The output is undefined'
    #assert output['vendor_id'] is not None
    #assert output['passenger_count'].isin([0]).sum() == 0
    #assert output['trip_distance'].isin([0]).sum() == 0

In [23]:
data = transform(data)

In [25]:
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.google_cloud_storage import GoogleCloudStorage
from pandas import DataFrame
from os import path

if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter


@data_exporter
def export_data_to_google_cloud_storage(df: DataFrame, **kwargs) -> None:
    
    config_path = path.join(get_repo_path(), 'io_config.yaml')
    config_profile = 'default'

    bucket_name = 'mage-zoomcamp-2024-alex-korga'
    object_key = 'fhv_taxi_data.parquet'

    GoogleCloudStorage.with_config(ConfigFileLoader(config_path, config_profile)).export(
        df,
        bucket_name,
        object_key,
    )

In [26]:
export_data_to_google_cloud_storage(data)

GoogleCloudStorage initialized
└─ Exporting data frame to bucket 'mage-zoomcamp-2024-alex-korga' at key 'fhv_taxi_data.parquet'...DONE


In [27]:
import pyarrow as pa
import pyarrow.parquet as pq
from pandas import DataFrame
import os

if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = './credentials/cred.json'

bucket_name = 'mage-zoomcamp-2024-alex-korga'
project_id = 'dezoomcamp2024-412718'

table_name = 'fhv_taxi_data'

root_path = f'{bucket_name}/{table_name}'

@data_exporter
def export_data(df: DataFrame, *args, **kwargs):
    table = pa.Table.from_pandas(df)
    gcs = pa.fs.GcsFileSystem()

    pq.write_to_dataset(
        table, 
        root_path, 
        partition_cols=['pickup_date'], 
        filesystem=gcs
    )

In [28]:
export_data(data)

In [29]:
del data