# d6tstack with pyspark

Pyspark is a great library for out-of-core computing. But if input files are not properly organized it quickly breaks. For example:

1) if columns are different between files: [unlike dask](https://github.com/d6t/d6tstack/blob/master/examples-dask.ipynb) pyspark actually handles that

2) if column order is rearranged between files it will read data, but into the wrong columns and you won't notice it

3) if columns are named between files, you'll have to manually fix the situation

Pyspark can't easily handle those scenarios. With d6tstack you can easily fix the situation with just a few lines of code!

For more instructions, examples and documentation see https://github.com/d6t/d6tstack

In [1]:
import findspark
findspark.init(r'E:\progs.install\spark-2.2.0-bin-hadoop2.7')

import pyspark
sc = pyspark.SparkContext(appName="myAppName")
from pyspark.sql import SQLContext
sqlc = SQLContext(sc)

## Base Case: Columns are same between all files
As a base case, we have input files which have consistent input columns and thus can be easily read in dask.

In [2]:
sdf = sqlc.read.csv('test-data/input/test-data-input-csv-clean-*.csv', inferSchema=False, header=True)
sdf.toPandas()

Unnamed: 0,cost,date,profit,sales
0,-100,2011-03-01,200,300
1,-100,2011-03-02,200,300
2,-100,2011-03-03,200,300
3,-100,2011-03-04,200,300
4,-100,2011-03-05,200,300
5,-100,2011-03-06,200,300
6,-100,2011-03-07,200,300
7,-100,2011-03-08,200,300
8,-100,2011-03-09,200,300
9,-100,2011-03-10,200,300


## Problem Case 1: Columns are different between files
That worked well. But what happens if your input files have inconsistent columns across files? Say for example one file has a new column that the other files don't have.

[unlike dask](https://github.com/d6t/d6tstack/blob/master/examples-dask.ipynb) pyspark actually handles that. The new column got correctly added.

In [3]:
sdf = sqlc.read.csv('test-data/input/test-data-input-csv-colmismatch-*.csv', inferSchema=False, header=True)
sdf.toPandas()


Unnamed: 0,cost,date,profit,sales,profit2
0,-100,2011-03-01,200,300,400.0
1,-100,2011-03-02,200,300,400.0
2,-100,2011-03-03,200,300,400.0
3,-100,2011-03-04,200,300,400.0
4,-100,2011-03-05,200,300,400.0
5,-100,2011-03-06,200,300,400.0
6,-100,2011-03-07,200,300,400.0
7,-100,2011-03-08,200,300,400.0
8,-100,2011-03-09,200,300,400.0
9,-100,2011-03-10,200,300,400.0


## Problem Case 2: Columns are reordered between files
This is a sneaky case. The columns are the same but the order is different! Pyspark will read everything just fine without a warning but your data is totally messed up! You don't even notice it! You'll start using the data and at some point notice something weird is going on!

In the example below, the "profit" column contains data from the "cost" column!

In [4]:
sdf = sqlc.read.csv('test-data/input/test-data-input-csv-reorder-*.csv', inferSchema=False, header=True)
sdf.toPandas()

Unnamed: 0,date,sales,profit,cost
0,2011-03-01,300,200,-100
1,2011-03-02,300,200,-100
2,2011-03-03,300,200,-100
3,2011-03-04,300,200,-100
4,2011-03-05,300,200,-100
5,2011-03-06,300,200,-100
6,2011-03-07,300,200,-100
7,2011-03-08,300,200,-100
8,2011-03-09,300,200,-100
9,2011-03-10,300,200,-100


## Fixing the problem with d6stack
After a while you'll get to the root of the problem, and then you can either manually process those files or use d6tstack to easily check for such a situation and fix it with a few lines of code - no manual processing required. Let's take a look!

In [1]:
import glob
import d6tstack.combine_csv

cfg_fnames = list(glob.glob('test-data/input/test-data-input-csv-reorder-*.csv'))
c = d6tstack.combine_csv.CombinerCSV(cfg_fnames, all_strings=True)

# check columns
col_sniff = c.sniff_columns()
print('all columns equal?' , col_sniff['is_all_equal'])
print('')
print('in what order do columns appear in the files?')
print('')
col_sniff['df_columns_order'].reset_index(drop=True)

  return f(*args, **kwds)


sniffing columns ok
all columns equal? False

in what order do columns appear in the files?



Unnamed: 0,date,sales,cost,profit
0,0,1,2,3
1,0,1,2,3
2,0,1,3,2


Again, just a useful check before loading data into dask you can see that the columns don't line up. It's very fast to run because it only reads the headers, there's NO reason for you NOT to do it from a QA perspective.

Same as above, the fix is the same few lines of code with d6stack.

In [6]:
# out-of-core combining
c.to_csv_align(output_dir='test-data/output/')

True

In [7]:
sdf = sqlc.read.csv('test-data/output/d6tstack-test-data-input-csv-colmismatch-*.csv', inferSchema=False, header=True)
sdf.toPandas()


Unnamed: 0,profit,date,cost,sales,filename
0,110,2011-02-01,-90,200,test-data-input-csv-reorder-feb.csv
1,110,2011-02-02,-90,200,test-data-input-csv-reorder-feb.csv
2,110,2011-02-03,-90,200,test-data-input-csv-reorder-feb.csv
3,110,2011-02-04,-90,200,test-data-input-csv-reorder-feb.csv
4,110,2011-02-05,-90,200,test-data-input-csv-reorder-feb.csv
5,110,2011-02-06,-90,200,test-data-input-csv-reorder-feb.csv
6,110,2011-02-07,-90,200,test-data-input-csv-reorder-feb.csv
7,110,2011-02-08,-90,200,test-data-input-csv-reorder-feb.csv
8,110,2011-02-09,-90,200,test-data-input-csv-reorder-feb.csv
9,110,2011-02-10,-90,200,test-data-input-csv-reorder-feb.csv


## Problem Case 3: Columns are renamed between files
In this case a column gets renamed between files so you have two columns with partial NaNs that should really be the same column. You would have to manually inspect which columns this applies to and then manually edit them looking for NaNs.

Instead you can use d6tstack to make your input files consistent.

In [8]:
# see examples-csv.ipynb