In [2]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/amex-default-prediction/sample_submission.csv
/kaggle/input/amex-default-prediction/train_data.csv
/kaggle/input/amex-default-prediction/test_data.csv
/kaggle/input/amex-default-prediction/train_labels.csv
/kaggle/input/amex-train-pq/ex.parquet


## Library Imports


In [3]:
import gc
import scipy as sp
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.csv as pv
import pyarrow.parquet as pq
import multiprocessing as mp
# from tqdm.auto import tqdm
# import itertools

pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 200)
pd.set_option('display.width', 1000)

## The enviornment we are running this in is memory constrained.
To combat this, we will be doing a few things. Ultimately, we desire to reduce memory costs.
1. Iterate through file using a cursor
2. Convert file from CSV to Parquet
3. Check for and Reformat improperly stored data
  * Unique integers saved as strings, etc. 
  
*Iterate through file using a cursor*<br>
Iterating through the file, while more memory stable, does produce some inherit costs. <br>
1. open_csv() is the only *supported* way to instantiate a CSVStreamingReader in pyarrow.
2. open_csv() does not support multiprocessing, even when setting multiprocessing to true.
3. Setting a read_size in pyarrow will only change the chunking size. The open file may continue to read into memory, reducing resources for other processing.
**It may not enough to set read_size, CSV may need to be opened in a preallocated memory pool.**<br>
However CPU isn't an issue. So at this time, we will open both CSVStreamingReader and ParquetWriter simultaneously in a spawned process.

In [4]:
import resource
def mem():
    print('Memory usage         : % 2.2f MB' % round(
        resource.getrusage(resource.RUSAGE_SELF).ru_maxrss/1024.0,1))

In [None]:
# Note: While pyarrow.csv.StreamingReader documentation says it has a close() method, it does not. MUST be opened by with statement
def convert_csv_to_parquet(origin_path, destination_path, pv_read_options):
    with pv.open_csv('../input/amex-default-prediction/train_data.csv', read_options=pv_read_options) as csv_cursor:
        with pq.ParquetWriter('ex.parquet', csv_cursor.schema) as writer:
            while True:
                try:
                    batch = csv_cursor.read_next_batch()
                    writer.write_batch(batch)
                except StopIteration:
                    print('Reached End')
                    mem()
                    batch = None
                    gc.collect()
                    break
                batch = None
                gc.collect()

In [None]:
csv_path = '../input/amex-default-prediction/train_data.csv'
pv_read_options = pv.ReadOptions(block_size=268435456)
parquet_dest = 'ex.parquet'

mem()
convert_process = mp.Process(target=convert_csv_to_parquet, args=(csv_path, parquet_dest, pv_read_options))
convert_process.start()
convert_process.join()
mem()

# Reduction in File Size
Here we see, the 16.39GB CSV file has been reduced to a 6.97GB Parquet file. <br>
Even in our memory constrained environment, we could read all of this to memory, so long as we clean up after ourselves.

## Limitations of Garbage Collection
While we have called gc.collect() several times, it is important to note; the gc module doesn't track all the files we created. The blinding large set of data is creating a 'free list'. This will pose an issue when we need to free up memory. There are several methods possible for garbage collection, however without a reference to the objects created, gc.collect() will leave the data in memory. When pyarrow reads in the data, it does so to the Arrow buffer. Even when dealing with the Arrow buffer directly, it is not guaranteed that the memory will be returned to the OS, this is a conundrum of large file sizes. In this scenario, given the formats the data can be read into memory and the need to preserve the resources for ensemble calculation, the function will have to occur in a subprocess to most efficiently return the memory.

In [None]:
def create_array():
    dataset = pq.ParquetDataset('../input/amex-train-pq/ex.parquet')
    pq_array = dataset.read()
    mem()
mem()
p = mp.Process(target=create_array, args=())
p.start()
p.join()
mem()

# Spawning Processes Returns Memory upon join()
While there are limitations in spawing processes to handle memory-hogging tasks, using means outside of the pyarrow library to multiprocess a document often requires Cython a boilerplate. Given the limitations of the interface, we aren't sacrificing much.<br>
## However, we should still be only accessing one column at a time. <h4>We will need to do two forms of iterative loading:</h4>
* Row Based
  * This will be the base access for ensamble algorithms
  * At current; ParquetDataset doesn't support split_row_groups, or any other form of row seperation/filtering, however ParquetFile module supports batch reading.
      * We will need to change the format we work with to a pyarrow Array in order to allow quicker processing and translation between endpoints.
