In [108]:
import pandas as pd
import resource 
import pickle as pickle
import parquet
import pyarrow
import feather
import json
import re
import numpy as np
import os
%load_ext memory_profiler


The memory_profiler extension is already loaded. To reload it, use:
  %reload_ext memory_profiler


In [79]:

PREC = 2

def convert_to_categorical(col):
    if (col.nunique() / len(col.index)) < 0.2:
        cat_dtype = pd.api.types.CategoricalDtype(ordered=True)
        col = col.astype(cat_dtype, copy=False)
    return col


def shrink_floats(col):
    if col.dtypes == 'float64':
        col = (col * (10**PREC)).astype('int64', copy=False)
    return col

def shrink_integers(col):
    if col.dtypes == 'int64':
        min_val = col.min()
        max_val = col.max()
#         print(c, df[c].max(),df[c].min())
#         print(df[c].memory_usage(deep=True)) 
        if max_val <= 255 and min_val >= 0:
            col = col.astype('uint8', copy=False)
        elif max_val <= 65535 and min_val >= 0:
            col = col.astype('uint16', copy=False)            
        elif max_val <= 4294967295 and min_val >= 0:
            col = col.astype('uint32', copy=False)            
        elif max_val <= 127 and min_val >= -128:
            col = col.astype('int8', copy=False) 
        elif max_val <= 32767 and min_val >= -32768:
            col = col.astype('int16', copy=False)
        elif max_val <= 2147483647 and min_val >= -2147483648:
            col = col.astype('int32', copy=False)
        else:
            print("not worth changing types")
    return col

def get_trailing_number(s):
    m = re.search(r'\d+$', s)
    return int(m.group()) if m else None

def intersection(lst1, lst2): 
    return list(set(lst1) & set(lst2)) 

def union(lst1, lst2): 
    return list(set().union(lst1, lst2))

def read(feather_dir, file, preds, cols, num_chunks):
    indices = []
    for filename in os.listdir(feather_dir):
        for i,col in enumerate(cols):
            if filename.startswith(col) :
                pred = preds[i]
                e = feather.read_dataframe(feather_dir + filename)
                ind = e[pred].index
                ind = np.add(ind, 1 + CHUNK_SIZE * get_trailing_number(filename.split('.')[0]))
                if len(indices) > 0: # only for ands
                    indices = union(indices, ind)
                else:
                    indices += list(ind)
    df_labels = pd.read_csv(file, nrows=1)
    df = pd.read_csv(file, skiprows=indices, header=0, names=df_labels.columns, index_col=False)
    return df

def read_cardinality_many(feather_dir, file, preds, cols, num_chunks):
    indices = []
    for i,col in enumerate(cols):
        chunk_inds = []
        for filename in os.listdir(feather_dir):
            if filename.startswith(col) :
                pred = preds[i]
                e = feather.read_dataframe(feather_dir + filename)

                ind = e[pred].index
                ind = np.add(ind, CHUNK_SIZE * get_trailing_number(filename.split('.')[0]))
                chunk_inds.extend(ind)
        if len(indices) > 0: # only for ands
            indices = intersection(indices, chunk_inds)
        else:
            indices = chunk_inds
    return len(indices)

def read_cardinality_one(feather_dir, file, preds, cols, num_chunks):
    indices = []
    total = 0
    for filename in os.listdir(feather_dir):
        for i,col in enumerate(cols):
            if filename.startswith(col) :
                pred = preds[i]
                e = feather.read_dataframe(feather_dir + filename)
                total += len(e[pred])
    return total

def read_fast(feather_dir, file, preds, cols, num_chunks):
    
    df_all = []
    for i in range(0,num_chunks+1):
        df_part = (feather.read_dataframe(f'{feather_dir}full{i}.f'))
        for pred in preds:
            df_part = df_part[pred]
        df_all.append(df_part)
    df = pd.concat(df_all, ignore_index=True)
    return df


In [80]:
def write_chunk(df, chunk_id):
    for c in df:
#         df[c] = shrink_floats(df[c])
        df[c] = shrink_integers(df[c])
        df[c] = convert_to_categorical(df[c])
        feather.write_dataframe(df[c].to_frame(),  f'{FEATHER_DIR}{c}{chunk_id}.f')
    feather.write_dataframe(df, f'{FEATHER_DIR}full{chunk_id}.f')
def write_chunks(file, chunk_size):
    num_chunks = 0
    for i, df in enumerate(pd.read_csv(file, iterator=True, chunksize=chunk_size)):
        write_chunk(df, i)
        num_chunks = i
    return num_chunks

