## Data Pipelines these days

From @wrobstory: the SIMPLE pipelines:

![](https://dl.dropboxusercontent.com/u/75194/simplepipe.png)

### Why learn databases

- you shouldnt really implement one
- very hard to get right, many edge cases: eg: crash, fs outta space
- but understanding how one works is good as, data storage/munging are not just database concerns, Eg: columnar storage in Apache Parquet. And arrow, the language agnostic dataframe
- you do need to choose a storage engine that is ok for your program, and tuning a database requires deep knowledge of the correspondence between whats the output of `explain` and whats the structure of the db
- in particular architectures that work for transaction processing are not aptimal for analytics
- you will use database libraries as opposed to daemons: leveldb (Google Chrome), lmdb, bdb, sqlite (uses btrees for implementation)
- a lot of what you have learnt comes together here.

### Relational vs Document

- today both are highly used: we have *polyglot persistence*


- Mongo/Couch/etc are document oriented, store JSON documents (hierarchical structure - hard to map to rows and columns)


- these have a higher locality of data: its like a really wide row with hierarchy
- JSON document - very local - everything in one place specific to person 
- standard relational db will have tables for everthing in separate places


- normalization vs denormalization (put all data out and put it in one long big row)
- never repeat anything
- if want transactional, want to only have to update in one place


- last 5-6 years have been explosion of databases, the standard relational model wasn't working for them



### Relational Model

- a relation (table) is a collection of tuples
- SQL a declarative model: a query optimizer decides how to execute the query (if a field range covers 80% of values, should we use the index or the table?). Also parallelizable
- *shredding* splits a document into multiple tables due to normalization

In [1]:
# SQL and pandas - notions of tables
# samples in rows 
# fields in columns

### Document Model

- stores nested records
- bad for many-to-many relationships.. many customer buys many products (documents all over the place, one doc for each transaction, one doc for each person, for each product - make relationships really complicated)
- storage locality good for access, bad for writing
- couch, mongo, etc

## Components to a database

![](https://dl.dropboxusercontent.com/u/75194/dbmscomponents.png) (DBMS components from Hellerstein at al: Architecture of a Database System: circa 2007)

- client connection manager: what to do with incomings

- transactional storage
    - storage data structures
    - transactions and ACID: atomicity, consistency, isolation, durability
    - atomicity: if update one thing like transfer money, then better withdraw from one account and deposit into another
    - isolation: if multiple readers and writers access at same time, then only one can go through
    - durability: database needs to be in a sensible state
    - consistency: consistent state before and after.... not that important
- process model: coroutines, threads, processes
- query model and language: query optimization

### Whats the history of a database query

From Hellerstein et al:
>At the base of the gate agent’s query plan, one or more operators exist to request data from the database. These operators make calls to fetch data from the DBMS’ Transactional Storage Manager, which man- ages all data access (read) and manipulation (create, update, delete) calls. The storage system includes algorithms and data structures for organizing and accessing data on disk (“access methods”), including basic structures like tables and indexes. It also includes a buffer management module that decides when and what data to transfer between disk and memory buffers. Returning to our example, in the course of accessing data in the access methods, the gate agent’s query must invoke the transaction management code to ensure the well-known “ACID” properties of transactions . Before accessing data, locks are acquired from a lock manager to ensure correct execution in the face of other concurrent queries.

>At this point in the example query’s life, it has begun to access data records, and is ready to use them to compute results for the client. This is done by “unwinding the stack” of activities we described up to this point. The access methods return control to the query executor’s operators, which orchestrate the computation of result tuples from database data; as result tuples are generated, they are placed in a buffer for the client communications manager, which ships the results back to the caller. For large result sets, the client typically will make additional calls to fetch more data incrementally from the query, resulting in multiple itera- tions through the communications manager, query execu- tor, and storage manager. In our simple example, at the end of the query the transaction is completed and the connec- tion closed; this results in the transaction manager cleaning up state for the transaction, the process manager freeing any control structures for the query, and the communi- cations manager cleaning up communication state for the connection.


### Transaction Processing or Analytics?

- Also known as OLTP vs OLAP/Warehousing
- small query size vs aggregates over large ones
- random writes from user input vs ordered ETL/stream
- end user (amazon site) vs analyst (you)
- GB to TB vs TB to PB

![](https://dl.dropboxusercontent.com/u/75194/ETL.png)

(from designing data intensive applications)

### Handling the different workloads

- for smaller sizes any relational db will do
- currently vendors focus on one or the other, not both
- MS and SAP HANA support both but with different storage engines
- OLTP need to be highly available, low latency
- SQL (or any Pandasish syntax) is good for drilling down
- warehousing: star schema with very wide fact table, but typically you focus on few columns at a time
- btree indexes for oltp, bitmaps + btree for warehouse

### Column oriented storage

- store values from each column together in separate storage
- lends itself to compression with bitmap indexes and run-length encoding
- this involves choosing an appropriate sort order
- the index then can be the data (great for IN and AND queries): there is no pinters to "elsewhere"
- compressed indexes can fit into cache and are usable by iterators
- bitwise AND/OR can be done with vector processing
- several different sort orders can be redundantly stored
- writing is harder: updating a row touches many column files
- but you can write an in-memory front sorted store (row or column), and eventually merge onto the disk

### Data cubes

- Basically a histogram of counts in bins for multiple fields
- can give you fast marginals and conditionals in any combination of dimensions
- expensive to update so only used for warehousing
- such histograms are used by query optimizers as well

## Indexing and Databases

- an additional structure derived from the primary data
- however a clustered index may actually store the data
- there is overhead on writes: indexes speed up queries but slow down writes

### Simple start

- start with index for key-value data
- aka dictionary
- in memory you are done. the index IS the database
- hash tables are no good for range queries

In [2]:
# simplest in memory database
# also an INDEX 

database=dict()
database['rahul']="aged"
database['pavlos']="ancient"
database['kobe']="stillyoung"

In [3]:
database['kobe']

'stillyoung'

### Doing it on disk

- in the hashmap(dict) in memory, store a file offset instead
- this file is an append only file.
- if you update, simply append a new entry and change the offset in the hashmap
- this is what bitcask in Riak does
- break the file into segments. Each segment, once written, the kv pairs are never changed. maintain a hashmap per segment and search these in order
- run compaction to throw away dupes from segments and merge them; delete old files
- deletion is done by writing a tombstone record
- only one writer thread. 
- bitcask will store hashmap snapshots

In [4]:
# after value_sz, read however many bytes to get value
# keep appending stuff at end
# but file will grow huge over time
# keep these files time bound so that there's 1 a day 
# Then need to run a merge process for all these files

# storing value_pos as offset

# but now must also store the file where putting it in 
# this is what nosql does -- stores the file id and position of corresponding value

# this is a memory based structure 

# if did not store info to file periodically, or on every write, would get into trouble, store all this stuff in memory 
#  and something will get lost
# need to make sure you persist it 


![](https://dl.dropboxusercontent.com/u/75194/riak1.png)

![](https://dl.dropboxusercontent.com/u/75194/riak2.png)

(from riak bitcask intro at http://basho.com/wp-content/uploads/2015/05/bitcask-intro.pdf )

In [5]:
import os.path
import sys
class Database():
    
    def __init__(self, file):
        self.file = file
        self.byteorder=sys.byteorder
        if not os.path.exists(file):
            self.fd = open(file, "xb+", buffering=0)
            self.index={}
        else:
            self.fd = open(file, "r+b", buffering=0)
            with open(file+".idx") as fdi:
                items = [l.strip().split(':') for l in fdi.readlines()]
                self.index = {k:int(v) for k,v in items}
        self.readptr = self.fd.tell()
        self.fd.seek(0,2)
        self.writeptr = self.fd.tell()
        
        
    def set(self, x, v):
        if not isinstance(x, str):
            raise ValueError("Key must be a string")
        bin_x = x.encode('utf-8')
        sz_x=len(bin_x).to_bytes(1, byteorder=self.byteorder)
        if not isinstance(v, str):
            raise ValueError("Value must be a string")
        bin_v = v.encode('utf-8')
        sz_v=len(bin_v).to_bytes(1, byteorder=self.byteorder)
        try:
            self.index[x]=self.writeptr
            self.fd.seek(self.writeptr)
            print("currently", self.fd.tell())
            self.fd.write(sz_x+sz_v+bin_x+bin_v)
        except:
            del self.index[x]
        else:
            self.writeptr=self.fd.tell()
            
    def get(self, x):
        try:
            offset = self.index[x]
        except:
            raise ValueError("{} is not in index".format(x))
        bin_x = x.encode('utf-8')
        print("offset is", offset)
        self.readptr=offset
        self.fd.seek(self.readptr)
        sz_k = int.from_bytes(self.fd.read(1), byteorder=self.byteorder)
        sz_v = int.from_bytes(self.fd.read(1), byteorder=self.byteorder)
        self.fd.seek(sz_k,1)
        readit=self.fd.read(sz_v).decode('utf-8')
        print("now", self.fd.tell())
        return readit
        
    def close(self):
        fdi=open(self.file+".idx","w")
        fdi.write("\n".join([k+":"+str(v) for k,v in self.index.items()]))
        fdi.close()
        self.fd.close()
        
    def __del__(self):
        self.fd.close()

In [6]:
!rm /tmp/test.db

rm: /tmp/test.db: No such file or directory


In [7]:
# initialize database with a .db file in the tmp directory
db = Database("/tmp/test.db")

In [30]:
# view the /tmp directory
!ls /tmp

[34mKSOutOfProcessFetcher.0.ppfIhqX0vjaTSb8AJYobDV7Cu68=[m[m   [33mesets.gui.501.fifo[m[m
[34mKSOutOfProcessFetcher.501.ppfIhqX0vjaTSb8AJYobDV7Cu68=[m[m [32mesets.sock[m[m
[34mcom.apple.launchd.3Rh2hNC6ja[m[m                           [31mpcwifioccupy.filelock[m[m
[34mcom.apple.launchd.8mX9YGkA8A[m[m                           test.db
[34mcom.apple.launchd.Z1SYxwMDYP[m[m                           test.db.idx


In [8]:
# check the index attriute of db is empty dictionary
print(db.index)

{}


In [9]:
# set rahul as key and aged as value
db.set("rahul", "aged")
db.set("pavlos", "aged")
db.set("kobe", "stillyoung")

currently 0
currently 11
currently 23


In [10]:
# get the index which should ahve keys and the file locattions
print(db.index)

{'pavlos': 11, 'rahul': 0, 'kobe': 23}


In [11]:
# get pavlos
db.get("pavlos")

offset is 11
now 23


'aged'

In [12]:
# update rahul with a new word, just append it to the end (the old stuff is still there)
db.set("rahul","young")

currently 39


In [13]:
# get the updated inex
print(db.index)

{'pavlos': 11, 'rahul': 39, 'kobe': 23}


In [14]:
# our reader is sitll at 39
db.get("kobe")

offset is 23
now 39


'stillyoung'

In [15]:
# our read pointer is moving around. Once it gets the last item, it will stop at the end of the last item
db.get("rahul")

offset is 39
now 51


'young'

In [16]:
db.get("pavlos")

offset is 11
now 23


'aged'

In [17]:
db.index

{'kobe': 23, 'pavlos': 11, 'rahul': 39}

In [18]:
db.set("kobe", "retired")

currently 51


In [19]:
db.index

{'kobe': 51, 'pavlos': 11, 'rahul': 39}

In [20]:
# move around the file to get items
print(db.get("rahul"))
print(db.get("pavlos"))
print(db.get("kobe"))

offset is 39
now 51
young
offset is 11
now 23
aged
offset is 51
now 64
retired


In [21]:
# write new key-value at the last location left off
db.set("obama","president")
db.index

currently 64


{'kobe': 51, 'obama': 64, 'pavlos': 11, 'rahul': 39}

In [22]:
print(db.get("rahul"))
print(db.get("pavlos"))
print(db.get("kobe"))
print(db.get("obama"))

offset is 39
now 51
young
offset is 11
now 23
aged
offset is 51
now 64
retired
offset is 64
now 80
president


In [23]:
db.close()

In [24]:
db=Database("/tmp/test.db")
print(db.get("rahul"))
print(db.get("pavlos"))
print(db.get("kobe"))
print(db.get("obama"))

offset is 39
now 51
young
offset is 11
now 23
aged
offset is 51
now 64
retired
offset is 64
now 80
president


In [25]:
db.set("pavlos", "ancient")
db.index

currently 80


{'kobe': 51, 'obama': 64, 'pavlos': 80, 'rahul': 39}

In [26]:
print(db.get("rahul"))
print(db.get("pavlos"))
print(db.get("kobe"))
print(db.get("obama"))

offset is 39
now 51
young
offset is 80
now 95
ancient
offset is 51
now 64
retired
offset is 64
now 80
president


In [27]:
# close the database
db.close()

In [28]:
# print out what's in the idx file
!cat /tmp/test.db.idx

pavlos:80
rahul:39
kobe:51
obama:64

In [29]:
# TODO strings?
# get strings of what's int the db file
!strings /tmp/test.db

rahulaged
pavlosaged
kobestillyoung
rahulyoung
koberetired
obamapresident
pavlosancient


In [None]:
# range search
# use a tree
# or use a sorted array if can figure out how to sort an array in a reasonable fashion

# Store file every 10,000 transactions
# Need to store - hashtables don't sort 
# Use a tree 
# in memory, store into a tree as they come in, then get an automatically sorted structure in n log n
# then write in this structure every so often

# now you lost temporal access of things
# sorted in time order, so now you've lost things???
# Lost something that I had earlier, which was this notion of after storing file offsets as to where things were, 
# I knew which was the latest one and go one by one and find 
# now each file going to be sorted a-z. So if need to find a thing, might get at latest one 
# need to do something in-order traversal


# time order files in chunk, but they are sorted
# Ok for a segment, create a segment instead... not a file id


# Use a bloom filter 
# use a memory structure in front to check if it's in there or not

## Getting more sophisticated

### SSTables and LSM trees

- keep the segments from last time
- now add the requirement that these are sorted by key
- merging segments is like mergesort; track recentness 
- now all keys need not be in memory, only some

![](https://dl.dropboxusercontent.com/u/75194/sstable.png)

(from designing data intensive applications)

- how to maintain the sort?
- well maintain it in memory using a balanced binary tree or a memtable
- once in-mem struct exceeds a certain size, flush to disk
- for crashes, keep a log per memtable, to be discarded when memtable is written to a sstable. Each write is immediately appended, this is like a WAL.
- again do lookups most recent first, and do compaction and merging. A high throughput on writes will affect compactions and vice versa

- called a LSM tree
- popularized by bigtable
- used in cassandra, riak, hbase, leveldb, rocksdb
- indeed in lucent where the value is a list of documents
- you could use it for vocabularies in your NLP.
- leveldb uses bloom filters to prevent multiple searches if not there

In [None]:
# To deal with crashes, keep a log, so can regenerate memtable from log

## The most common indexing structure, btree

![](https://dl.dropboxusercontent.com/u/75194/btree1q.png)

(from https://loveforprogramming.quora.com/Memory-locality-the-magic-of-B-Trees)

- "A linked sorted distributed range array with predefined sub array size which allows searches, sequential access, insertions, and deletions in logarithmic time. "
- it is a generalization of a binary tree
- but the branching factor is much higher, and the depth thus smaller
- brees break database into pages, and read-or-write one page at a time
- leaf pages contain all the values and may represent a clustered index

![](https://dl.dropboxusercontent.com/u/75194/btree1.png)
(from designing data intensive applications)

When we update a key, a split can happen

![](https://dl.dropboxusercontent.com/u/75194/btree2.png)

(from designing data intensive applications)

This is an in-place modificaltion unlike what we had earlier. The data structure is mutable. This can cause issues for transactions, and must be dealt with. One can create immutable b-trees, and lmdb, the database inside of openldap, does this. It uses a copy-on-write schee which writes new pages elsewhere

Both splits and writing in-place are dangerous, so its normal for b-tree implementations to have a WAL, or write ahead log (such a log can also be used to manage transactions). Every operation on the btree is appended to this log file.

In B+ trees, pointers amonst the leaf nodes make for an easier linear scan.

![](https://dl.dropboxusercontent.com/u/75194/btree2q.png)

(from https://loveforprogramming.quora.com/Memory-locality-the-magic-of-B-Trees)

### Immutable BST

![](https://dl.dropboxusercontent.com/u/75194/immutablebst.png)

(from purely functional data structures)

### B-tree vs LSM tree

- comparable on random reads
- LSM tree good on random writes as it makes the writes sequential
- B-tree good for transactions; at most one place there things are

### Other indexes

- key-value indexes suppose unique keys and are thus like primary keys in a relational model
- you can also have secondary keys or multi-column keys which can be created by some kind of concatenation or a multi-dimensional r-tree, 
- in a k-v database, the value is stored in the index and we are done. Usually in rdbms, the rows are stored in a heap file to avoid duplication
- in mysql's innodb engine, the pk is a clustered index, while the secondary keys point to the pk.
- a covering index stores certain columns in the index. Of-course in a columnar situation, the column is the index when we choose that columns ordering.

## in-memory databases

- are fast as byte serialization is not needed
- in the anti-caching pattern evice LRU data to disk like virtual memory or swapping, but managed by the db. This means that you can now have an in-memory database which can handle out-of-core data.
- h-store (now voltdb) uses this in-mamory idea with single threading to achieve high throughput in a OLTP scenario, relying on many partitions for ACID.

## Today's exercises

1. Implement deletion (to submit next monday)
2. Think about concurrency issues inour little database

## Amy's Lab Solution

In [None]:
# TODO copy to lab 

# this is a slow implementation
# right and left pointer are the same
# ordinarily, would want 2 pointers - read and write
# append-only database
# keeps index in memory
# appends this to disk as persistent storage
# writes that pipeline dictionary to disk
# has offsets in memory
# run this 
# add a delete to this 
# understand how it works
# make sure you understand how it works 

# deletion of labs

### Doing it on disk

- in the hashmap(dict) in memory, store a file offset instead
- this file is an append only file.
- if you update, simply append a new entry and change the offset in the hashmap
- this is what bitcask in Riak does
- break the file into segments. Each segment, once written, the kv pairs are never changed. maintain a hashmap per segment and search these in order
- run compaction to throw away dupes from segments and merge them; delete old files
- deletion is done by writing a tombstone record
- only one writer thread. 
- bitcask will store hashmap snapshots

In [75]:
# Whole idea is that this is an append only database! 
# keep file pointer stuck at the end of the file

import os.path
import sys
class Database():
    
    def __init__(self, file):
        # create a file
        self.file = file
        
        # TODO what is this? 
        # An indicator of the native byte order. This will have the value 'big' on big-endian (most-significant byte 
        # first) platforms, and 'little' on little-endian (least-significant byte first) platforms.
        self.byteorder=sys.byteorder
        # if file doesn't exist, then open it and set up new index
        if not os.path.exists(file):
            self.fd = open(file, "xb+", buffering=0)
            self.index={}
        # if file does exist, then 
        else:
            # file handler
            self.fd = open(file, "r+b", buffering=0)
            with open(file+".idx") as fdi:
                items = [l.strip().split(':') for l in fdi.readlines()]
                self.index = {k:int(v) for k,v in items}
                
        # read pointer
        # tell() returns the current position of the file read/write pointer within the file.
        # set readpointer to the beginning of the file
        self.readptr = self.fd.tell()
        
        # seek() sets the file's current position at the offset
        # 0 arg: offset -- This is the position of the read/write pointer within the file.
        # 2 arg (the 2nd one): whence This is optional and defaults to 0 which means absolute file positioning, 
            # other values are 1 which means seek relative to the current position and 2 means seek relative 
            # to the file's end.
        self.fd.seek(0,2)
        
        # set write pointer to the end of the file
        self.writeptr = self.fd.tell()
        
        
    def set(self, x, v):
        # check key is a string
        if not isinstance(x, str):
            raise ValueError("Key must be a string")
        # encode the key to utf-8
        bin_x = x.encode('utf-8')
        # store the size in bytes of encoded key
        sz_x=len(bin_x).to_bytes(1, byteorder=self.byteorder)
        
        # ensure value must be a string
        if not isinstance(v, str):
            raise ValueError("Value must be a string")
        # encode value to utf-8
        bin_v = v.encode('utf-8')
        # store the size of the bytes of encoded value
        sz_v=len(bin_v).to_bytes(1, byteorder=self.byteorder)

        # set the writepointer in the index
        try:
            # set the index to the writeptr location of where the file will be written
            self.index[x]=self.writeptr
            # go to the writeptr location in the file (using seek)
            self.fd.seek(self.writeptr)
            # print where the pointer currently is in the file
            print("currently", self.fd.tell())
            
            # write len of key + len of value + encoded key + encoded value
            self.fd.write(sz_x+sz_v+bin_x+bin_v)
        except:
            # if it doesn't work, delete x
            del self.index[x]
        else:
            # update writeptr to new location
            self.writeptr=self.fd.tell()
            
    def get(self, x):
        # get the offset in the file
        try:
            offset = self.index[x]
        except:
            raise ValueError("{} is not in index".format(x))
        
        # encode x 
        bin_x = x.encode('utf-8')
        print("offset is", offset)
        
        # set the readptr to the offset 
        self.readptr=offset
        # go to that location in the file 
        self.fd.seek(self.readptr)
        
        # TODO what is this doing? 
        # get the len of int 
        sz_k = int.from_bytes(self.fd.read(1), byteorder=self.byteorder)
        sz_v = int.from_bytes(self.fd.read(1), byteorder=self.byteorder)
        
        # seek the file the length of an int? Should be length of the key
        self.fd.seek(sz_k,1)
        
        # read from the seek location 
        readit=self.fd.read(sz_v).decode('utf-8')
        print("now", self.fd.tell())
        return readit
        
    def delete(self, x):
        """
        For my deletion flag I use the code '-999'
        """
        # My code here 
        # first check if key exists in the index
        try: 
            self.get(x)
        except ValueError: 
            raise ValueError("{} is not in index".format(x))
        
        # append to database with tombostone deletion flag. No need to delete the previous writes 
        self.set(x, '-999')
        # no need to delete from index
        return    
    
    def close_db(self):
        # open file, write to file, and close file
        fdi=open(self.file+".idx","w")
        fdi.write("\n".join([k+":"+str(v) for k,v in self.index.items()]))
        fdi.close()
        # TODO what is this doing? 
        self.fd.close()        
        
    # this is the database object deletion... not key-value deletion!
    def __del__(self):
        self.fd.close()

In [76]:
!rm /tmp/amy.db

In [77]:
# initialize database with a .db file in the tmp directory
db = Database("/tmp/amy.db")

In [78]:
print(db.index)

{}


In [79]:
db.set("rahul", "aged")
db.set("pavlos", "aged")
db.set("kobe", "stillyoung")

currently 0
currently 11
currently 23


In [80]:
print(db.index)

{'pavlos': 11, 'rahul': 0, 'kobe': 23}


In [81]:
db.get('rahul')

offset is 0
now 11


'aged'

In [82]:
db.get('amy')

ValueError: amy is not in index

In [83]:
db.delete('rahul')

offset is 0
now 11
currently 39


In [84]:
print(db.index)

{'pavlos': 11, 'rahul': 39, 'kobe': 23}


In [85]:
db.close_db()

In [86]:
!cat /tmp/amy.db.idx

pavlos:11
rahul:39
kobe:23

In [87]:
!strings /tmp/amy.db

rahulaged
pavlosaged
kobestillyoung
rahul-999
