This short example illustrates how to create JSON objects (as fake device information using list of words as device names), create an RDD schema as a dataframe, and register it as a temporary table. Once registered as a table, we can issue SQL queries to it. In essence, the table schema is inferred from the JsonRDD created. Though a short example, it demostrates Spark API's high-level abstraction?dataframes on top of RDD?allowing developers to manipulate and process data with relative easy and with a familiar query langauge.

But most importantly, with such brevity and simplicity!

That such is a draw of Spark API is no surprise. Try it out for yourself!

In [2]:
#import the necessary modules
import urllib2
import time
import random
import json
import time
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
#
# function that returns batches of bn from the list of words
#
def getBatches(url, bn):
  webFile = urllib2.urlopen(url).read()
  words = webFile.split(",")
  #add an addtional word of the batches are not evenampe
  if len(words) % bn != 0:
    words = words + ['jules']
  j = bn
  batches = []
  for i in range(0, len(words), bn):
      batches.append(words[i:j])
      j = j + bn
  return batches

def getLargeBatches(size):
  batches = []
  for i in range(1, size):
    batches.append(str(i))
  return batches

#
# get an IP
#
def get_ip_addr():
  n1 = random.randrange(60, 209)
  n2 = random.randrange(1, 127)
  n3 = random.randrange(1, 127)
  n4 = random.randrange(1,127)

  ip = str(n1) + "." + str(n2) + "." + str (n3) + "." + str(n4)
  return ip

#
# get random letters
#
def get_random_word():
  word = ''
  for i in range(10):
    word += random.choice('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789')
  return word
#
# create a json object with attributes and values
#
def create_json(id, d):
  temp = random.randrange(10, 35)
  (x, y) = random.randrange(0, 100), random.randrange(0, 100)
  ts = time.time()
  ip = get_ip_addr()
  if id % 2 == 0:
    d = "sensor-pad-" + d + get_random_word()
  elif id % 3 == 0:
    d = "device-mac-" + d + get_random_word()
  elif id % 5 == 0:
    d = "therm-stick-" + d + get_random_word()
  else:
    d = "meter-gauge-" + d + get_random_word()
  zipcode = random.randrange(94538,97107)
  humidity = random.randrange(25, 100)
  return json.dumps({"device_id": id, "device_name": d, "ip": ip, "timestamp": ts, "temp": temp, "scale": "Celius", "lat": x, "long": y, "zipcode": zipcode, "humidity": humidity}, sort_keys=True)
#
#
# create a list of devices 
#
def create_devices(batches):
  id=0
  devices =[]
  for b in batches:
    for w in b:
      id=id+1
      time.sleep(0.025)
      device_msg = create_json(id, w)
      devices.append(device_msg)
  return devices
  

Fetch the batches 10 words, each batch as a list of comma separated words, convert them into fake JSON devices information, and create an RDD of 10 partitions

In [4]:
#batches = getBatches('http://www.textfixer.com/resources/common-english-words.txt', 10)
batches = getLargeBatches(500)
devices = create_devices(batches)
print devices[0:10]

create an RDD of five paritions

In [6]:
devicesRDD = sc.parallelize(devices, 5)
devicesRDD.take(2)

Create at dataframe using using the jsonRDD() and display the dataframe

In [8]:

df = sqlContext.jsonRDD(devicesRDD)
df.printSchema()

Display the dataframe and plot temperature vs device_ids

In [10]:
display(df)

Display the dataframe as table or columns in the table

In [12]:
display(df)

In [13]:
display(df)

Save it as a temporary table to which we can issue SQL queries

In [15]:
df.registerTempTable("deviceTables")

In [16]:
%sql select count(*) from deviceTables

In [17]:
%sql select device_id, device_name, humidity, temp from deviceTables where temp > 20 and humidity < 50

In [18]:
display(df)

In [19]:
display(df)

In [20]:
%sql select device_id, device_name, humidity, temp, zipcode, timestamp from deviceTables where zipcode > 91000

In [21]:
%sql select device_id, device_name, humidity, temp from deviceTables where device_name like 'therm%' and humidity < 50