Write to Parquet
Instead, we’ll store our data in Parquet, a format that is more efficient for computers to read and write.

In [1]:
from dask.distributed import Client
client = Client()
client

0,1
Client  Scheduler: tcp://127.0.0.1:38229  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 8  Memory: 16.69 GB


In [2]:
import dask.dataframe as dd
import numpy as np
from datetime import datetime
from datetime import timedelta
import pandas as pd
import time
import fastparquet 
import pyarrow

In [3]:
def time2Mins(time_):
    hr2min, mins = time_.split(':')
    return ((int(hr2min)*60)+int(mins))/5

def date2Mins(date):
    date_format = "%m/%d/%Y"
    d0 = datetime.strptime(str(date), date_format)
    d1 = datetime.strptime('07/31/2019', date_format)
    delta = d0 - d1
    return (int((delta.days)*24*60))/5

In [None]:
"""
Partitions should fit comfortably in memory (smaller than a gigabyte) 
but also not be too many. Every operation on every partition takes the 
central scheduler a few hundred microseconds to process. If you have a 
few thousand tasks this is barely noticeable, but it is nice to reduce 
the number if possible.

A common situation is that you load lots of data into reasonably sized 
partitions (Dask’s defaults make decent choices), but then you filter
down your dataset to only a small fraction of the original. At this point,
it is wise to regroup your many small partitions into a few larger ones. 
You can do this by using the repartition method:
"""

In [90]:
start = time.time()
# join 37 files into a single df
station = '15799'
ddf = dd.read_csv(f'./data/Targetted-GDOT/{station}/{station}_*.csv',assume_missing=True)
ddf = ddf.repartition(npartitions=4)

# selecting all numeric cols
lane_type_idx = list(ddf.columns).index("lane_type")
data_names = list(ddf.columns[(lane_type_idx+1):])
ddf=ddf.dropna()
# for col in data_names:
#     ddf[col] = ddf[col].fillna(100000)
#     ddf[col] = ddf[col].replace('NA',10000)
#     ddf[col] = ddf[col].replace('na',10000)
#     ddf = ddf.replace('N/A',10000)

# selecting all numeric cols
# num_lanes = len(ddf["lane_type"].unique())
# print(num_lanes)
# col2 = ddf.columns
# lane_type_idx = list(ddf.columns).index("lane_type")
# data_names = list(ddf.columns[(lane_type_idx+1):])

# # map columns and create new column
# ddf.datestamp = ddf.datestamp.map(lambda x: date2Mins(x), meta=pd.Series([], dtype=str, name='x'))
# ddf.timestamp = ddf.timestamp.map(lambda x: time2Mins(x), meta=pd.Series([], dtype=str, name='x'))
# ddf = ddf.map_partitions(lambda df: df.assign(time_anchor=(df.datestamp + df.timestamp)))



# # df can fit in mem so use persist
# ddf = ddf.repartition(npartitions=1)
# ddf = ddf.persist()
# end = time.time()
# print(f'1st process took: {end-start}')

# start = time.time()
# items = list(ddf['lane_type'].unique())
# for lane in items:
#     ddf[ddf['lane_type'] == lane].to_csv(f'./data/{lane}_*.csv')
# end = time.time()
# print(f'2nd process took: {end-start}')

In [160]:
import os
from termcolor import colored

def skipfiles(start_at):
    if skip == True:
        if folder == start_at:
            skip = False
        else:
            skip = True
    return     

def sort_process_ave(folders_dir, start_at=None, skip=False):
    
    folders = os.listdir(folders_dir)
    folders.sort()
    errors = []
    for folder in folders:
        
        # if you need to process a segment at a time
        if skip:
            skip = skipfiles(start_at)
            
        print(folder)
        if folder in ['2941']:
            continue
        # current version for single file
        start = time.time()
        
        
        # join 37 files into a single df
#         print(f'{folders_dir}/{folder}/{folder}_*.csv')
        ddf = dd.read_csv(f'{folders_dir}/{folder}/{folder}_*.csv', assume_missing=True)
        ddf = ddf.repartition(npartitions=8)
        
        # selecting all numeric cols
        lane_type_idx = list(ddf.columns).index("lane_type")
        data_names = list(ddf.columns[(lane_type_idx+1):])
        for col in data_names:
            c_name = col.split('_')
            if 'spd' in c_name:
                ddf[col] = ddf[col].fillna(-10000)
        ddf=ddf.dropna()
        
