# Data Engineering in Python with databolt  - Quickly Load Any Type of CSV (d6tlib/d6tstack)

Vendors often send large datasets in multiple files. Often there are missing and misaligned columns between files that have to be manually cleaned. With DataBolt File Stack you can easily stack them together into one consistent dataset.

Features include:
* Quickly check column consistency across multiple files
* Fix added/missing columns
* Fix renamed columns
* Out of core functionality to process large files
* Export to pandas, CSV, SQL, parquet
    * Fast export to postgres and mysql with out of core support
    
In this workbook we will demonstrate the usage of the d6tstack library.

In [100]:
import importlib
import pandas as pd
import glob

import d6tstack.combine_csv as d6tc
import d6tstack

## Get sample data

We've created some dummy sample data which you can download. 

In [78]:
import urllib.request
cfg_fname_sample = 'test-data.zip'
urllib.request.urlretrieve("https://github.com/d6t/d6tstack/raw/master/"+cfg_fname_sample, cfg_fname_sample)
import zipfile
zip_ref = zipfile.ZipFile(cfg_fname_sample, 'r')
zip_ref.extractall('.')
zip_ref.close()

## Use Case: Checking Column Consistency

Let's say you receive a bunch of csv files you want to ingest them, say for example into pandas, dask, pyspark, database.

In [79]:
cfg_fnames = list(glob.glob('test-data/input/test-data-input-csv-clean-*.csv'))
print(cfg_fnames)

['test-data/input/test-data-input-csv-clean-mar.csv', 'test-data/input/test-data-input-csv-clean-feb.csv', 'test-data/input/test-data-input-csv-clean-jan.csv']


### Check column consistency across all files

Even if you think the files have a consistent column layout, it worthwhile using `d6tstack` to assert that that is actually the case. It's very quick to do even with very many large files!

In [80]:
# get previews
c = d6tc.CombinerCSV(cfg_fnames) # all_strings=True makes reading faster
col_sniff = c.sniff_columns()

sniffing columns ok


In [81]:
print('all columns equal?', c.is_all_equal())
print('')
print('which columns are present in which files?')
print('')
print(c.is_column_present())
print('')
print('in what order do columns appear in the files?')
print('')
print(col_sniff['df_columns_order'].reset_index(drop=True))

all columns equal? True

which columns are present in which files?

                                                   date  sales  cost  profit
file_path                                                                   
test-data/input/test-data-input-csv-clean-feb.csv  True   True  True    True
test-data/input/test-data-input-csv-clean-jan.csv  True   True  True    True
test-data/input/test-data-input-csv-clean-mar.csv  True   True  True    True

in what order do columns appear in the files?

   date  sales  cost  profit
0     0      1     2       3
1     0      1     2       3
2     0      1     2       3


### Preview Combined Data

You can see a preview of what the combined data from all files will look like.

In [82]:
c.combine_preview()

Unnamed: 0,date,sales,cost,profit,filepath,filename
0,2011-02-01,200,-90,110,test-data/input/test-data-input-csv-clean-feb.csv,test-data-input-csv-clean-feb.csv
1,2011-02-02,200,-90,110,test-data/input/test-data-input-csv-clean-feb.csv,test-data-input-csv-clean-feb.csv
2,2011-02-03,200,-90,110,test-data/input/test-data-input-csv-clean-feb.csv,test-data-input-csv-clean-feb.csv
3,2011-01-01,100,-80,20,test-data/input/test-data-input-csv-clean-jan.csv,test-data-input-csv-clean-jan.csv
4,2011-01-02,100,-80,20,test-data/input/test-data-input-csv-clean-jan.csv,test-data-input-csv-clean-jan.csv
5,2011-01-03,100,-80,20,test-data/input/test-data-input-csv-clean-jan.csv,test-data-input-csv-clean-jan.csv
6,2011-03-01,300,-100,200,test-data/input/test-data-input-csv-clean-mar.csv,test-data-input-csv-clean-mar.csv
7,2011-03-02,300,-100,200,test-data/input/test-data-input-csv-clean-mar.csv,test-data-input-csv-clean-mar.csv
8,2011-03-03,300,-100,200,test-data/input/test-data-input-csv-clean-mar.csv,test-data-input-csv-clean-mar.csv