* Column Based
  * This will be the base access for Feature Creation </ul></ul>
It is important to remember the shape of our table has Features in Columns, and Persons in rows. <br>
Iterative loading has several benefits, and with proper feature creation and tracking, would be the proper access of a dataset this large even outside of the constrains of the environment.

pyarrow has a very powerful API for 

In [5]:
pq_path = '../input/amex-train-pq/ex.parquet'
dataset = pq.ParquetDataset(pq_path, use_legacy_dataset=False)

## Convert Customer ID
<h4>customer_ID</h4> 
customer_ID is a string, if we take a subsection of customer ID and redefine it as a integer we save a great deal of space. This requires that we verify that all our base16 strings are converted to base10 without overlapping. We will be converting them uniformly, and checking the number of uniques before and after transformation. If these numbers are the same, we can infer that the transfer was successful. 

In [20]:
import sys
mem()
def convert_cust_id(dataset):
    schema = dataset.schema
    table = dataset.read(['customer_ID'])
    mem()
    df = table.to_pandas()
    display(df.customer_ID.nunique())
    print(sys.getsizeof(df.customer_ID))
    df.customer_ID = df.customer_ID.apply(lambda x: int(x[-16:], base=16)).astype('int64')
    display(df.customer_ID.nunique())
    print(sys.getsizeof(df.customer_ID))
#     We see we've reduced the ~669MB memory of the df.customer_ID object to ~44MB
#     We used a apply(lambda) to do this to each row and we are returned with the same number of unique items, indicating a successful transfer to base10

p = mp.Process(target=convert_cust_id, args=(dataset,))
p.start()
p.join()
mem()

Memory usage         :  256.70 MB
Memory usage         :  636.60 MB


458913

669305731


458913

44251768
Memory usage         :  256.70 MB


## Other Datatype Conversions

## Review Data Row by Row
We've iterated through our parquet columns and checked for unique counts, we've found the following  columns boil down to 8 unique values maximum. <br>
Knowing this we are able to recategorize these categoricals as integers.<br>
['D_63', 'D_64', 'D_66', 'D_68', 'B_30', 'B_38', 'D_114', 'D_116', 'D_117', 'D_120', 'D_126'] <br>
So here we need to:
*     create a function that maps the unique variables of the column to a integer of base=8
*     apply that function to every column
*     convert back to parquet
*     observe size changes

In [9]:
# Convert date string to datetime object
#     table = dataset.read(['D_63', 'D_64', 'D_66', 'D_68', 'B_30', B_38', 'D_114', 'D_116', 'D_117', 'D_120', 'D_126'])

import sys
mem()
def review_data(dataset):
    schema = dataset.schema
    n = len(schema)
    for i in range(n):
        col = schema[i].name
        table = dataset.read([col])
        print(table.group_by('col').aggregate([("count_distinct")]))
        nunique = pa.compute.count_distinct(table[col])
        pd_nunique = table.to_pandas().nunique()
    cols = ['D_63', 'D_64', 'D_66', 'D_68', 'B_30', 'B_38', 'D_114', 'D_116', 'D_117', 'D_120', 'D_126']
    table = dataset.read(cols)
    df = table.to_pandas()
    for i in cols:
        print(df[i].value_counts())
        df[i] = df[i].apply(lambda x: int(x, base=8)).astype('int8')
    display(df)
    df.info()

def int_map(df):
    for i in df.columns:
        n = df[i].nunique()
        for num in n:
            pass

p = mp.Process(target=review_data, args=(dataset,))
p.start()
p.join()
mem()

Memory usage         :  254.40 MB
CO    4119621
CR     930133
CL     438390
XZ      25786
XM      10556
XL       6965
Name: D_63, dtype: int64
O     2913244
U     1523448
R      840112
       217442
