# SQL Incremental Load

This notebook describes how to take a set of records that represents the updates to a SQL Table and computes what the table looks like at the current point in time.
Moreover, it defines python function that are used in flatmap instead of lambda.

In [2]:
# Reference pyspark.sql.Row object
from pyspark.sql import Row
# Build the array of Row objects that represents the UserDataHistory table.
# one can think of rows as really how you want your table to look like as rows & columns, and you
# can them the schema that matches your sql table model in your RDMS.
array = [Row(action="Insert", modified_at="2-15-01-01 1 pm", user_id="1", name="Bob", value=150),
         Row(action="Update", modified_at="2-15-02-01 2 pm", user_id="1", name="Robert", value=200),
         Row(action="Delete", modified_at="2-15-02-02 4 pm", user_id="1", name="Robert", value=200),
         Row(action="Insert", modified_at="2-15-01-01 1 pm", user_id="2", name="Liz", value=150),
         Row(action="Update", modified_at="2-15-01-01 2 pm", user_id="2", name="Elizabeth", value=160)]

# Create RDD using sc.parallelize and then transforms it into a DataFrame
# note we have stringed together three operaitons in one line. 
# 1. parallelize and RDD
# 2. Convert or create the RDD as the Dataframe with the schema as defined by each Row object
# 3. Register the Dataframe as a temporary table, to which can issue SQL queries
df = sqlContext.createDataFrame(sc.parallelize(array))
df.registerTempTable("UserDataHistory")
##use some API calls on dataframes
#get the count
df.count()
# get the columns
df.columns
# return all the rows
df.collect()
# show
df.show()
# explain
df.explain(True)
#print schema
df.printSchema()

In [3]:
%sql describe UserDataHistory
-- describe the table. As we can see it matches our schema specified in the Row objects above

In [4]:
%sql select * from UserDataHistory order by user_id, modified_at asc
-- select all rows from the UserDataHistory, order them by user_id and modified time in an ascending order

In [5]:
%sql select * from UserDataHistory order by value asc
-- this time do the same thing but order them by value

In [6]:
#given a Row object return a list [id, row]
def convertRowToPair(row):
  return [row.user_id, row]
convertRowToPair(Row(action="Insert", modified_at="2-15-01-01 1 pm", user_id="1", name="Bob", value=150))

In [7]:
def getFinalRow(pair):
  id = pair[0]
  rows = pair[1]
  final_row = None
  for r in rows:
    if final_row == None:
      final_row = r
      continue
    if r.modified_at > final_row:
      final_row = r
  if final_row == None:
    return []
  elif final_row.action == "Delete":
    return []
  else:
    return [final_row]

In [8]:
##transforming int k,v pairs really
sqlContext.sql("select * from UserDataHistory").rdd.map(convertRowToPair).collect()

In [9]:
sqlContext.sql("select * from UserDataHistory").rdd.map(convertRowToPair).groupByKey().flatMap(lambda p: p).collect()

In [10]:
final_table = sqlContext.sql("select * from UserDataHistory").rdd.map(convertRowToPair).groupByKey().flatMap(getFinalRow)

In [11]:
print final_table

In [12]:
sqlContext.createDataFrame(final_table).registerTempTable("UserData")

In [13]:
%sql select * from UserData