### Read All Files to Pandas

You can quickly load the combined data into a pandas dataframe with a single command. 

In [83]:
c.to_pandas().head()

Unnamed: 0,date,sales,cost,profit,filepath,filename
0,2011-02-01,200,-90,110,test-data/input/test-data-input-csv-clean-feb.csv,test-data-input-csv-clean-feb.csv
1,2011-02-02,200,-90,110,test-data/input/test-data-input-csv-clean-feb.csv,test-data-input-csv-clean-feb.csv
2,2011-02-03,200,-90,110,test-data/input/test-data-input-csv-clean-feb.csv,test-data-input-csv-clean-feb.csv
3,2011-02-04,200,-90,110,test-data/input/test-data-input-csv-clean-feb.csv,test-data-input-csv-clean-feb.csv
4,2011-02-05,200,-90,110,test-data/input/test-data-input-csv-clean-feb.csv,test-data-input-csv-clean-feb.csv


## Use Case: Identifying and fixing inconsistent columns

The first case was clean: all files had the same columns. It happens very frequently that the data schema changes over time with columns being added or deleted over time. Let's look at a case where an extra columns got added.

In [84]:
cfg_fnames = list(glob.glob('test-data/input/test-data-input-csv-colmismatch-*.csv'))
print(cfg_fnames)

['test-data/input/test-data-input-csv-colmismatch-mar.csv', 'test-data/input/test-data-input-csv-colmismatch-feb.csv', 'test-data/input/test-data-input-csv-colmismatch-jan.csv']


In [85]:
# get previews
c = d6tc.CombinerCSV(cfg_fnames) # all_strings=True makes reading faster
col_sniff = c.sniff_columns()

sniffing columns ok


In [86]:
print('all columns equal?', c.is_all_equal())
print('')
print('which columns are unique?', col_sniff['columns_unique'])
print('')
print('which files have unique columns?')
print('')
print(c.is_column_present_unique())

all columns equal? False

which columns are unique? ['profit2']

which files have unique columns?

                                                    profit2
file_path                                                  
test-data/input/test-data-input-csv-colmismatch...    False
test-data/input/test-data-input-csv-colmismatch...    False
test-data/input/test-data-input-csv-colmismatch...     True


In [87]:
c.to_pandas().head() # keep all columns

Unnamed: 0,date,sales,cost,profit,profit2,filepath,filename
0,2011-02-01,200,-90,110,,test-data/input/test-data-input-csv-colmismatc...,test-data-input-csv-colmismatch-feb.csv
1,2011-02-02,200,-90,110,,test-data/input/test-data-input-csv-colmismatc...,test-data-input-csv-colmismatch-feb.csv
2,2011-02-03,200,-90,110,,test-data/input/test-data-input-csv-colmismatc...,test-data-input-csv-colmismatch-feb.csv
3,2011-02-04,200,-90,110,,test-data/input/test-data-input-csv-colmismatc...,test-data-input-csv-colmismatch-feb.csv
4,2011-02-05,200,-90,110,,test-data/input/test-data-input-csv-colmismatc...,test-data-input-csv-colmismatch-feb.csv


In [88]:
d6tc.CombinerCSV(cfg_fnames, columns_select_common=True).to_pandas().head()

sniffing columns ok


Unnamed: 0,date,sales,cost,profit,filepath,filename
0,2011-02-01,200,-90,110,test-data/input/test-data-input-csv-colmismatc...,test-data-input-csv-colmismatch-feb.csv
1,2011-02-02,200,-90,110,test-data/input/test-data-input-csv-colmismatc...,test-data-input-csv-colmismatch-feb.csv
2,2011-02-03,200,-90,110,test-data/input/test-data-input-csv-colmismatc...,test-data-input-csv-colmismatch-feb.csv
3,2011-02-04,200,-90,110,test-data/input/test-data-input-csv-colmismatc...,test-data-input-csv-colmismatch-feb.csv
4,2011-02-05,200,-90,110,test-data/input/test-data-input-csv-colmismatc...,test-data-input-csv-colmismatch-feb.csv


