# dfply_streaming

# Main motivation

* This module will provide `dfply` functionality for a csv file read in with the `DictReader`

In [2]:
from csv import DictReader, Sniffer
with open('auto_sales.csv') as csvfile:
    dialect = Sniffer().sniff(csvfile.read(1024))
    csvfile.seek(0)
    reader = DictReader(csvfile, dialect=dialect)
    print(dir(reader.reader))
    columns = reader.fieldnames
    l = list(reader)
l, columns

['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__iter__', '__le__', '__lt__', '__ne__', '__new__', '__next__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'dialect', 'line_num']


([OrderedDict([('Salesperson', 'Ann'),
               ('Compact', '22'),
               ('Sedan', '18'),
               ('SUV', '15'),
               ('Truck', '12')]),
  OrderedDict([('Salesperson', 'Bob'),
               ('Compact', '19'),
               ('Sedan', '12'),
               ('SUV', '17'),
               ('Truck', '20')]),
  OrderedDict([('Salesperson', 'Doug'),
               ('Compact', '20'),
               ('Sedan', '13'),
               ('SUV', ''),
               ('Truck', '20')]),
  OrderedDict([('Salesperson', 'Yolanda'),
               ('Compact', '19'),
               ('Sedan', '8'),
               ('SUV', '32'),
               ('Truck', '15')]),
  OrderedDict([('Salesperson', 'Xerxes'),
               ('Compact', '12'),
               ('Sedan', '23'),
               ('SUV', '18'),
               ('Truck', '9')])],
 ['Salesperson', 'Compact', 'Sedan', 'SUV', 'Truck'])

## Example setup

In [3]:
from dfply_streaming import merge

columns = ['Salesperson', 'Compact', 'Sedan', 'SUV', 'Truck']
my_col_types = {'Salesperson':str, 
             'SUV':int, 
             'Truck':int}
my_col_types2 = merge(my_col_types, {'Sedan':int, 'Compact':int})

In [4]:
columns, my_col_types, my_col_types2

(['Salesperson', 'Compact', 'Sedan', 'SUV', 'Truck'],
 {'Salesperson': str, 'SUV': int, 'Truck': int},
 {'Salesperson': str, 'SUV': int, 'Truck': int, 'Sedan': int, 'Compact': int})

## The `Row` class is a row wrapper

* Provides fields for each entry
* Holds the column and col_type information
* Converts to types and handles missing data (replacing with None)

In [11]:
#Compact is Int but below in output its shown as string
from dfply_streaming import make_row_class
Row = make_row_class(columns, my_col_types)
base_row = Row(**{'columns':columns, 'col_types':my_col_types})
rs = [base_row.set(**r) for r in l]
next(iter(rs)) #Not iterable?

Row(Salesperson = 'Ann', Compact = '22', Sedan = '18', SUV = 15, Truck = 12)

In [19]:
r1 = rs[0]
r1

Row(Salesperson = 'Ann', Compact = '22', Sedan = '18', SUV = 15, Truck = 12)

#### Rows contain the type information

In [20]:
r1.col_types

{'Salesperson': str, 'SUV': int, 'Truck': int}

#### Rows contain an instance of `Column` class

In [16]:
r1.columns

Column5(all_columns=StrPVector(['Salesperson', 'Compact', 'Sedan', 'SUV', 'Truck']), Compact=Columns(all_columns=StrPVector(['Salesperson', 'Compact', 'Sedan', 'SUV', 'Truck']), current_set=StrPSet(['Compact'])), Truck=Columns(all_columns=StrPVector(['Salesperson', 'Compact', 'Sedan', 'SUV', 'Truck']), current_set=StrPSet(['Truck'])), Sedan=Columns(all_columns=StrPVector(['Salesperson', 'Compact', 'Sedan', 'SUV', 'Truck']), current_set=StrPSet(['Sedan'])), current_set=StrPSet(['Compact', 'Salesperson', 'Truck', 'Sedan', 'SUV']), Salesperson=Columns(all_columns=StrPVector(['Salesperson', 'Compact', 'Sedan', 'SUV', 'Truck']), current_set=StrPSet(['Salesperson'])), SUV=Columns(all_columns=StrPVector(['Salesperson', 'Compact', 'Sedan', 'SUV', 'Truck']), current_set=StrPSet(['SUV'])))

#### Access data with fields

In [17]:
r1.Salesperson

'Ann'

In [12]:
[r.Truck for r in rs]

[12, 20, 20, 15, 9]

#### Missing data is automatically converted to `None`

In [53]:
rs[2]

Row(Salesperson = 'Doug', Compact = '20', Sedan = '13', SUV = None, Truck = 20)

## Example 2 - Convert all sales to `int`

In [27]:
base_row2 = Row(**{'columns':columns, 'col_types':my_col_types2})
rs = [base_row2.set(**r) for r in l]
rs

[Row(Salesperson = 'Ann', Compact = '22', Sedan = '18', SUV = 15, Truck = 12),
 Row(Salesperson = 'Bob', Compact = '19', Sedan = '12', SUV = 17, Truck = 20),
 Row(Salesperson = 'Doug', Compact = '20', Sedan = '13', SUV = None, Truck = 20),
 Row(Salesperson = 'Yolanda', Compact = '19', Sedan = '8', SUV = 32, Truck = 15),
 Row(Salesperson = 'Xerxes', Compact = '12', Sedan = '23', SUV = 18, Truck = 9)]

In [26]:
my_col_types2

{'Salesperson': str, 'SUV': int, 'Truck': int, 'Sedan': int, 'Compact': int}

In [24]:
r1 = rs[0]
r1

Row(Salesperson = 'Ann', Compact = '22', Sedan = '18', SUV = 15, Truck = 12)

In [51]:
r1.col_types

{'Salesperson': str, 'SUV': int, 'Truck': int, 'Sedan': int, 'Compact': int}

## Using `convert_to_row` on a `DictReader`

The easiest way to convert to `Rows`

In [14]:
from toolz import pipe as pipez
from dfply_streaming import convert_to_rows

pipez(l,
     convert_to_rows(col_types=my_col_types2),
    list)

[Row(Salesperson = 'Ann', Compact = 22, Sedan = 18, SUV = 15, Truck = 12),
 Row(Salesperson = 'Bob', Compact = 19, Sedan = 12, SUV = 17, Truck = 20),
 Row(Salesperson = 'Doug', Compact = 20, Sedan = 13, SUV = None, Truck = 20),
 Row(Salesperson = 'Yolanda', Compact = 19, Sedan = 8, SUV = 32, Truck = 15),
 Row(Salesperson = 'Xerxes', Compact = 12, Sedan = 23, SUV = 18, Truck = 9)]

## Using `maybe_apply` to deal with missing values

The `maybe_apply` decorator allows a function to automatically deal with missing values.

In [15]:
from dfply_streaming import maybe_apply
from toolz.curried import map

doubler = maybe_apply(lambda x: 2*x)
pipez([2,3, None, 4],
     map(doubler),
     list)

[4, 6, None, 8]

**Note:** `mutate` automatically decorates all Intentions and row functions with `maybe_apply`

## Processing a table with `select`, `mutate` and `filter_by`

Things to note:

1. Notice that computations involving `None` return `None`
2. New columns are added on the right.
3. `filter_by` removes missing values

In [16]:
from dfply_streaming import select, mutate, filter_by

pipez(l,
     convert_to_rows(col_types=my_col_types2),
     select(['Salesperson', 'Truck', 'SUV']),
     mutate(Utility = lambda r: r.Truck + r.SUV,
            Salesperson = lambda r: r.Salesperson.lower()),
     filter_by(lambda r: r.Truck > 10),
     list)

[Row(Salesperson = 'ann', SUV = 15, Truck = 12, Utility = 27),
 Row(Salesperson = 'bob', SUV = 17, Truck = 20, Utility = 37),
 Row(Salesperson = 'doug', SUV = None, Truck = 20, Utility = None),
 Row(Salesperson = 'yolanda', SUV = 32, Truck = 15, Utility = 47)]

## Using the `X` intension instead of `lambda` functions

In [17]:
from dfply_streaming import X

pipez(l,
     convert_to_rows(col_types=my_col_types2),
     select(X.Salesperson + X.Truck + X.SUV),
     mutate(Utility = X.Truck + X.SUV,
            Salesperson = X.Salesperson.lower()),
     filter_by(X.Truck > 10),
     list)

[Row(Salesperson = 'ann', SUV = 15, Truck = 12, Utility = 27),
 Row(Salesperson = 'bob', SUV = 17, Truck = 20, Utility = 37),
 Row(Salesperson = 'doug', SUV = None, Truck = 20, Utility = None),
 Row(Salesperson = 'yolanda', SUV = 32, Truck = 15, Utility = 47)]

## Set operators for column selection

Wrapping a column indicator (list of stings, intention, function) in `wrap_col_intection` 
allow easy set operations.  You can use

* `+` to build up a set
* `~` to compute the set complement
* `-`, `&` and `|` for set difference, intersection, and union

In [18]:
from dfply_streaming import wrap_col_intention

In [19]:
wrap_col_intention(r1.columns)(r1)

StrPSet(['Sedan', 'Truck', 'Salesperson', 'Compact', 'SUV'])

In [20]:
wrap_col_intention(['Truck', 'SUV'])(r1)

pset(['Truck', 'SUV'])

In [21]:
wrap_col_intention(X.Truck + X.SUV)(r1)

StrPSet(['Truck', 'SUV'])

In [27]:
wrap_col_intention(~(X.Truck + X.SUV))(r1)

StrPSet(['Salesperson', 'Compact', 'Sedan'])

## `select` automatically wraps the `columns` argument

You can pass in

* list of columns names (strings)
* Intension expression
* Selection function

Note that the order is given by the list

In [28]:
list(select(["Truck", "SUV"], rs))

[Row(Truck = 12, SUV = 15),
 Row(Truck = 20, SUV = 17),
 Row(Truck = 20, SUV = None),
 Row(Truck = 15, SUV = 32),
 Row(Truck = 9, SUV = 18)]

In [30]:
pipez(rs,
     select(["SUV", "Truck"]),
     list)

[Row(Truck = 12, SUV = 15),
 Row(Truck = 20, SUV = 17),
 Row(Truck = 20, SUV = None),
 Row(Truck = 15, SUV = 32),
 Row(Truck = 9, SUV = 18)]

## <font color='red'> TODO: Need to fix the order for adding intentions </font>

In [31]:
pipez(rs,
     select(X.Truck + X.SUV),
     list)

[Row(Truck = 12, SUV = 15),
 Row(Truck = 20, SUV = 17),
 Row(Truck = 20, SUV = None),
 Row(Truck = 15, SUV = 32),
 Row(Truck = 9, SUV = 18)]

In [34]:
pipez(rs,
     select(X.SUV + X.Truck),
     list)

[Row(Truck = 12, SUV = 15),
 Row(Truck = 20, SUV = 17),
 Row(Truck = 20, SUV = None),
 Row(Truck = 15, SUV = 32),
 Row(Truck = 9, SUV = 18)]

## Use inversion operator `~` to get the complement

In [35]:
pipez(rs,
     select(~X.Truck),
     list)

[Row(Salesperson = 'Ann', Compact = '22', SUV = 15, Sedan = '18'),
 Row(Salesperson = 'Bob', Compact = '19', SUV = 17, Sedan = '12'),
 Row(Salesperson = 'Doug', Compact = '20', SUV = None, Sedan = '13'),
 Row(Salesperson = 'Yolanda', Compact = '19', SUV = 32, Sedan = '8'),
 Row(Salesperson = 'Xerxes', Compact = '12', SUV = 18, Sedan = '23')]

In [36]:
pipez(rs,
     select(~(X.Truck + X.SUV)),
     list)

[Row(Salesperson = 'Ann', Compact = '22', Sedan = '18'),
 Row(Salesperson = 'Bob', Compact = '19', Sedan = '12'),
 Row(Salesperson = 'Doug', Compact = '20', Sedan = '13'),
 Row(Salesperson = 'Yolanda', Compact = '19', Sedan = '8'),
 Row(Salesperson = 'Xerxes', Compact = '12', Sedan = '23')]

## <font color='red'> TODO: Need to clean up this error message</font>

In [38]:
pipez(rs,
     select(~(X.Truck + X.SUV)),
     mutate(Utility = X.Truck + X.SUV),
     list)

AttributeError: Row21 has no attribute 'Truck'

## Using `and_`, `or_` and `not_` with `filter_by`

Currently, intensions don't work with `and`, `or`, or `not`.  Instead import and use `and_`, `or_`, and `not_`

In [39]:
from dfply_streaming import and_, or_, not_
pipez(rs,
     filter_by(not_(X.SUV > 15)),
      list)

[Row(Salesperson = 'Ann', Compact = '22', Sedan = '18', SUV = 15, Truck = 12)]

In [40]:
pipez(rs,
     filter_by(and_(X.SUV > 15, X.Truck > 15)),
      list)

[Row(Salesperson = 'Bob', Compact = '19', Sedan = '12', SUV = 17, Truck = 20)]

In [41]:
pipez(rs,
     filter_by(or_(X.SUV > 15, X.Truck > 15)),
      list)

[Row(Salesperson = 'Bob', Compact = '19', Sedan = '12', SUV = 17, Truck = 20),
 Row(Salesperson = 'Yolanda', Compact = '19', Sedan = '8', SUV = 32, Truck = 15),
 Row(Salesperson = 'Xerxes', Compact = '12', Sedan = '23', SUV = 18, Truck = 9)]

## <font color="red"> TODO: Need to try to fix the and an or operators </font>

In [66]:
pipez(rs,
     filter_by(X.SUV > 15 and X.Truck > 15),
      list)

TypeError: __index__ returned non-int (type Intention)