#         ddf=ddf.fillna(-10000)
        
        
        
        # get vName for csv label
#         try:

        try:
            name = ddf['vName'].head(1).tolist()[0].split()[1]
        except IndexError:
            errors.append(folder)
            print(colored('missing data at station file: ', None),colored(f'{folder}', 'red'))
            continue    
            
#         name = (ddf['vName'].head(1)[0].split()[1])
        num_lanes = len(ddf["lane_type"].unique()) # get the number of lanes
#         except:
#             errors.append(folder)
#             print(colored('Error at station file: ', None),colored(f'{folder}', 'red'))
#             continue

        
        if folder != name:
            print(f'non-match! folder: {folder}')
            errors.append(folder)
            print(colored('Error at station file: ', None),colored(f'{folder}', 'red'))
            continue

        # map columns and create new column
        ddf.datestamp = ddf.datestamp.map(lambda x: date2Mins(x), meta=pd.Series([], dtype=str, name='x'))
        ddf.timestamp = ddf.timestamp.map(lambda x: time2Mins(x), meta=pd.Series([], dtype=str, name='x'))
        ddf = ddf.map_partitions(lambda df: df.assign(time_anchor=(df.datestamp + df.timestamp)))
        ddf = ddf.groupby('time_anchor')[data_names].mean()
        
        for col in data_names:
            c_name = col.split('_')
            if any(x in c_name for x in ['spd','total','avg']):
                pass
            else:
                column = '_'.join(col.split('_'))
                ddf[column] = ddf[column].map(lambda x: x*num_lanes, meta=pd.Series([], dtype=str, name='x'))
        ddf['det_avg_occ'] = ddf['det_avg_occ'].map(lambda x: x*num_lanes, meta=pd.Series([], dtype=str, name='x'))

        # df can fit in mem so use persist
        ddf = ddf.persist()

        start = time.time()
        ddf.to_csv(f'./data/{name}_*.csv')
        end = time.time()
        print(f'{folder} process took: {end-start}')
        
    return errors

In [161]:
errors = sort_process_ave('./data/Targetted-GDOT')