In [130]:
COLS = ['raw_row_number'
        ,'date','county_name','district','subject_race',
        'subject_sex','department_name','type','violation','arrest_made',
        'citation_issued','warning_issued','outcome','contraband_found',
        'frisk_performed','search_conducted','search_person','search_basis',
        'reason_for_stop','raw_race','raw_search_basis'
       ]
VALS = [32,'2009-07-01','San Diego County','San Onofre Inspection Facility',
        'hispanic','male','California Highway Patrol','vehicular','Inspection / Scale Facility',
        'NA','NA','NA','NA', 'NA','NA',False,False,'NA','Inspection / Scale Facility',
        'Hispanic','Parole / Probation / Warrant'
]

PREDS = [lambda x: (x[COLS[0]] != VALS[0]),
         lambda x: (x[COLS[1]] != VALS[1]),
         lambda x: (x[COLS[2]] != VALS[2]),
         lambda x: (x[COLS[3]] != VALS[3]),
         lambda x: (x[COLS[4]] != VALS[4]),
         lambda x: (x[COLS[5]] != VALS[5]),
         lambda x: (x[COLS[6]] != VALS[6]),
         lambda x: (x[COLS[7]] != VALS[7]),
         lambda x: (x[COLS[8]] != VALS[8]),
         lambda x: (~x[COLS[9]].isnull()),
         lambda x: (~x[COLS[10]].isnull()),
         lambda x: (~x[COLS[11]].isnull()),
         lambda x: (~x[COLS[12]].isnull()),
         lambda x: (~x[COLS[13]].isnull()),
         lambda x: (~x[COLS[14]].isnull()),
         lambda x: (x[COLS[15]] != VALS[15]),
         lambda x: (x[COLS[16]] != VALS[16]),
         lambda x: (~x[COLS[17]].isnull()),
         lambda x: (x[COLS[18]] != VALS[18]),
         lambda x: (x[COLS[19]] != VALS[19]),
         lambda x: (x[COLS[20]] != VALS[20]),
        ]

POS_PREDS = [
         lambda x: (x[COLS[0]] == VALS[0]),
         lambda x: (x[COLS[1]] == VALS[1]),
         lambda x: (x[COLS[2]] == VALS[2]),
         lambda x: (x[COLS[3]] == VALS[3]),
         lambda x: (x[COLS[4]] == VALS[4]),
         lambda x: (x[COLS[5]] == VALS[5]),
         lambda x: (x[COLS[6]] == VALS[6]),
         lambda x: (x[COLS[7]] == VALS[7]),
         lambda x: (x[COLS[8]] == VALS[8]),
         lambda x: (x[COLS[9]].isnull()),
         lambda x: (x[COLS[10]].isnull()),
         lambda x: (x[COLS[11]].isnull()),
         lambda x: (x[COLS[12]].isnull()),
         lambda x: (x[COLS[13]].isnull()),
         lambda x: (x[COLS[14]].isnull()),
         lambda x: (x[COLS[15]] == VALS[15]),
         lambda x: (x[COLS[16]] == VALS[16]),
         lambda x: (x[COLS[17]].isnull()),
         lambda x: (x[COLS[18]] == VALS[18]),
         lambda x: (x[COLS[19]] == VALS[19]),
         lambda x: (x[COLS[20]] == VALS[20]),
        ]

# COLS = ['issue_url','issue_title','body']
# VALS = ['https://github.com/DungeonKeepers/DnDInventoryManager/issues/2', 'api', 'documentation']


# POS_PREDS = [
#              lambda x: (x[COLS[0]].str.contains(VALS[0])),
#              lambda x: (x[COLS[1]].str.contains(VALS[1])),
#              lambda x: (x[COLS[2]].str.contains(VALS[2])),
#             ]
                    
# PREDS = [
#          lambda x: (~x[COLS[0]].str.contains(VALS[0])),
#          lambda x: (~x[COLS[1]].str.contains(VALS[1])),
#          lambda x: (~x[COLS[2]].str.contains(VALS[2])),
#         ]

# COLS = ['LCLid','tstp','energy']
# VALS = ['MAC000002','2012-10-12 00:30:00.0000000', 0]


# POS_PREDS = [
#              lambda x: (x[COLS[0]].str.contains(VALS[0])),
#              lambda x: (x[COLS[1]].str.contains(VALS[1])),
#              lambda x: (x[COLS[2]] == VALS[2]),
#             ]
                    
