<h1>Structured Data</h1>

<h2>Spark makes it easy to work with structured data.</h2>
<h2>Structured data is any data that has a schema, i.e., a known set of fields for each record.</h2>

<h2> We can work with structured data in two ways:</h2>
    <ul>
    <li><h3>via Structured Query Language (SQL).</h3></li>
    <li><h3>via DataFrames.</h3></li>
    </ul>

<h2><u>Definition</u>: A <b>DataFrame</b> is a distributed collection of data organized into named columns.</h2>

![StarSchema](https://github.com/ahmetbulut/CS340withDSX/blob/master/static/Week6/StarSchema.png?raw=true)
![StarSchemaExample](https://github.com/ahmetbulut/CS340withDSX/blob/master/static/Week6/StarSchemaExample.png?raw=true)

# Create a DataFrame from json-ed text data (json: Java Script Object Notation).

In [1]:
# The code was removed by DSX for sharing.

In [2]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

#Prior to this, make sure to setup the access of Spark to your Object Storage.
df = sqlContext.read.json("swift://CS340." + name + "/Intentory.json")

In [3]:
# Prints the contents of the DataFrame to stdout
df.show()

+--------+-----------+--------+---------------+--------+--------+----------------+--------------------+------------+---------+-----------+------------+--------------------+------------+-----------------+------+
|   Brand|BuyCurrency|BuyPrice|       Category|      Id|LeadTime|MinOrderQuantity|                Name|SellCurrency|SellPrice|StockOnHand|StockOnOrder|         SubCategory|    Supplier|TargetBatchVolume|Volume|
+--------+-----------+--------+---------------+--------+--------+----------------+--------------------+------------+---------+-----------+------------+--------------------+------------+-----------------+------+
| StarTAC|        GBP|   27.67|     Technology|16342939|      15|               1|      StarTAC Series|         USD|    65.99|         15|          50|Telephones and Co...|Office First|              0.0|   0.8|
|   Xerox|        CNY|    29.8|Office Supplies|16346727|       5|               1|          Xerox 1984|         USD|     6.48|         34|           1|     

In [4]:
df.printSchema()

root
 |-- Brand: string (nullable = true)
 |-- BuyCurrency: string (nullable = true)
 |-- BuyPrice: double (nullable = true)
 |-- Category: string (nullable = true)
 |-- Id: long (nullable = true)
 |-- LeadTime: long (nullable = true)
 |-- MinOrderQuantity: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- SellCurrency: string (nullable = true)
 |-- SellPrice: double (nullable = true)
 |-- StockOnHand: long (nullable = true)
 |-- StockOnOrder: long (nullable = true)
 |-- SubCategory: string (nullable = true)
 |-- Supplier: string (nullable = true)
 |-- TargetBatchVolume: double (nullable = true)
 |-- Volume: double (nullable = true)



In [5]:
df.select("Brand").show()

+--------+
|   Brand|
+--------+
| StarTAC|
|   Xerox|
|   Xerox|
|   Elite|
|  Wilson|
|MicroTAC|
|   V3682|
|   Xerox|
| Staples|
| StarTAC|
|   Avery|
|Anderson|
|   SAFCO|
|   Canon|
|It's Hot|
|Electrix|
|   Safco|
| Catalog|
|  Belkin|
|    Iris|
+--------+
only showing top 20 rows



In [6]:
df.select('Brand', 'Volume').show()

+--------+------+
|   Brand|Volume|
+--------+------+
| StarTAC|   0.8|
|   Xerox|  0.47|
|   Xerox|  0.55|
|   Elite|  0.85|
|  Wilson|  0.12|
|MicroTAC|  0.55|
|   V3682|  0.41|
|   Xerox|  0.84|
| Staples|  0.12|
| StarTAC|  0.97|
|   Avery|  0.13|
|Anderson|  0.84|
|   SAFCO|  0.55|
|   Canon|   0.9|
|It's Hot|  0.12|
|Electrix|  0.11|
|   Safco|  0.07|
| Catalog|  0.11|
|  Belkin|  0.99|
|    Iris|  0.11|
+--------+------+
only showing top 20 rows



In [11]:
df.select(df['Brand'], df['Volume']).show()

+--------+------+
|   Brand|Volume|
+--------+------+
| StarTAC|   0.8|
|   Xerox|  0.47|
|   Xerox|  0.55|
|   Elite|  0.85|
|  Wilson|  0.12|
|MicroTAC|  0.55|
|   V3682|  0.41|
|   Xerox|  0.84|
| Staples|  0.12|
| StarTAC|  0.97|
|   Avery|  0.13|
|Anderson|  0.84|
|   SAFCO|  0.55|
|   Canon|   0.9|
|It's Hot|  0.12|
|Electrix|  0.11|
|   Safco|  0.07|
| Catalog|  0.11|
|  Belkin|  0.99|
|    Iris|  0.11|
+--------+------+
only showing top 20 rows



In [7]:
df.select(df['Brand'], df['Volume'] + 1).show()

+--------+------------------+
|   Brand|      (Volume + 1)|
+--------+------------------+
| StarTAC|               1.8|
|   Xerox|              1.47|
|   Xerox|              1.55|
|   Elite|              1.85|
|  Wilson|              1.12|
|MicroTAC|              1.55|
|   V3682|              1.41|
|   Xerox|1.8399999999999999|
| Staples|              1.12|
| StarTAC|              1.97|
|   Avery|              1.13|
|Anderson|1.8399999999999999|
|   SAFCO|              1.55|
|   Canon|               1.9|
|It's Hot|              1.12|
|Electrix|              1.11|
|   Safco|              1.07|
| Catalog|              1.11|
|  Belkin|              1.99|
|    Iris|              1.11|
+--------+------------------+
only showing top 20 rows



In [8]:
df.filter(df['Category'] == 'Technology').select("Id","Category").show()

+--------+----------+
|      Id|  Category|
+--------+----------+
|16342939|Technology|
|16376266|Technology|
|16379125|Technology|
|16402632|Technology|
|16413246|Technology|
|16468556|Technology|
|16492459|Technology|
|16505852|Technology|
|16513635|Technology|
|16554562|Technology|
|16579736|Technology|
|16616835|Technology|
|16627009|Technology|
|16645340|Technology|
|16688388|Technology|
|16704002|Technology|
|16711687|Technology|
|16726527|Technology|
|16728666|Technology|
|16730738|Technology|
+--------+----------+
only showing top 20 rows



In [11]:
df.groupBy("Category").count().show()

+---------------+-----+
|       Category|count|
+---------------+-----+
|Office Supplies|   68|
|     Technology|   21|
|      Furniture|   11|
+---------------+-----+



# Execute direct SQL queries on DataFrames

In [12]:
df.registerTempTable("inventory")
# Registers this RDD as a temporary table using the given name.
# The lifetime of this temporary table is tied to the SQLContext 
# that was used to create this DataFrame.

sqlContext.sql("SELECT Brand, SellPrice FROM inventory ORDER BY SellPrice DESC LIMIT 5").collect()

[Row(Brand=u'Chromcraft', SellPrice=550.98),
 Row(Brand=u'Panasonic', SellPrice=517.48),
 Row(Brand=u'Sharp', SellPrice=499.99),
 Row(Brand=u'Fellowes', SellPrice=387.99),
 Row(Brand=u'Hoover', SellPrice=363.25)]

In [13]:
topitems = sqlContext.sql("SELECT Brand, SellPrice FROM inventory")
topitems.map(lambda row: (row.Brand, row.SellPrice)).reduceByKey(lambda x,y:max(x,y)).collect()

[(u'Sanford', 2.88),
 (u'Deflect-O', 7.7),
 (u'Hon', 113.98),
 (u'Canon', 199.99),
 (u'Wilson', 6.75),
 (u'Anderson', 15.23),
 (u'Master', 7.59),
 (u'MicroTAC', 65.99),
 (u'EcoTones', 4.0),
 (u'Verbatim', 22.24),
 (u'Avery', 40.98),
 (u'Catalog', 67.28),
 (u"O'Sullivan", 130.98),
 (u'Imation', 33.98),
 (u'Elite', 8.45),
 (u'Electrix', 13.4),
 (u'Rush', 160.98),
 (u'Acme', 8.34),
 (u'U.S.', 99.99),
 (u'Ampad', 4.48),
 (u"It's Hot", 7.4),
 (u'TDK', 14.48),
 (u'Acco', 29.74),
 (u'DAX', 5.77),
 (u'OIC', 3.58),
 (u'V3682', 125.99),
 (u'Presstex', 4.55),
 (u'GBC', 122.99),
 (u'XtraLife', 7.84),
 (u'Multimedia', 162.93),
 (u'Keytronic', 73.98),
 (u'Xerox', 55.98),
 (u'StarTAC', 125.99),
 (u'Fellowes', 387.99),
 (u'Hoover', 363.25),
 (u'Jet-Pak', 35.89),
 (u'Hewlett-Packard', 115.99),
 (u'Newell', 3.28),
 (u'Eldon', 3.34),
 (u'Logitech', 100.98),
 (u'AT&T', 15.99),
 (u'Iris', 20.89),
 (u'Staples', 35.44),
 (u'Safco', 70.98),
 (u'Speediset', 10.31),
 (u'Gyration', 100.97),
 (u'Soundgear', 204.1

In [21]:
sqlContext.cacheTable("inventory")

# Create DataFrames from "Row" RDDs. 

<strong>pyspark.sql.Row class is a wrapper to create named columns, i.e., data attributes.</strong>

In [15]:
from pyspark.sql import Row
rowRdd = sc.parallelize([Row(name="Jack", favouritecoffee="Industrial Blend"), Row(name="Jane", favouritecoffee="Decaffeinated")])
df = sqlContext.createDataFrame(rowRdd)
df.printSchema()

root
 |-- favouritecoffee: string (nullable = true)
 |-- name: string (nullable = true)



In [16]:
df.show()

+----------------+----+
| favouritecoffee|name|
+----------------+----+
|Industrial Blend|Jack|
|   Decaffeinated|Jane|
+----------------+----+



# Create a DataFrame from raw text data.

In [18]:
netflowdata = sc.textFile("swift://CS340." + name + "/NetflowData.txt")
netflowdata.first()

u'10.1.0.2,16.2.3.7,12,20K,http'

In [19]:
plainRdd = netflowdata.map(lambda l: l.split(","))
rddOfRowObjects = plainRdd.map(lambda p: Row(source=p[0], destination=p[1],duration=int(p[2]),bytes_sent=int(p[3][:-1]),protocol=p[4]))
dfNetflow = sqlContext.createDataFrame(rddOfRowObjects)

In [20]:
dfNetflow.show()

+----------+-----------+--------+--------+----------+
|bytes_sent|destination|duration|protocol|    source|
+----------+-----------+--------+--------+----------+
|        20|   16.2.3.7|      12|    http|  10.1.0.2|
|        24|   12.4.0.3|      16|    http|  18.6.7.1|
|        20|   11.6.8.2|      15|    http|  13.9.4.3|
|        40|   17.1.2.1|      19|    http|  15.2.2.9|
|        58|   14.8.7.4|      26|    http|  12.4.3.8|
|       100|   13.0.0.1|      27|     ftp|  10.5.1.3|
|       300|   10.3.4.5|      32|     ftp|  11.1.0.6|
|        80|   16.5.5.8|      18|     ftp|  19.7.1.2|
|        20|  16.12.3.7|      12|    http|10.110.0.2|
|       124|  12.4.0.13|      16|    http| 182.6.7.1|
|        20|   11.6.8.2|      15|    http| 163.9.4.3|
|       140|  17.17.2.1|      19|    http| 15.2.12.9|
|        58| 14.8.71.42|      26|    http|  12.4.3.8|
|       200|   13.0.0.1|      27|    http| 10.45.1.3|
|       300|   10.3.4.5|      32|     ftp| 11.1.20.6|
|       180|   16.5.5.8|    

In [33]:
dfNetflow.select("protocol","bytes_sent").groupBy("protocol").sum().show()

+--------+---------------+
|protocol|sum(bytes_sent)|
+--------+---------------+
|     ftp|           1203|
|    http|           1810|
+--------+---------------+



# Work with nested data using SQL.

In [22]:
# I am just doing these as prep-work to have a saved json data. 
import json
products = [{'Product': 'Apple iMac', 'Features': {'Pros': ['Display Resolution', 'Aesthetics'],'Cons': ['Price Expensive', 'Cooling Problem']}},
              {'Product': 'Dell', 'Features': {'Pros': ['Affordable', 'Fast Customer Service'],'Cons': ['Standard Look & Feel', 'Heavy']}}]
rdd = sc.parallelize(products)
rdd.map(lambda x: json.dumps(x)).saveAsTextFile("ProductsDumpReal.json")

# Now continue as usual.
srdd = sqlContext.read.json("ProductsDumpReal.json")
srdd.printSchema()

root
 |-- Features: struct (nullable = true)
 |    |-- Cons: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Pros: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- Product: string (nullable = true)



In [23]:
# We need to register a dataframe as a temporary table before querying it with SQL.
srdd.registerTempTable("Products")
# Registers the given DataFrame as a temporary table in the SQLContext catalog.
# Temporary tables exist only during the lifetime of this instance of SQLContext.

sqlContext.sql("SELECT Features.Pros FROM Products").collect()

[Row(Pros=[u'Display Resolution', u'Aesthetics']),
 Row(Pros=[u'Affordable', u'Fast Customer Service'])]

# Work with nested data using DataFrame API.

In [24]:
products = [{'Product': 'Apple iMac', 'Features': {'Pros': ['Display Resolution', 'Aesthetics'],'Cons': ['Price Expensive', 'Cooling Problem']}},
              {'Product': 'Dell', 'Features': {'Pros': ['Affordable', 'Fast Customer Service'],'Cons': ['Standard Look & Feel', 'Heavy']}}]

In [26]:
df = sqlContext.createDataFrame(products)

In [27]:
df.select("Features.Pros").show()

+--------------------+
|                Pros|
+--------------------+
|[Display Resoluti...|
|[Affordable, Fast...|
+--------------------+

