In [2]:
cp /tmp/dask_config.yaml /home/jovyan/.config/dask/dask_config.yaml

In [1]:
from dask.distributed import Client
from dask_kubernetes import KubeCluster
import dask.dataframe as dd
from distributed import wait
import gcsfs
import toolz
import pandas as pd

In [2]:
cluster = KubeCluster(n_workers=100)
cluster

distributed.scheduler - INFO - Clear task state
Port 8787 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.
distributed.scheduler - INFO -   Scheduler at:   tcp://10.20.0.131:44829
distributed.scheduler - INFO -   dashboard at:                    :37809


VBox(children=(HTML(value='<h2>KubeCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    .…

In [3]:
client = Client(cluster)

distributed.scheduler - INFO - Receive client connection: Client-485d77d4-fa5d-11e9-8043-56d0ba869408
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register tcp://10.20.144.22:44787
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.20.144.22:44787
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register tcp://10.20.151.22:33057
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.20.151.22:33057
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register tcp://10.20.121.22:35141
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.20.121.22:35141
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register tcp://10.20.147.22:41751
distributed.scheduler - INFO - Register tcp://10.20.127.23:43699
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.20.14

In [4]:
def f(path):
    fs = gcsfs.GCSFileSystem(token="anon")
    with fs.open(path) as g:
        return path, pd.read_csv(g, nrows=5).columns

In [5]:
fs = gcsfs.GCSFileSystem(token="anon")
futures = client.map(f, fs.ls("dask-nyc-taxi/csv/"))
columns = [x.result() for x in futures]

In [6]:
grouped = toolz.groupby(lambda x: '-'.join(x[1]), columns)

In [8]:
groups = []

for x in grouped.values():
    groups.append(['gs://' + v[0] for v in x])

In [9]:
renamer = {
    'VendorID': 'vendor_id',
    'tpep_dropoff_datetime': 'dropoff_datetime',
    'tpep_pickup_datetime': 'pickup_datetime',
    'RateCodeID': 'rate_code',
    'RatecodeID': 'rate_code',
}
keep = [
    'vendor_id', 'pickup_datetime', 'dropoff_datetime', 'passenger_count',
    'trip_distance', 'pickup_longitude', 'pickup_latitude', 'rate_code',
    'dropoff_longitude', 'dropoff_latitude',
    'payment_type', 'fare_amount', 'tip_amount',
    'tolls_amount', 'total_amount',
]

cat_cols = ['vendor_id', 'payment_type', 'rate_code']

In [10]:
df0 = (
    dd.read_csv(groups[0], parse_dates=[' pickup_datetime', ' dropoff_datetime'],
                dtype={'rate_code': 'object'})
      .rename(columns=lambda x: x.strip())
)
df0['vendor_id'] = df0.vendor_id.replace({'CMT': 1, 'VTS': 2})
payment_types = {
    'CRD': 1,
    'CSH': 2,
    'UNK': 5,
    'NOC': 3,
    'DIS': 4,
}
df0['payment_type'] = df0.payment_type.replace(payment_types)

df0 = df0[keep].dropna()
df0['vendor_id'] = df0['vendor_id'].astype(int)
df0['payment_type'] = df0['payment_type'].astype(int)

In [11]:
df1 = (
    dd.read_csv(groups[1],
                parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
               dtype={"RateCodeID": object,
                      "payment_type": float})
         .rename(columns=renamer)
)[keep].dropna()

df1['rate_code'] = df1['rate_code'].astype(int)
df1['payment_type'] = df1['payment_type'].astype(int)

In [12]:
df2 = (
    dd.read_csv(groups[2],
                parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
                dtype={"RatecodeID": object,
                       "payment_type": float})
      .rename(columns=renamer)
)[keep].dropna()
df2['rate_code'] = df2['rate_code'].astype(int)
df2['payment_type'] = df2['payment_type'].astype(int)

In [14]:
df = dd.concat([df0[keep], df1[keep], df2[keep]])

In [None]:
df = df.persist()
wait(df);

In [None]:
import pandas as pd

vendor_id_dtype = pd.CategoricalDtype(categories=['CMT', 'VTS'])
payment_type_dtype = pd.CategoricalDtype(categories=['CRD', 'CSH', 'NOC', 'DIS', 'UNKD'])
rate_code_dtype = pd.CategoricalDtype(categories=['standard', 'JFK', 'NWK', 'nassau', 'negotiated', 'group'])

In [None]:
def f(x, categories):
    return pd.Categorical.from_codes(x - 1, categories=categories)

df['vendor_id'] = df['vendor_id'].map_partitions(f,
                                                   categories=vendor_id_dtype.categories,
                                                   meta=('vendor_id', vendor_id_dtype))


df['payment_type'] = df['payment_type'].map_partitions(f,
                                                         categories=payment_type_dtype.categories,
                                                         meta=('payment_type', payment_type_dtype))


def g(x, categories):
    x2 = x - 1
    x2[x2 >= len(categories)] = -1
    return pd.Categorical.from_codes(x2, categories=categories)

df['rate_code'] = df['rate_code'].map_partitions(g,
                                                   categories=rate_code_dtype.categories,
                                                   meta=('rate_code', rate_code_dtype))

In [None]:
df2 = df.dropna().set_index("pickup_datetime").repartition(freq="D")
df2 = df2.persist()
wait(df2);

In [None]:
lens = df2.map_partitions(len).compute()

In [None]:
*divisions, final = df2.divisions

In [None]:
assert len(divisions) == len(lens)

In [None]:
new_divisions = [x for x, length in zip(divisions, lens) if length > 0]
new_divisions = tuple(new_divisions) + (final,)
# new_divisions = list(toolz.sliding_window(2, new_divisions))
# new_divisions.append((new_divisions[-1][-1], final))

In [None]:
nobjs = len(new_divisions)
new_delayed = []

delayeds = df2.to_delayed()
new_delayed = [x for x, length in zip(delayeds, lens) if length > 0]

In [None]:
df3 = dd.from_delayed(new_delayed, meta=df2._meta, divisions=new_divisions)

In [None]:
df3 = df3.persist()
wait(df3)

In [28]:
df3.to_parquet("gs://dask-nyc-taxi/yellowtrip.parquet/", engine="fastparquet", storage_options={"token": "anon"})

In [30]:
df3.to_parquet("gs://dask-nyc-taxi/nyc-taxi.parquet/", engine="pyarrow", storage_options={"token": "anon"})



In [31]:
dd.read_parquet("gs://dask-nyc-taxi/yellowtrip.parquet/", engine="fastparquet").dtypes

vendor_id                  category
dropoff_datetime     datetime64[ns]
passenger_count               int64
trip_distance               float64
pickup_longitude            float64
pickup_latitude             float64
rate_code                  category
dropoff_longitude           float64
dropoff_latitude            float64
payment_type               category
fare_amount                 float64
tip_amount                  float64
tolls_amount                float64
total_amount                float64
dtype: object

In [32]:
dd.read_parquet("gs://dask-nyc-taxi/nyc-taxi.parquet/", engine="pyarrow").dtypes

vendor_id            datetime64[ns]
dropoff_datetime     datetime64[ns]
passenger_count               int64
trip_distance               float64
pickup_longitude            float64
pickup_latitude             float64
rate_code            datetime64[ns]
dropoff_longitude           float64
dropoff_latitude            float64
payment_type         datetime64[ns]
fare_amount                 float64
tip_amount                  float64
tolls_amount                float64
total_amount                float64
dtype: object

