In [1]:
%matplotlib inline
%pylab inline

Populating the interactive namespace from numpy and matplotlib


In [2]:
import warnings
warnings.filterwarnings('ignore')

In [3]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn import preprocessing
import seaborn as sns
import pandas as pd
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
progress = ProgressBar().register()

In [4]:
IMG_DIR = '../../analysis'
DPI=120

## Loading data using Dask (loads lazily)
* https://www.youtube.com/watch?v=RA_2qdipVng&t=1s
* http://matthewrocklin.com/slides/scipy-2017.html

In [5]:
# data_types = {'CRSElapsedTime': int, 'CRSDepTime': int, 'Year': int, 'Month': int, 'DayOfWeek': int, 'DayofMonth': int}
data_types = {'CRSDepTime': int, 'Year': int, 'Month': int, 'DayOfWeek': int, 'DayofMonth': int}


# http://dask.pydata.org/en/latest/dataframe-overview.html
%time df = dd.read_csv('../../data/raw/*.csv', encoding='iso-8859-1', dtype=data_types, assume_missing=True, blocksize=1024 * 1024)

CPU times: user 2.58 s, sys: 219 ms, total: 2.8 s
Wall time: 2.84 s


In [6]:
%time df.head()

[########################################] | 100% Completed |  0.1s
CPU times: user 109 ms, sys: 109 ms, total: 219 ms
Wall time: 186 ms


Unnamed: 0,Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,...,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
0,2000,1,28,5,1647.0,1647,1906.0,1859.0,HP,154.0,...,15.0,11.0,0.0,,0.0,,,,,
1,2000,1,29,6,1648.0,1647,1939.0,1859.0,HP,154.0,...,5.0,47.0,0.0,,0.0,,,,,
2,2000,1,30,7,,1647,,1859.0,HP,154.0,...,0.0,0.0,1.0,,0.0,,,,,
3,2000,1,31,1,1645.0,1647,1852.0,1859.0,HP,154.0,...,7.0,14.0,0.0,,0.0,,,,,
4,2000,1,1,6,842.0,846,1057.0,1101.0,HP,609.0,...,3.0,8.0,0.0,,0.0,,,,,


In [7]:
%time df = df.fillna(-1)

CPU times: user 15.6 ms, sys: 0 ns, total: 15.6 ms
Wall time: 30.3 ms


In [None]:
# Takes a while
# %time df.count().compute()

In [None]:
# Takes a while, but should be doable
# %time unique_origins = df['Origin'].unique().compute()

In [None]:
# once you compute you get a real pandas series
# type(unique_origins)

In [None]:
# unique_origins

In [8]:
# 2400 is not a valid time
df['CRSDepTime'] = df.apply(lambda row: 2359 if row['CRSDepTime'] == 2400 else row['CRSDepTime'], axis='columns')

In [9]:
# df.apply?

In [10]:
# pd.Timestamp?

In [11]:
head = df.head()

[########################################] | 100% Completed |  0.5s


In [12]:
def create_timestamp (row):
    return pd.Timestamp('%s-%s-%s;%04d'%(row['Year'], row['Month'], row['DayofMonth'], row['CRSDepTime']))

In [13]:
type(head)

pandas.core.frame.DataFrame

In [14]:
head

Unnamed: 0,Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,...,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
0,2000,1,28,5,1647.0,1647,1906.0,1859.0,HP,154.0,...,15.0,11.0,0.0,-1.0,0.0,-1.0,-1.0,-1.0,-1.0,-1.0
1,2000,1,29,6,1648.0,1647,1939.0,1859.0,HP,154.0,...,5.0,47.0,0.0,-1.0,0.0,-1.0,-1.0,-1.0,-1.0,-1.0
2,2000,1,30,7,-1.0,1647,-1.0,1859.0,HP,154.0,...,0.0,0.0,1.0,-1.0,0.0,-1.0,-1.0,-1.0,-1.0,-1.0
3,2000,1,31,1,1645.0,1647,1852.0,1859.0,HP,154.0,...,7.0,14.0,0.0,-1.0,0.0,-1.0,-1.0,-1.0,-1.0,-1.0
4,2000,1,1,6,842.0,846,1057.0,1101.0,HP,609.0,...,3.0,8.0,0.0,-1.0,0.0,-1.0,-1.0,-1.0,-1.0,-1.0


In [15]:
# create a sample for dask to figure out the data types
transformation_sample = head.apply(create_timestamp, axis='columns')

In [16]:
type(transformation_sample)

pandas.core.series.Series

In [17]:
transformation_sample[0]

Timestamp('2000-01-28 16:47:00')

In [18]:
# meta_information = {'@timestamp': pd.Timestamp}
meta_information = transformation_sample

df['@timestamp'] = df.apply(lambda row: pd.Timestamp('%s-%s-%s;%04d'%(row['Year'], row['Month'], row['DayofMonth'], row['CRSDepTime'])),
                            axis='columns',
                           meta=meta_information)

In [19]:
df.head()

[########################################] | 100% Completed |  2.8s


Unnamed: 0,Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,...,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay,@timestamp
0,2000,1,28,5,1647.0,1647,1906.0,1859.0,HP,154.0,...,11.0,0.0,-1.0,0.0,-1.0,-1.0,-1.0,-1.0,-1.0,2000-01-28 16:47:00
1,2000,1,29,6,1648.0,1647,1939.0,1859.0,HP,154.0,...,47.0,0.0,-1.0,0.0,-1.0,-1.0,-1.0,-1.0,-1.0,2000-01-29 16:47:00
2,2000,1,30,7,-1.0,1647,-1.0,1859.0,HP,154.0,...,0.0,1.0,-1.0,0.0,-1.0,-1.0,-1.0,-1.0,-1.0,2000-01-30 16:47:00
3,2000,1,31,1,1645.0,1647,1852.0,1859.0,HP,154.0,...,14.0,0.0,-1.0,0.0,-1.0,-1.0,-1.0,-1.0,-1.0,2000-01-31 16:47:00
4,2000,1,1,6,842.0,846,1057.0,1101.0,HP,609.0,...,8.0,0.0,-1.0,0.0,-1.0,-1.0,-1.0,-1.0,-1.0,2000-01-01 08:46:00


In [20]:
df.npartitions

1623

In [21]:
p0 = df.get_partition(0)

In [22]:
records = p0.compute().to_dict(orient='records')
# records = partition.to_records()
print(type(records))

[########################################] | 100% Completed |  2.8s
<class 'list'>


In [None]:
for partition_nr in range(df.npartitions):
    partition = df.get_partition(partition_nr)
    records = partition.to_records()
    print(type(records))

In [None]:
df.get_partition(28)

In [None]:
df.npartitions

In [None]:
# df.to_delayed()

In [None]:
# df.map_partitions?

In [None]:
df.get_partition?