-1      37205
Name: D_64, dtype: int64
1.0    617066
0.0      6288
Name: D_66, dtype: int64
6.0    2782455
5.0    1201706
3.0     484442
4.0     477187
2.0     220111
1.0     133122
0.0      15925
Name: D_68, dtype: int64
0.0    4710663
1.0     763955
2.0      54817
Name: B_30, dtype: int64
2.0    1953232
3.0    1255315
1.0    1160047
5.0     444856
4.0     294917
7.0     259028
6.0     162040
Name: B_38, dtype: int64
1.0    3316478
0.0    2038257
Name: D_114, dtype: int64
0.0    5348109
1.0       6626
Name: D_116, dtype: int64
-1.0    1456084
 3.0    1166400
 4.0    1138666
 2.0     666808
 5.0     459290
 6.0     344520
 1.0     122967
Name: D_117, dtype: int64
0.0    4729723
1.0     625012
Name: D_120, dtype: int64
 1.0    4262414
 0.0     891323
-1.0     260898
Name: D_126, dtype: int64

Unnamed: 0,D_63,D_64,D_66,D_68,B_30,B_38,D_114,D_116,D_117,D_120,D_126
0,CR,O,,6.0,0.0,2.0,1.0,0.0,4.0,0.0,1.0
1,CR,O,,6.0,0.0,2.0,1.0,0.0,4.0,0.0,1.0
2,CR,O,,6.0,0.0,2.0,1.0,0.0,4.0,0.0,1.0
3,CR,O,,6.0,0.0,2.0,1.0,0.0,4.0,0.0,1.0
4,CR,O,,6.0,0.0,2.0,1.0,0.0,4.0,0.0,1.0
...,...,...,...,...,...,...,...,...,...,...,...
5531446,CL,O,,5.0,0.0,3.0,1.0,0.0,3.0,0.0,1.0
5531447,CL,O,,5.0,0.0,3.0,1.0,0.0,3.0,0.0,1.0
5531448,CL,O,,5.0,0.0,3.0,1.0,0.0,3.0,0.0,1.0
5531449,CL,O,,5.0,0.0,3.0,1.0,0.0,3.0,0.0,1.0


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5531451 entries, 0 to 5531450
Data columns (total 11 columns):
 #   Column  Dtype  
---  ------  -----  
 0   D_63    object 
 1   D_64    object 
 2   D_66    float64
 3   D_68    float64
 4   B_30    float64
 5   B_38    float64
 6   D_114   float64
 7   D_116   float64
 8   D_117   float64
 9   D_120   float64
 10  D_126   float64
dtypes: float64(9), object(2)
memory usage: 464.2+ MB
Memory usage         :  254.40 MB


In [168]:
mem()
def columnar_scan(dataset):
    schema = dataset.schema
    for i in schema:
        table = dataset.read([i.name])
        break
    mem()
    df = table.to_pandas()
    customer_ID_group = df.groupby(['customer_ID'])
    df = customer_ID_group['customer_ID'].count().reset_index(name="count")
    display(df.query('count < 13'))
def row_scan(dataset):
    pass

p = mp.Process(target=columnar_scan, args=(dataset,))
p.start()
p.join()
mem()

Memory usage         :  298.70 MB
Memory usage         :  677.80 MB


Unnamed: 0,customer_ID,count
10,0001337ded4e1c2539d1a78ff44a457bd4a95caa55ba17...,3
20,000391f219520dbca6c3c1c46e0fab569da163f79ee266...,4
27,0004860c260168fcaad0734a1dfedb7ceb1a83aaac54e2...,9
35,00057c2d8d887fa3f777d97dc939700731575772e6c990...,4
36,0005a6ae24fd274640a237ea56c43b1ef9e32077ad168a...,12
...,...,...
458895,fffe13e28dc3ceadf28249b596ba25df93e38ec53d38cf...,3
458896,fffe2bc02423407e33a607660caeed076d713d8a5ad323...,8
458899,fffe5008118592b867d89647fc840c45858860f596d98b...,2
458903,fffec7d7e1ca804c86f1ffdaac389c33f8039ed35bf412...,7


Memory usage         :  298.70 MB


In [112]:
mem()
def row_scan(path):
    pf = pq.ParquetFile(path)
    row = next(pf.iter_batches(batch_size = 1))
    df = pa.Table.from_batches([row]).to_pandas()
    mem()
    display(df)
    print(df.customer_ID[0])
    
    

p = mp.Process(target=row_scan, args=(pq_path,))
p.start()
p.join()
mem()

Memory usage         :  298.70 MB
Memory usage         :  598.10 MB


