In [1]:
from dotenv import load_dotenv
import os
import sys

load_dotenv()  # Load variables from .env

project_root = os.getenv("project_root")
sys.path.append(project_root)


In [None]:
#check if we are running inside the correct virtual environment.
import sys
print(sys.executable)


## Extract Data

In [None]:
import pandas as pd
import pyarrow.parquet as pq

In [None]:
trips = pq.read_table('../data/raw/fhvhv_tripdata_2021-10.parquet')
trips = trips.to_pandas()

## Explore Data

In [None]:
trips.describe()

In [None]:
trips.dtypes

In [None]:
trips.info()

In [None]:
trips.value_counts()

>hvfhs_license_num	Company (Uber/Lyft)

>dispatching_base_num	Base that dispatched the trip

>originating_base_num	Original base that got the request

>request_datetime	When the trip was requested

>on_scene_datetime	Driver arrives

>pickup_datetime	Trip starts

>dropoff_datetime	Trip ends

>PULocationID	Pickup zone

>DOLocationID	Dropoff zone

>trip_miles	Distance

>trip_time	Duration

>base_passenger_fare	Base fare

>tolls	Toll fees

>bcf	Black Car Fund surcharge

>sales_tax	Tax

>congestion_surcharge	NYC congestion fee

>airport_fee	Airport access fee

>tips	Tips

>driver_pay	Driver earnings

>shared_request_flag	Passenger asked for shared ride

>shared_match_flag	Shared ride matched

>access_a_ride_flag	Handicapped program

>wav_request_flag	Wheelchair-accessible vehicle requested

>wav_match_flag	WAV matched

In [None]:
trips.columns

In [None]:
trips['originating_base_num'].value_counts().count()

>isna() and isnull() are aliases

Pandas created both names for convenience.

âœ” Both detect missing values:

NaN

None

NaT

empty/null values

In [None]:
trips.isnull().sum()

## Data Cleaning

### Drop Columns

In [None]:
trips.drop(columns=['originating_base_num'],inplace=True,axis=1)

In [None]:
trips.drop(columns=['on_scene_datetime'],inplace=True,axis=1)

In [None]:
trips.drop(columns=['access_a_ride_flag'],inplace=True)

In [None]:
trips.head()

In [None]:
trips['wav_request_flag'].value_counts()

In [None]:
trips['wav_match_flag'].value_counts()

In [None]:
trips['shared_request_flag'].value_counts()

In [None]:
trips['shared_match_flag'].value_counts()


### Repalce boolean columns

In [None]:
trips.replace({
    'shared_request_flag':{'N':0,'Y':1},
    'wav_match_flag':{'N':0,'Y':1},
    'wav_request_flag':{'N':0,'Y':1},
    'shared_match_flag':{'N':0,'Y':1}
},
inplace=True)

In [None]:
trips.head()

### Convert data type

In [None]:
trips.dtypes

In [None]:
trips.dtypes == 'datetime64[us]'

In [None]:
trips.dtypes == 'int64'

In [None]:
for x in trips:
    print(x)

In [None]:
for x in trips:
    if(trips[x].dtypes == 'datetime64[us]'):
        trips[x] = pd.to_datetime(trips[x])

In [None]:
for x in trips:
    if(trips[x].dtypes == 'int64'):
        trips[x] = trips[x].astype(int)

In [None]:
for x in trips:
    if(trips[x].dtypes == 'float64'):
        trips[x] = trips[x].astype(float)

## Add Index

In [None]:
trips['row_id'] = trips.index

# Test src.benchmarks.benchmarks_app.py

In [None]:
from src.mongo_import import connect_to_mongo

DB_NAME = "trips_db"
COLLECTION_NAME = "fhvhv_trips_2021-10"
db = connect_to_mongo(DB_NAME)
collection = db[COLLECTION_NAME]
pipeline = [
{ "$group": { "_id": "$hvfhs_license_num", "total": { "$sum": 1 }, "avg_trip_time": { "$avg": "$trip_time" } } },
{ "$match": { "avg_trip_time": { "$gte": 300 } } }
]



In [None]:
m = db.command("explain", {
    "aggregate": collection.name,
    "pipeline": pipeline,
    
    })

In [None]:
m.get("stages", {})

In [2]:
from src.benchmarks.benchmarks_app import run_benchmark

In [4]:
q = {'trip_time':{'$gte': 300}}
run_benchmark(
    query=q,
    index_param={ "trip_time": 1 },
    index_name="simple_index"
)

{'trip_time': {'$gte': 300}}
<pymongo.synchronous.cursor.Cursor object at 0x00000289019E51D0>


_OperationCancelled: operation cancelled