##What is bigdata?

Big data primarily refers to data sets that are too large or complex to be dealt with by traditional data-processing application software. Data with many entries offer greater statistical power, while data with higher complexity may lead to a higher false discovery rate.

## Why spark?

when you think of a “computer” you think about one machine sitting on your desk at home or at work. This
machine works perfectly well for watching movies or working with spreadsheet software. However, as many users
likely experience at some point, there are some things that your computer is not powerful enough to perform. One
particularly challenging area is data processing. Single machines do not have enough power and resources to perform
computations on huge amounts of information (or the user may not have time to wait for the computation to finish).
A cluster, or group of machines, pools the resources of many machines together allowing us to use all the cumulative
resources as if they were one. Now a group of machines alone is not powerful, you need a framework to coordinate
work across them. Spark is a tool for just that, managing and coordinating the execution of tasks on data across a
cluster of computers.

## What is spark?

Spark is a tool for managing and coordinating the execution of tasks on data across a cluster of computers.
The cluster of machines that Spark will leverage to execute tasks will be managed by a cluster manager like Spark’s
Standalone cluster manager, YARN, or Mesos. We then submit Spark Applications to these cluster managers which will
grant resources to our application so that we can complete our work.
Spark, in addition to its cluster mode, also has a local mode

## Internals of spark?

Spark core
Spark consists of one of the bigger components called Spark core which contains the majority of the libraries. On top of this Spark core, there are four different components. They are
Spark SQL
Spark Streaming
MLLIB
GraphX

Cluster Managers: 
Spark also consists of three pluggable cluster managers.

Standalone: 
It is a simple cluster manager which is easier to set up and execute the tasks on the cluster. It is a reliable cluster manager which can handle failures successfully. It can manage the resources based on the application requirements.

Apache Mesos: 
It is a general cluster manager from the Apache group that can also run Hadoop MapReduce along with Spark and other service applications. It consists of API for most of the programming languages.
 
Hadoop YARN: 
It is a resource manager which was provided in Hadoop 2. It stands for Yet Another Resource Negotiator. It is also a general-purpose cluster manager and can work in both Hadoop and Spark.

## Highlevel API of spark?

### SparkSQL and DataFrames.

SparkSQL is the module in Spark for processing structured data also using DataFrames.

DataFrame is a structured data collection formed of rows which is distributed across worker nodes (executer) of Spark. Fundamentally DataFrames are like tables in a relational database with their own schemas and headers.

### Sparksession:

we control our Spark Application through a driver process. This driver
process manifests itself to the user as something called the SparkSession.
SparkSession will be the entrance point to running Spark code.
The SparkSession instance is the way Spark exeutes user-defined manipulations across the cluster. 
In Scala and Python the variable is available as spark when you start up the console.
When using Spark from a Python or R, the user never writes explicit JVM instructions, but instead writes Python and R code that Spark will translate into code that Spark can then run on the executor JVMs.

### Dataframe:

In Spark, DataFrames are the distributed collections of data, organized into rows and columns. Each column in a DataFrame has a name and an associated type. DataFrames are similar to traditional database tables, which are structured and concise.

Let’s now perform the simple task of creating a range of numbers. This range of numbers is just like a named column
in a spreadsheet.

In [0]:
myRange = spark.range(1000).toDF('number')

### Partitions:
    
    In order to allow every executor to perform work in parallel, Spark breaks up the data into chunks, called partitions. A
partition is a collection of rows that sit on one physical machine in our cluster. A DataFrame’s partitions represent how
the data is physically distributed across your cluster of machines during execution. If you have one partition, Spark
will only have a parallelism of one even if you have thousands of executors. If you have many partitions, but only one
executor Spark will still only have a parallelism of one because there is only one computation resource.

### Transformation:
    
    In Spark, the core data structures are immutable meaning they cannot be changed once created. This might seem like
a strange concept at first, if you cannot change it, how are you supposed to use it? In order to “change” a DataFrame
you will have to instruct Spark how you would like to modify the DataFrame you have into the one that you want.
These instructions are called transformations.

Let’s perform a simple transformation to find all even numbers in our currentDataFrame.

In [0]:
divisBy2 = myRange.where('number % 2 = 0')
divisBy2.show()

+------+
|number|
+------+
|     0|
|     2|
|     4|
|     6|
|     8|
|    10|
|    12|
|    14|
|    16|
|    18|
|    20|
|    22|
|    24|
|    26|
|    28|
|    30|
|    32|
|    34|
|    36|
|    38|
+------+
only showing top 20 rows



### Actions:
    
    Transformations allow us to build up our logical transformation plan. To trigger the computation, we run an action. An
action instructs Spark to compute a result from a series of transformations. The simplest action is count which gives
us the total number of records in the DataFrame.
The simplest action is count which gives us the total number of records in the DataFrame.

In [0]:
divisBy2.count()

Out[3]: 500

### Lazy evaluation
Lazy evaulation means that Spark will wait until the very last moment to execute your transformations. In Spark,
instead of modifying the data quickly, we build up a plan of transformations that we would like to apply to our source
data. Spark, by waiting until the last minute to execute the code, will compile this plan from your raw, DataFrame
transformations, to an efficient physical plan that will run as efficiently as possible across the cluster.

## Dataframes and SQL:
    
    DataFrames and SQL, in Spark, are the exact same
thing. You can express your business logic in either language and Spark will compile that logic down to an underlying
plan (that we see in the explain plan) before actually executing your code.

Any DataFrame can be made into a table or view with one simple method call.

