![Spark Image](https://upload.wikimedia.org/wikipedia/commons/thumb/f/f3/Apache_Spark_logo.svg/1200px-Apache_Spark_logo.svg.png)

# Structured APIs

## RDDs vs DataFrames and Datasets

![](https://databricks.com/wp-content/uploads/2018/05/rdd-1024x595.png)

### Resilient Distributed Dataset (RDD)
RDD was the primary user-facing API in Spark since its inception. At the core, an RDD is an immutable distributed collection of elements of your data, partitioned across nodes in your cluster that can be operated in parallel with a low-level API that offers transformations and actions.

### When to use RDDs?
Consider these scenarios or common use cases for using RDDs when:
- you want low-level transformation and actions and control on your dataset;
- your data is unstructured, such as media streams or streams of text;
- you want to manipulate your data with functional programming constructs than domain specific expressions;
- you don’t care about imposing a schema, such as columnar format, while processing or accessing data attributes by name or column; and
- you can forgo some optimization and performance benefits available with DataFrames and Datasets for structured and semi-structured data.

### What happens to RDDs in Apache Spark 2.0?
You may ask: Are RDDs being relegated as second class citizens? Are they being deprecated?

**The answer is a resounding NO!**

What’s more, as you will note below, you can seamlessly move between DataFrame or Dataset and RDDs at will—by simple API method calls—and DataFrames and Datasets are built on top of RDDs.

### DataFrames

Like an RDD, a DataFrame is an immutable distributed collection of data. Unlike an RDD, data is organized into named columns, like a table in a relational database. Designed to make large data sets processing even easier, DataFrame allows developers to impose a structure onto a distributed collection of data, allowing higher-level abstraction; it provides a domain specific language API to manipulate your distributed data; and makes Spark accessible to a wider audience, beyond specialized data engineers.

In Spark 2.0, DataFrame APIs will merge with Datasets APIs, unifying data processing capabilities across libraries. Because of this unification, developers now have fewer concepts to learn or remember, and work with a single high-level and type-safe API called `Dataset`.

![Spark](https://databricks.com/wp-content/uploads/2016/06/Unified-Apache-Spark-2.0-API-1.png)

### Datasets
Starting in Spark 2.0, Dataset takes on two distinct APIs characteristics: a strongly-typed API and an untyped API, as shown in the table below. Conceptually, consider DataFrame as an alias for a collection of generic objects Dataset[Row], where a Row is a generic untyped JVM object. Dataset, by contrast, is a collection of strongly-typed JVM objects, dictated by a case class you define in Scala or a class in Java.

### Typed and Un-typed APIs

<table class="table">
<thead>
<tr>
<th>Language</th>
<th>Main Abstraction</th>
</tr>
</thead>
<tbody>
<tr>
<td>Scala</td>
<td>Dataset[T] &amp; DataFrame (alias for Dataset[Row])</td>
</tr>
<tr>
<td>Java</td>
<td>Dataset[T]</td>
</tr>
<tr>
<td>Python*</td>
<td>DataFrame</td>
</tr>
<tr>
<td>R*</td>
<td>DataFrame</td>
</tr>
</tbody>
</table>

> **Note:** *Since Python and R have no compile-time type-safety, we only have untyped APIs, namely DataFrames.*

### Benefits of Dataset APIs

### 1. Static-typing and runtime type-safety

Consider static-typing and runtime safety as a spectrum, with SQL least restrictive to Dataset most restrictive. For instance, in your Spark SQL string queries, you won’t know a syntax error until runtime (which could be costly), whereas in DataFrames and Datasets you can catch errors at compile time (which saves developer-time and costs). That is, if you invoke a function in DataFrame that is not part of the API, the compiler will catch it. However, it won’t detect a non-existing column name until runtime.

At the far end of the spectrum is Dataset, most restrictive. Since Dataset APIs are all expressed as lambda functions and JVM typed objects, any mismatch of typed-parameters will be detected at compile time. Also, your analysis error can be detected at compile time too, when using Datasets, hence saving developer-time and costs.

All this translates to is a spectrum of type-safety along syntax and analysis error in your Spark code, with Datasets as most restrictive yet productive for a developer.

![](https://databricks.com/wp-content/uploads/2016/07/sql-vs-dataframes-vs-datasets-type-safety-spectrum.png)

### 2. High-level abstraction and custom view into structured and semi-structured data
DataFrames as a collection of Datasets[Row] render a structured custom view into your semi-structured data.

### 3. Ease-of-use of APIs with structure

Although structure may limit control in what your Spark program can do with data, it introduces rich semantics and an easy set of domain specific operations that can be expressed as high-level constructs. Most computations, however, can be accomplished with Dataset’s high-level APIs. For example, it’s much simpler to perform `agg`, `select`, `sum`, `avg`, `map`, `filter`, or `groupBy` operations. 

### 4. Performance and Optimization
Along with all the above benefits, you cannot overlook the space efficiency and performance gains in using DataFrames and Dataset APIs for two reasons.

First, because DataFrame and Dataset APIs are built on top of the Spark SQL engine, it uses Catalyst to generate an optimized logical and physical query plan. Across R, Java, Scala, or Python DataFrame/Dataset APIs, all relation type queries undergo the same code optimizer, providing the space and speed efficiency. Whereas the Dataset[T] typed API is optimized for data engineering tasks, the untyped Dataset[Row] (an alias of DataFrame) is even faster and suitable for interactive analysis.

![](https://databricks.com/wp-content/uploads/2016/07/memory-usage-when-caching-datasets-vs-rdds.png)

Second, since Spark as a compiler understands your Dataset type JVM object, it maps your type-specific JVM object to Tungsten’s internal memory representation using Encoders. As a result, Tungsten Encoders can efficiently serialize/deserialize JVM objects as well as generate compact bytecode that can execute at superior speeds.

### When should I use DataFrames or Datasets?
- If you want rich semantics, high-level abstractions, and domain specific APIs, use DataFrame or Dataset.
- If your processing demands high-level expressions, filters, maps, aggregation, averages, sum, SQL queries, columnar access and use of lambda functions on semi-structured data, use DataFrame or Dataset.
- If you want higher degree of type-safety at compile time, want typed JVM objects, take advantage of Catalyst optimization, and benefit from Tungsten’s efficient code generation, use Dataset.
- If you want unification and simplification of APIs across Spark Libraries, use DataFrame or Dataset.
- If you are a R user, use DataFrames.
- If you are a Python user, use DataFrames and resort back to RDDs if you need more control.

## Starting a Spark Session

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
# May take a little while on a local computer
spark = SparkSession.builder.appName("Structured API").getOrCreate()

In [3]:
!wget https://raw.githubusercontent.com/databricks/Spark-The-Definitive-Guide/master/data/flight-data/json/2015-summary.json -P data/

--2020-07-03 19:43:09--  https://raw.githubusercontent.com/databricks/Spark-The-Definitive-Guide/master/data/flight-data/json/2015-summary.json
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.152.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.152.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 21368 (21K) [text/plain]
Saving to: ‘data/2015-summary.json.1’


2020-07-03 19:43:11 (110 KB/s) - ‘data/2015-summary.json.1’ saved [21368/21368]



In [None]:
df = spark.read.format("json").load("data/2015-summary.json")

In [None]:
df.printSchema()

In [None]:
spark.read.format("json").load("data/2015-summary.json").schema

*A schema is a StructType made up of a number of fields, StructFields, that have a name, type, a Boolean flag which specifies whether that column can contain missing or null values, and, finally, users can optionally specify associated metadata with that column. The metadata is a way of storing information about this column.*

*If the types in the data (at runtime) do not match the schema, Spark will throw an error. The example that follows shows how to create and enforce a specific schema on a DataFrame.*

In [None]:
from pyspark.sql.types import StructField, StructType, StringType, LongType

In [None]:
myManualSchema = StructType([StructField("DEST_COUNTRY_NAME", StringType(), True),
                             StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
                             StructField("count", LongType(), False, metadata={"hello":"world"}) 
                            ])
df = spark.read.format("json").schema(myManualSchema).load("data/2015-summary.json")

In [None]:
df.printSchema()

**Spark Types**

<table class="table">
<tbody><tr>
  <th style="width:20%">Data type</th>
  <th style="width:40%">Value type in Python</th>
  <th>API to access or create a data type</th></tr>
<tr>
  <td> <b>ByteType</b> </td>
  <td>
  int or long <br>
  <b>Note:</b> Numbers will be converted to 1-byte signed integer numbers at runtime.
  Please make sure that numbers are within the range of -128 to 127.
  </td>
  <td>
  ByteType()
  </td>
</tr>
<tr>
  <td> <b>ShortType</b> </td>
  <td>
  int or long <br>
  <b>Note:</b> Numbers will be converted to 2-byte signed integer numbers at runtime.
  Please make sure that numbers are within the range of -32768 to 32767.
  </td>
  <td>
  ShortType()
  </td>
</tr>
<tr>
  <td> <b>IntegerType</b> </td>
  <td> int or long </td>
  <td>
  IntegerType()
  </td>
</tr>
<tr>
  <td> <b>LongType</b> </td>
  <td>
  long <br>
  <b>Note:</b> Numbers will be converted to 8-byte signed integer numbers at runtime.
  Please make sure that numbers are within the range of
  -9223372036854775808 to 9223372036854775807.
  Otherwise, please convert data to decimal.Decimal and use DecimalType.
  </td>
  <td>
  LongType()
  </td>
</tr>
<tr>
  <td> <b>FloatType</b> </td>
  <td>
  float <br>
  <b>Note:</b> Numbers will be converted to 4-byte single-precision floating
  point numbers at runtime.
  </td>
  <td>
  FloatType()
  </td>
</tr>
<tr>
  <td> <b>DoubleType</b> </td>
  <td> float </td>
  <td>
  DoubleType()
  </td>
</tr>
<tr>
  <td> <b>DecimalType</b> </td>
  <td> decimal.Decimal </td>
  <td>
  DecimalType()
  </td>
</tr>
<tr>
  <td> <b>StringType</b> </td>
  <td> string </td>
  <td>
  StringType()
  </td>
</tr>
<tr>
  <td> <b>BinaryType</b> </td>
  <td> bytearray </td>
  <td>
  BinaryType()
  </td>
</tr>
<tr>
  <td> <b>BooleanType</b> </td>
  <td> bool </td>
  <td>
  BooleanType()
  </td>
</tr>
<tr>
  <td> <b>TimestampType</b> </td>
  <td> datetime.datetime </td>
  <td>
  TimestampType()
  </td>
</tr>
<tr>
  <td> <b>DateType</b> </td>
  <td> datetime.date </td>
  <td>
  DateType()
  </td>
</tr>
<tr>
  <td> <b>ArrayType</b> </td>
  <td> list, tuple, or array </td>
  <td>
  ArrayType(<i>elementType</i>, [<i>containsNull</i>])<br>
  <b>Note:</b> The default value of <i>containsNull</i> is <i>True</i>.
  </td>
</tr>
<tr>
  <td> <b>MapType</b> </td>
  <td> dict </td>
  <td>
  MapType(<i>keyType</i>, <i>valueType</i>, [<i>valueContainsNull</i>])<br>
  <b>Note:</b> The default value of <i>valueContainsNull</i> is <i>True</i>.
  </td>
</tr>
<tr>
  <td> <b>StructType</b> </td>
  <td> list or tuple </td>
  <td>
  StructType(<i>fields</i>)<br>
  <b>Note:</b> <i>fields</i> is a Seq of StructFields. Also, two fields with the same
  name are not allowed.
  </td>
</tr>
<tr>
  <td> <b>StructField</b> </td>
  <td> The value type in Python of the data type of this field
  (For example, Int for a StructField with the data type IntegerType) </td>
  <td>
  StructField(<i>name</i>, <i>dataType</i>, [<i>nullable</i>])<br>
  <b>Note:</b> The default value of <i>nullable</i> is <i>True</i>.
  </td>
</tr>
</tbody></table>

In [None]:
df.show()

In [None]:
df.first()

*This is a row object in spark DataFrame*

In [None]:
df.first().asDict()['DEST_COUNTRY_NAME']

In [None]:
df.take(3)

### Columns and Expressions

There are a lot of different ways to construct and refer to columns but the two simplest ways are
by using the `col` or `column` functions.

In [None]:
from pyspark.sql.functions import col, column

In [None]:
df.select(col("DEST_COUNTRY_NAME")).show(2)

In [None]:
df.select(column("DEST_COUNTRY_NAME")).show(2)

### Expressions

In [None]:
from pyspark.sql.functions import expr
df.select(expr("DEST_COUNTRY_NAME as Destination")).show()

## select and selectExpr


In [None]:
df.select("DEST_COUNTRY_NAME").show(2)

In [None]:
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)


In [None]:
df.select(expr("DEST_COUNTRY_NAME"),
                col("DEST_COUNTRY_NAME"),
                column("DEST_COUNTRY_NAME")).show(2)

In [None]:
df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)

In [None]:
df.select(expr("DEST_COUNTRY_NAME as destination").alias("DEST_COUNTRY_NAME")).show(2)

In [None]:
df.selectExpr("DEST_COUNTRY_NAME as newColumnName", "DEST_COUNTRY_NAME").show(2)

In [None]:
df.selectExpr(
"*", # all original columns
"(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry").show(2)

In [None]:
df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2)

## Adding Columns

In [None]:
# Adding a column with literal 1
from pyspark.sql.functions import lit
df.withColumn("numberOne", lit(1)).show(2)

In [None]:
df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME")).show(2)

In [None]:
# Renaming Columns
df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns

In [None]:
df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME")).show(2)

In [None]:
# Removing Columns
df.drop("ORIGIN_COUNTRY_NAME").columns

In [None]:
# Changing a Column’s Type (cast)
df.withColumn("count2", col("count").cast("long")).schema

## Filtering Rows


In [None]:
df.filter(col("count") < 2).show(2)

In [None]:
df.where("count < 2").show(2)

In [None]:
df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia").show(2)

In [None]:
# Getting Unique Rows
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()

In [None]:
df.select("ORIGIN_COUNTRY_NAME").distinct().count()

## Random Samples


In [None]:
seed = 5
withReplacement = False
fraction = 0.5
df.sample(withReplacement, fraction, seed).count()

## Concatenating and Appending Rows (Union)


In [None]:
from pyspark.sql import Row
schema = df.schema
newRows = [Row("New Country", "Other Country", 5),Row("New Country 2", "Other Country 3", 1)]
parallelizedRows = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(parallelizedRows, schema)

In [None]:
df.union(newDF).where("count = 1").where(col("ORIGIN_COUNTRY_NAME") != "United States").show()

## Sorting Rows

In [None]:
df.sort("count").show(5)

In [None]:
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)

In [None]:
df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)

*By default it sorts in ascending order but if you want to explicitly define the order use desc and asc*

In [None]:
from pyspark.sql.functions import desc, asc
df.orderBy(expr("count desc")).show(2)

In [None]:
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)

## Limit

In [None]:
df.orderBy(expr("count desc")).limit(6).show()

## Repartition and Coalesce

In [None]:
df.rdd.getNumPartitions()

In [None]:
df.repartition(5)

In [None]:
df.repartition(5).rdd.getNumPartitions()

In [None]:
# If you know that you’re going to be filtering by a certain column often, 
# it can be worth repartitioning based on that column
df.repartition(col("DEST_COUNTRY_NAME"))

In [None]:
df.repartition(5, col("DEST_COUNTRY_NAME"))

*Coalesce, on the other hand, will not incur a full shuffle and will try to combine partitions.*

In [None]:
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)