13414
13414 process took: 1.9792237281799316
13415
13415 process took: 3.982919931411743
13416
13416 process took: 3.381715774536133
13417
13417 process took: 2.2020857334136963
13419
13419 process took: 2.8936495780944824
13482
13482 process took: 2.7302143573760986
13508
13508 process took: 3.1634011268615723
13516
13516 process took: 1.8067774772644043
15799
15799 process took: 0.893892765045166
15801
15801 process took: 1.795670986175537
15802
15802 process took: 1.776519775390625
15805
15805 process took: 0.5167829990386963
15886
missing data at station file: [0m [31m15886[0m
15891
15891 process took: 1.737846851348877
15892
15892 process took: 1.7567920684814453
16019
16019 process took: 3.61160945892334
16020
16020 process took: 5.193772315979004
16043
16043 process took: 3.3169870376586914
202
missing data at station file: [0m [31m202[0m
2788
2788 process took: 2.202224016189575
2793
2793 process took: 3.0816895961761475
2797
2797 process took: 4.405994415283203
2798
2798

4903 process took: 3.234149217605591
4905
4905 process took: 2.9544413089752197
5597
5597 process took: 3.7223784923553467
5734
missing data at station file: [0m [31m5734[0m
5839
5839 process took: 3.3950858116149902
5845
5845 process took: 3.227142572402954
5846
5846 process took: 3.067761182785034
5854
5854 process took: 2.3334593772888184
5882
5882 process took: 2.993346929550171
5905
5905 process took: 2.603724718093872
5906
5906 process took: 3.8130764961242676
5908
5908 process took: 3.440537691116333
5917
5917 process took: 3.032792568206787
5918
5918 process took: 1.992537260055542
5929
5929 process took: 3.0197556018829346
5942
5942 process took: 2.9387855529785156
6067
6067 process took: 1.7825028896331787
6068
6068 process took: 1.7844629287719727
6070
6070 process took: 1.8305199146270752
6071
6071 process took: 1.7862768173217773
6154
missing data at station file: [0m [31m6154[0m
6170
missing data at station file: [0m [31m6170[0m
6173
6173 process took: 1.89507555

In [None]:
['15886',
 '202',
 '3157',
 '3232',
 '4177', #off line
 '4178',
 '4210',
 '4211',
 '4328',
 '4329',
 '4354',
 '4728',
 '4729', #off line 
 '5734', #off line
 '6154', #off line /one week
 '6170', #off line
 '6176', #off line
]

In [None]:
# current version for single file
start = time.time()
# join 37 files into a single df
station = '2769'
ddf = dd.read_csv(f'./data/csv/{station}/{station}_*.csv')
ddf = ddf.repartition(npartitions=8)

# get vName for csv label
name = (ddf['vName'].head(1)[0].split()[1])

# selecting all numeric cols
lane_type_idx = list(ddf.columns).index("lane_type")
data_names = list(ddf.columns[(lane_type_idx+1):])

# map columns and create new column
ddf.datestamp = ddf.datestamp.map(lambda x: date2Mins(x), meta=pd.Series([], dtype=str, name='x'))
ddf.timestamp = ddf.timestamp.map(lambda x: time2Mins(x), meta=pd.Series([], dtype=str, name='x'))
ddf = ddf.map_partitions(lambda df: df.assign(time_anchor=(df.datestamp + df.timestamp)))
ddf = ddf.groupby('time_anchor')[data_names].mean()

# # df can fit in mem so use persist
# ddf = ddf.persist()
# end = time.time()
# print(f'1st process took: {end-start}')

# start = time.time()
# ddf.to_csv(f'./data/{name}_*.csv')
# end = time.time()
# print(f'2nd process took: {end-start}')

In [None]:
ddf

In [None]:
name

In [None]:
for folders in folder:
    prnt(folder)

In [None]:
# single file processor for pandas
start = time.time()
stations_dir = './data/batched/errored2'
for station in os.listdir(stations_dir):
    file_path = f'{stations_dir}/{station}'
    # join 37 files into a single df
    dfs = []
    for file in os.listdir(file_path):
        df = pd.read_csv(f'{file_path}/{file}')
        dfs.append(df)
        
    df = pd.concat(dfs)
    # map columns and create new column
    try:
        df.datestamp = df.datestamp.map(lambda x: date2Mins(x))
        df.timestamp = df.timestamp.map(lambda x: time2Mins(x))
        df['time_anchor'] = df.datestamp + df.timestamp
        # find the number of lanes of the road and split the data
        items = list(df['lane_type'].unique())
        for lane in items:
            df[df['lane_type'] == lane].to_csv(f'./data/batched/dump_pd/{station}_{lane}.csv')
    except:
        print(f'error at: {station}_{lane}.csv')
end = time.time()
print(f'the process took: {end-start}')

In [None]:
# single file processor
import os
start = time.time()
stations_dir = './data/batched/errored2'
for station in os.listdir(stations_dir):
    file_path = f'{stations_dir}/{station}'
    # join 37 files into a single df
    ddf = dd.read_csv(f'{file_path}/*.csv')
    ddf = ddf.repartition(npartitions=4)
    # map columns and create new column
    try:
        ddf.datestamp = ddf.datestamp.map(lambda x: date2Mins(x), meta=pd.Series([], dtype=str, name='x'))
        ddf.timestamp = ddf.timestamp.map(lambda x: time2Mins(x), meta=pd.Series([], dtype=str, name='x'))
        ddf = ddf.map_partitions(lambda df: df.assign(time_anchor=(df.datestamp + df.timestamp)))
        # df can fit in mem so use persist
    except:
        print(f'error with file: {station}')
        continue
    ddf = ddf.persist()
    end = time.time()
    print(f'1st process took: {end-start}')
    items = list(ddf['lane_type'].unique())
    for lane in items:
        ddf[ddf['lane_type'] == lane].to_csv(f'./data/batched/dump_dask/{station}_{lane}_part*.csv')
end = time.time()
print(f'2nd process took: {end-start}')