In [0]:
storage_account_name = "dbrickstraining"
storage_account_key = "bFeEi4U9QHCmoVqQHIllW1q4bdbG5r+86uswAEqrffH9pnidh7yIM6irGLPDLEbZtxQ7ys6JXNAA+ASt41QDtg=="

container = "mleazuretrainingcontainer"
mount_pointt = f"/mnt/{container}"

In [0]:
spark.conf.set("fs.azure.account.key.dbrickstraining.blob.core.windows.net",storage_account_key)

In [0]:
dbutils.fs.ls(f"wasbs://{container}@{storage_account_name}.blob.core.windows.net/")

Out[6]: [FileInfo(path='wasbs://mleazuretrainingcontainer@dbrickstraining.blob.core.windows.net/2015-summary.csv', name='2015-summary.csv', size=7337, modificationTime=1677839315000),
 FileInfo(path='wasbs://mleazuretrainingcontainer@dbrickstraining.blob.core.windows.net/2015-summary.json', name='2015-summary.json', size=21624, modificationTime=1678336030000)]

In [0]:
%fs ls wasbs://mleazuretrainingcontainer@dbrickstraining.blob.core.windows.net/

path,name,size,modificationTime
wasbs://mleazuretrainingcontainer@dbrickstraining.blob.core.windows.net/2015-summary.csv,2015-summary.csv,7337,1677839315000
wasbs://mleazuretrainingcontainer@dbrickstraining.blob.core.windows.net/2015-summary.json,2015-summary.json,21624,1678336030000


In [0]:
flightData2015 = spark\
.read\
.option('inferSchema', 'true')\
.option('header', 'true')\
.csv('wasbs://mleazuretrainingcontainer@dbrickstraining.blob.core.windows.net/2015-summary.csv')


In [0]:
flightData2015.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

In [0]:
flightData2015.createOrReplaceTempView('flight_data_2015')


Now we can query our data in SQL. To execute a SQL query, we’ll use the spark.sql function

In [0]:
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")

dataFrameWay = flightData2015\
.groupBy('DEST_COUNTRY_NAME')\
.count()
sqlWay.explain()
dataFrameWay.explain()



== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#116], functions=[finalmerge_count(merge count#151L) AS count(1)#139L])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#116, 200), ENSURE_REQUIREMENTS, [plan_id=144]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#116], functions=[partial_count(1) AS count#151L])
         +- FileScan csv [DEST_COUNTRY_NAME#116] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[wasbs://mleazuretrainingcontainer@dbrickstraining.blob.core.windows.ne..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#116], functions=[finalmerge_count(merge count#153L) AS count(1)#146L])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#116, 200), ENSURE_REQUIREMENTS, [plan_id=165]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#116], functions=[partial_count(1)

## Datasets:

We can use Datasets only when we need or want to.
For instance, I’ll define my own object and manipulate it via arbitrary map and filter functions. Once we’ve
performed our manipulations, Spark can automatically turn it back into a DataFrame and we can manipulate it
further using the hundreds of functions that Spark includes.
This makes it easy to drop down to lower level, type secure coding when necessary, and move higher up to SQL for more rapid analysis.

## Basic Structured Operation

In [0]:
Spark is effectively a programming language of its own. Internally, Spark uses an engine called Catalyst that maintains
its own type information through the planning and processing of work. This may seem like overkill, but it doing so,
this opens up a wide variety of execution optimizations that make significant differences. Spark types map directly
to the different language APIs that Spark maintains and there exists a lookup table for each of these in each of Scala,
Java, Python, SQL, and R. Even if we use Spark’s Structured APIs from Python or R, the majority of our manipulations
will operate strictly on Spark types, not Python types. For example, the below code does not perform addition in Scala
or Python, it actually performs addition purely in Spark.

[0;36m  File [0;32m"<command-1410129964961973>"[0;36m, line [0;32m1[0m
[0;31m    Spark is effectively a programming language of its own. Internally, Spark uses an engine called Catalyst that maintains[0m
[0m                         ^[0m
[0;31mSyntaxError[0m[0;31m:[0m invalid syntax


In [0]:
df = spark.range(500).toDF('number')
df.select(df['number'] + 10)
df.show()

+------+
|number|
+------+
|     0|
|     1|
|     2|
|     3|
|     4|
|     5|
|     6|
|     7|
|     8|
|     9|
|    10|
|    11|
|    12|
|    13|
|    14|
|    15|
|    16|
|    17|
|    18|
|    19|
+------+
only showing top 20 rows



### 1. Schemas:
A schema defines the column names and types of a DataFrame. Users can define schemas manually or users can
read a schema from a data source (often called schema on read).

In [0]:
%fs ls wasbs://mleazuretrainingcontainer@dbrickstraining.blob.core.windows.net/


path,name,size,modificationTime
wasbs://mleazuretrainingcontainer@dbrickstraining.blob.core.windows.net/2015-summary.csv,2015-summary.csv,7337,1677839315000
wasbs://mleazuretrainingcontainer@dbrickstraining.blob.core.windows.net/2015-summary.json,2015-summary.json,21624,1678336030000


In [0]:
spark.read.format('json')\
.load('wasbs://mleazuretrainingcontainer@dbrickstraining.blob.core.windows.net/2015-summary.json')\
.schema

Out[14]: StructType([StructField('DEST_COUNTRY_NAME', StringType(), True), StructField('ORIGIN_COUNTRY_NAME', StringType(), True), StructField('count', LongType(), True)])

A schema is a StructType made up of a number of fields, StructFields, that have a name, type, and a boolean
flag which specifies whether or not that column can contain missing or null values. Schemas can also contain other
StructType (Spark’s complex types). We will see this in the next chapter when we discuss working with complex
types.
Here’s how to create, and enforce a specific schema on a DataFrame. If the types in the data (at runtime), do not
match the schema. Spark will throw an error

In [0]:
from pyspark.sql.types import StructField, StructType, StringType, LongType
myManualSchema = StructType([
StructField('DEST_COUNTRY_NAME', StringType(), True),
StructField('ORIGIN_COUNTRY_NAME', StringType(), True),
StructField('count', LongType(), False)
])
df = spark.read.format('json')\
.schema(myManualSchema)\
.load('wasbs://mleazuretrainingcontainer@dbrickstraining.blob.core.windows.net/2015-summary.json')

### 2. Columns and Expressions
To users, columns in Spark are similar to columns in a spreadsheet, R dataframe, pandas DataFrame. We can select,
manipulate, and remove columns from DataFrames and these operations are represented as expressions.
To Spark, columns are logical constructions that simply represent a value computed on a per-record basis by means
of an expression. This means, in order to have a real value for a column, we need to have a row, and in order to
have a row we need to have a DataFrame. This means that we cannot manipulate an actual column outside of a
DataFrame, we can only manipulate a logical column’s expressions then perform that expression within the context of
a DataFrame.

### Columns:
There are a lot of different ways to construct and or refer to columns but the two simplest ways are with the col or
column functions. To use either of these functions, we pass in a column name.

In [0]:
from pyspark.sql.functions import col, column
col("someColumnName")
column("someColumnName")

Out[16]: Column<'someColumnName'>

### Expression:
Columns provide a subset of expression functionality. If you use col() and wish to perform transformations on
that column, you must perform those on that column reference. When using an expression, the expr function can
actually parse transformations and column references from a string and can subsequently be passed into further
transformations. Let’s look at some examples.
expr(“someCol - 5”) is the same transformation as performing col(“someCol”) - 5 or even
expr(“someCol”) - 5. That’s because Spark compiles these to a logical tree specifying the order of operations.

In [0]:
(((col('someCol') + 5) * 200) - 6) < col('otherCol') 

Out[17]: Column<'((((someCol + 5) * 200) - 6) < otherCol)'>

In [0]:
from pyspark.sql.functions import expr
expr('(((someCol + 5) * 200) - 6) < otherCol')

Out[18]: Column<'((((someCol + 5) * 200) - 6) < otherCol)'>

### 3. Records and Rows
In Spark, a record or row makes up a “row” in a DataFrame. A logical record or row is an object of type Row. Row
objects are the objects that column expressions operate on to produce some usable value. Row objects represent
physical byte arrays. The byte array interface is never shown to users because we only use column expressions to
manipulate them.

In [0]:
df.first()

Out[19]: Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)

### 4. Create Rows
You can create rows by manually instantiating a Row object with the values that below in each column. It’s important
to note that only DataFrames have schema. Rows themselves do not have schemas. This means if you create a Row manually, you must specify the values in the same order as the schema of the DataFrame they may be appended to. 

A capitalized “Row” will refer to the Row object. We can see a row by calling first on our DataFrame.

In [0]:
from pyspark.sql import Row
myRow = Row('Hello', None, 1, False)
myRow[0]

Out[20]: 'Hello'

In [0]:
myRow[2]

Out[21]: 1

### 5. Dataframe Transformation
Now that we briefly defined the core parts of a DataFrame, we will move onto manipulating DataFrames. When
working with individual DataFrames there are some fundamental objectives. These break down into several core
operations.

- We can add rows or columns
- We can remove rows or columns
- We can transform a row into a column (or vice versa)
- We can change the order of rows based on the values in columns

Luckily we can translate all of these into simple transformations, the most common being those that take one column,
change it row by row, and then return our results.

In [0]:
### 6. Creating Dataframes


In [0]:
# We can create DataFrames from raw data sources.

df = spark.read.format('json')\
.load('wasbs://mleazuretrainingcontainer@dbrickstraining.blob.core.windows.net/2015-summary.json')
df.createOrReplaceTempView('dfTable')

In [0]:
# We can also create DataFrames on the fly by taking a set of rows and converting them to a DataFrame.

from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType,\
 StringType, LongType
myManualSchema = StructType([
StructField('some', StringType(), True),
StructField('col', StringType(), True),
StructField('names', LongType(), False)
])
myRow = Row('Hello', None, 1)
myDf = spark.createDataFrame([myRow], myManualSchema)
myDf.show()

+-----+----+-----+
| some| col|names|
+-----+----+-----+
|Hello|null|    1|
+-----+----+-----+



### 7. select and selectExpr
Select and SelectExpr allow us to do the DataFrame equivalent of SQL queries on a table of data.

In [0]:
df.select(
'DEST_COUNTRY_NAME',
'ORIGIN_COUNTRY_NAME' )\
.show(2)

+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+
only showing top 2 rows



In [0]:
from pyspark.sql.functions import expr, col, column
df.select(
expr('DEST_COUNTRY_NAME'),
col('DEST_COUNTRY_NAME'),
column('DEST_COUNTRY_NAME'))\
.show(2)

+-----------------+-----------------+-----------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-----------------+-----------------+-----------------+
|    United States|    United States|    United States|
|    United States|    United States|    United States|
+-----------------+-----------------+-----------------+
only showing top 2 rows



We can treat selectExpr as a simple way to build up complex expressions
that create new DataFrames. In fact, we can add any valid non-aggregating SQL statement and as long as the columns
resolve — it will be valid! Here’s a simple example that adds a new column withinCountry to our DataFrame that
specifies whether or not the destination and origin are the same.

In [0]:
df.selectExpr(
'*', # all original columns
'(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry')\
.show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



In [0]:
df.selectExpr('avg(count)', 'count(distinct(DEST_COUNTRY_NAME))').show(2)

+-----------+---------------------------------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+-----------+---------------------------------+
|1770.765625|                              132|
+-----------+---------------------------------+



### 8. Adding columns
There’s also a more formal way of adding a new column to a DataFrame using the withColumn method on our
DataFrame. For example, let’s add a column that just adds the number one as a column.

In [0]:
df.withColumn(
'withinCountry',
expr('ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME'))\
.show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



### 9. Renaming columns
we can rename a column, it’s often much easier (and readable) to use the withColumnRenamed method. This will rename the column with the name of the string in the first argument, to the string in the second argument

In [0]:
df.withColumnRenamed('DEST_COUNTRY_NAME', 'dest').columns

Out[30]: ['dest', 'ORIGIN_COUNTRY_NAME', 'count']

### 10. Changing column's type
Sometimes we may need to convert from one type to another, for example if we have a set of StringType that
should be integers. We can convert columns from one type to another by casting the column from one type to another.
For instance let’s convert our count column from an integer to a Long type.

In [0]:
df.printSchema()
df.withColumn('count', col('count').cast('int')).printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)



### 11. Filtering rows
To filter rows we create an expression that evaluates to true or false. We then filter out the rows that have expression
that is equal to false. The most common way to do this with DataFrames is to create either an expression as a String
or build an expression with a set of column manipulations. There are two methods to perform this operation, we can
use where or filter and they both will perform the same operation and accept the same argument types when
used with DataFrames.

In [0]:
colCondition = df.filter(col('count') < 2).take(2)
conditional = df.where('count < 2').take(2)
conditional

Out[34]: [Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1)]

In [0]:
df.where(col('count') < 2)\
.where(col('ORIGIN_COUNTRY_NAME') != 'Croatia')\
.show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|          Singapore|    1|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



### 12. Getting unique rows
A very common use case is to get the unique or distinct values in a DataFrame. These values can be in one or more
columns. The way we do this is with the distinct method on a DataFrame that will allow us to deduplicate
any rows that are in that DataFrame. For instance let’s get the unique origins in our dataset

In [0]:
df.select('ORIGIN_COUNTRY_NAME', 'DEST_COUNTRY_NAME').count()

Out[36]: 256

In [0]:
df.select('ORIGIN_COUNTRY_NAME').distinct().count()


Out[37]: 125

### 13. Sorting
When we sort the values in a DataFrame, we always want to sort with either the largest or smallest values at the top of
a DataFrame. There are two equivalent operations to do this sort and orderBy that work the exact same way. They
accept both column expressions and strings as well as multiple columns. The default is to sort in ascending order.

In [0]:
df.sort('count').show(5)
df.orderBy('count', 'DEST_COUNTRY_NAME').show(5)
df.orderBy(col('count'), col('DEST_COUNTRY_NAME')).show(5)

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
+--------------------+-------------------+-----+
only showing top 5 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|     Burkina Faso|      United States|    1|
|    Cote d'Ivoire|      United States|    1|
|           Cyprus|      United States|    1|
|         Djibouti|      United States|    1|
|        Indonesia|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--

In [0]:
from pyspark.sql.functions import desc, asc
df.orderBy(expr('count desc')).show(2)
df.orderBy(desc(col('count')), asc(col('DEST_COUNTRY_NAME'))).show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|          Moldova|      United States|    1|
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+
only showing top 2 rows

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
+-----------------+-------------------+------+
only showing top 2 rows



### 14. Limit
Often times you may just want the top ten of some DataFrame. For example, you might want to only work with the top
50 of some dataset. We do this with the limit method

In [0]:
df.limit(5).show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+



In [0]:
df.orderBy(expr('count desc')).limit(6).show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
|             Moldova|      United States|    1|
+--------------------+-------------------+-----+



### 15. Repartition and Coalesce
Another important optimization opportunity is to partition the data according to some frequently filtered columns
which controls the physical layout of data across the cluster including the partitioning scheme and the number of
partitions

In [0]:
df.rdd.getNumPartitions()

Out[42]: 1

In [0]:
df.repartition(5)

Out[43]: DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

## Aggregation Functions
Aggregating is the act of collecting something together and is a cornerstone of big data analytics.
In an aggregation, you will specify a key or grouping and an aggregation function that specifies
how you should transform one or more columns. This function must produce one result for each
group, given multiple input values. Spark’s aggregation capabilities are sophisticated and mature,
with a variety of different use cases and possibilities.

In [0]:
%fs ls wasbs://mleazuretrainingcontainer@dbrickstraining.blob.core.windows.net/

path,name,size,modificationTime
wasbs://mleazuretrainingcontainer@dbrickstraining.blob.core.windows.net/2015-summary.csv,2015-summary.csv,7337,1677839315000
wasbs://mleazuretrainingcontainer@dbrickstraining.blob.core.windows.net/2015-summary.json,2015-summary.json,21624,1678336030000
wasbs://mleazuretrainingcontainer@dbrickstraining.blob.core.windows.net/all/,all/,0,0


In [0]:
df = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("wasbs://mleazuretrainingcontainer@dbrickstraining.blob.core.windows.net/all/*.csv")\
.coalesce(5)
df.cache()
df.createOrReplaceTempView("dfTable")

In [0]:
df.show()

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/2010 8:26|     4.

### 1. count
we can do one of two things: specify a specific
column to count, or all the columns by using count(*) or count(1) to represent that we want to
count every row as the literal one, as shown in this example:

In [0]:
from pyspark.sql.functions import count
df.select(count("StockCode")).show()

+----------------+
|count(StockCode)|
+----------------+
|          541909|
+----------------+



### 2. countDistinct
Sometimes, the total number is not relevant; rather, it’s the number of unique groups that you
want. To get this number, you can use the countDistinct function. This is a bit more relevant
for individual columns:

In [0]:
from pyspark.sql.functions import countDistinct
df.select(countDistinct("StockCode")).show() 

+-------------------------+
|count(DISTINCT StockCode)|
+-------------------------+
|                     4070|
+-------------------------+



### 3. first and last
You can get the first and last values from a DataFrame by using these two obviously named
functions. This will be based on the rows in the DataFrame, not on the values in the DataFrame:

In [0]:
from pyspark.sql.functions import first, last
df.select(first("StockCode"), last("StockCode")).show()

+----------------+---------------+
|first(StockCode)|last(StockCode)|
+----------------+---------------+
|           23182|          23510|
+----------------+---------------+



### 4. min and max
To extract the minimum and maximum values from a DataFrame, use the min and max functions:

In [0]:
from pyspark.sql.functions import min, max
df.select(min("Quantity"), max("Quantity")).show()

+-------------+-------------+
|min(Quantity)|max(Quantity)|
+-------------+-------------+
|       -80995|        80995|
+-------------+-------------+



### 5. sum
Another simple task is to add all the values in a row using the sum function:

In [0]:
from pyspark.sql.functions import sum
df.select(sum("Quantity")).show()

+-------------+
|sum(Quantity)|
+-------------+
|      5176450|
+-------------+



### 6. sum
 Another simple task is to add all the values in a row using the sum function:

In [0]:
from pyspark.sql.functions import sum
df.select(sum("Quantity")).show()

+-------------+
|sum(Quantity)|
+-------------+
|      5176450|
+-------------+



### 7. sumDistinct
In addition to summing a total, you also can sum a distinct set of values by using the
sumDistinct function:

In [0]:
from pyspark.sql.functions import sumDistinct
df.select(sumDistinct("Quantity")).show()

+----------------------+
|sum(DISTINCT Quantity)|
+----------------------+
|                 29310|
+----------------------+



### 8. avg
Although you can calculate average by dividing sum by count, Spark provides an easier way to
get that value via the avg or mean functions. In this example, we use alias in order to more
easily reuse these columns later:

In [0]:
from pyspark.sql.functions import sum, count, avg, expr
df.select(
count("Quantity").alias("total_transactions"),
sum("Quantity").alias("total_purchases"),
avg("Quantity").alias("avg_purchases"),
expr("mean(Quantity)").alias("mean_purchases"))\
.selectExpr(
"total_purchases/total_transactions",
"avg_purchases",
"mean_purchases").show()

+--------------------------------------+----------------+----------------+
|(total_purchases / total_transactions)|   avg_purchases|  mean_purchases|
+--------------------------------------+----------------+----------------+
|                      9.55224954743324|9.55224954743324|9.55224954743324|
+--------------------------------------+----------------+----------------+



### 9. grouping
 A more common task is to
perform calculations based on groups in the data. This is typically done on categorical data for
which we group our data on one column and perform some calculations on the other columns
that end up in that group.

In [0]:
df.groupBy("InvoiceNo", "CustomerId").count().show()

+---------+----------+-----+
|InvoiceNo|CustomerId|count|
+---------+----------+-----+
|   536846|     14573|   76|
|   537026|     12395|   12|
|   537883|     14437|    5|
|   538068|     17978|   12|
|   538279|     14952|    7|
|   538800|     16458|   10|
|   538942|     17346|   12|
|  C539947|     13854|    1|
|   540096|     13253|   16|
|   540530|     14755|   27|
|   541225|     14099|   19|
|   541978|     13551|    4|
|   542093|     17677|   16|
|   536596|      null|    6|
|   537252|      null|    1|
|   538041|      null|    1|
|   537159|     14527|   28|
|   537213|     12748|    6|
|   538191|     15061|   16|
|  C539301|     13496|    1|
+---------+----------+-----+
only showing top 20 rows



### 10. window functions
Spark supports three kinds of window functions: ranking functions, analytic functions,
and aggregate functions

## Joins
A join brings together two sets of data, the left and the right, by comparing the value of one or
more keys of the left and right and evaluating the result of a join expression that determines
whether Spark should bring together the left set of data with the right set of data.

### 1. Try out examples for each types of joins

In [0]:
person = spark.createDataFrame([
(0, "Bill Chambers", 0, [100]),
(1, "Matei Zaharia", 1, [500, 250, 100]),
(2, "Michael Armbrust", 1, [250, 100])])\
.toDF("id", "name", "graduate_program", "spark_status")
graduateProgram = spark.createDataFrame([
(0, "Masters", "School of Information", "UC Berkeley"),
(2, "Masters", "EECS", "UC Berkeley"),
(1, "Ph.D.", "EECS", "UC Berkeley")])\
.toDF("id", "degree", "department", "school")
sparkStatus = spark.createDataFrame([
(500, "Vice President"),
(250, "PMC Member"),
(100, "Contributor")])\
.toDF("id", "status")


In [0]:
person.createOrReplaceTempView("person")
graduateProgram.createOrReplaceTempView("graduateProgram")
sparkStatus.createOrReplaceTempView("sparkStatus")

#### Inner Joins
Inner joins evaluate the keys in both of the DataFrames or tables and include (and join together)
only the rows that evaluate to true. In the following example, we join the graduateProgram
DataFrame with the person DataFrame to create a new DataFrame:

In [0]:
joinExpression = person["graduate_program"] == graduateProgram['id']

#### Outer Joins
Outer joins evaluate the keys in both of the DataFrames or tables and includes (and joins
together) the rows that evaluate to true or false. If there is no equivalent row in either the left or
right DataFrame, Spark will insert null

In [0]:
joinType = "outer"
person.join(graduateProgram, joinExpression, joinType).show()

+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|   0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|   1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|   2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|null|            null|            null|           null|  2|Masters|                EECS|UC Berkeley|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+



#### Left Outer Joins
Left outer joins evaluate the keys in both of the DataFrames or tables and includes all rows from
the left DataFrame as well as any rows in the right DataFrame that have a match in the left
DataFrame. If there is no equivalent row in the right DataFrame, Spark will insert null:

In [0]:
joinType = "left_outer"
graduateProgram.join(person, joinExpression, joinType).show()


+---+-------+--------------------+-----------+----+----------------+----------------+---------------+
| id| degree|          department|     school|  id|            name|graduate_program|   spark_status|
+---+-------+--------------------+-----------+----+----------------+----------------+---------------+
|  0|Masters|School of Informa...|UC Berkeley|   0|   Bill Chambers|               0|          [100]|
|  2|Masters|                EECS|UC Berkeley|null|            null|            null|           null|
|  1|  Ph.D.|                EECS|UC Berkeley|   2|Michael Armbrust|               1|     [250, 100]|
|  1|  Ph.D.|                EECS|UC Berkeley|   1|   Matei Zaharia|               1|[500, 250, 100]|
+---+-------+--------------------+-----------+----+----------------+----------------+---------------+



#### Right Outer Joins
Right outer joins evaluate the keys in both of the DataFrames or tables and includes all rows
from the right DataFrame as well as any rows in the left DataFrame that have a match in the right
DataFrame. If there is no equivalent row in the left DataFrame, Spark will insert null:

In [0]:
joinType = "right_outer"
person.join(graduateProgram, joinExpression, joinType).show()

+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|   0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|null|            null|            null|           null|  2|Masters|                EECS|UC Berkeley|
|   2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|   1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+



#### Left Semi Joins
Semi joins are a bit of a departure from the other joins. They do not actually include any values
from the right DataFrame. They only compare values to see if the value exists in the second
DataFrame. If the value does exist, those rows will be kept in the result, even if there are
duplicate keys in the left DataFrame

In [0]:
joinType = "left_semi"
graduateProgram.join(person, joinExpression, joinType).show()

+---+-------+--------------------+-----------+
| id| degree|          department|     school|
+---+-------+--------------------+-----------+
|  0|Masters|School of Informa...|UC Berkeley|
|  1|  Ph.D.|                EECS|UC Berkeley|
+---+-------+--------------------+-----------+



#### Left Anti Joins

Left anti joins are the opposite of left semi joins. Like left semi joins, they do not actually
include any values from the right DataFrame. They only compare values to see if the value exists
in the second DataFrame.

In [0]:
joinType = "left_anti"
graduateProgram.join(person, joinExpression, joinType).show()

+---+-------+----------+-----------+
| id| degree|department|     school|
+---+-------+----------+-----------+
|  2|Masters|      EECS|UC Berkeley|
+---+-------+----------+-----------+



#### Natural Joins
Natural joins make implicit guesses at the columns on which you would like to join. It finds
matching columns and returns the results. Left, right, and outer natural joins are all supported.

#### Cross (Cartesian) Joins
The last of our joins are cross-joins or cartesian products. Cross-joins in simplest terms are inner
joins that do not specify a predicate. Cross joins will join every single row in the left DataFrame
to ever single row in the right DataFrame.

In [0]:
joinType = "cross"
graduateProgram.join(person, joinExpression, joinType).show()

+---+-------+--------------------+-----------+---+----------------+----------------+---------------+
| id| degree|          department|     school| id|            name|graduate_program|   spark_status|
+---+-------+--------------------+-----------+---+----------------+----------------+---------------+
|  0|Masters|School of Informa...|UC Berkeley|  0|   Bill Chambers|               0|          [100]|
|  1|  Ph.D.|                EECS|UC Berkeley|  2|Michael Armbrust|               1|     [250, 100]|
|  1|  Ph.D.|                EECS|UC Berkeley|  1|   Matei Zaharia|               1|[500, 250, 100]|
+---+-------+--------------------+-----------+---+----------------+----------------+---------------+



### 2. Handling Duplicate column names

One of the tricky things that come up in joins is dealing with duplicate column names in your
results DataFrame. In a DataFrame, each column has a unique ID within Spark’s SQL Engine,
Catalyst. This unique ID is purely internal and not something that you can directly reference.
This makes it quite difficult to refer to a specific column when you have a DataFrame with
duplicate column names.
This can occur in two distinct situations:
The join expression that you specify does not remove one key from one of the input
DataFrames and the keys have the same column name
Two columns on which you are not performing the join have the same name

##### Approach 1: Different join expression
When you have two keys that have the same name, probably the easiest fix is to change the join
expression from a Boolean expression to a string or sequence. This automatically removes one of
the columns for you during the join:
person.join(gradProgramDupe,"graduate_program").select("graduate_program").show()

##### Approach 2: Dropping the column after the join
Another approach is to drop the offending column after the join. When doing this, we need to
refer to the column via the original source DataFrame. We can do this if the join uses the same
key names or if the source DataFrames have columns that simply have the same name:
person.join(gradProgramDupe, joinExpr).drop(person.col("graduate_program"))
.select("graduate_program").show()
val joinExpr = person.col("graduate_program") === graduateProgram.col("id")
person.join(graduateProgram, joinExpr).drop(graduateProgram.col("id")).show()

##### Approach 3: Renaming a column before the join
We can avoid this issue altogether if we rename one of our columns before the join:
val gradProgram3 = graduateProgram.withColumnRenamed("id", "grad_id")
val joinExpr = person.col("graduate_program") === gradProgram3.col("grad_id")
person.join(gradProgram3, joinExpr).show()

### 3. How spark performs joins
To understand how Spark performs joins, you need to understand the two core resources at play:
the node-to-node communication strategy and per node computation strategy. These internals are
likely irrelevant to your business problem. However, comprehending how Spark performs joins
can mean the difference between a job that completes quickly and one that never completes at
all.

## Datasources
y. Spark has six
“core” data sources and hundreds of external data sources written by the community. The ability
to read and write from all different kinds of data sources and for the community to create its own
contributions is arguably one of Spark’s greatest strengths. Following are Spark’s core data
sources:
- CSV
- JSON
- Parquet
- ORC
- JDBC/ODBC connections
- Plain-text files

### 1. Basics of reading data

### 2. Basics of write data

The foundation for reading data in Spark is the DataFrameReader. We access this through the
SparkSession via the read attribute:
spark.read
After we have a DataFrame reader, we specify several values:
- The format
- The schema
- The read mode
- A series of options

### 3. CSV files - reading, writing
To read a CSV file, like any other format, we must first create a DataFrameReader for that
specific format. Here, we specify the format to be CSV:
spark.read.format("csv")

Just as with reading data, there are a variety of options (listed in Table 9-3) for writing data when
we write CSV files. This is a subset of the reading options because many do not apply when
writing data (like maxColumns and inferSchema).

### 4. REading and writing json files
 
  There are some catches when working with this kind of data
that are worth considering before we jump in. In Spark, when we refer to JSON files, we refer to
line-delimited JSON files. This contrasts with files that have a large JSON object or array per
file.
The line-delimited versus multiline trade-off is controlled by a single option: multiLine.

Writing JSON files is just as simple as reading them, and, as you might expect, the data source
does not matter. Therefore, we can reuse the CSV DataFrame that we created earlier to be the
source for our JSON file.

### 5. Parquet files - important

Parquet is an open source column-oriented data store that provides a variety of storage
optimizations, especially for analytics workloads. It provides columnar compression, which
saves storage space and allows for reading individual columns instead of entire files. It is a file
format that works exceptionally well with Apache Spark and is in fact the default file format.

### 6. Reading and Writing parquet files

Parquet has very few options because it enforces its own schema when storing data. Thus, all you
need to set is the format and you are good to go. We can set the schema if we have strict
requirements for what our DataFrame should look like
spark.read.format("parquet")
 
Writing Parquet is as easy as reading it. We simply specify the location for the file.

### 7. orc - optional

ORC is a self-describing, type-aware columnar file format designed for Hadoop workloads. It is
optimized for large streaming reads, but with integrated support for finding required rows
quickly. ORC actually has no options for reading in data because Spark understands the file
format quite well.

### 8. Splittable File Types and COmpression

Certain file formats are fundamentally “splittable.” This can improve speed because it makes it
possible for Spark to avoid reading an entire file, and access only the parts of the file necessary
to satisfy your query. Additionally if you’re using something like Hadoop Distributed File
System (HDFS), splitting a file can provide further optimization if that file spans multiple
blocks. In conjunction with this is a need to manage compression. Not all compression schemes
are splittable. How you store your data is of immense consequence when it comes to making
your Spark jobs run smoothly. We recommend Parquet with gzip compression.

### 9. Managing File size

Managing file sizes is an important factor not so much for writing data but reading it later on.
When you’re writing lots of small files, there’s a significant metadata overhead that you incur
managing all of those files. Spark especially does not do well with small files, although many file
systems (like HDFS) don’t handle lots of small files well, either. You might hear this referred to
as the “small file problem.” The opposite is also true: you don’t want files that are too large
either, because it becomes inefficient to have to read entire blocks of data when you need only a
few rows.

## Spark SQL 

Spark SQL is arguably one of the most important and powerful features in Spark.

### What Is SQL?
SQL or Structured Query Language is a domain-specific language for expressing relational
operations over data

### Big Data and SQL: Apache Hive
Before Spark’s rise, Hive was the de facto big data SQL access layer. Originally developed at
Facebook, Hive became an incredibly popular tool across industry for performing SQL
operations on big data.

### Big Data and SQL: Spark SQL
With the release of Spark 2.0, its authors created a superset of Hive’s support, writing a native
SQL parser that supports both ANSI-SQL as well as HiveQL queries.

### Spark’s Relationship to Hive
Spark SQL has a great relationship with Hive because it can connect to Hive metastores. The
Hive metastore is the way in which Hive maintains table information for use across sessions.

### Catalog
The highest level abstraction in Spark SQL is the Catalog. The Catalog is an abstraction for the
storage of metadata about the data stored in your tables as well as other helpful things like
databases, tables, functions, and views. The catalog is available in the
org.apache.spark.sql.catalog.Catalog package and contains a number of helpful functions
for doing things like listing tables, databases, and functions.

### Tables
To do anything useful with Spark SQL, you first need to define tables. Tables are logically
equivalent to a DataFrame in that they are a structure of data against which you run commands.
We can join tables, filter them, aggregate them, and perform different manipulations.

### Spark-Managed Tables
One important note is the concept of managed versus unmanaged tables. Tables store two
important pieces of information. The data within the tables as well as the data about the tables;
that is, the metadata.

### Creating Tables
You can create tables from a variety of sources. Something fairly unique to Spark is the
capability of reusing the entire Data Source API within SQL. This means that you do not need to
define a table and then load data into it; Spark lets you create one on the fly. You can even
specify all sorts of sophisticated options when you read in a file. For example,

CREATE TABLE flights (
DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count LONG)
USING JSON OPTIONS (path 'wasbs://mleazuretrainingcontainer@dbrickstraining.blob.core.windows.net/2015-summary.json')

### Creating External Tables

Hive was one of the first big data SQL
systems, and Spark SQL is completely compatible with Hive SQL (HiveQL) statements. One of
the use cases that you might encounter is to port your legacy Hive statements to Spark SQL.


You can view any files that have already been defined by running the following command:

CREATE EXTERNAL TABLE hive_flights (
DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count LONG)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/data/flight-data-hive/'

### Inserting into Tables
Insertions follow the standard SQL syntax:

INSERT INTO flights_from_select
SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count FROM flights LIMIT 20

### Describing Table Metadata
We saw earlier that you can add a comment when creating a table. You can view this by
describing the table metadata, which will show us the relevant comment:
    
DESCRIBE TABLE flights_csv

### Refreshing Table Metadata
Maintaining table metadata is an important task to ensure that you’re reading from the most
recent set of data. There are two commands to refresh table metadata. REFRESH TABLE refreshes
all cached entries (essentially, files) associated with the table. If the table were previously
cached, it would be cached lazily the next time it is scanned:
    
REFRESH table partitioned_flights

### Dropping Tables
You cannot delete tables: you can only “drop” them. You can drop a table by using the DROP
keyword. If you drop a managed table (e.g., flights_csv), both the data and the table definition
will be removed:
    
DROP TABLE flights_csv;

### Caching Tables
Just like DataFrames, you can cache and uncache tables. You simply specify which table you
would like using the following syntax:
    
CACHE TABLE flights

### Views
Now that you created a table, another thing that you can define is a view. A view specifies a set
of transformations on top of an existing table—basically just saved query plans, which can be
convenient for organizing or reusing your query logic.

### Creating Views
To an end user, views are displayed as tables, except rather than rewriting all of the data to a new
location, they simply perform a transformation on the source data at query time. This might be a
filter, select, or potentially an even larger GROUP BY or ROLLUP. For instance, in the
following example, we create a view in which the destination is United States in order to see
only those flights:
    
CREATE VIEW just_usa_view AS
SELECT * FROM flights WHERE dest_country_name = 'United States'

### Dropping Views
You can drop views in the same way that you drop tables; you simply specify that what you
intend to drop is a view instead of a table. The main difference between dropping a view and
dropping a table is that with a view, no underlying data is removed, only the view definition
itself:

DROP VIEW IF EXISTS just_usa_view;

### Databases
Databases are a tool for organizing tables. As mentioned earlier, if you do not define one, Spark
will use the default database.

### Creating Databases
Creating databases follows the same patterns you’ve seen previously in this chapter; however,
here you use the CREATE DATABASE keywords:
    
CREATE DATABASE some_db

Setting the Database
You might want to set a database to perform a certain query. To do this, use the USE keyword
followed by the database name:

USE some_db

### Dropping Databases
Dropping or removing databases is equally as easy: you simply use the DROP DATABASE
keyword:
    
DROP DATABASE IF EXISTS some_db;

### Complex Types
Complex types are a departure from standard SQL and are an incredibly powerful feature that
does not exist in standard SQL. Understanding how to manipulate them appropriately in SQL is
essential. There are three core complex types in Spark SQL: structs, lists, and maps.

#### Structs
Structs are more akin to maps. They provide a way of creating or querying nested data in Spark.
To create one, you simply need to wrap a set of columns (or expressions) in parentheses:

CREATE VIEW IF NOT EXISTS nested_data AS
SELECT (DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME) as country, count FROM flights

#### Lists
If you’re familiar with lists in programming languages, Spark SQL lists will feel familiar. There
are several ways to create an array or list of values. You can use the collect_list function,
which creates a list of values. You can also use the function collect_set, which creates an
array without duplicate values. These are both aggregation functions and therefore can be
specified only in aggregations:

SELECT DEST_COUNTRY_NAME as new_name, collect_list(count) as flight_counts,
collect_set(ORIGIN_COUNTRY_NAME) as origin_set
FROM flights GROUP BY DEST_COUNTRY_NAME

### Functions
In addition to complex types, Spark SQL provides a variety of sophisticated functions. You can
find most of these functions in the DataFrames function reference; however, it is worth
understanding how to find these functions in SQL, as well. To see a list of functions in Spark
SQL, you use the SHOW FUNCTIONS statement:
    
SHOW FUNCTIONS

### Subqueries
With subqueries, you can specify queries within other queries. This makes it possible for you to
specify some sophisticated logic within your SQL. In Spark, there are two fundamental
subqueries. Correlated subqueries use some information from the outer scope of the query in
order to supplement information in the subquery. Uncorrelated subqueries include no
information from the outer scope. Each of these queries can return one (scalar subquery) or more
values. Spark also includes support for predicate subqueries, which allow for filtering based on
values.