# Overview (SCALA)
## How to Use SparkSession - A Unified Entry Point in Apache Spark 2.0


In Spark 2.0, SparkSession, is a new entry point that subsumes SparkContext, SQLContext, StreamingContext, and HiveContext. For backward compatibiilty, they are preserved. 
SparkSession has many features and in this notebook some of the more important ones are illustrated. Even though, this notebook is written in Scala, similar functionality and APIs exist in Python and Java.
In DSX notebooks and Spark REPL, the SparkSession is created for you, stored in a variable called spark.

The companion blog post

- http://cdn2.hubspot.net/hubfs/438089/notebooks/spark2.0/SparkSession.html
- https://databricks.com/blog/2016/08/15/how-to-use-sparksession-in-apache-spark-2-0.html
- http://www.slideshare.net/databricks/2016-spark-summit-east-keynote-matei-zaharia
- http://www.slideshare.net/databricks/structuring-spark-dataframes-datasets-and-streaming-62871797


![alt text](https://databricks.com/wp-content/uploads/2016/06/Unified-Apache-Spark-2.0-API-1.png "Title")
https://databricks.com/product/getting-started-guide/quick-start#rdds-datasets-and-dataframes

### APACHE SPARK: RDD, DATAFRAME OR DATASET?
http://www.agildata.com/apache-spark-rdd-vs-dataframe-vs-dataset/

# PART 1: Exploring SparkSession
For backward compatibility, you can access SparkContext, SQLContext, and SparkConf

In [3]:
spark

org.apache.spark.sql.SparkSession@8f2c0393

## SparkContext as part of SparkSession
Preserved as part of SparkSession for backward compatibility.

In [2]:
spark.sparkContext

org.apache.spark.SparkContext@4cf97f88

## sqlContext as part of SparkSession
Preserved as part of SparkSession for backward compatibility

In [3]:
spark.sqlContext

org.apache.spark.sql.SQLContext@168e2b35

# Configuring Spark's runtime configuration parameters

## SparkConf as part of SparkSession
Through spark.conf, You manipulate Spark's runtime configruation parameters. Note that all configuration options set are automatically propagated over to Spark and Hadoop during I/O.

In [4]:
spark.conf.set("spark.notebook.name", "SparkSessionSimpleZipExample")

In [5]:
spark.conf.get("spark.notebook.name")

SparkSessionSimpleZipExample

In [6]:
spark.conf.get("spark.sql.warehouse.dir")

file:/gpfs/global_fs01/sym_shared/YPProdSpark/user/s716-cc0c8609c35e27-396c42860ed9/notebook/work/spark-warehouse/

## Spark config variables set can be accessed via SQL with variable subsitution

In [7]:
spark.sql("select '${spark.notebook.name}', '${spark.sql.warehouse.dir}'")

[SparkSessionSimpleZipExample: string, ${spark.sql.warehouse.dir}: string]

## Creating DataFrames and Datasets
There are a number of ways to create DataFrames and Datasets using the SparkSession APIs. Once either a DataFrame or Dataset is created, you can manipulate your data. For example, for quick exploration of Datasets, you can use the spark.range

In [8]:
import org.apache.spark.sql.functions._

In [9]:
val numDS = spark.range(5, 100, 5)
numDS.show(5)

+---+
| id|
+---+
|  5|
| 10|
| 15|
| 20|
| 25|
+---+
only showing top 5 rows



In [10]:
numDS.describe().show()

+-------+------------------+
|summary|                id|
+-------+------------------+
|  count|                19|
|   mean|              50.0|
| stddev|28.136571693556885|
|    min|                 5|
|    max|                95|
+-------+------------------+



## Creating a DataFrame from a collection with SparkSession

In [11]:
val langPercentDF = spark.createDataFrame(List(("Scala", 35), ("Python", 30), ("R", 15), ("Java", 20)))

In [12]:
val lpDF = langPercentDF.withColumnRenamed("_1", "language").withColumnRenamed("_2", "percent")

In [13]:
lpDF.orderBy(desc("percent")).show()

+--------+-------+
|language|percent|
+--------+-------+
|   Scala|     35|
|  Python|     30|
|    Java|     20|
|       R|     15|
+--------+-------+



# PART 2: Exploring Zip codes data using SparkSession and Dataset APIs.

### Next, we going to exlore some zip code data fetched from MongoDB

In [12]:
// The code was removed by DSX for sharing.

Name: Unknown Error
Message: <console>:22: error: stable identifier required, but this.$line7$read.spark.implicits found.
       import spark.implicits._
                    ^
StackTrace: 

In [16]:
import sys.process._
"wget http://media.mongodb.org/zips.json" !

--2017-02-09 10:27:49--  http://media.mongodb.org/zips.json
Resolving media.mongodb.org (media.mongodb.org)... 52.85.113.82, 52.85.113.131, 52.85.113.150, ...
Connecting to media.mongodb.org (media.mongodb.org)|52.85.113.82|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 3182409 (3.0M) [application/json]
Saving to: ‘zips.json.1’

     0K .......... .......... .......... .......... ..........  1%  154M 0s
    50K .......... .......... .......... .......... ..........  3%  450M 0s
   100K .......... .......... .......... .......... ..........  4%  407M 0s
   150K .......... .......... .......... .......... ..........  6%  461M 0s
   200K .......... .......... .......... .......... ..........  8% 1.13M 1s
   250K .......... .......... .......... .......... ..........  9% 2.22M 1s
   300K .......... .......... .......... .......... .......... 11% 2.24M 1s
   350K .......... .......... .......... .......... .......... 12% 2.24M 1s
   400K .......... .......... .....

##### The above command runs on your cluster's single node, fetches the zip code file from the specified URL, unzips in the directory below

In [1]:
"pwd" !

Name: Compile Error
Message: <console>:18: error: value ! is not a member of String
       "pwd" !
             ^
StackTrace: 

In [18]:
"ls" !

spark-warehouse
zips.json
zips.json.1


# Reading the JSON file with SparkSession
### Read the JSON file, infer the schema and convert it into a Dataset dictated by the case class Zips

In [17]:
import org.apache.spark.sql.SparkSession
val spark = (SparkSession.
    builder().
    getOrCreate())
// For implicit conversions like converting RDDs to DataFrames
// This import is needed to use the $-notation
import spark.implicits._

In [20]:
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
// A case class for zips data
case class Zips(zip:String, city:String, loc:Array[Double], pop:Long, state:String)

In [2]:
val zipDF = spark.read.json("zips.json").withColumnRenamed("_id","zip")
//rename the _id to zip for readability
//convert to a dataset using the case class
//val zipDS = zipDF.withColumnRenamed("_id","zip").as[Zips]
val zipDS = zipDF.as[Zips]
// since we will be quering this dataset often let's cache it
zipDS.cache()
//display(zipDS)
zipDS.show()

+-----+---------------+--------------------+-----+-----+
|  zip|           city|                 loc|  pop|state|
+-----+---------------+--------------------+-----+-----+
|01001|         AGAWAM|[-72.622739, 42.0...|15338|   MA|
|01002|        CUSHMAN|[-72.51565, 42.37...|36963|   MA|
|01005|          BARRE|[-72.108354, 42.4...| 4546|   MA|
|01007|    BELCHERTOWN|[-72.410953, 42.2...|10579|   MA|
|01008|      BLANDFORD|[-72.936114, 42.1...| 1240|   MA|
|01010|      BRIMFIELD|[-72.188455, 42.1...| 3706|   MA|
|01011|        CHESTER|[-72.988761, 42.2...| 1688|   MA|
|01012|   CHESTERFIELD|[-72.833309, 42.3...|  177|   MA|
|01013|       CHICOPEE|[-72.607962, 42.1...|23396|   MA|
|01020|       CHICOPEE|[-72.576142, 42.1...|31495|   MA|
|01022|   WESTOVER AFB|[-72.558657, 42.1...| 1764|   MA|
|01026|     CUMMINGTON|[-72.905767, 42.4...| 1484|   MA|
|01027|      MOUNT TOM|[-72.679921, 42.2...|16864|   MA|
|01028|EAST LONGMEADOW|[-72.505565, 42.0...|13367|   MA|
|01030|  FEEDING HILLS|[-72.675

In [3]:
zipDS.take(5).foreach(println)

Zips(01001,AGAWAM,[D@57290dce,15338,MA)
Zips(01002,CUSHMAN,[D@fca59a77,36963,MA)
Zips(01005,BARRE,[D@ffcc0eca,4546,MA)
Zips(01007,BELCHERTOWN,[D@1172d4a5,10579,MA)
Zips(01008,BLANDFORD,[D@37d80f38,1240,MA)


### Q1: Can you display states, zips, cities with population greater than 40000, in descending order

In [22]:
//display(zipDS.select("state", "city", "zip", "pop").filter("pop > 40000").orderBy(desc("pop")))
zipDS.select("state", "city", "zip", "pop").filter("pop > 40000").orderBy(desc("pop")).show()

+-----+---------------+-----+------+
|state|           city|  zip|   pop|
+-----+---------------+-----+------+
|   IL|        CHICAGO|60623|112047|
|   NY|       BROOKLYN|11226|111396|
|   NY|       NEW YORK|10021|106564|
|   NY|       NEW YORK|10025|100027|
|   CA|   BELL GARDENS|90201| 99568|
|   IL|        CHICAGO|60617| 98612|
|   CA|    LOS ANGELES|90011| 96074|
|   IL|        CHICAGO|60647| 95971|
|   IL|        CHICAGO|60628| 94317|
|   CA|        NORWALK|90650| 94188|
|   IL|        CHICAGO|60620| 92005|
|   IL|        CHICAGO|60629| 91814|
|   IL|        CHICAGO|60609| 89762|
|   IL|        CHICAGO|60618| 88377|
|   NY|JACKSON HEIGHTS|11373| 88241|
|   CA|         ARLETA|91331| 88114|
|   NY|       BROOKLYN|11212| 87079|
|   CA|     SOUTH GATE|90280| 87026|
|   NY|      RIDGEWOOD|11385| 85732|
|   NY|          BRONX|10467| 85710|
+-----+---------------+-----+------+
only showing top 20 rows



### Q2: Which cities and zips in the state of california are most populous?

In [23]:
//display(zipDS.select("city", "zip", "pop").filter('state === "CA").orderBy(desc("pop")))
zipDS.select("city", "zip", "pop").filter('state === "CA").orderBy(desc("pop")).show()

+----------------+-----+-----+
|            city|  zip|  pop|
+----------------+-----+-----+
|    BELL GARDENS|90201|99568|
|     LOS ANGELES|90011|96074|
|         NORWALK|90650|94188|
|          ARLETA|91331|88114|
|      SOUTH GATE|90280|87026|
|     LOS ANGELES|90044|83958|
|         FONTANA|92335|81255|
|      HOLLY PARK|90250|78511|
|     WESTMINSTER|92683|77965|
|       SANTA ANA|92704|77151|
|        INDUSTRY|91744|77114|
|COAST GUARD ISLA|94501|76110|
|          RIALTO|92376|75341|
|     LOS ANGELES|90026|74751|
|      LONG BEACH|90805|74011|
| HUNTINGTON PARK|90255|72139|
|   MORENO VALLEY|92553|71314|
|LAKE LOS ANGELES|93550|71024|
|   SAN FRANCISCO|94110|70770|
|       IRWINDALE|91706|69464|
+----------------+-----+-----+
only showing top 20 rows



### Q3: Can you sum up the population of all the states and order them in descending order?

In [24]:
//display(zipDS.select("state", "pop").groupBy("state").sum("pop").orderBy(desc("sum(pop)")))
zipDS.select("state", "pop").groupBy("state").sum("pop").orderBy(desc("sum(pop)")).show()

+-----+--------+
|state|sum(pop)|
+-----+--------+
|   CA|29754890|
|   NY|17990402|
|   TX|16984601|
|   FL|12686644|
|   PA|11881643|
|   IL|11427576|
|   OH|10846517|
|   MI| 9295297|
|   NJ| 7730188|
|   NC| 6628637|
|   GA| 6478216|
|   VA| 6181479|
|   MA| 6016425|
|   IN| 5544136|
|   MO| 5110648|
|   WI| 4891769|
|   TN| 4876457|
|   WA| 4866692|
|   MD| 4781379|
|   MN| 4372982|
+-----+--------+
only showing top 20 rows



# PART 3: Creating Hive Table, registering a UDF, and querying it using SparkSession and Spark SQL APIs

### drop the table if one exists

In [70]:
spark.sql("DROP TABLE IF EXISTS hive_zips_table")

[]

In [71]:
import sys.process._

### Just ensure we don't have any lingering files in the directory because of eventual consistency.

In [72]:
// "ls /gpfs/global_fs01/sym_shared/YPProdSpark/user/sc07-a3c399a7caae2d-99fc3133bdbb/notebook/work/spark-warehouse/hive_zips_table" !
"ls /gpfs/global_fs01/sym_shared/YPProdSpark/user/s716-cc0c8609c35e27-396c42860ed9/notebook/work/spark-warehouse/hive_zips_table" !

ls: cannot access /gpfs/global_fs01/sym_shared/YPProdSpark/user/s716-cc0c8609c35e27-396c42860ed9/notebook/work/spark-warehouse/hive_zips_table: No such file or directory


In [73]:
// "rm -rf /gpfs/global_fs01/sym_shared/YPProdSpark/user/sc07-a3c399a7caae2d-99fc3133bdbb/notebook/work/spark-warehouse/hive_zips_table" !
// "rm -rf /gpfs/global_fs01/sym_shared/YPProdSpark/user/s716-cc0c8609c35e27-396c42860ed9/notebook/work/spark-warehouse/hive_zips_table1" !

"rm -rf /gpfs/global_fs01/sym_shared/YPProdSpark/user/s716-cc0c8609c35e27-396c42860ed9/notebook/work/spark-warehouse/hive_zips_table" !

In [74]:
zipDS.write.saveAsTable("hive_zips_table")

# Working and Accessing Catalog metadata

In [75]:
//display(spark.catalog.listDatabases)
spark.catalog.listDatabases.show(false)

+-------+----------------+-----------------------------------------------------------------------------------------------------------------+
|name   |description     |locationUri                                                                                                      |
+-------+----------------+-----------------------------------------------------------------------------------------------------------------+
|default|default database|file:/gpfs/global_fs01/sym_shared/YPProdSpark/user/s716-cc0c8609c35e27-396c42860ed9/notebook/work/spark-warehouse|
+-------+----------------+-----------------------------------------------------------------------------------------------------------------+



In [76]:
//display(spark.catalog.listTables)
spark.catalog.listTables.show(false)

+---------------+--------+-----------+---------+-----------+
|name           |database|description|tableType|isTemporary|
+---------------+--------+-----------+---------+-----------+
|hive_zips_table|default |null       |MANAGED  |false      |
+---------------+--------+-----------+---------+-----------+



## Cache table using SparkSession API

In [78]:
spark.catalog.cacheTable("hive_zips_table")

## Q1: Can you query the Hive table with the Spark SQL query indentical to the one above Q1?

In [79]:
//display(spark.sql("SELECT state, city, zip, pop FROM hive_zips_table WHERE pop > 40000 ORDER BY pop DESC"))
spark.sql("SELECT state, city, zip, pop FROM hive_zips_table WHERE pop > 40000 ORDER BY pop DESC").show()

+-----+---------------+-----+------+
|state|           city|  zip|   pop|
+-----+---------------+-----+------+
|   IL|        CHICAGO|60623|112047|
|   NY|       BROOKLYN|11226|111396|
|   NY|       NEW YORK|10021|106564|
|   NY|       NEW YORK|10025|100027|
|   CA|   BELL GARDENS|90201| 99568|
|   IL|        CHICAGO|60617| 98612|
|   CA|    LOS ANGELES|90011| 96074|
|   IL|        CHICAGO|60647| 95971|
|   IL|        CHICAGO|60628| 94317|
|   CA|        NORWALK|90650| 94188|
|   IL|        CHICAGO|60620| 92005|
|   IL|        CHICAGO|60629| 91814|
|   IL|        CHICAGO|60609| 89762|
|   IL|        CHICAGO|60618| 88377|
|   NY|JACKSON HEIGHTS|11373| 88241|
|   CA|         ARLETA|91331| 88114|
|   NY|       BROOKLYN|11212| 87079|
|   CA|     SOUTH GATE|90280| 87026|
|   NY|      RIDGEWOOD|11385| 85732|
|   NY|          BRONX|10467| 85710|
+-----+---------------+-----+------+
only showing top 20 rows



## Q2: Find the populus cities in Calfornia with total number of zips using the hive table?

In [80]:
// display(spark.sql("SELECT COUNT(zip), SUM(pop), city FROM hive_zips_table WHERE state = 'CA' GROUP BY city ORDER BY SUM(pop) DESC"))
spark.sql("SELECT COUNT(zip), SUM(pop), city FROM hive_zips_table WHERE state = 'CA' GROUP BY city ORDER BY SUM(pop) DESC").show()

+----------+--------+----------------+
|count(zip)|sum(pop)|            city|
+----------+--------+----------------+
|        56| 2102295|     LOS ANGELES|
|        34| 1049298|       SAN DIEGO|
|        29|  816653|        SAN JOSE|
|        26|  723993|   SAN FRANCISCO|
|        28|  628279|      SACRAMENTO|
|        12|  347905|          FRESNO|
|        12|  314487|         OAKLAND|
|         8|  299651|      LONG BEACH|
|         7|  272327|         ANAHEIM|
|         8|  271347|     BAKERSFIELD|
|        11|  267258|        STOCKTON|
|         7|  253478|       RIVERSIDE|
|         4|  234472|       SANTA ANA|
|         5|  216459|         MODESTO|
|         4|  183542|HUNTINGTON BEACH|
|         7|  177552|  SAN BERNARDINO|
|         4|  173374|         FREMONT|
|         8|  163666|        GLENDALE|
|         6|  158398|      SANTA ROSA|
|         6|  158183|        TORRANCE|
+----------+--------+----------------+
only showing top 20 rows



# Registring a UDF with SparkSession

## Q4: Can you register a simple UDF with SparkSession that converts zip into long (currently it's a string)?

In [81]:
spark.sql("describe hive_zips_table").show()

+--------+-------------+-------+
|col_name|    data_type|comment|
+--------+-------------+-------+
|     zip|       string|   null|
|    city|       string|   null|
|     loc|array<double>|   null|
|     pop|       bigint|   null|
|   state|       string|   null|
+--------+-------------+-------+



In [82]:
spark.udf.register("zipToLong", (z:String) => z.toLong)

UserDefinedFunction(<function1>,LongType,Some(List(StringType)))

In [83]:
spark.sql("SELECT city, zipToLong(zip) as zip_to_long FROM hive_zips_table ORDER BY zip_to_long DESC").show()

+-----------+-----------+
|       city|zip_to_long|
+-----------+-----------+
|  KETCHIKAN|      99950|
|   WRANGELL|      99929|
|POINT BAKER|      99927|
| METLAKATLA|      99926|
|    KLAWOCK|      99925|
|      HYDER|      99923|
|   HYDABURG|      99922|
|      CRAIG|      99921|
| THORNE BAY|      99919|
|  KETCHIKAN|      99901|
|    SKAGWAY|      99840|
|      SITKA|      99835|
| PETERSBURG|      99833|
|     HOONAH|      99829|
|     HAINES|      99827|
|   GUSTAVUS|      99826|
|    DOUGLAS|      99824|
|     ANGOON|      99820|
|     JUNEAU|      99801|
|    NUIQSUT|      99789|
+-----------+-----------+
only showing top 20 rows



### Register another UDF that calculates the strlen of cities

In [84]:
spark.udf.register("cityLength", (c:String) => c.length())

UserDefinedFunction(<function1>,IntegerType,Some(List(StringType)))

### Using catalog data, get the list of your registered UDFs

In [85]:
val udfs = spark.catalog.listFunctions()

In [86]:
udfs.filter('name === "cityLength".toLowerCase || 'name === "zipToLong".toLowerCase).select("name", "database").show()

+----------+--------+
|      name|database|
+----------+--------+
|citylength|    null|
| ziptolong|    null|
+----------+--------+



In [87]:
spark.sql("SELECT city, cityLength(city) as city_length FROM hive_zips_table ORDER BY city_length DESC").show()

+----------------+-----------+
|            city|city_length|
+----------------+-----------+
|CHEBEAGUE ISLAND|         16|
|MONTGOMERY CENTE|         16|
|CUMBERLAND CENTE|         16|
|WEST BRIDGEWATER|         16|
|OLD ORCHARD BEAC|         16|
|WEST SPRINGFIELD|         16|
|CUMBERLAND FORES|         16|
|GREAT BARRINGTON|         16|
|NORTH WHITEFIELD|         16|
|NORTH CHELMSFORD|         16|
|EAST MILLINOCKET|         16|
|NEWTON UPPER FAL|         16|
|GREENVILLE JUNCT|         16|
|GILMANTON IRON W|         16|
|LITTLE DEER ISLE|         16|
|WOOD RIVER JUNCT|         16|
|SOUTH GOULDSBORO|         16|
|CENTER BARNSTEAD|         16|
|SOUTHWEST HARBOR|         16|
|WEST CHESTERFIEL|         16|
+----------------+-----------+
only showing top 20 rows



### Q5: Can you compose the same query as Q2 using Datasets APIs?

In [88]:
(zipDS.filter('state === "CA")
  .select("zip", "pop", "city")
  .groupBy("city")
  .sum()
  .orderBy(desc("sum(pop)")).show())

+----------------+--------+
|            city|sum(pop)|
+----------------+--------+
|     LOS ANGELES| 2102295|
|       SAN DIEGO| 1049298|
|        SAN JOSE|  816653|
|   SAN FRANCISCO|  723993|
|      SACRAMENTO|  628279|
|          FRESNO|  347905|
|         OAKLAND|  314487|
|      LONG BEACH|  299651|
|         ANAHEIM|  272327|
|     BAKERSFIELD|  271347|
|        STOCKTON|  267258|
|       RIVERSIDE|  253478|
|       SANTA ANA|  234472|
|         MODESTO|  216459|
|HUNTINGTON BEACH|  183542|
|  SAN BERNARDINO|  177552|
|         FREMONT|  173374|
|        GLENDALE|  163666|
|      SANTA ROSA|  158398|
|        TORRANCE|  158183|
+----------------+--------+
only showing top 20 rows



In [92]:
(zipDS.filter('state === "CA")
  .select("zip", "pop", "city")
  .groupBy("city")
  .agg(sum("pop").alias("population"))
  .orderBy(desc("population"))).show()

+----------------+----------+
|            city|population|
+----------------+----------+
|     LOS ANGELES|   2102295|
|       SAN DIEGO|   1049298|
|        SAN JOSE|    816653|
|   SAN FRANCISCO|    723993|
|      SACRAMENTO|    628279|
|          FRESNO|    347905|
|         OAKLAND|    314487|
|      LONG BEACH|    299651|
|         ANAHEIM|    272327|
|     BAKERSFIELD|    271347|
|        STOCKTON|    267258|
|       RIVERSIDE|    253478|
|       SANTA ANA|    234472|
|         MODESTO|    216459|
|HUNTINGTON BEACH|    183542|
|  SAN BERNARDINO|    177552|
|         FREMONT|    173374|
|        GLENDALE|    163666|
|      SANTA ROSA|    158398|
|        TORRANCE|    158183|
+----------------+----------+
only showing top 20 rows



In [93]:
(zipDS.filter('state === "CA")
  .select("zip", "pop", "city")
  .groupBy("city")
  .agg(sum("pop").alias("population"), count("*").alias("cnt"))
  .orderBy(desc("population"))).show()

+----------------+----------+---+
|            city|population|cnt|
+----------------+----------+---+
|     LOS ANGELES|   2102295| 56|
|       SAN DIEGO|   1049298| 34|
|        SAN JOSE|    816653| 29|
|   SAN FRANCISCO|    723993| 26|
|      SACRAMENTO|    628279| 28|
|          FRESNO|    347905| 12|
|         OAKLAND|    314487| 12|
|      LONG BEACH|    299651|  8|
|         ANAHEIM|    272327|  7|
|     BAKERSFIELD|    271347|  8|
|        STOCKTON|    267258| 11|
|       RIVERSIDE|    253478|  7|
|       SANTA ANA|    234472|  4|
|         MODESTO|    216459|  5|
|HUNTINGTON BEACH|    183542|  4|
|  SAN BERNARDINO|    177552|  7|
|         FREMONT|    173374|  4|
|        GLENDALE|    163666|  8|
|      SANTA ROSA|    158398|  6|
|        TORRANCE|    158183|  6|
+----------------+----------+---+
only showing top 20 rows



In [94]:
(zipDS.filter('state === "CA")
  .select("zip", "pop", "city")
  .groupBy("city")
  .agg(sum("pop").alias("population"), count("zip").alias("zip"))
  .orderBy(desc("population"))).show()

+----------------+----------+---+
|            city|population|zip|
+----------------+----------+---+
|     LOS ANGELES|   2102295| 56|
|       SAN DIEGO|   1049298| 34|
|        SAN JOSE|    816653| 29|
|   SAN FRANCISCO|    723993| 26|
|      SACRAMENTO|    628279| 28|
|          FRESNO|    347905| 12|
|         OAKLAND|    314487| 12|
|      LONG BEACH|    299651|  8|
|         ANAHEIM|    272327|  7|
|     BAKERSFIELD|    271347|  8|
|        STOCKTON|    267258| 11|
|       RIVERSIDE|    253478|  7|
|       SANTA ANA|    234472|  4|
|         MODESTO|    216459|  5|
|HUNTINGTON BEACH|    183542|  4|
|  SAN BERNARDINO|    177552|  7|
|         FREMONT|    173374|  4|
|        GLENDALE|    163666|  8|
|      SANTA ROSA|    158398|  6|
|        TORRANCE|    158183|  6|
+----------------+----------+---+
only showing top 20 rows



# Parquet - Bluemix Object Storage

## Save as parquet file

In [96]:
zipDS.write.parquet("swift://Databricks." + name + "/zipDS.parquet")

Name: org.apache.spark.sql.AnalysisException
Message: path swift://Databricks.keystone/zipDS.parquet already exists.;
StackTrace: org.apache.spark.sql.AnalysisException: path swift://Databricks.keystone/zipDS.parquet already exists.;
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:88)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
  at org.apache.spark.rdd.RDDOperationScope$.withSc

## Read parquet file from Object Storage

In [97]:
val zipDSparquet = spark.read.parquet("swift://Databricks." + name + "/zipDS.parquet")

In [98]:
zipDSparquet.show()

+-----+---------------+--------------------+-----+-----+
|  zip|           city|                 loc|  pop|state|
+-----+---------------+--------------------+-----+-----+
|01001|         AGAWAM|[-72.622739, 42.0...|15338|   MA|
|01002|        CUSHMAN|[-72.51565, 42.37...|36963|   MA|
|01005|          BARRE|[-72.108354, 42.4...| 4546|   MA|
|01007|    BELCHERTOWN|[-72.410953, 42.2...|10579|   MA|
|01008|      BLANDFORD|[-72.936114, 42.1...| 1240|   MA|
|01010|      BRIMFIELD|[-72.188455, 42.1...| 3706|   MA|
|01011|        CHESTER|[-72.988761, 42.2...| 1688|   MA|
|01012|   CHESTERFIELD|[-72.833309, 42.3...|  177|   MA|
|01013|       CHICOPEE|[-72.607962, 42.1...|23396|   MA|
|01020|       CHICOPEE|[-72.576142, 42.1...|31495|   MA|
|01022|   WESTOVER AFB|[-72.558657, 42.1...| 1764|   MA|
|01026|     CUMMINGTON|[-72.905767, 42.4...| 1484|   MA|
|01027|      MOUNT TOM|[-72.679921, 42.2...|16864|   MA|
|01028|EAST LONGMEADOW|[-72.505565, 42.0...|13367|   MA|
|01030|  FEEDING HILLS|[-72.675

In [99]:
zipDSparquet.registerTempTable("zipDSparquet")

In [100]:
spark.sql("select * from zipDSparquet").show()

+-----+---------------+--------------------+-----+-----+
|  zip|           city|                 loc|  pop|state|
+-----+---------------+--------------------+-----+-----+
|01001|         AGAWAM|[-72.622739, 42.0...|15338|   MA|
|01002|        CUSHMAN|[-72.51565, 42.37...|36963|   MA|
|01005|          BARRE|[-72.108354, 42.4...| 4546|   MA|
|01007|    BELCHERTOWN|[-72.410953, 42.2...|10579|   MA|
|01008|      BLANDFORD|[-72.936114, 42.1...| 1240|   MA|
|01010|      BRIMFIELD|[-72.188455, 42.1...| 3706|   MA|
|01011|        CHESTER|[-72.988761, 42.2...| 1688|   MA|
|01012|   CHESTERFIELD|[-72.833309, 42.3...|  177|   MA|
|01013|       CHICOPEE|[-72.607962, 42.1...|23396|   MA|
|01020|       CHICOPEE|[-72.576142, 42.1...|31495|   MA|
|01022|   WESTOVER AFB|[-72.558657, 42.1...| 1764|   MA|
|01026|     CUMMINGTON|[-72.905767, 42.4...| 1484|   MA|
|01027|      MOUNT TOM|[-72.679921, 42.2...|16864|   MA|
|01028|EAST LONGMEADOW|[-72.505565, 42.0...|13367|   MA|
|01030|  FEEDING HILLS|[-72.675

# Amazon S3

### https://github.com/charles2588/bluemixsparknotebooks/blob/master/Python/read_write_S3.ipynb

## Amazon S3 - write parquet

In [101]:
zipDS.write.parquet("s3a://incoming5824/zipDS.parquet")

In [102]:
// Even this command works, but u can run this or the above
zipDS.write.save("s3a://incoming5824/zipDS.parquet")

Name: org.apache.spark.sql.AnalysisException
Message: path s3a://incoming5824/zipDS.parquet already exists.;
StackTrace:   at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:88)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)


In [19]:
// Writes Parquet not CSV, ignore
//zipDS.write.save("s3a://incoming5824/zipDS.csv")

## Amazon S3 - read parquet

In [103]:
val zipDSparquet = spark.read.parquet("s3a://incoming5824/zipDS.parquet")
zipDSparquet.show()

+-----+---------------+--------------------+-----+-----+
|  zip|           city|                 loc|  pop|state|
+-----+---------------+--------------------+-----+-----+
|01001|         AGAWAM|[-72.622739, 42.0...|15338|   MA|
|01002|        CUSHMAN|[-72.51565, 42.37...|36963|   MA|
|01005|          BARRE|[-72.108354, 42.4...| 4546|   MA|
|01007|    BELCHERTOWN|[-72.410953, 42.2...|10579|   MA|
|01008|      BLANDFORD|[-72.936114, 42.1...| 1240|   MA|
|01010|      BRIMFIELD|[-72.188455, 42.1...| 3706|   MA|
|01011|        CHESTER|[-72.988761, 42.2...| 1688|   MA|
|01012|   CHESTERFIELD|[-72.833309, 42.3...|  177|   MA|
|01013|       CHICOPEE|[-72.607962, 42.1...|23396|   MA|
|01020|       CHICOPEE|[-72.576142, 42.1...|31495|   MA|
|01022|   WESTOVER AFB|[-72.558657, 42.1...| 1764|   MA|
|01026|     CUMMINGTON|[-72.905767, 42.4...| 1484|   MA|
|01027|      MOUNT TOM|[-72.679921, 42.2...|16864|   MA|
|01028|EAST LONGMEADOW|[-72.505565, 42.0...|13367|   MA|
|01030|  FEEDING HILLS|[-72.675

In [109]:
// // Even this command works, but u can run this or the above
val zipDSparquet = (spark.read
.format("org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat")
.option("header", "true")
.load("s3a://incoming5824/zipDS.parquet"))
zipDSparquet.show()

+-----+---------------+--------------------+-----+-----+
|  zip|           city|                 loc|  pop|state|
+-----+---------------+--------------------+-----+-----+
|01001|         AGAWAM|[-72.622739, 42.0...|15338|   MA|
|01002|        CUSHMAN|[-72.51565, 42.37...|36963|   MA|
|01005|          BARRE|[-72.108354, 42.4...| 4546|   MA|
|01007|    BELCHERTOWN|[-72.410953, 42.2...|10579|   MA|
|01008|      BLANDFORD|[-72.936114, 42.1...| 1240|   MA|
|01010|      BRIMFIELD|[-72.188455, 42.1...| 3706|   MA|
|01011|        CHESTER|[-72.988761, 42.2...| 1688|   MA|
|01012|   CHESTERFIELD|[-72.833309, 42.3...|  177|   MA|
|01013|       CHICOPEE|[-72.607962, 42.1...|23396|   MA|
|01020|       CHICOPEE|[-72.576142, 42.1...|31495|   MA|
|01022|   WESTOVER AFB|[-72.558657, 42.1...| 1764|   MA|
|01026|     CUMMINGTON|[-72.905767, 42.4...| 1484|   MA|
|01027|      MOUNT TOM|[-72.679921, 42.2...|16864|   MA|
|01028|EAST LONGMEADOW|[-72.505565, 42.0...|13367|   MA|
|01030|  FEEDING HILLS|[-72.675

## Amazon S3 - "Query" up-to-the-minute data from Parquet Table

In [6]:
val parquetOutputPath1 = "s3a://AKIAJ2SOSS5HJQBP3FMQ:Ga94v+bRU1nMX73aBenuGMSrR5T9wq1KhH4WHKvj@incoming5824/zipDS.parquet/" 
val sqlDF = spark.sql(s"SELECT * FROM parquet.`$parquetOutputPath1`")
sqlDF.show(false)

+-----+---------------+-----------------------+-----+-----+
|zip  |city           |loc                    |pop  |state|
+-----+---------------+-----------------------+-----+-----+
|01001|AGAWAM         |[-72.622739, 42.070206]|15338|MA   |
|01002|CUSHMAN        |[-72.51565, 42.377017] |36963|MA   |
|01005|BARRE          |[-72.108354, 42.409698]|4546 |MA   |
|01007|BELCHERTOWN    |[-72.410953, 42.275103]|10579|MA   |
|01008|BLANDFORD      |[-72.936114, 42.182949]|1240 |MA   |
|01010|BRIMFIELD      |[-72.188455, 42.116543]|3706 |MA   |
|01011|CHESTER        |[-72.988761, 42.279421]|1688 |MA   |
|01012|CHESTERFIELD   |[-72.833309, 42.38167] |177  |MA   |
|01013|CHICOPEE       |[-72.607962, 42.162046]|23396|MA   |
|01020|CHICOPEE       |[-72.576142, 42.176443]|31495|MA   |
|01022|WESTOVER AFB   |[-72.558657, 42.196672]|1764 |MA   |
|01026|CUMMINGTON     |[-72.905767, 42.435296]|1484 |MA   |
|01027|MOUNT TOM      |[-72.679921, 42.264319]|16864|MA   |
|01028|EAST LONGMEADOW|[-72.505565, 42.0

In [11]:
println(sqlDF.getClass)

class org.apache.spark.sql.Dataset


In [10]:
// This does not work , from Databricks scala notebook with streaming  data
val parquetOutputPath1 = "s3a://AKIAJ2SOSS5HJQBP3FMQ:Ga94v+bRU1nMX73aBenuGMSrR5T9wq1KhH4WHKvj@cloudtrail5824/" 
val sqlDF = spark.sql(s"SELECT * FROM parquet.`$parquetOutputPath1`")
//sqlDF.show(false)

## Reading from Bluemix Object Storage a csv file and writing the csv file to s3 and then reading it back in DSX

### Read from bluemix object store

In [111]:
val dfData3 = spark.
    read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").
    option("header", "true").
    option("inferSchema", "true").
    load("swift://Databricks." + name + "/diamonds.csv")
dfData3.show(5)

1
+-----+-------+-----+-------+-----+-----+-----+----+----+----+
|carat|    cut|color|clarity|depth|table|price|   x|   y|   z|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+
| 0.23|  Ideal|    E|    SI2| 61.5| 55.0|  326|3.95|3.98|2.43|
| 0.21|Premium|    E|    SI1| 59.8| 61.0|  326|3.89|3.84|2.31|
| 0.23|   Good|    E|    VS1| 56.9| 65.0|  327|4.05|4.07|2.31|
| 0.29|Premium|    I|    VS2| 62.4| 58.0|  334| 4.2|4.23|2.63|
| 0.31|   Good|    J|    SI2| 63.3| 58.0|  335|4.34|4.35|2.75|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+
only showing top 5 rows



### Write to s3 Object store

In [112]:
dfData3.write.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").save("s3a://incoming5824/diamonds_out.csv")

### Read from s3 object store

In [113]:
val dfData2 = spark.
    read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").
    option("header", "true").
    option("inferSchema", "true").
    load("swift://Databricks." + name + "/diamonds_out.csv")
dfData2.show(5)

+----+---------+---+----+----+----+---+----+----+----+
|0.23|    Ideal|  E| SI2|61.5|55.0|326|3.95|3.98|2.43|
+----+---------+---+----+----+----+---+----+----+----+
|0.21|  Premium|  E| SI1|59.8|61.0|326|3.89|3.84|2.31|
|0.23|     Good|  E| VS1|56.9|65.0|327|4.05|4.07|2.31|
|0.29|  Premium|  I| VS2|62.4|58.0|334| 4.2|4.23|2.63|
|0.31|     Good|  J| SI2|63.3|58.0|335|4.34|4.35|2.75|
|0.24|Very Good|  J|VVS2|62.8|57.0|336|3.94|3.96|2.48|
+----+---------+---+----+----+----+---+----+----+----+
only showing top 5 rows



## Reading from Amazon S3 Bucket, a file that was written by DataConnect

In [17]:
val dfData1 = spark.
    read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").
    option("header", "true").
    option("inferSchema", "true").
    load("s3a://incoming5824/diamonds_dataconnect")
dfData1.show(5)

+-----+-------+-----+-------+-----+-----+-----+----+----+----+
|carat|    cut|color|clarity|depth|table|price|   x|   y|   z|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+
| 0.23|  Ideal|    E|    SI2| 61.5| 55.0|  326|3.95|3.98|2.43|
| 0.21|Premium|    E|    SI1| 59.8| 61.0|  326|3.89|3.84|2.31|
| 0.23|   Good|    E|    VS1| 56.9| 65.0|  327|4.05|4.07|2.31|
| 0.29|Premium|    I|    VS2| 62.4| 58.0|  334| 4.2|4.23|2.63|
| 0.31|   Good|    J|    SI2| 63.3| 58.0|  335|4.34|4.35|2.75|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+
only showing top 5 rows



In [18]:
val primitiveDS = Seq(1, 2, 3).toDS()

In [19]:
primitiveDS.getClass

class org.apache.spark.sql.Dataset

In [20]:
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

Array(2, 3, 4)