Unnamed: 0,customer_ID,S_2,P_2,D_39,B_1,B_2,R_1,S_3,D_41,B_3,D_42,D_43,D_44,B_4,D_45,B_5,R_2,D_46,D_47,D_48,D_49,B_6,B_7,B_8,D_50,D_51,B_9,R_3,D_52,P_3,B_10,D_53,S_5,B_11,S_6,D_54,R_4,S_7,B_12,S_8,D_55,D_56,B_13,R_5,D_58,S_9,B_14,D_59,D_60,D_61,B_15,S_11,D_62,D_63,D_64,D_65,B_16,B_17,B_18,B_19,D_66,B_20,D_68,S_12,R_6,S_13,B_21,D_69,B_22,D_70,D_71,D_72,S_15,B_23,D_73,P_4,D_74,D_75,D_76,B_24,R_7,D_77,B_25,B_26,D_78,D_79,R_8,R_9,S_16,D_80,R_10,R_11,B_27,D_81,D_82,S_17,R_12,B_28,R_13,D_83,R_14,R_15,D_84,R_16,B_29,B_30,S_18,D_86,D_87,R_17,R_18,D_88,B_31,S_19,R_19,B_32,S_20,R_20,R_21,B_33,D_89,R_22,R_23,D_91,D_92,D_93,D_94,R_24,R_25,D_96,S_22,S_23,S_24,S_25,S_26,D_102,D_103,D_104,D_105,D_106,D_107,B_36,B_37,R_26,R_27,B_38,D_108,D_109,D_110,D_111,B_39,D_112,B_40,S_27,D_113,D_114,D_115,D_116,D_117,D_118,D_119,D_120,D_121,D_122,D_123,D_124,D_125,D_126,D_127,D_128,D_129,B_41,B_42,D_130,D_131,D_132,D_133,R_28,D_134,D_135,D_136,D_137,D_138,D_139,D_140,D_141,D_142,D_143,D_144,D_145
0,0000099d6bd597052cdcda90ffabf56573fe9d7c79be5f...,2017-03-09,0.938469,0.001733,0.008724,1.006838,0.009228,0.124035,0.008771,0.004709,,,0.00063,0.080986,0.708906,0.1706,0.006204,0.358587,0.525351,0.255736,,0.063902,0.059416,0.006466,0.148698,1.335856,0.008207,0.001423,0.207334,0.736463,0.096219,,0.023381,0.002768,0.008322,1.001519,0.008298,0.161345,0.148266,0.922998,0.354596,0.152025,0.118075,0.001882,0.158612,0.065728,0.018385,0.063646,0.199617,0.308233,0.016361,0.401619,0.091071,CR,O,0.007126,0.007665,,0.652984,0.00852,,0.00473,6.0,0.272008,0.008363,0.515222,0.002644,0.009013,0.004808,0.008342,0.119403,0.004802,0.108271,0.050882,,0.007554,0.080422,0.069067,,0.004327,0.007562,,0.007729,0.000272,0.001576,0.004239,0.001434,,0.002271,0.004061,0.007121,0.002456,0.00231,0.003532,0.506612,0.008033,1.009825,0.084683,0.00382,0.007043,0.000438,0.006452,0.00083,0.005055,,0.0,0.00572,0.007084,,0.000198,0.008907,,1,0.002537,0.005177,0.006626,0.009705,0.007782,0.00245,1.001101,0.002665,0.007479,0.006893,1.503673,1.006133,0.003569,0.008871,0.00395,0.003647,0.00495,0.89409,0.135561,0.911191,0.974539,0.001243,0.766688,1.008691,1.004587,0.893734,,0.670041,0.009968,0.004572,,1.008949,2.0,,0.004326,,,,1.007336,0.21006,0.676922,0.007871,1.0,0.23825,0.0,4.0,0.23212,0.236266,0.0,0.70228,0.434345,0.003057,0.686516,0.00874,1.0,1.003319,1.007819,1.00008,0.006805,,0.002052,0.005972,,0.004345,0.001535,,,,,,0.002427,0.003706,0.003818,,0.000569,0.00061,0.002674


0000099d6bd597052cdcda90ffabf56573fe9d7c79be5fbac11a8ed792feb62a
Memory usage         :  298.70 MB


todo:
* Change handling to pyarrow arrays
* Change processing actions from pandas to pyarrow