# PREDS = [
#          lambda x: (~x[COLS[0]].str.contains(VALS[0])),
#          lambda x: (~x[COLS[1]].str.contains(VALS[1])),
#          lambda x: (x[COLS[2]] != VALS[2]),
#         ]


CHUNK_SIZE = 5_000_000
FILE = 'datasets/github_issues.csv'
FEATHER_DIR = 'datasets/github_issues/'

In [131]:
%%time

num_chunks = write_chunks(FILE, CHUNK_SIZE)


CPU times: user 1min 2s, sys: 55.7 s, total: 1min 58s
Wall time: 2min 52s


In [127]:
%%time

print(read_cardinality_many(FEATHER_DIR, FILE, POS_PREDS[8:13], COLS[8:13], num_chunks))
# print(read_cardinality_one(FEATHER_DIR, FILE, POS_PREDS[8:9], COLS[8:9], num_chunks))

1043872
CPU times: user 10 s, sys: 1.5 s, total: 11.5 s
Wall time: 12.1 s


In [115]:
%%time

num_chunks = write_chunks(FILE, CHUNK_SIZE)
df_read = read_fast(FEATHER_DIR, FILE, PREDS, COLS, num_chunks)

print(df_read.shape)

(528503, 21)
CPU times: user 6.14 s, sys: 1.05 s, total: 7.19 s
Wall time: 5.3 s


In [100]:
%%time
df_read = read(FEATHER_DIR, FILE, PREDS[0:3], COLS[0:3], num_chunks)
print(df_read.shape)

(1, 3)
CPU times: user 52.5 s, sys: 15.2 s, total: 1min 7s
Wall time: 1min 13s


In [101]:
%%time

indices = []
df = pd.read_csv(FILE)

for pred in PREDS[0:3]:
    indices = df[pred].index
    df.drop(indices, inplace=True)

print(df.shape)

(1, 3)
CPU times: user 23.9 s, sys: 1.71 s, total: 25.6 s
Wall time: 29.6 s


In [102]:
%%time

df_all = []

for i, df_p in enumerate(pd.read_csv(FILE, iterator=True, chunksize=CHUNK_SIZE)):
    for pred in PREDS[8:9]:
        indices = df_p[pred].index
        df_p.drop(indices, inplace=True)
    df_all.append(df_p)

df = pd.concat(df_all, ignore_index=True)
print(df.shape)

(1, 3)
CPU times: user 23.6 s, sys: 1.4 s, total: 25 s
Wall time: 26.3 s


test across all columns (when is it worth running in this system)

shrinking encodings of different type (approximate queries)

ca_police 6.7GB
full_files 802MB
col_files 802 MB
parse time 3min 36s



PREDS[5:6] 22161713

read_fast 8.66 s
read 1min 24s
read_chunks 2min 20s
read_csv 2min 55s

PREDS[4:10] 528503

read_fast  6.58s
read 2 min 7s
read_chunks 2min 12s
read_csv 3min 21s

PREDS[0:20] 1

read_fast 2.66 s
read 6min 38s
read_chunks 1min 59s
read_csv 2min 58s


github_issues 1min 27s

PREDS[0:3] 1

read_fast 51.9 s
read 2min 56s
read_chunks 49.9 s
read_csv 1min 9s

PREDS[0:1] 102077

read_fast 2min 3s
read 4min 43s
read_chunks 1min 23s
read_csv 2min 26s


block_total 22.6 s

PREDS [2:3] 183368

read_fast 795ms
read 29.5 s
read_chunks 21.8 s
read_csv 17.5 s

PREDS [0:3] 1

read_fast 1.92 s
read 1min 13s
read_chunks 26.3 s
read_csv 29.6 s



Queries

ca_police[8] 82260698
cardinality_one 234 ms
cardinality_many 447 ms
normal 2-3 min

ca_police[8:10]
cardinality_many 4.1s
read_fast ~10 s
normal ~2 min

ca_police[8:11]
cardinality_many 6.57s
read_fast ~10 s
normal ~2 min

ca_police[8:13]
cardinality_many 12.1s
read_fast ~10 s
normal ~2 min

In [420]:
indices = []
    
df_all = []
for i in range(0,num_chunks+1):
    df_all.append(feather.read_dataframe(f'{FEATHER_DIR}full{i}.f'))
df = pd.concat(df_all, ignore_index=True)
print('concatted')
print(df.index)

concatted
RangeIndex(start=0, stop=4999999, step=1)
