# Dask DataFrame Experiment

## Part 2

Use Pandas only to convert a pandas dataframe into a text file in 'fixed position' format, with custom encoding.

## Imports

In [9]:
import os
import time
import pickle
import pandas as pd

from collections import OrderedDict

# %config IPCompleter.greedy=True

## Custom data encoding helper

In [10]:
class EBCDIC:
    
    def __init__(self, logger, cp='cp037'):
        self.logger = logger
        self.cp = cp
        self.cp_blank = ' '.encode(cp)
    
    def _log_error_msg1(msg: str):
        self.logger(f"ERROR: Function ::{msg}:: Returned error.")
        
    def _log_error_msg2(msg: tuple):
        self.logger(f"ERROR: Function ::{msg[0]}:: {msg[1]}.")
    
    def to_char(self, string, size) -> bytes:
        try:
            s = string.encode(self.cp)[:size]
            if len(s) < size:
                s += self.cp_blank * (size - len(s))
            return s
        except:
            self._log_error_msg1('to_char()')

    def to_pib(self, integer: int, size: int) -> bytes:
        if size not in [1, 2, 4, 8]:
            self._log_error_msg2(('to_pib()', "'size' must be one of: 1, 2, 4, or 8."))
            return None
        
        try:
            h = hex(integer)[2:]
            if len(h) % 2 == 1:
                h = '0' + h
                
            s = int(len(h) / 2)
            if s > size:
                self._log_error_msg2(('to_pib()', "'size' not large enough for the 'integer'."))
                return None
            elif s < size:
                h = '0' * ((size - s) * 2) + h
            
            return bytes.fromhex(h)
        except:
            self._log_error_msg1('to_pib()')
            return None
        
    # Truncated for brevity
    
E = EBCDIC(print)

## Functions

In [11]:
# This will speed up access to the data during multiple calls
records_d = OrderedDict()

# Convert a single 'cell' value with the given conversion type
def convert_to_ebcdic(value, size, conv, pre):
    
    pre_b = b''
    
    if pre != '':
        pre_b = E.to_char(pre, len(pre))

    if conv == 'CHAR':
        if type(value) != str:
            value = str(value)
        return pre_b + E.to_char(value, size)
    
    # Truncated 
    
# Map an entire record (row) to a bytes array
def map_rec_to_bytes(rec: pd.Series):
    
    # Get the record layout dict for this record
    record_d = records_d[rec['rec_lbl']]
    
    # Start the binary record
    b_rec = b''
    
    # Mark the current position in the fixed position record
    pos = 1
    
    # Loop through all fields in the layout
    for field in record_d.keys():
        start = record_d[field]['start']
        length = record_d[field]['length']
        conversion = record_d[field]['conversion']
        
        if pos < start:
            prepend = ' ' * (start-pos)
        else:
            prepend = ''
        
        pos = start + length
        
        b_rec += convert_to_ebcdic(rec[field], length, conversion, prepend)
    
    # Add 4 positions for the IBM RDW - minus 1 because pos tracks the position of the next value
    rec_len = pos + 4 - 1
    
    # The IBM RDW is made up of 2 parts
    ibm_rdw = E.to_pib(rec_len, 2) + E.to_pib(0, 2)
    
    return ibm_rdw + b_rec

# Map an entire pandas dataframe with the function
def map_pandas_df(df: pd.DataFrame) -> pd.DataFrame:
    s = df.apply(
        map_rec_to_bytes,
        axis=1
    )
    
    max_length = s.str.len().max()
    
    return pd.DataFrame(
        [[
            b''.join(s.tolist()), max_length
        ]],
        columns=['Binary Data', 'Rec Length']
    )

## Convert the data using Pandas

In [12]:
# Data location
DATA = 'data'

def convert_data(count=5000):
    file_path_data = os.path.join(os.getcwd(), DATA, str(count)+'_data.pickle')
    file_path_layout = os.path.join(os.getcwd(), DATA, str(count)+'_layout.pickle')
    file_path_records = os.path.join(os.getcwd(), DATA, str(count)+'_records.pickle')

    with open(file_path_data, 'rb') as f:
        data_df = pickle.load(f)
    with open(file_path_layout, 'rb') as f:
        layout_df = pickle.load(f)
    with open(file_path_records, 'rb') as f:
        records_df = pickle.load(f)
        
    # Convert the layout dataframe to dict
    # Iterate over each unique record 
    for record in layout_df['rec'].unique():
        fields_d = OrderedDict()
        # Iterate over each unique field 'name' in the record
        for field in layout_df[layout_df['rec']==record]['name'].unique():
            fields_d[field] = layout_df[
                (layout_df['rec'] == record) & (layout_df['name'] == field)
            ][['start', 'length', 'conversion']].to_dict('records')[0]
        records_d[record] = fields_d
    
    # Add record label to data dataframe
    records_l = records_df.values.tolist()
    
    data_df['rec_lbl'] = ''
    
    for rec in records_l:
        data_df.loc[eval(rec[1]), ['rec_lbl']] = rec[0]

    # Start measuring time
    pandas_start = time.time()
    
    # Map the data
    pandas_result = map_pandas_df(data_df)
    
    pandas_secs = time.time() - pandas_start
    print(f"Total time: {pandas_secs:.3f} seconds")
    
    return pandas_result

In [8]:
%%time
# df = convert_data(5000)
# df = convert_data(50000)
df = convert_data(500000)
# df = convert_data(5000000)

print(df.shape)
df.head()

Total time: 193.637 seconds
(1, 2)
CPU times: user 3min 16s, sys: 2.11 s, total: 3min 18s
Wall time: 3min 18s


Unnamed: 0,Binary Data,Rec Length
0,b'\x00^\x00\x00\xd7\xd9\xe2\xd5@@@@@@\xf5\xf8\...,189


### Pandas performance

Using TR1950
- for ~35,000 records --> 2.11 secs
- for ~350,000 records --> 19.4 secs
- for ~3,500,000 records --> 3 mins 18 secs
- for ~35,000,000 records --> Unknown

Notes - top shows 100% CPU but low RAM usage

Using i7-7700HQ @ 3.54-3.60 GHz

- for ~35,000 records --> 3.45 secs
- for ~350,000 records --> 35.4 secs
- for ~3,500,000 records --> 6 mins 36 secs
- for ~35,000,000 records --> 73 mins 29 secs

Notes - Task Manager CPU pane shows heavy load on one thread. Other threads show medium to low utilization. Total utilization shows as between 25-29%.