In [1]:
import pandas as pd
from sqlalchemy import create_engine
from pathlib import Path
from tqdm import tqdm
from rq import Queue
from redis import Redis
from time import sleep
from random import randint
import os
import dask.dataframe as dd
from src.helpers import get_dataframe

In [2]:
con_str = "postgresql://postgres:postgres@postgres"
data_location = Path("data/rq_results")
data_location.mkdir(exist_ok=True)
engine = create_engine(con_str)

In [3]:
pd.read_sql("SELECT COUNT(*) FROM yellow_taxi", engine)

Unnamed: 0,count
0,7696617


In [4]:
# Initalize redis
redis_client = Redis(host="redis")
Q = Queue(connection=redis_client)

In [5]:
jobs = []
for i in range(1,32):
    day = f"2019-01-{i:02}"
    job = Q.enqueue(get_dataframe, day, con_str, data_location)
    jobs.append(job)

In [6]:
while any([not(j.is_finished) for j in jobs]):
    print(f"Some jobs still running - {sum(not(j.is_finished) for j in jobs)} jobs are still running")
    sleep(15)

Some jobs still running - 31 jobs are still running
Some jobs still running - 31 jobs are still running
Some jobs still running - 25 jobs are still running
Some jobs still running - 22 jobs are still running
Some jobs still running - 18 jobs are still running
Some jobs still running - 14 jobs are still running
Some jobs still running - 11 jobs are still running
Some jobs still running - 7 jobs are still running
Some jobs still running - 3 jobs are still running


In [7]:
ddf = dd.read_parquet("data/rq_results/*.parquet")

In [8]:
ddf.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 4 entries, tpep_pickup_datetime to fare_amount
dtypes: datetime64[ns](1), float64(3)

In [9]:
ddf.head()

Unnamed: 0,tpep_pickup_datetime,passenger_count,trip_distance,fare_amount
0,2019-01-01 05:41:56,,13.6,48.49
1,2019-01-01 05:33:00,,18.12,64.55
2,2019-01-01 05:18:08,,5.03,29.45
3,2019-01-01 00:38:45,,2.21,12.24
4,2019-01-01 05:32:00,,13.11,47.99


In [10]:
ddf.describe().compute()

Unnamed: 0,passenger_count,trip_distance,fare_amount
count,7667408.0,7696080.0,7696080.0
mean,1.567009,2.830114,12.52952
std,1.224395,3.774527,261.5988
min,0.0,0.0,-362.0
25%,1.0,1.07,6.5
50%,1.0,1.9,9.5
75%,2.0,3.91,15.5
max,9.0,831.8,623259.9


In [11]:
ddf.head()

Unnamed: 0,tpep_pickup_datetime,passenger_count,trip_distance,fare_amount
0,2019-01-01 05:41:56,,13.6,48.49
1,2019-01-01 05:33:00,,18.12,64.55
2,2019-01-01 05:18:08,,5.03,29.45
3,2019-01-01 00:38:45,,2.21,12.24
4,2019-01-01 05:32:00,,13.11,47.99