# Use Case: align renamed columns. Select subset of columns

Say a column has been renamed and now the data doesn't line up with the data from the old column name. You can easily fix such a situation by using `CombinerCSVAdvanced` which allows you to rename columns and automatically lines up the data. It also allows you to just load data from a subset of columns.

In [89]:
cfg_fnames = list(glob.glob('test-data/input/test-data-input-csv-renamed-*.csv'))
c = d6tc.CombinerCSV(cfg_fnames)
print(c.is_column_present_unique())

sniffing columns ok
                                                    revenue  sales
file_path                                                         
test-data/input/test-data-input-csv-renamed-feb...    False   True
test-data/input/test-data-input-csv-renamed-jan...    False   True
test-data/input/test-data-input-csv-renamed-mar...     True  False


The column `sales` got renamed to `revenue` in the March file, this would causes problems when reading the files. 

In [90]:
col_sniff = c.sniff_columns()
c.combine_preview()[['filename']+col_sniff['columns_unique']]

sniffing columns ok


Unnamed: 0,filename,revenue,sales
0,test-data-input-csv-renamed-feb.csv,,200.0
1,test-data-input-csv-renamed-feb.csv,,200.0
2,test-data-input-csv-renamed-feb.csv,,200.0
3,test-data-input-csv-renamed-jan.csv,,100.0
4,test-data-input-csv-renamed-jan.csv,,100.0
5,test-data-input-csv-renamed-jan.csv,,100.0
6,test-data-input-csv-renamed-mar.csv,300.0,
7,test-data-input-csv-renamed-mar.csv,300.0,
8,test-data-input-csv-renamed-mar.csv,300.0,


You can pass the columns you want to rename to `columns_rename` and it will rename and align those columns.

In [91]:
# only select particular columns
cfg_col_sel = ['date','sales','cost','profit'] # don't select profit2
# rename colums
cfg_col_rename = {'sales':'revenue'} # rename all instances of sales to revenue

In [92]:
c = d6tc.CombinerCSV(cfg_fnames, columns_rename = cfg_col_rename, columns_select = cfg_col_sel) 
c.combine_preview() 


sniffing columns ok


Unnamed: 0,date,revenue,cost,profit,filepath,filename
0,2011-02-01,200,-90,110,test-data/input/test-data-input-csv-renamed-fe...,test-data-input-csv-renamed-feb.csv
1,2011-02-02,200,-90,110,test-data/input/test-data-input-csv-renamed-fe...,test-data-input-csv-renamed-feb.csv
2,2011-02-03,200,-90,110,test-data/input/test-data-input-csv-renamed-fe...,test-data-input-csv-renamed-feb.csv
3,2011-01-01,100,-80,20,test-data/input/test-data-input-csv-renamed-ja...,test-data-input-csv-renamed-jan.csv
4,2011-01-02,100,-80,20,test-data/input/test-data-input-csv-renamed-ja...,test-data-input-csv-renamed-jan.csv
5,2011-01-03,100,-80,20,test-data/input/test-data-input-csv-renamed-ja...,test-data-input-csv-renamed-jan.csv
6,2011-03-01,300,-100,200,test-data/input/test-data-input-csv-renamed-ma...,test-data-input-csv-renamed-mar.csv
7,2011-03-02,300,-100,200,test-data/input/test-data-input-csv-renamed-ma...,test-data-input-csv-renamed-mar.csv
8,2011-03-03,300,-100,200,test-data/input/test-data-input-csv-renamed-ma...,test-data-input-csv-renamed-mar.csv


## Case: Identify change in column order

If you read your files into a database this will be a real problem because it look like the files are all the same whereas in fact they have changes. This is because programs like dask or sql loaders assume the column order is the same. With `d6tstack` you can easily identify and fix such a case.

In [93]:
cfg_fnames = list(glob.glob('test-data/input/test-data-input-csv-reorder-*.csv'))
print(cfg_fnames)

['test-data/input/test-data-input-csv-reorder-jan.csv', 'test-data/input/test-data-input-csv-reorder-mar.csv', 'test-data/input/test-data-input-csv-reorder-feb.csv']


In [94]:
# get previews
c = d6tc.CombinerCSV(cfg_fnames) # all_strings=True makes reading faster
col_sniff = c.sniff_columns()

sniffing columns ok


Here we can see that all columns are not equal

In [95]:
print('all columns equal?', col_sniff['is_all_equal'])
print('')
print('in what order do columns appear in the files?')
print('')
print(col_sniff['df_columns_order'].reset_index(drop=True))

all columns equal? False

in what order do columns appear in the files?

   date  sales  cost  profit
0     0      1     2       3
1     0      1     2       3
2     0      1     3       2


In [96]:
c.combine_preview() # automatically puts it in the right order

Unnamed: 0,date,sales,cost,profit,filepath,filename
0,2011-02-01,200,-90,110,test-data/input/test-data-input-csv-reorder-fe...,test-data-input-csv-reorder-feb.csv
1,2011-02-02,200,-90,110,test-data/input/test-data-input-csv-reorder-fe...,test-data-input-csv-reorder-feb.csv
2,2011-02-03,200,-90,110,test-data/input/test-data-input-csv-reorder-fe...,test-data-input-csv-reorder-feb.csv
3,2011-01-01,100,-80,20,test-data/input/test-data-input-csv-reorder-ja...,test-data-input-csv-reorder-jan.csv
4,2011-01-02,100,-80,20,test-data/input/test-data-input-csv-reorder-ja...,test-data-input-csv-reorder-jan.csv
5,2011-01-03,100,-80,20,test-data/input/test-data-input-csv-reorder-ja...,test-data-input-csv-reorder-jan.csv
6,2011-03-01,300,-100,200,test-data/input/test-data-input-csv-reorder-ma...,test-data-input-csv-reorder-mar.csv
7,2011-03-02,300,-100,200,test-data/input/test-data-input-csv-reorder-ma...,test-data-input-csv-reorder-mar.csv
8,2011-03-03,300,-100,200,test-data/input/test-data-input-csv-reorder-ma...,test-data-input-csv-reorder-mar.csv


# Customize separator and pass pd.read_csv() params

You can pass additional parameters such as separators and any params for `pd.read_csv()` to the combiner.

In [97]:
c = d6tc.CombinerCSV(cfg_fnames, sep=',', read_csv_params={'header': None})
col_sniff = c.sniff_columns()
print(col_sniff)

sniffing columns ok
{'files_columns': {'test-data/input/test-data-input-csv-reorder-feb.csv': ['date', 'sales', 'cost', 'profit'], 'test-data/input/test-data-input-csv-reorder-jan.csv': ['date', 'sales', 'cost', 'profit'], 'test-data/input/test-data-input-csv-reorder-mar.csv': ['date', 'sales', 'profit', 'cost']}, 'columns_all': ['date', 'sales', 'cost', 'profit'], 'columns_common': ['date', 'sales', 'cost', 'profit'], 'columns_unique': [], 'is_all_equal': False, 'df_columns_present':                                                     date  sales  cost  profit
file_path                                                                    
test-data/input/test-data-input-csv-reorder-feb...  True   True  True    True
test-data/input/test-data-input-csv-reorder-jan...  True   True  True    True
test-data/input/test-data-input-csv-reorder-mar...  True   True  True    True, 'df_columns_order':                                                     date  sales  cost  profit
test-data/input/test-

# CSV out of core functionality

If your files are large you don't want to read them all in memory and then save. Instead you can write directly to the output file.

In [98]:
c.to_csv_combine('test-data/output/test.csv')

'test-data/output/test.csv'

# Auto Detect pd.read_csv() settings

In [103]:
### Detect CSV settings across a single file

In [104]:
cfg_sniff = d6tstack.sniffer.sniff_settings_csv([cfg_fnames[0]])
print(cfg_sniff)


{'delim': ',', 'skiprows': 0, 'has_header': True, 'header': 0}


### Detect CSV settings across multiple files

In [105]:
# finds common csv across all files
cfg_sniff = d6tstack.sniffer.sniff_settings_csv(cfg_fnames)
print(cfg_sniff)


{'delim': ',', 'skiprows': 0, 'has_header': True, 'header': 0}
