### grp

# Spark: The Definitive Guide

## PART 2: Structured APIs - DataFrames, SQL, and Datasets

## dataPaths

In [1]:
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext(master='spark://164.52.193.152:7077', appName='test')


In [4]:
from pyspark.sql import SparkSession
spark = SparkSession(sc)

In [5]:
flightDataJson2015 = 'data/flight-data/json/2015-summary.json'
flightDataJson = 'data/flight-data/json/*-summary.json'
retailData20101201 = 'data/retail-data/by-day/2010-12-01.csv'
retailDataAll = 'data/retail-data/all/*.csv'
flightDataCSV2010 = 'data/flight-data/csv/2010-summary.csv'
flightDataJson2010 = 'data/flight-data/json/2010-summary.json'
flightDataParquet2010 = 'data/flight-data/parquet/2010-summary.parquet'
flightDataORC2010 = 'data/flight-data/orc/2010-summary.orc'
sqliteJDBC = 'data/flight-data/jdbc/my-sqlite.db'

In [6]:
spark.conf.set("spark.sql.shuffle.partitions", 5)

## _Chapter #4 - Structured API Overview_

-  Columns
-  Rows
-  Spark Types

### _How code is executed across the cluster_:
1. write DF/DS/SQL
2. if valid code, Spark converts this to a **Logical Plan**
3. Spark transforms this **Logical Plan** to a **Physical Plan**, checking for optimizations along the way
4. Spark executes this **Physical Plan** (RDD manipulations) across the cluster

_Catalyst Optimizer analyzes and decides how the code should be executed via a plan_

### _Plan Execution_:
1.  Convert user's code into an Unresolved Logical Plan
2.  Unresolved Logical Plan uses the Catalog to that check object (DF/DS/SQL) information is valid
3.  If valid, Catalyst Optimizer collects information that attempts to optimize the Logical Plan
4.  After Logical Plan is created, Spark plans out the Physical Plan
5.  Physical Plan maps process of how logic will be executed on the cluster

## _Chapter #5 - Basic Structured Operations_

### Definitions:
-  **Schema** (defines the column names and types of a DF):
    - schema on read is fine for ad hoc use cases however inferring the schema can be expensive and or incorrect
    - it is recommended to define schema for production ETL use cases   
    <br>
-  **Structure**:
    - StructType [schema made up of a number of fields]
    - StructField [contain name, type, boolean flag specifying whether a column can contain missing or null values]
    - Metadata [way of storing information about this column]   
    <br>
-  **Expressions**:
    - operations that select, manipulate, and remove columns from DFs
    - set of transformations on one or more values in a record in a DF
    - same performance is achieved via DF code vs SQL expression   
    <br>
-  **Columns**:
    - label that represents a value computed on a per record basis by means of an expression
    - columns are resolved when compared to column names maintained in the **catalog** of the **analyzer phase**   
    <br>
-  **Rows**:
    -  each row in a DF is a single record of type Row
    -  Row objects internally arrays of bytes   
    <br>
-  **Repartition**:
    -  repartition forces a full shuffle of the data
    -  typically only use repartition when the future # of partitions is > current # of partitions
    -  can be used when paritioning by a set of columns (ex: frequent filter on same column)   
    <br>
-  **Coalesce**:
    -  does not cause a shuffle instead tries to combine partitions

### DataFrame Transformations:
-  add rows or columns
-  remove rows or columns
-  transform a row into a column
-  transform a column into a row
-  change order of rows based on the values in columns

### _Chapter #5 Exercises (DataFrames)_

### _Schema Example_

In [None]:
df = spark.read.format("json").load(flightDataJson2015).schema

In [None]:
df

### _Create Schema Example_

In [None]:
from pyspark.sql.types import StructField, StructType, StringType, LongType

In [None]:
myManualSchema = StructType([\
                            StructField("DEST_COUNTRY_NAME", StringType(), True),\
                            StructField("ORIGIN_COUNTRY_NAME", StringType(), True),\
                            StructField("count", LongType(), False, metadata={"hello":"world"})\
                            ])
df = spark.read.format("json").schema(myManualSchema)\
.load(flightDataJson2015)

In [None]:
df.printSchema()

### _Columns Example_

In [8]:
from pyspark.sql.functions import col, column

In [9]:
print(col("someColumnName"))
print(column("someColumnName"))

Column<b'someColumnName'>
Column<b'someColumnName'>


In [10]:
spark.read.format("json").load(flightDataJson2015).columns

['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']

### _Expressions Example_

In [11]:
from pyspark.sql.functions import expr

In [12]:
# df code
(((col("someCol") + 5) * 200) - 6) < col("otherCol")

Column<b'((((someCol + 5) * 200) - 6) < otherCol)'>

In [13]:
# sql code
expr("(((someCol + 5) * 200) - 6) < otherCol")

Column<b'((((someCol + 5) * 200) - 6) < otherCol)'>

### _Row Example_

In [14]:
from pyspark.sql import Row

In [15]:
myRow = Row("Hello", None, 1, False)

In [16]:
# accessing row elements
print(myRow[0])
print(myRow[2])

Hello
1


### _Creating DF Example_

In [17]:
df = spark.read.format("json").load(flightDataJson2015)
df.createOrReplaceTempView("dfTable")

### _Convert Rows to DF Example_

In [18]:
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType

In [19]:
myManualSchema = StructType([\
                            StructField("some", StringType(), True),\
                            StructField("col", StringType(), True),\
                            StructField("names", LongType(), False)\
                            ])
myRow = Row("Hello", None, 1)
myDF = spark.createDataFrame([myRow], myManualSchema)
myDF.first()

Row(some='Hello', col=None, names=1)

### _select & expr Example_

In [20]:
df.select("DEST_COUNTRY_NAME").show(3)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
|    United States|
+-----------------+
only showing top 3 rows



In [21]:
from pyspark.sql.functions import expr, col, column

In [22]:
df.select(\
         expr("DEST_COUNTRY_NAME"),\
         col("DEST_COUNTRY_NAME"),\
         column("DEST_COUNTRY_NAME"))\
.show(3)

+-----------------+-----------------+-----------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-----------------+-----------------+-----------------+
|    United States|    United States|    United States|
|    United States|    United States|    United States|
|    United States|    United States|    United States|
+-----------------+-----------------+-----------------+
only showing top 3 rows



In [23]:
df.select(expr("DEST_COUNTRY_NAME AS destination")).show(3)

+-------------+
|  destination|
+-------------+
|United States|
|United States|
|United States|
+-------------+
only showing top 3 rows



In [24]:
df.select(expr("DEST_COUNTRY_NAME AS destination").alias("DEST_COUNTRY_NAME")).show(3)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
|    United States|
+-----------------+
only showing top 3 rows



### _selectExpr Example_

In [25]:
# shorthand to select(expr(...))
df.selectExpr("DEST_COUNTRY_NAME as newColumnName", "DEST_COUNTRY_NAME").show(3)

+-------------+-----------------+
|newColumnName|DEST_COUNTRY_NAME|
+-------------+-----------------+
|United States|    United States|
|United States|    United States|
|United States|    United States|
+-------------+-----------------+
only showing top 3 rows



In [26]:
df.selectExpr("*", "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry").show(3)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
|    United States|            Ireland|  344|        false|
+-----------------+-------------------+-----+-------------+
only showing top 3 rows



In [27]:
df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(3)

+-----------+---------------------------------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+-----------+---------------------------------+
|1770.765625|                              132|
+-----------+---------------------------------+



### _literal [lit] & withColumn Example_

In [28]:
from pyspark.sql.functions import lit

In [29]:
df.select(expr("*"), lit(1).alias("One")).show(3)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
|    United States|            Ireland|  344|  1|
+-----------------+-------------------+-----+---+
only showing top 3 rows



In [30]:
df.withColumn("numberOne", lit(1)).show(3)

+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
|    United States|            Ireland|  344|        1|
+-----------------+-------------------+-----+---------+
only showing top 3 rows



In [31]:
df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME")).show(3)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
|    United States|            Ireland|  344|        false|
+-----------------+-------------------+-----+-------------+
only showing top 3 rows



### _withColumnRenamed Example_

In [32]:
df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns

['dest', 'ORIGIN_COUNTRY_NAME', 'count']

### _Columns w/ Escape Characters Example_

In [33]:
dfWithLongColName = df.withColumn("This Long Column-Name", expr("ORIGIN_COUNTRY_NAME"))

In [34]:
# backticks (`) required when referencing a column with whitespaces/escape characters in an expression [expr]
dfWithLongColName.selectExpr("`This Long Column-Name`",\
                            "`This Long Column-Name` as `new col`")\
.show(3)

+---------------------+-------+
|This Long Column-Name|new col|
+---------------------+-------+
|              Romania|Romania|
|              Croatia|Croatia|
|              Ireland|Ireland|
+---------------------+-------+
only showing top 3 rows



### _Removing Columns [drop] Example_

In [35]:
df.drop("ORIGIN_COUNTRY_NAME").columns

['DEST_COUNTRY_NAME', 'count']

In [36]:
dfWithLongColName.drop("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME")

DataFrame[count: bigint, This Long Column-Name: string]

### _Changing Column Types [cast] Example_

In [37]:
df.withColumn("count2", col("count").cast("long"))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint, count2: bigint]

### _Filter Rows [filter/where] Example_

In [38]:
df.filter(col("count") < 2).show(3)
df.where("count < 2").show(3)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+
only showing top 3 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+
only showing top 3 rows



In [39]:
# use WHERE for multiple "AND" filters since Spark performs all FILTER operations at the same time
df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia").show(3)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|          Singapore|    1|
|          Moldova|      United States|    1|
|            Malta|      United States|    1|
+-----------------+-------------------+-----+
only showing top 3 rows



### _Unique Rows [distinct] Example_

In [40]:
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()

256

### _Random Samples [sample] Example_

In [41]:
seed = 5
withReplacement = False
fraction = 0.5
df.sample(withReplacement, fraction, seed).count()

126

### _Random Split [randomSplit] Example_

In [42]:
dataFrames = df.randomSplit([0.25, 0.75], seed)
print(dataFrames[0].count() > dataFrames[1].count())
print(dataFrames[0].count())
print(dataFrames[1].count())

False
60
196


### _Union Example_

In [43]:
from pyspark.sql import Row

In [44]:
schema = df.schema
newRows = [\
          Row("New Country", "Other Country", 0),\
          Row("New Country 2", "Other Country 3", 999999999)\
          ]
parallelizedRows = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(parallelizedRows, schema)

In [45]:
print(df.count())
print(newDF.count())

256
2


In [46]:
df.union(newDF).count()

258

In [47]:
df = df.union(newDF)

In [48]:
df.count()

258

### _Sorting Rows [sort & orderBy] Example_

In [49]:
from pyspark.sql.functions import desc, asc

In [50]:
# default sort is ascending order
df.sort("count").show(3)
df.orderBy("count", "DEST_COUNTRY_NAME").show(3)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|      New Country|      Other Country|    0|
|          Moldova|      United States|    1|
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+
only showing top 3 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|      New Country|      Other Country|    0|
|     Burkina Faso|      United States|    1|
|    Cote d'Ivoire|      United States|    1|
+-----------------+-------------------+-----+
only showing top 3 rows



In [51]:
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(5)

+-----------------+-------------------+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|    count|
+-----------------+-------------------+---------+
|    New Country 2|    Other Country 3|999999999|
|    United States|      United States|   370002|
|    United States|             Canada|     8483|
|           Canada|      United States|     8399|
|    United States|             Mexico|     7187|
+-----------------+-------------------+---------+
only showing top 5 rows



### _Partition Sorts [sortWithinPartitions] Example_

In [52]:
spark.read.format("json").load(flightDataJson).sortWithinPartitions("count")

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

### _Limit [limit] Example_

In [53]:
df.limit(3).show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
+-----------------+-------------------+-----+



In [54]:
df.orderBy(col("count").desc()).limit(3).show()

+-----------------+-------------------+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|    count|
+-----------------+-------------------+---------+
|    New Country 2|    Other Country 3|999999999|
|    United States|      United States|   370002|
|    United States|             Canada|     8483|
+-----------------+-------------------+---------+



### _Repartitioning [repartition & coalesce] Example_

In [55]:
df.rdd.getNumPartitions()

9

In [56]:
df.repartition(5)

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [57]:
df.repartition(col("DEST_COUNTRY_NAME"))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [58]:
df.repartition(5, col("DEST_COUNTRY_NAME"))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [59]:
# ex: shuffle data into 5 partitions based on column then coalesce partitions into 2
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

### _Collect Rows to Driver Example_

In [60]:
collectDF = df.limit(10)
for i in collectDF.take(5): print(i)
print("\n")
collectDF.show()
for i in collectDF.collect(): print(i)

Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1)
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)
Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15)
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62)


+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|    United States|          Singapore|    1|
|    United States|            Grenada|   62|
|       Costa Rica|      United States|  588|
|          Senegal|      United States|   40|
|          Moldova|      United States|    1|
+---

In [61]:
collectDF.toLocalIterator()

<itertools.chain at 0x1111cf630>

## _Chapter #6 - Working with Different Types of Data_

### Package Documentation:
-  http://spark.apache.org/docs/latest/api/python/#package
-  http://spark.apache.org/docs/latest/api/scala/#package   

### **Data**:   
-  **Booleans**:
   -  filters
   -  conditionals
   -  elements (and, or, true, false)
   -  always chain together "and" filters as sequential filter   
   <br>
-  **Numbers**:
   -  statFunctions package is useful for statistical inference and functions   
<br>
-  **Strings**:
    -  regular expressions   
    <br>
-  **Dates & Timestamps**:
    -  Spark follows Java SimpleDateFormat standard - https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html
    -  correct format recommended is 'yyyy-MM-dd'
    -  to_date function [convert string to a date]
    -  Spark will return unparsed "error" to_date conversions as **null** due to different date/timestamp formats      
    <br>
-  **Handling Nulls**:
    -  Spark recommends to use 'null' for missing/empty data instead of empty strings/other values for optimization purposes
    -  options (1. drop nulls; 2. fill nulls with a new value)
    -  if column(s) in schema are declared to not have nulls (nullable = false) it does not mean nulls are forbidden to be in that column ... this feature helps spark sql optimize handling for that column ... having null values in columns with (nullable = false) can lead to strange results and performance
    -  nulls can be ordered with the following functions (**asc_nulls_first, desc_nulls_first, asc_nulls_last_desc_nulls_last**)   
<br>
-  **Complex Types**:
    -  _structs_ ["DFs within DFs" w/ dot syntax]
    -  _arrays_ [list of elements]
    -  _maps_ [key-value pairs]
    - _JSON_ [semi-structured nested dictionary data]   
<br>
-  **UDFs**:
    -  ability to write own custom transformations
    -  Spark serializes the UDF on the driver and transfers it over the network to all executor processes hence there can be performance penalties (Python) because:
        1. Spark starts a Python process on the worker
        2. then Spark serializes all of the data to a format Python can understand
        3. then executes the UDF row by row on the data in the Python process
        4. then returns the results of the row operations back to the JVM/Spark
    -  Python data serialization is expensive bc Spark cannot manage the memory of the worker when the UDF is sent to the workers to be serialized
    -  ***Scala/Java UDFs do not have a separate process like Python hence the logic is computed in the JVM***
    -  it is a best practice to define return type of UDF since Spark and Python Data Types can be different (_see UDF example_)

### _Chapter #6 Exercises (DataFrames)_

In [62]:
df = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load(retailData20101201)
df.printSchema()
df.createOrReplaceTempView("dfTable")

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [63]:
df.limit(3).show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+



### _Convert to Spark Types Example_

In [64]:
from pyspark.sql.functions import lit

In [65]:
df.select(lit(5), lit("five"), lit(5.0))

DataFrame[5: int, five: string, 5.0: double]

### _Boolean Example_

In [66]:
from pyspark.sql.functions import col

In [67]:
df.where(col("InvoiceNo") != 536365)\
.select("InvoiceNo", "Description")\
.show(5, False)

+---------+-----------------------------+
|InvoiceNo|Description                  |
+---------+-----------------------------+
|536366   |HAND WARMER UNION JACK       |
|536366   |HAND WARMER RED POLKA DOT    |
|536367   |ASSORTED COLOUR BIRD ORNAMENT|
|536367   |POPPY'S PLAYHOUSE BEDROOM    |
|536367   |POPPY'S PLAYHOUSE KITCHEN    |
+---------+-----------------------------+
only showing top 5 rows



In [68]:
df.where("InvoiceNo = 536365").show(5, False)
df.where("InvoiceNo != 536365").show(5, False)

+---------+---------+-----------------------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate        |UnitPrice|CustomerID|Country       |
+---------+---------+-----------------------------------+--------+-------------------+---------+----------+--------------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |2010-12-01 08:26:00|2.55     |17850.0   |United Kingdom|
|536365   |71053    |WHITE METAL LANTERN                |6       |2010-12-01 08:26:00|3.39     |17850.0   |United Kingdom|
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |2010-12-01 08:26:00|2.75     |17850.0   |United Kingdom|
|536365   |84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |2010-12-01 08:26:00|3.39     |17850.0   |United Kingdom|
|536365   |84029E   |RED WOOLLY HOTTIE WHITE HEART.     |6       |2010-12-01 08:26:00|3.39     |17850.0   |United Kingdom|
+---------+-----

### _Boolean Substring [instr] Expression [isin] Filter Example_

In [69]:
from pyspark.sql.functions import instr

In [70]:
priceFilter = col("UnitPrice") > 600
descripFilter = instr(df.Description, "POSTAGE") >= 1
df.where(df.StockCode.isin("DOT")).where(priceFilter | descripFilter).show() # isin() means boolean expression / pipe means "OR"

+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|   Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|   536544|      DOT|DOTCOM POSTAGE|       1|2010-12-01 14:32:00|   569.77|      null|United Kingdom|
|   536592|      DOT|DOTCOM POSTAGE|       1|2010-12-01 17:06:00|   607.49|      null|United Kingdom|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+



In [71]:
DOTCodeFilter = col("StockCode") == 'DOT'
priceFilter = col("UnitPrice") > 600
descripFilter = instr(col("Description"), "POSTAGE") >= 1
df.withColumn("isExpensive", DOTCodeFilter & (priceFilter | descripFilter))\
.where("isExpensive")\
.select("Description", "unitPrice", "isExpensive").show(3)

+--------------+---------+-----------+
|   Description|unitPrice|isExpensive|
+--------------+---------+-----------+
|DOTCOM POSTAGE|   569.77|       true|
|DOTCOM POSTAGE|   607.49|       true|
+--------------+---------+-----------+



In [72]:
from pyspark.sql.functions import expr

In [73]:
df.withColumn("isExpensive", expr("NOT UnitPrice <= 250"))\
.where("isExpensive")\
.select("Description", "UnitPrice", "isExpensive").show(3)

+--------------+---------+-----------+
|   Description|UnitPrice|isExpensive|
+--------------+---------+-----------+
|DOTCOM POSTAGE|   569.77|       true|
|DOTCOM POSTAGE|   607.49|       true|
+--------------+---------+-----------+



### _Boolean Expression [eqNullSafe] for NULLs Example_

In [74]:
df.where(col("Description").eqNullSafe(None)).show(3)
df.where(col("Description").eqNullSafe("null")).show(3)
df.where(col("Description").eqNullSafe("NULL")).show(3)

+---------+---------+-----------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+-----------+--------+-------------------+---------+----------+--------------+
|   536414|    22139|       null|      56|2010-12-01 11:52:00|      0.0|      null|United Kingdom|
|   536545|    21134|       null|       1|2010-12-01 14:32:00|      0.0|      null|United Kingdom|
|   536546|    22145|       null|       1|2010-12-01 14:33:00|      0.0|      null|United Kingdom|
+---------+---------+-----------+--------+-------------------+---------+----------+--------------+
only showing top 3 rows

+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
+---------+---------+---------

In [75]:
df.select(df["Description"].eqNullSafe(None)).distinct().show()

+----------------------+
|(Description <=> NULL)|
+----------------------+
|                 false|
|                  true|
+----------------------+



### _Power Expression [pow] Example_

In [76]:
from pyspark.sql.functions import expr, pow

In [77]:
fabricatedQuantity = pow(col("Quantity") * col("UnitPrice"), 2) + 5
df.select(expr("CustomerId"), fabricatedQuantity.alias("realQuantity")).show(3)

+----------+------------------+
|CustomerId|      realQuantity|
+----------+------------------+
|   17850.0|239.08999999999997|
|   17850.0|          418.7156|
|   17850.0|             489.0|
+----------+------------------+
only showing top 3 rows



In [78]:
# via select expression
df.selectExpr("CustomerId", "(POWER((Quantity * UnitPrice), 2.0) + 5) as realQuantity").show(3)

+----------+------------------+
|CustomerId|      realQuantity|
+----------+------------------+
|   17850.0|239.08999999999997|
|   17850.0|          418.7156|
|   17850.0|             489.0|
+----------+------------------+
only showing top 3 rows



### _Round Up [round] & Round Down [bround] Example_

In [79]:
from pyspark.sql.functions import lit, round, bround

In [80]:
df.select(round(lit("2.5")), bround(lit("2.5"))).distinct().show()

+-------------+--------------+
|round(2.5, 0)|bround(2.5, 0)|
+-------------+--------------+
|          3.0|           2.0|
+-------------+--------------+



### _Column Correlation [corr] Example_

In [81]:
from pyspark.sql.functions import corr

In [82]:
df.stat.corr("Quantity", "UnitPrice")
df.select(corr("Quantity", "UnitPrice")).show()

+-------------------------+
|corr(Quantity, UnitPrice)|
+-------------------------+
|     -0.04112314436835551|
+-------------------------+



### _Summary Stats [describe] Example_

In [83]:
for i in df.describe().collect(): print(i)

Row(summary='count', InvoiceNo='3108', StockCode='3108', Description='3098', Quantity='3108', UnitPrice='3108', CustomerID='1968', Country='3108')
Row(summary='mean', InvoiceNo='536516.684944841', StockCode='27834.304044117645', Description=None, Quantity='8.627413127413128', UnitPrice='4.151946589446603', CustomerID='15661.388719512195', Country=None)
Row(summary='stddev', InvoiceNo='72.89447869788873', StockCode='17407.897548583845', Description=None, Quantity='26.371821677029203', UnitPrice='15.638659854603892', CustomerID='1854.4496996893627', Country=None)
Row(summary='min', InvoiceNo='536365', StockCode='10002', Description=' 4 PURPLE FLOCK DINNER CANDLES', Quantity='-24', UnitPrice='0.0', CustomerID='12431.0', Country='Australia')
Row(summary='max', InvoiceNo='C536548', StockCode='POST', Description='ZINC WILLIE WINKIE  CANDLE STICK', Quantity='600', UnitPrice='607.49', CustomerID='18229.0', Country='United Kingdom')


In [84]:
# manual import
from pyspark.sql.functions import count, mean, stddev_pop, min, max

### _Median Example_

In [85]:
colName = "UnitPrice"
quantileProbs = [0.5]
relError = 0.05
df.stat.approxQuantile("UnitPrice", quantileProbs, relError)

[2.51]

### _View Frequent Column Value Occurences [stat.freqItems] Example_

In [86]:
# be careful; output may be large
for i in df.stat.freqItems(["StockCode", "Quantity"]).collect(): print(i)

Row(StockCode_freqItems=['90214E', '20728', '20755', '21703', '22113', '22524', '22041', '72803A', '72798C', '90181B', '21756', '22694', '90206C', '20970', '21624', '90209C', '84744', '82494L', '22952', '20682', '22583', '21705', '20679', '22220', '90177E', '90214A', '22448', '90214S', '22121', '22802', '84970L', '72818', '90192', '90200C', '22910', '21380', '90211A', '21137', '35271S', '84926A', '20765', '22384', '21524', '22165', '22366', '21221', '21704', '22519', '85035C', '21967', '22114', '22909', '22900', '22447', '21577', '21877', '20726', '85034A', 'DOT', '84658', '21472', '22804', '22222', '72802C', '21739', '22467', '90214H', '22785', '22446', '22197', '20665', '21733', '22731', '21709', '22086', '40001', '85123A'], Quantity_freqItems=[200, 128, 23, 32, 50, 600, 8, 17, 80, -1, -10, 11, 56, 47, 20, -7, 2, 5, 480, -4, 14, 432, 100, 64, 40, 13, 4, -5, 22, 16, -2, 7, 70, 384, 25, 34, 10, 1, 288, 216, 28, 252, 19, 120, 192, 60, 96, 72, 144, 36, 27, 9, 18, 48, 21, 12, 3, -6, -24, 

### _Unique ID [monotonically...] Example_

In [87]:
from pyspark.sql.functions import monotonically_increasing_id

In [88]:
df.select(monotonically_increasing_id()).show(5)

+-----------------------------+
|monotonically_increasing_id()|
+-----------------------------+
|                            0|
|                            1|
|                            2|
|                            3|
|                            4|
+-----------------------------+
only showing top 5 rows



### _Capitalize per Whitespace [initcap] Example_

In [89]:
from pyspark.sql.functions import initcap

In [90]:
df.select("Description", initcap(col("Description"))).show(3, False)

+----------------------------------+----------------------------------+
|Description                       |initcap(Description)              |
+----------------------------------+----------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER|White Hanging Heart T-light Holder|
|WHITE METAL LANTERN               |White Metal Lantern               |
|CREAM CUPID HEARTS COAT HANGER    |Cream Cupid Hearts Coat Hanger    |
+----------------------------------+----------------------------------+
only showing top 3 rows



### _Uppercase & Lowercase [upper / lower] Example_

In [91]:
from pyspark.sql.functions import lower, upper

In [92]:
df.select(col("Description"),\
         lower(col("Description")),\
         upper(lower(col("Description")))).show(3, False)

+----------------------------------+----------------------------------+----------------------------------+
|Description                       |lower(Description)                |upper(lower(Description))         |
+----------------------------------+----------------------------------+----------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER|white hanging heart t-light holder|WHITE HANGING HEART T-LIGHT HOLDER|
|WHITE METAL LANTERN               |white metal lantern               |WHITE METAL LANTERN               |
|CREAM CUPID HEARTS COAT HANGER    |cream cupid hearts coat hanger    |CREAM CUPID HEARTS COAT HANGER    |
+----------------------------------+----------------------------------+----------------------------------+
only showing top 3 rows



### _Removing Spaces [lpad, ltrim, rpad, rtrim, trim] Example_

In [93]:
from pyspark.sql.functions import lit, ltrim, rtrim, rpad, lpad, trim

In [94]:
df.select(\
         ltrim(lit("   HELLO   ")).alias("ltrim"),\
         rtrim(lit("   HELLO   ")).alias("rtrim"),\
         trim(lit("   HELLO   ")).alias("trim"),\
         lpad(lit("HELLO"), 3, " ").alias("lp"),\
         rpad(lit("HELLO"), 10, "x").alias("rp")).show(3)

+--------+--------+-----+---+----------+
|   ltrim|   rtrim| trim| lp|        rp|
+--------+--------+-----+---+----------+
|HELLO   |   HELLO|HELLO|HEL|HELLOxxxxx|
|HELLO   |   HELLO|HELLO|HEL|HELLOxxxxx|
|HELLO   |   HELLO|HELLO|HEL|HELLOxxxxx|
+--------+--------+-----+---+----------+
only showing top 3 rows



### _RegEx Example_:
-  regexp_extract
-  regexp_replace
-  translate
-  instr
-  locate

In [95]:
from pyspark.sql.functions import regexp_replace, regexp_extract

In [96]:
regex_string = "BLACK|WHITE|RED|GREEN|BLUE" # replace any of these colors with the word "COLOR"
df.select(\
         regexp_replace(col("Description"), regex_string, "COLOR").alias("color_clean"),\
         col("Description")).show(5, False)

+-----------------------------------+-----------------------------------+
|color_clean                        |Description                        |
+-----------------------------------+-----------------------------------+
|COLOR HANGING HEART T-LIGHT HOLDER |WHITE HANGING HEART T-LIGHT HOLDER |
|COLOR METAL LANTERN                |WHITE METAL LANTERN                |
|CREAM CUPID HEARTS COAT HANGER     |CREAM CUPID HEARTS COAT HANGER     |
|KNITTED UNION FLAG HOT WATER BOTTLE|KNITTED UNION FLAG HOT WATER BOTTLE|
|COLOR WOOLLY HOTTIE COLOR HEART.   |RED WOOLLY HOTTIE WHITE HEART.     |
+-----------------------------------+-----------------------------------+
only showing top 5 rows



In [97]:
from pyspark.sql.functions import translate

In [98]:
df.select(translate(col("Description"), "LET", "137"), col("Description")).show(3, False) # replace characters w/ intergers

+----------------------------------+----------------------------------+
|translate(Description, LET, 137)  |Description                       |
+----------------------------------+----------------------------------+
|WHI73 HANGING H3AR7 7-1IGH7 HO1D3R|WHITE HANGING HEART T-LIGHT HOLDER|
|WHI73 M37A1 1AN73RN               |WHITE METAL LANTERN               |
|CR3AM CUPID H3AR7S COA7 HANG3R    |CREAM CUPID HEARTS COAT HANGER    |
+----------------------------------+----------------------------------+
only showing top 3 rows



In [99]:
extract_str = "(BLACK|WHITE|RED|GREEN|BLUE)"
df.select(\
         regexp_extract(col("Description"), extract_str, 1).alias("color_clean"),\
         col("Description")).show(5, False)

+-----------+-----------------------------------+
|color_clean|Description                        |
+-----------+-----------------------------------+
|WHITE      |WHITE HANGING HEART T-LIGHT HOLDER |
|WHITE      |WHITE METAL LANTERN                |
|           |CREAM CUPID HEARTS COAT HANGER     |
|           |KNITTED UNION FLAG HOT WATER BOTTLE|
|RED        |RED WOOLLY HOTTIE WHITE HEART.     |
+-----------+-----------------------------------+
only showing top 5 rows



In [100]:
containsBlack = instr(col("Description"), "BLACK") >= 1
containsWhite = instr(col("Description"), "WHITE") >= 1
df.withColumn("hasSimpleColor", containsBlack | containsWhite)\
.where("hasSimpleColor")\
.select("Description")\
.show(3, False)

+----------------------------------+
|Description                       |
+----------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER|
|WHITE METAL LANTERN               |
|RED WOOLLY HOTTIE WHITE HEART.    |
+----------------------------------+
only showing top 3 rows



In [101]:
from pyspark.sql.functions import expr, locate

In [102]:
simpleColors = ["black", "white", "red", "green", "blue"]

def color_locator(column, color_string):
  return locate(color_string.upper(), column)\
          .cast("boolean")\
          .alias("is_" + color_string)

selectedColumns = [color_locator(df.Description, c) for c in simpleColors]
selectedColumns.append(expr("*")) # has to a be Column type

df.select(*selectedColumns).where(expr("is_black or is_blue"))\
  .select("Description").show(3, False)

+---------------------------------+
|Description                      |
+---------------------------------+
|BLUE COAT RACK PARIS FASHION     |
|JUMBO  BAG BAROQUE BLACK WHITE   |
|BLUE 3 PIECE POLKADOT CUTLERY SET|
+---------------------------------+
only showing top 3 rows



In [103]:
selectedColumns

[Column<b'CAST(locate(BLACK, Description, 1) AS BOOLEAN) AS `is_black`'>,
 Column<b'CAST(locate(WHITE, Description, 1) AS BOOLEAN) AS `is_white`'>,
 Column<b'CAST(locate(RED, Description, 1) AS BOOLEAN) AS `is_red`'>,
 Column<b'CAST(locate(GREEN, Description, 1) AS BOOLEAN) AS `is_green`'>,
 Column<b'CAST(locate(BLUE, Description, 1) AS BOOLEAN) AS `is_blue`'>,
 Column<b'unresolvedstar()'>]

### _Dates & Timestamps Example_:
-  Dates [calendar dates]
-  Timestamps [date and time information]

In [104]:
from pyspark.sql.functions import current_date, current_timestamp

In [105]:
dateDF = spark.range(10)\
.withColumn("today", current_date())\
.withColumn("now", current_timestamp())
dateDF.createOrReplaceTempView("dateTable")
dateDF.printSchema()

root
 |-- id: long (nullable = false)
 |-- today: date (nullable = false)
 |-- now: timestamp (nullable = false)



In [106]:
dateDF.limit(1).show(truncate=False)

+---+----------+-----------------------+
|id |today     |now                    |
+---+----------+-----------------------+
|0  |2018-10-28|2018-10-28 22:05:08.165|
+---+----------+-----------------------+



In [107]:
from pyspark.sql.functions import date_add, date_sub

In [108]:
dateDF.select(col("today"), date_sub(col("today"), 5), date_add(col("today"), 5)).show(1)

+----------+------------------+------------------+
|     today|date_sub(today, 5)|date_add(today, 5)|
+----------+------------------+------------------+
|2018-10-28|        2018-10-23|        2018-11-02|
+----------+------------------+------------------+
only showing top 1 row



### _Difference Between 2 Dates [datediff] Example_

In [109]:
from pyspark.sql.functions import datediff, months_between, to_date

In [110]:
# number of days between 2 dates
dateDF.withColumn("week_ago", date_sub(col("today"), 7))\
.select(datediff(col("week_ago"), col("today"))).show(1)

+-------------------------+
|datediff(week_ago, today)|
+-------------------------+
|                       -7|
+-------------------------+
only showing top 1 row



In [111]:
# number of months between 2 dates
dateDF.select(\
             to_date(lit("2000-01-01")).alias("start"),\
             to_date(lit("2007-01-07")).alias("end"))\
.select(months_between(col("start"), col("end")))\
.show(1)

+--------------------------+
|months_between(start, end)|
+--------------------------+
|              -84.19354839|
+--------------------------+
only showing top 1 row



### _DateFormat Debugging Part 1 Example_

In [112]:
from pyspark.sql.functions import to_date, lit

In [113]:
spark.range(5).withColumn("date", lit("2017-01-01")).select(to_date(col("date"))).show(1)
dateDF.select(to_date(lit("2016-20-12")), to_date(lit("2017-12-11"))).show(1)
# month slot does not have '20' hence null error
# unable to know if '12' is 12th day or Dec as well as if '11' is 11th day or Nov

+---------------+
|to_date(`date`)|
+---------------+
|     2017-01-01|
+---------------+
only showing top 1 row

+---------------------+---------------------+
|to_date('2016-20-12')|to_date('2017-12-11')|
+---------------------+---------------------+
|                 null|           2017-12-11|
+---------------------+---------------------+
only showing top 1 row



### _DateFormat Debugging Part 2 Example_

In [114]:
from pyspark.sql.functions import to_date

In [115]:
dateFormat = "yyyy-dd-MM"
cleanDateDF = spark.range(1).select(\
                                   to_date(lit("2017-12-11"), dateFormat).alias("date"),\
                                   to_date(lit("2017-20-12"), dateFormat).alias("date2"))
cleanDateDF.createOrReplaceTempView("dataTable2")
cleanDateDF.show()

+----------+----------+
|      date|     date2|
+----------+----------+
|2017-11-12|2017-12-20|
+----------+----------+



In [116]:
from pyspark.sql.functions import to_timestamp

In [117]:
cleanDateDF.select(to_timestamp(col("date"), dateFormat)).show()

+----------------------------------+
|to_timestamp(`date`, 'yyyy-dd-MM')|
+----------------------------------+
|               2017-11-12 00:00:00|
+----------------------------------+



In [118]:
cleanDateDF.filter(col("date2") > lit("2017-12-12")).show()
cleanDateDF.filter(col("date2") > "'2017-12-12'").show()

+----------+----------+
|      date|     date2|
+----------+----------+
|2017-11-12|2017-12-20|
+----------+----------+

+----------+----------+
|      date|     date2|
+----------+----------+
|2017-11-12|2017-12-20|
+----------+----------+



### _Selecting non-null values [coalesce] Example_

In [119]:
from pyspark.sql.functions import coalesce

In [120]:
df.select(coalesce(col("Description"), col("Customerid"))).show(3, truncate=False)

+----------------------------------+
|coalesce(Description, Customerid) |
+----------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER|
|WHITE METAL LANTERN               |
|CREAM CUPID HEARTS COAT HANGER    |
+----------------------------------+
only showing top 3 rows



### _Deleting Rows w/ Null [drop] Example_

In [121]:
print(df.count())
print(df.na.drop().count()) # drops rows if any of the values are NULL
print(df.na.drop("any").count()) # drops rows if any of the values are NULL
print(df.na.drop("all").count()) # drops rows only if all values are NULLs or NaNs
print(df.na.drop("all", subset=["StockCode", "InvoiceNo"]).count()) # drops rows for certain subset of columns

3108
1968
1968
3108
3108


### _Filling Rows w/ Null [fill] Example_

In [122]:
# type string column
df.na.fill("All Null values become this string")

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string]

In [123]:
df.na.fill(9999, subset=["StockCode", "InvoiceNo"])

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string]

In [124]:
# map key (column) - value (what to fill nulls with) fill
fill_cols_vals = {"StockCode": 5, "Description" : "No Value"}
df.na.fill(fill_cols_vals)

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string]

### _Replace Values w/ Values [replace] Example_

In [125]:
df.show(3)
df.na.replace(["United Kingdom"], ["UK"], "Country").show(3)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 3 rows

+---------+---------+--------------------+--------+-------------------+---------+----------+-------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|Country|
+---------+------

### _Ordering Nulls Example_:
-  asc_nulls_first
-  desc_nulls_first
-  asc_nulls_last
-  desc_nulls_last

### _Struct Example_

In [126]:
from pyspark.sql.functions import struct

In [127]:
df.selectExpr("(Description, InvoiceNo) as complex", "*")
df.selectExpr("struct(Description, InvoiceNo) as complex", "*")

DataFrame[complex: struct<Description:string,InvoiceNo:string>, InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string]

In [128]:
complexDF = df.select(struct("Description", "InvoiceNo").alias("complex"))
complexDF.createOrReplaceTempView("complexDF")
complexDF.printSchema()

root
 |-- complex: struct (nullable = false)
 |    |-- Description: string (nullable = true)
 |    |-- InvoiceNo: string (nullable = true)



In [129]:
complexDF.show(1, truncate=False)

+--------------------------------------------+
|complex                                     |
+--------------------------------------------+
|[WHITE HANGING HEART T-LIGHT HOLDER, 536365]|
+--------------------------------------------+
only showing top 1 row



In [130]:
complexDF.select("complex.Description").show(1, False)
complexDF.select(col("complex").getField("Description")).show(1, False)
complexDF.select("complex.*").show(1, False)

+----------------------------------+
|Description                       |
+----------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER|
+----------------------------------+
only showing top 1 row

+----------------------------------+
|complex.Description               |
+----------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER|
+----------------------------------+
only showing top 1 row

+----------------------------------+---------+
|Description                       |InvoiceNo|
+----------------------------------+---------+
|WHITE HANGING HEART T-LIGHT HOLDER|536365   |
+----------------------------------+---------+
only showing top 1 row



### _Array (Convert Every Description Token into Row) [explode] Example_

In [131]:
from pyspark.sql.functions import split

In [132]:
df.select(split(col("Description"), " ")).show(3, False)
df.select(split(col("Description"), " ").alias("array_col")).selectExpr("array_col[0]").show(3)

+----------------------------------------+
|split(Description,  )                   |
+----------------------------------------+
|[WHITE, HANGING, HEART, T-LIGHT, HOLDER]|
|[WHITE, METAL, LANTERN]                 |
|[CREAM, CUPID, HEARTS, COAT, HANGER]    |
+----------------------------------------+
only showing top 3 rows

+------------+
|array_col[0]|
+------------+
|       WHITE|
|       WHITE|
|       CREAM|
+------------+
only showing top 3 rows



In [133]:
arrayDF = df.select(split(col("Description"), " ").alias("arrayType"))
arrayDF.printSchema()

root
 |-- arrayType: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [134]:
from pyspark.sql.functions import size

In [135]:
df.select(size(split(col("Description"), " "))).show(3) # shows each row's array length

+---------------------------+
|size(split(Description,  ))|
+---------------------------+
|                          5|
|                          3|
|                          5|
+---------------------------+
only showing top 3 rows



In [136]:
from pyspark.sql.functions import array_contains

In [137]:
df.select(array_contains(split(col("Description"), " "), "WHITE")).show(3) # search within array

+--------------------------------------------+
|array_contains(split(Description,  ), WHITE)|
+--------------------------------------------+
|                                        true|
|                                        true|
|                                       false|
+--------------------------------------------+
only showing top 3 rows



In [138]:
from pyspark.sql.functions import split, explode

In [139]:
df.withColumn("splitted", split(col("Description"), " "))\
.withColumn("exploded", explode(col("splitted")))\
.select("Description", "InvoiceNo", "exploded")\
.show(5, False)

+----------------------------------+---------+--------+
|Description                       |InvoiceNo|exploded|
+----------------------------------+---------+--------+
|WHITE HANGING HEART T-LIGHT HOLDER|536365   |WHITE   |
|WHITE HANGING HEART T-LIGHT HOLDER|536365   |HANGING |
|WHITE HANGING HEART T-LIGHT HOLDER|536365   |HEART   |
|WHITE HANGING HEART T-LIGHT HOLDER|536365   |T-LIGHT |
|WHITE HANGING HEART T-LIGHT HOLDER|536365   |HOLDER  |
+----------------------------------+---------+--------+
only showing top 5 rows



### _Map Example_

In [140]:
from pyspark.sql.functions import create_map

In [141]:
mapDF = df.select(create_map(col("Description"), col("InvoiceNo")).alias("complex_map"))
mapDF.show(3, False)
mapDF.printSchema()

+----------------------------------------------+
|complex_map                                   |
+----------------------------------------------+
|[WHITE HANGING HEART T-LIGHT HOLDER -> 536365]|
|[WHITE METAL LANTERN -> 536365]               |
|[CREAM CUPID HEARTS COAT HANGER -> 536365]    |
+----------------------------------------------+
only showing top 3 rows

root
 |-- complex_map: map (nullable = false)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



In [142]:
mapDF.selectExpr("complex_map['WHITE METAL LANTERN']").show(3, False)

+--------------------------------+
|complex_map[WHITE METAL LANTERN]|
+--------------------------------+
|null                            |
|536365                          |
|null                            |
+--------------------------------+
only showing top 3 rows



In [143]:
mapDF.selectExpr("explode(complex_map)").show(3, False)

+----------------------------------+------+
|key                               |value |
+----------------------------------+------+
|WHITE HANGING HEART T-LIGHT HOLDER|536365|
|WHITE METAL LANTERN               |536365|
|CREAM CUPID HEARTS COAT HANGER    |536365|
+----------------------------------+------+
only showing top 3 rows



### _JSON Example_

In [144]:
jsonDF = spark.range(1).selectExpr("""
  '{"myJSONKey" : {"myJSONValue" : [1, 2, 3]}}' as jsonString""")
jsonDF.show(1, False)
jsonDF.printSchema()

+-------------------------------------------+
|jsonString                                 |
+-------------------------------------------+
|{"myJSONKey" : {"myJSONValue" : [1, 2, 3]}}|
+-------------------------------------------+

root
 |-- jsonString: string (nullable = false)



In [145]:
from pyspark.sql.functions import get_json_object, json_tuple

In [146]:
jsonDF.select(\
              get_json_object(col("jsonString"), "$.myJSONKey.myJSONValue[0]").alias("col"),\
              json_tuple(col("jsonString"), "myJSONKey"))\
.show(3, False)

+---+-----------------------+
|col|c0                     |
+---+-----------------------+
|1  |{"myJSONValue":[1,2,3]}|
+---+-----------------------+



In [147]:
from pyspark.sql.functions import to_json

In [148]:
df.selectExpr("(InvoiceNo, Description) as myStruct").select(to_json(col("myStruct"))).show(3, False)

+-------------------------------------------------------------------------+
|structstojson(myStruct)                                                  |
+-------------------------------------------------------------------------+
|{"InvoiceNo":"536365","Description":"WHITE HANGING HEART T-LIGHT HOLDER"}|
|{"InvoiceNo":"536365","Description":"WHITE METAL LANTERN"}               |
|{"InvoiceNo":"536365","Description":"CREAM CUPID HEARTS COAT HANGER"}    |
+-------------------------------------------------------------------------+
only showing top 3 rows



In [149]:
from pyspark.sql.functions import from_json
from pyspark.sql.types import *

In [150]:
parseSchema = StructType((
  StructField("InvoiceNo",StringType(),True),
  StructField("Description",StringType(),True)))

df.selectExpr("(InvoiceNo, Description) as myStruct")\
.select(to_json(col("myStruct")).alias("newJSON"))\
.select(from_json(col("newJSON"), parseSchema), col("newJSON")).show(3, False)

+--------------------------------------------+-------------------------------------------------------------------------+
|jsontostructs(newJSON)                      |newJSON                                                                  |
+--------------------------------------------+-------------------------------------------------------------------------+
|[536365, WHITE HANGING HEART T-LIGHT HOLDER]|{"InvoiceNo":"536365","Description":"WHITE HANGING HEART T-LIGHT HOLDER"}|
|[536365, WHITE METAL LANTERN]               |{"InvoiceNo":"536365","Description":"WHITE METAL LANTERN"}               |
|[536365, CREAM CUPID HEARTS COAT HANGER]    |{"InvoiceNo":"536365","Description":"CREAM CUPID HEARTS COAT HANGER"}    |
+--------------------------------------------+-------------------------------------------------------------------------+
only showing top 3 rows



### _UDF Example_

In [151]:
udfExampleDF = spark.range(5).toDF("num")
def power3(double_value):
  return double_value ** 3
power3(2.0)

8.0

In [152]:
from pyspark.sql.functions import udf

In [153]:
power3udf = udf(power3)

In [154]:
from pyspark.sql.functions import col

In [155]:
udfExampleDF.select(power3udf(col("num"))).show(3)

+-----------+
|power3(num)|
+-----------+
|          0|
|          1|
|          8|
+-----------+
only showing top 3 rows



In [156]:
from pyspark.sql.types import IntegerType, DoubleType

In [157]:
# register UDF type
spark.udf.register("power3py", power3, DoubleType())
# via expression
udfExampleDF.selectExpr("power3py(num)").show(3)

+-------------+
|power3py(num)|
+-------------+
|         null|
|         null|
|         null|
+-------------+
only showing top 3 rows



In [158]:
# register UDF type
spark.udf.register("power3py", power3, IntegerType())
# via expression
udfExampleDF.selectExpr("power3py(num)").show(3)

+-------------+
|power3py(num)|
+-------------+
|            0|
|            1|
|            8|
+-------------+
only showing top 3 rows



## _Chapter #7 - Aggregations_

-  specify _key_ or _grouping_
-  specify _aggregation function_ for column(s) transformation
-  a _"group by"_ takes data where every row can only go in one grouping
-  majority of Spark aggregation functions are in the **org.apache.spark.sql.functions (pyspark.sql.functions)** package
-  **pyspark.sql.functions** documentation: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions   
<br>
-  **Grouping Types**:
    -  aggregation via select statement ex: (df.count()):
        - covers DF level aggregations   
        <br>
    - "group by" on 1 or more keys / 1 or more aggregation functions to transform column(s) ex: (df.groupBy(...)):
        - covers grouping data on columns(s) and perform calculations (aggs) on other column(s) assigned to specific group by clause   
        <br>
    -  "window" ex: (rolling averages with each row representing 1 day):
        -  computes aggregation of specific window of data
        -  returns value for every input row of a table based on a group of rows (frame)
        -  typically contains a "partitions" (how data is broken up in group)
        -  functions:
             - ranking (rank vs dense_rank)
             - analytic
             - aggregate   
         <br>
    -  "grouping set":
        -  covers aggregation across multiple groups ex: (total quantity by stock codes and customers)   
    <br>
    -  "rollup":
        -  multidimensional aggregation that performs a variety of group-by calculations ex: (aggregation across groups by time)
        -  "null" values indicate grand totals across columns (null in all columns specifies total aggregation across those cols)   
    <br>
    -  "cube":
        -  rollup at a deeper level   
        -  must filter out NULL values for aggregation levels on cubes, rollups, and grouping sets or else code will compute incorrect results   
<br>
    -  "pivot":
        -  convert a row into a column   
        <br>
    -  "UDAFs":
        -  user-defined aggregation functions are designed to define custom aggregation functions over groups of input data (as opposed to single rows)
        -  currently only available in Scala and Java


### _Chapter #7 Exercises (DataFrames)_

In [159]:
# repartition data to have less partition because of small data volume stored in many small files
# cache DF for in memory access
df = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load(retailDataAll)\
.coalesce(5)
df.cache()
df.createOrReplaceTempView("dfTable")
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



In [160]:
df.count()

541909

### _Count Example_:
- count
- countDistinct
- approx_count_distinct

In [161]:
from pyspark.sql.functions import count
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import approx_count_distinct

In [162]:
print(df.select(count("StockCode")).show()) # Spark will not count NULLs in individual col
print(df.select(count("*")).show()) # Spark will count NULLs

print(df.select(countDistinct("StockCode")).show()) # count unique values in col

print(df.select(approx_count_distinct("StockCode", 0.1)).show()) # approx count w/ degree of accuracy

+----------------+
|count(StockCode)|
+----------------+
|          541909|
+----------------+

None
+--------+
|count(1)|
+--------+
|  541909|
+--------+

None
+-------------------------+
|count(DISTINCT StockCode)|
+-------------------------+
|                     4070|
+-------------------------+

None
+--------------------------------+
|approx_count_distinct(StockCode)|
+--------------------------------+
|                            3364|
+--------------------------------+

None


### _Boundry Example_:
- first and last
- min and max

In [163]:
from pyspark.sql.functions import first, last
from pyspark.sql.functions import min, max

In [164]:
print(df.select(first("StockCode"), last("StockCode")).show()) # return first and last col value in row in DF

print(df.select(min("Quantity"), max("Quantity")).show()) # return min and max value in DF

+-----------------------+----------------------+
|first(StockCode, false)|last(StockCode, false)|
+-----------------------+----------------------+
|                 85123A|                 22138|
+-----------------------+----------------------+

None
+-------------+-------------+
|min(Quantity)|max(Quantity)|
+-------------+-------------+
|       -80995|        80995|
+-------------+-------------+

None


### _Sum Example_:
-  sum
-  sumDistinct

In [165]:
from pyspark.sql.functions import sum
from pyspark.sql.functions import sumDistinct

In [166]:
print(df.select(sum("Quantity")).show()) # sum values in col

print(df.select(sumDistinct("Quantity")).show()) # sum distinct set of values in col

+-------------+
|sum(Quantity)|
+-------------+
|      5176450|
+-------------+

None
+----------------------+
|sum(DISTINCT Quantity)|
+----------------------+
|                 29310|
+----------------------+

None


### _Average Example_

In [167]:
from pyspark.sql.functions import sum, count, avg, expr

In [168]:
df.select(
    count("Quantity").alias("total_transactions_COUNT"),
    sum("Quantity").alias("total_purchases_SUM"),
    avg("Quantity").alias("avg_purchases_AVG"),
    expr("mean(Quantity)").alias("mean_purchases_MEAN"))\
  .selectExpr(
    "total_transactions_COUNT",
    "total_purchases_SUM",
    "total_purchases_SUM/total_transactions_COUNT",
    "avg_purchases_AVG",
    "mean_purchases_MEAN")\
    .take(1)

[Row(total_transactions_COUNT=541909, total_purchases_SUM=5176450, (total_purchases_SUM / total_transactions_COUNT)=9.55224954743324, avg_purchases_AVG=9.55224954743324, mean_purchases_MEAN=9.55224954743324)]

### _Stats Example_:
##### Var and Stddev measure the spread of the data around the mean
##### Skewness and Kurtosis measure extreme points in data and are used in probability distribution of a random varaible
##### Corr and Covar compares the interactions of the values in 2 different variables

-  variance (sample and population) __average of squared differences from the mean__
-  standard deviation (sample and population) __square root of the variance__
-  skewness __measure of asymmetry of values in data around the mean__
-  kurtosis __measure of the tail of data__
-  covariance (sample and population) __measures variability between 2 different varaibles__
-  correlation __measures the Pearson correlation coefficient between -1 and +1__

In [169]:
from pyspark.sql.functions import var_pop, stddev_pop
from pyspark.sql.functions import var_samp, stddev_samp

In [170]:
df.select(var_pop("Quantity"), var_samp("Quantity"), stddev_pop("Quantity"), stddev_samp("Quantity")).show()

+------------------+------------------+--------------------+---------------------+
| var_pop(Quantity)|var_samp(Quantity)|stddev_pop(Quantity)|stddev_samp(Quantity)|
+------------------+------------------+--------------------+---------------------+
|47559.303646609056|47559.391409298754|  218.08095663447796|   218.08115785023418|
+------------------+------------------+--------------------+---------------------+



In [171]:
from pyspark.sql.functions import skewness, kurtosis

In [172]:
df.select(skewness("Quantity"), kurtosis("Quantity")).show()

+-------------------+------------------+
| skewness(Quantity)|kurtosis(Quantity)|
+-------------------+------------------+
|-0.2640755761052562|119768.05495536952|
+-------------------+------------------+



In [173]:
from pyspark.sql.functions import corr, covar_pop, covar_samp

In [174]:
df.select(corr("InvoiceNo", "Quantity"), covar_samp("InvoiceNo", "Quantity"), covar_pop("InvoiceNo", "Quantity")).show()

+-------------------------+-------------------------------+------------------------------+
|corr(InvoiceNo, Quantity)|covar_samp(InvoiceNo, Quantity)|covar_pop(InvoiceNo, Quantity)|
+-------------------------+-------------------------------+------------------------------+
|     4.912186085635685E-4|             1052.7280543902734|            1052.7260778741693|
+-------------------------+-------------------------------+------------------------------+



### _Complex Types Example_:
-  collect_list [collects list of present values in col]
-  collect_set [collects list of unique present values in col]

In [175]:
from pyspark.sql.functions import collect_set, collect_list

In [176]:
df.agg(collect_set("Country"), collect_list("Country")).show()

+--------------------+---------------------+
|collect_set(Country)|collect_list(Country)|
+--------------------+---------------------+
|[Portugal, Italy,...| [United Kingdom, ...|
+--------------------+---------------------+



### _Group By Example_:
-  groupBy
-  agg
-  grouping with maps [key is the column; value is the aggregation function]

In [177]:
df.groupBy("InvoiceNo", "CustomerId").count().show(3)

+---------+----------+-----+
|InvoiceNo|CustomerId|count|
+---------+----------+-----+
|   536366|     17850|    2|
|   536367|     13047|   12|
|   536369|     13047|    1|
+---------+----------+-----+
only showing top 3 rows



In [178]:
from pyspark.sql.functions import count

In [179]:
df.groupBy("InvoiceNo").agg(
    count("Quantity").alias("quan"),
    expr("count(Quantity)")).show(3)

+---------+----+---------------+
|InvoiceNo|quan|count(Quantity)|
+---------+----+---------------+
|   536370|  20|             20|
|   536380|   1|              1|
|   536384|  13|             13|
+---------+----+---------------+
only showing top 3 rows



In [180]:
df.groupBy("InvoiceNo").agg(expr("avg(Quantity)"),expr("stddev_pop(Quantity)")).show(3)

+---------+------------------+--------------------+
|InvoiceNo|     avg(Quantity)|stddev_pop(Quantity)|
+---------+------------------+--------------------+
|   536370|             22.45|   8.935742834258381|
|   536380|              24.0|                 0.0|
|   536384|14.615384615384615|  15.750645708563392|
+---------+------------------+--------------------+
only showing top 3 rows



### _Window Example_

In [181]:
from pyspark.sql.functions import col, to_date

In [182]:
dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))
dfWithDate.createOrReplaceTempView("dfWithDate")

In [183]:
dfWithDate.show(1)

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+----------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|      date|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+----------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|2010-12-01|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+----------+
only showing top 1 row



In [184]:
from pyspark.sql.window import Window
from pyspark.sql.functions import desc

In [185]:
windowSpec = Window\
.partitionBy("CustomerId", "date")\
.orderBy(desc("Quantity"))\
.rowsBetween(Window.unboundedPreceding, Window.currentRow)

In [186]:
from pyspark.sql.functions import max

In [187]:
maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)

In [188]:
from pyspark.sql.functions import dense_rank, rank

In [189]:
purchaseDenseRank = dense_rank().over(windowSpec) # dense rank avoids ranking ties
purchaseRank = rank().over(windowSpec) # rank does tied values (duplicate rows)

In [190]:
from pyspark.sql.functions import col

In [191]:
dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId")\
  .select(
    col("CustomerId"),
    col("date"),
    col("Quantity"),
    purchaseRank.alias("quantityRank"),
    purchaseDenseRank.alias("quantityDenseRank"),
    maxPurchaseQuantity.alias("maxPurchaseQuantity")).show(10)

+----------+----------+--------+------------+-----------------+-------------------+
|CustomerId|      date|Quantity|quantityRank|quantityDenseRank|maxPurchaseQuantity|
+----------+----------+--------+------------+-----------------+-------------------+
|     12346|2011-01-18|   74215|           1|                1|              74215|
|     12346|2011-01-18|  -74215|           2|                2|              74215|
|     12347|2010-12-07|      36|           1|                1|                 36|
|     12347|2010-12-07|      30|           2|                2|                 36|
|     12347|2010-12-07|      24|           3|                3|                 36|
|     12347|2010-12-07|      12|           4|                4|                 36|
|     12347|2010-12-07|      12|           4|                4|                 36|
|     12347|2010-12-07|      12|           4|                4|                 36|
|     12347|2010-12-07|      12|           4|                4|             

In [192]:
spark.sql("""
SELECT CustomerId, date, Quantity,
  rank(Quantity) OVER (PARTITION BY CustomerId, date
                       ORDER BY Quantity DESC NULLS LAST
                       ROWS BETWEEN
                         UNBOUNDED PRECEDING AND
                         CURRENT ROW) as rank,

  dense_rank(Quantity) OVER (PARTITION BY CustomerId, date
                             ORDER BY Quantity DESC NULLS LAST
                             ROWS BETWEEN
                               UNBOUNDED PRECEDING AND
                               CURRENT ROW) as dRank,

  max(Quantity) OVER (PARTITION BY CustomerId, date
                      ORDER BY Quantity DESC NULLS LAST
                      ROWS BETWEEN
                        UNBOUNDED PRECEDING AND
                        CURRENT ROW) as maxPurchase
FROM dfWithDate WHERE CustomerId IS NOT NULL ORDER BY CustomerId
""").show(10)

+----------+----------+--------+----+-----+-----------+
|CustomerId|      date|Quantity|rank|dRank|maxPurchase|
+----------+----------+--------+----+-----+-----------+
|     12346|2011-01-18|   74215|   1|    1|      74215|
|     12346|2011-01-18|  -74215|   2|    2|      74215|
|     12347|2010-12-07|      36|   1|    1|         36|
|     12347|2010-12-07|      30|   2|    2|         36|
|     12347|2010-12-07|      12|   4|    4|         36|
|     12347|2010-12-07|      12|   4|    4|         36|
|     12347|2010-12-07|      24|   3|    3|         36|
|     12347|2010-12-07|      12|   4|    4|         36|
|     12347|2010-12-07|      12|   4|    4|         36|
|     12347|2010-12-07|      12|   4|    4|         36|
+----------+----------+--------+----+-----+-----------+
only showing top 10 rows



### _Grouping Set (via Spark SQL) Example_

In [193]:
dfNoNull = dfWithDate.drop()
dfNoNull.createOrReplaceTempView("dfNoNull")

In [194]:
spark.sql("""
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY customerId, stockCode
ORDER BY CustomerId DESC, stockCode DESC
""").show(3)
spark.sql("""
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode))
ORDER BY CustomerId DESC, stockCode DESC
""").show(3)

+----------+---------+-------------+
|CustomerId|stockCode|sum(Quantity)|
+----------+---------+-------------+
|     18287|    85173|           48|
|     18287|   85040A|           48|
|     18287|   85039B|          120|
+----------+---------+-------------+
only showing top 3 rows

+----------+---------+-------------+
|customerId|stockCode|sum(Quantity)|
+----------+---------+-------------+
|     18287|    85173|           48|
|     18287|   85040A|           48|
|     18287|   85039B|          120|
+----------+---------+-------------+
only showing top 3 rows



### _Rollup Example_

In [195]:
rolledUpDF = dfNoNull.rollup("Date", "Country").agg(sum("Quantity"))\
.selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity")\
.orderBy("Date")
rolledUpDF.show(5)
rolledUpDF.where("Country IS NULL").show(3)
rolledUpDF.where("Date IS NULL").show(3)

+----------+--------------+--------------+
|      Date|       Country|total_quantity|
+----------+--------------+--------------+
|      null|          null|       5176450|
|2010-12-01|          null|         26814|
|2010-12-01|          EIRE|           243|
|2010-12-01|        France|           449|
|2010-12-01|United Kingdom|         23949|
+----------+--------------+--------------+
only showing top 5 rows

+----------+-------+--------------+
|      Date|Country|total_quantity|
+----------+-------+--------------+
|      null|   null|       5176450|
|2010-12-01|   null|         26814|
|2010-12-02|   null|         21023|
+----------+-------+--------------+
only showing top 3 rows

+----+-------+--------------+
|Date|Country|total_quantity|
+----+-------+--------------+
|null|   null|       5176450|
+----+-------+--------------+



### _Cube Example_

In [196]:
from pyspark.sql.functions import sum

In [197]:
dfNoNull.cube("Date", "Country").agg(sum(col("Quantity"))).select("Date", "Country", "sum(Quantity)").orderBy("Date").show(10)

+----+--------------------+-------------+
|Date|             Country|sum(Quantity)|
+----+--------------------+-------------+
|null|         Switzerland|        30325|
|null|                null|      5176450|
|null|             Belgium|        23152|
|null|     Channel Islands|         9479|
|null|               Spain|        26824|
|null|             Bahrain|          260|
|null|           Singapore|         5234|
|null|United Arab Emirates|          982|
|null|        Saudi Arabia|           75|
|null|             Finland|        10666|
+----+--------------------+-------------+
only showing top 10 rows



### _Pivot Example_

In [198]:
dfWithDate.show(1)

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+----------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|      date|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+----------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|2010-12-01|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+----------+
only showing top 1 row



In [199]:
pivoted = dfWithDate.groupBy("date").pivot("Country").sum()

In [200]:
pivoted.printSchema()

root
 |-- date: date (nullable = true)
 |-- Australia_sum(CAST(Quantity AS BIGINT)): long (nullable = true)
 |-- Australia_sum(UnitPrice): double (nullable = true)
 |-- Australia_sum(CAST(CustomerID AS BIGINT)): long (nullable = true)
 |-- Austria_sum(CAST(Quantity AS BIGINT)): long (nullable = true)
 |-- Austria_sum(UnitPrice): double (nullable = true)
 |-- Austria_sum(CAST(CustomerID AS BIGINT)): long (nullable = true)
 |-- Bahrain_sum(CAST(Quantity AS BIGINT)): long (nullable = true)
 |-- Bahrain_sum(UnitPrice): double (nullable = true)
 |-- Bahrain_sum(CAST(CustomerID AS BIGINT)): long (nullable = true)
 |-- Belgium_sum(CAST(Quantity AS BIGINT)): long (nullable = true)
 |-- Belgium_sum(UnitPrice): double (nullable = true)
 |-- Belgium_sum(CAST(CustomerID AS BIGINT)): long (nullable = true)
 |-- Brazil_sum(CAST(Quantity AS BIGINT)): long (nullable = true)
 |-- Brazil_sum(UnitPrice): double (nullable = true)
 |-- Brazil_sum(CAST(CustomerID AS BIGINT)): long (nullable = true)
 |-- Can

In [201]:
pivoted.where("date > '2011-12-05'").select("date", "USA_sum(CAST(Quantity AS BIGINT))").show(3)

+----------+---------------------------------+
|      date|USA_sum(CAST(Quantity AS BIGINT))|
+----------+---------------------------------+
|2011-12-07|                             null|
|2011-12-06|                             null|
|2011-12-08|                             -196|
+----------+---------------------------------+
only showing top 3 rows



## _Chapter #8 - Joins_

-  brings together 2 sets of data (left dataset and right dataset) by comparing the value of 1 or more keys of the left dataset and right dataset
-  equi-join (compares whether the specified keys in your left and right datasets are equal and discards rows that do not have matching keys)   
<br>
- **Types**:
    -  inner joins (keep rows with keys that exist in the left and right datasets)
    -  full outer joins (keep rows with keys in either the left or right datasets)
    -  left outer joins (keep rows with keys in the left dataset)
    -  right outer joins (keep rows with keys in the right dataset)
    -  left semi joins (keep rows in the left (only left) dataset where the key appears in the right dataset)
    -  left anti joins (keep rows in the left (only left) dataset where they DO NOT appear in the right dataset)
    -  natural joins (performs join that matches columns between the 2 datasets with the same names)
    -  cross _cartesian_ joins (match every row in the left dataset with every row in the right dataset)   
    <br>
-  **How Spark Performs Joins**:
    -  node-to-node communication strategy
    -  per node computation strategy   
    <br>
-  **Communication Strategies**:
    -  **shuffle join** [occurs on "big table" to "big table" joins where during shuffle every node talks to every other node and they share data according to which node has a certain key or set of keys being joined]
    -  **broadcast join** [can be useful for "small table" to "big table" joins because table is small enough to fit into memory of single worker node]
        -  plan is to replicate small DF onto every worker node in the cluster to avoid "all to all (node to node) communication during the entire join process so worker can perform their own work thus not needing to communicate with other slaves
        -  broadcasting tables too large can crash the driver
        -  Spark tries to optimize broadcast automatically and can be confimed via EXPLAIN PLAN however can also specifiy broadcast join
        -  it is recommended to let Spark optimize computation on "small table" to "small table" joins
        -  Additional Optimization Technique: ***consider partitioning your data "correctly" prior to a join as the execution performance can significantly improve thus even if a shuffle is planned Spark can avoid a shuffle is data from the DFs are already located on the same machine***

### _Chapter #8 Exercises (DataFrames)_

In [202]:
spark.conf.set("spark.sql.shuffle.partitions", 5)

In [203]:
import pandas as pd

In [204]:
person = spark.createDataFrame([
    (0, "Bill Chambers", 0, [100]),
    (1, "Matei Zaharia", 1, [500, 250, 100]),
    (2, "Michael Armbrust", 1, [250, 100])])\
  .toDF("id", "name", "graduate_program", "spark_status")
graduateProgram = spark.createDataFrame([
    (0, "Masters", "School of Information", "UC Berkeley"),
    (2, "Masters", "EECS", "UC Berkeley"),
    (1, "Ph.D.", "EECS", "UC Berkeley")])\
  .toDF("id", "degree", "department", "school")
sparkStatus = spark.createDataFrame([
    (500, "Vice President"),
    (250, "PMC Member"),
    (100, "Contributor")])\
  .toDF("id", "status")
person.createOrReplaceTempView("person")
graduateProgram.createOrReplaceTempView("graduateProgram")
sparkStatus.createOrReplaceTempView("sparkStatus")

In [205]:
print("person table:")
person.show()
print("graduateProgram table:")
graduateProgram.show()
print("sparkStatus table:")
sparkStatus.show()

person table:
+---+----------------+----------------+---------------+
| id|            name|graduate_program|   spark_status|
+---+----------------+----------------+---------------+
|  0|   Bill Chambers|               0|          [100]|
|  1|   Matei Zaharia|               1|[500, 250, 100]|
|  2|Michael Armbrust|               1|     [250, 100]|
+---+----------------+----------------+---------------+

graduateProgram table:
+---+-------+--------------------+-----------+
| id| degree|          department|     school|
+---+-------+--------------------+-----------+
|  0|Masters|School of Informa...|UC Berkeley|
|  2|Masters|                EECS|UC Berkeley|
|  1|  Ph.D.|                EECS|UC Berkeley|
+---+-------+--------------------+-----------+

sparkStatus table:
+---+--------------+
| id|        status|
+---+--------------+
|500|Vice President|
|250|    PMC Member|
|100|   Contributor|
+---+--------------+



### _Inner Join Example_

In [206]:
#  join only rows on keys in both datasets that evaluate to true
joinExpression = person["graduate_program"] == graduateProgram["id"]
person.join(graduateProgram, joinExpression).toPandas().head()

Unnamed: 0,id,name,graduate_program,spark_status,id.1,degree,department,school
0,0,Bill Chambers,0,[100],0,Masters,School of Information,UC Berkeley
1,1,Matei Zaharia,1,"[500, 250, 100]",1,Ph.D.,EECS,UC Berkeley
2,2,Michael Armbrust,1,"[250, 100]",1,Ph.D.,EECS,UC Berkeley


In [207]:
# example of key that does not exist in either dataset hence returns no rows
wrongJoinExpression = person["name"] == graduateProgram["school"]
person.join(graduateProgram, wrongJoinExpression).toPandas().head()

Unnamed: 0,id,name,graduate_program,spark_status,id.1,degree,department,school


In [208]:
# via specifying join type
joinType = "inner"
person.join(graduateProgram, joinExpression, joinType).toPandas().head()

Unnamed: 0,id,name,graduate_program,spark_status,id.1,degree,department,school
0,0,Bill Chambers,0,[100],0,Masters,School of Information,UC Berkeley
1,1,Matei Zaharia,1,"[500, 250, 100]",1,Ph.D.,EECS,UC Berkeley
2,2,Michael Armbrust,1,"[250, 100]",1,Ph.D.,EECS,UC Berkeley


### _Outer Join Example_

In [209]:
#  join rows on keys in both datasets and includes rows that evaluate to true or false (listed as nulls)
joinType = "outer"
person.join(graduateProgram, joinExpression, joinType).toPandas().head()

Unnamed: 0,id,name,graduate_program,spark_status,id.1,degree,department,school
0,0.0,Bill Chambers,0.0,[100],0,Masters,School of Information,UC Berkeley
1,,,,,2,Masters,EECS,UC Berkeley
2,1.0,Matei Zaharia,1.0,"[500, 250, 100]",1,Ph.D.,EECS,UC Berkeley
3,2.0,Michael Armbrust,1.0,"[250, 100]",1,Ph.D.,EECS,UC Berkeley


### _Left Outer Join Example_

In [210]:
# join rows on keys in both datasets and includes ALL rows from left dataset
# as well as any rows in the right dataset that have a match in left dataset
# no equivalent row in the right dataset will be listed as null
joinType = "left_outer"
graduateProgram.join(person, joinExpression, joinType).toPandas().head()

Unnamed: 0,id,degree,department,school,id.1,name,graduate_program,spark_status
0,0,Masters,School of Information,UC Berkeley,0.0,Bill Chambers,0.0,[100]
1,2,Masters,EECS,UC Berkeley,,,,
2,1,Ph.D.,EECS,UC Berkeley,1.0,Matei Zaharia,1.0,"[500, 250, 100]"
3,1,Ph.D.,EECS,UC Berkeley,2.0,Michael Armbrust,1.0,"[250, 100]"


### _Right Outer Join Example_

In [211]:
# join rows on keys in both datasets and includes ALL rows from right dataset
# as well as any rows in the left dataset that have a match in right dataset
# no equivalent row in the left dataset will be listed as null
joinType = "right_outer"
person.join(graduateProgram, joinExpression, joinType).toPandas().head()

Unnamed: 0,id,name,graduate_program,spark_status,id.1,degree,department,school
0,0.0,Bill Chambers,0.0,[100],0,Masters,School of Information,UC Berkeley
1,,,,,2,Masters,EECS,UC Berkeley
2,1.0,Matei Zaharia,1.0,"[500, 250, 100]",1,Ph.D.,EECS,UC Berkeley
3,2.0,Michael Armbrust,1.0,"[250, 100]",1,Ph.D.,EECS,UC Berkeley


### _Left Semi Join Example_

In [212]:
# similar to filter on DF via key specified
# compare values (key) to see if the key value exists in the right dataset
# if values exists those rows will be kept in the result even if there is a duplicate key in left dataset
joinType = "left_semi"
graduateProgram.join(person, joinExpression, joinType).show()

+---+-------+--------------------+-----------+
| id| degree|          department|     school|
+---+-------+--------------------+-----------+
|  0|Masters|School of Informa...|UC Berkeley|
|  1|  Ph.D.|                EECS|UC Berkeley|
+---+-------+--------------------+-----------+



In [213]:
# how duplicates are treated example
gradProgram2 = graduateProgram.union(spark.createDataFrame([
    (0, "Masters", "Duplicated Row", "Duplicated School")]))
gradProgram2.show()
gradProgram2.join(person, joinExpression, joinType).show()

+---+-------+--------------------+-----------------+
| id| degree|          department|           school|
+---+-------+--------------------+-----------------+
|  0|Masters|School of Informa...|      UC Berkeley|
|  2|Masters|                EECS|      UC Berkeley|
|  1|  Ph.D.|                EECS|      UC Berkeley|
|  0|Masters|      Duplicated Row|Duplicated School|
+---+-------+--------------------+-----------------+

+---+-------+--------------------+-----------------+
| id| degree|          department|           school|
+---+-------+--------------------+-----------------+
|  0|Masters|School of Informa...|      UC Berkeley|
|  0|Masters|      Duplicated Row|Duplicated School|
|  1|  Ph.D.|                EECS|      UC Berkeley|
+---+-------+--------------------+-----------------+



### _Left Anti Join Example_

In [214]:
# similar to NOT IN filter on DF via key specified
# compare values (key) to see if the key value exists in the right dataset
# opposite of semi joins ... only keep values that DO NOT have a corresponding key in the right dataset
joinType = "left_anti"
graduateProgram.join(person, joinExpression, joinType).show()

+---+-------+----------+-----------+
| id| degree|department|     school|
+---+-------+----------+-----------+
|  2|Masters|      EECS|UC Berkeley|
+---+-------+----------+-----------+



### _Natural Join Example_

In [215]:
# finds matching columns and returns the results ... always use this join with CAUTION
spark.sql("select * from graduateProgram NATURAL JOIN person").toPandas().head()

Unnamed: 0,id,degree,department,school,name,graduate_program,spark_status
0,0,Masters,School of Information,UC Berkeley,Bill Chambers,0,[100]
1,2,Masters,EECS,UC Berkeley,Michael Armbrust,1,"[250, 100]"
2,1,Ph.D.,EECS,UC Berkeley,Matei Zaharia,1,"[500, 250, 100]"


### _Cross Cartesian Join Example_

In [216]:
# join every row in the left dataset to every row in the right dataset
# 1,000 left rows * 1,000 right rows = 100,000 rows returned so CAUTION this join
joinType = "cross"
graduateProgram.join(person, joinExpression, joinType).toPandas().head()

Unnamed: 0,id,degree,department,school,id.1,name,graduate_program,spark_status
0,0,Masters,School of Information,UC Berkeley,0,Bill Chambers,0,[100]
1,1,Ph.D.,EECS,UC Berkeley,1,Matei Zaharia,1,"[500, 250, 100]"
2,1,Ph.D.,EECS,UC Berkeley,2,Michael Armbrust,1,"[250, 100]"


In [217]:
person.crossJoin(graduateProgram).toPandas().head()

Unnamed: 0,id,name,graduate_program,spark_status,id.1,degree,department,school
0,0,Bill Chambers,0,[100],0,Masters,School of Information,UC Berkeley
1,0,Bill Chambers,0,[100],2,Masters,EECS,UC Berkeley
2,0,Bill Chambers,0,[100],1,Ph.D.,EECS,UC Berkeley
3,1,Matei Zaharia,1,"[500, 250, 100]",0,Masters,School of Information,UC Berkeley
4,1,Matei Zaharia,1,"[500, 250, 100]",2,Masters,EECS,UC Berkeley


### _Joins On Complex Types Example_

In [218]:
from pyspark.sql.functions import expr

In [219]:
person.withColumnRenamed("id", "personId")\
.join(sparkStatus, expr("array_contains(spark_status, id)")).show(3)

+--------+-------------+----------------+---------------+---+--------------+
|personId|         name|graduate_program|   spark_status| id|        status|
+--------+-------------+----------------+---------------+---+--------------+
|       0|Bill Chambers|               0|          [100]|100|   Contributor|
|       1|Matei Zaharia|               1|[500, 250, 100]|500|Vice President|
|       1|Matei Zaharia|               1|[500, 250, 100]|250|    PMC Member|
+--------+-------------+----------------+---------------+---+--------------+
only showing top 3 rows



### _Handling Duplicate Column Names Example_:
1.  different join expression
2.  dropping column after join
3.  renaming column before join

In [220]:
gradProgramDupe = graduateProgram.withColumnRenamed("id", "graduate_program")
joinExpr = gradProgramDupe["graduate_program"] == person["graduate_program"]

In [221]:
person.join(gradProgramDupe, joinExpr).toPandas().head()

Unnamed: 0,id,name,graduate_program,spark_status,graduate_program.1,degree,department,school
0,0,Bill Chambers,0,[100],0,Masters,School of Information,UC Berkeley
1,1,Matei Zaharia,1,"[500, 250, 100]",1,Ph.D.,EECS,UC Berkeley
2,2,Michael Armbrust,1,"[250, 100]",1,Ph.D.,EECS,UC Berkeley


In [222]:
# challenge arrises when selecting duplicate column
'''
person.join(gradProgramDupe, joinExpr).select("graduate_program").show() # triggers error
'''

'\nperson.join(gradProgramDupe, joinExpr).select("graduate_program").show() # triggers error\n'

In [223]:
#1
person.join(gradProgramDupe, "graduate_program").show()

+----------------+---+----------------+---------------+-------+--------------------+-----------+
|graduate_program| id|            name|   spark_status| degree|          department|     school|
+----------------+---+----------------+---------------+-------+--------------------+-----------+
|               0|  0|   Bill Chambers|          [100]|Masters|School of Informa...|UC Berkeley|
|               1|  1|   Matei Zaharia|[500, 250, 100]|  Ph.D.|                EECS|UC Berkeley|
|               1|  2|Michael Armbrust|     [250, 100]|  Ph.D.|                EECS|UC Berkeley|
+----------------+---+----------------+---------------+-------+--------------------+-----------+



In [224]:
#2
person.join(gradProgramDupe, joinExpr).drop(person["graduate_program"]).show()

+---+----------------+---------------+----------------+-------+--------------------+-----------+
| id|            name|   spark_status|graduate_program| degree|          department|     school|
+---+----------------+---------------+----------------+-------+--------------------+-----------+
|  0|   Bill Chambers|          [100]|               0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|[500, 250, 100]|               1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|     [250, 100]|               1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+---------------+----------------+-------+--------------------+-----------+



In [225]:
#3
gradProgram3 = graduateProgram.withColumnRenamed("id", "grad_id")
joinExpr = person["graduate_program"] == gradProgram3["grad_id"]
person.join(gradProgram3, joinExpr).toPandas().head()

Unnamed: 0,id,name,graduate_program,spark_status,grad_id,degree,department,school
0,0,Bill Chambers,0,[100],0,Masters,School of Information,UC Berkeley
1,1,Matei Zaharia,1,"[500, 250, 100]",1,Ph.D.,EECS,UC Berkeley
2,2,Michael Armbrust,1,"[250, 100]",1,Ph.D.,EECS,UC Berkeley


### _Broadcast Join Example_

In [226]:
from pyspark.sql.functions import broadcast

In [227]:
joinExpr = person["graduate_program"] == graduateProgram["id"]
person.join(broadcast(graduateProgram), joinExpr).explain()

== Physical Plan ==
*(2) BroadcastHashJoin [graduate_program#7770L], [id#7784L], Inner, BuildRight
:- *(2) Project [_1#7760L AS id#7768L, _2#7761 AS name#7769, _3#7762L AS graduate_program#7770L, _4#7763 AS spark_status#7771]
:  +- *(2) Filter isnotnull(_3#7762L)
:     +- Scan ExistingRDD[_1#7760L,_2#7761,_3#7762L,_4#7763]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
   +- *(1) Project [_1#7776L AS id#7784L, _2#7777 AS degree#7785, _3#7778 AS department#7786, _4#7779 AS school#7787]
      +- *(1) Filter isnotnull(_1#7776L)
         +- Scan ExistingRDD[_1#7776L,_2#7777,_3#7778,_4#7779]


## _Chapter #9 - Data Sources_

-  **6 "Core" Data Sources**:
    -  CSV (parameters listed in table 9-3):
        -  _does not support complex types (ex: array, nested data)_
    -  JSON (parameters listed in table 9-4):
        -  multiLine (allows for reading in non-line-delimited JSON files)
    -  Parquet (parameters listed in table 9-4):
        -  columnar compression for saving storage space and optimized select querying
        -  default file format for Apache Spark
        -  supports complex types (ex: array)
        -  enforces own schema when storing data as schema is built into the file itself (so no inference needed)
    -  ORC (no parameter options):
        -  ORC is further optimized for Hive however works very well with Spark
        -  optimized for large streaming reads and finds rows quickly
        -  similar to parquet
    -  JDBC/ODBC connections (parameters listed in table 9-6):
        - JDBC read/write:
            -  need JDBC driver for particular database on spark classpath
            -  provide proper JAR for driver
    -  TXT:
        -  each line in file is a record in DF
        -  writing to TXT requires source to have ONLY 1 string column or else write will fail   
    <br>
-  **Community-Created Data Sources**:
    -  Cassandra / HBase / MongoDB / Redshift
    -  XML / Avro
    
#### Structure for READING data: DataFrameReader.format(...).option("key", "value").schema(...).load(...)

-  **Reading Data** (spark.read):
    -  format
    -  schema
    -  read mode:
        -  _permissive_ (sets all fields to null when encountering a corrupted reocrd / places corrupted records in column (_corrupt_record)
        -  _dropMalformed_ (drops rows containing malformed records)
        -  _failFast_ (fails immediately when encountering malformed records)
    -  options
    
#### Structure for WRITING data: DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save(...)

-  **Writing Data** (dataFrame.write):
    -  format
    -  options
    -  save mode:
        -  _append_ (appends output files to the list of files that already exist at that location path)
        -  _overwrite_ (completely overwrites files at location path)
        -  _errorIfExists_ (shows an error and fails to write if files already exist at that specified location path)
        -  _ignore_ (if files exist at the location path do nothing with current DF)

### Advanced I/O Concepts:
-  Spark Developers have the ability to control the parallelism of files written by controlling the partitions prior to writing:
    -  bucketing
    -  partitioning   

### Splittable File Types and Compression:
-  parquet w/ GZIP compression is recommended

### Reading Data in Parallel:
-  multiple executors cannot read from the same file at the same time however can read different files at the same time
-  each file in a folder becomes a partition in a DF and will be read by available executors in parallel

### Writing Data in Parallel:
-  _number of files written (output) is dependent on the number of partitions the DF object has when you write out the data_
-  by default, 1 file is written / partition of the data

#### File Organization Approach (methods for controlling the data that is specifically written to each output file):
#### 1. Partitioning "Partition By":
-  encodes a column as a folder
-  output files contain data based on partitionBy predicate

#### 2. Bucketing:
-  groups data by bucket ID
-  helps with avoiding shuffles when joining or aggregating
-  supported only for Spark managed tables

### Writing Complex Types:
-  the best file format often depends on the type of data being read and processed

### Managing File Size:
-  try to avoid "lots of small files" aka "small file problem" because it requires a lot of metadata to manage
-  Spark does not work well with many small files or many large files
-  "maxRecordsPerFile" helps will controlling the file size / # of records written to each file (ex: df.write.option("maxRecordsPerFile", 5000))
-  always be aware of the # of partitions at write time

### _Chapter #9 Exercises (Data Sources)_

### _CSV Read Example_

In [228]:
from pyspark.sql.types import StructField, StructType, StringType, LongType

In [229]:
myManualSchema = StructType([\
    StructField("DEST_COUNTRY_NAME", StringType(), True),\
    StructField("ORIGIN_COUNTRY_NAME", StringType(), True),\
    StructField("count", LongType(), False)])

In [230]:
csvFile = spark.read.format("csv")\
.option("header", "true")\
.option("mode", "FAILFAST")\
.schema(myManualSchema)\
.load(flightDataCSV2010)

In [231]:
csvFile.show(3)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
+-----------------+-------------------+-----+
only showing top 3 rows



### _CSV Write Example_

In [232]:
csvFile.write.format("csv").mode("overwrite").option("sep", "\t")\
.save("/Users/grp/sparkTheDefinitiveGuide/tmp/csvWrite.csv")

In [233]:
# reflects # of files outputed to target save directory
csvFile.rdd.getNumPartitions()

1

In [234]:
# 1 row per line tab delimited
!head /Users/grp/sparkTheDefinitiveGuide/tmp/csvWrite.csv/*.csv

United States	Romania	1
United States	Ireland	264
United States	India	69
Egypt	United States	24
Equatorial Guinea	United States	1
United States	Singapore	25
United States	Grenada	54
Costa Rica	United States	477
Senegal	United States	29
United States	Marshall Islands	44


### _JSON Read Example_

In [235]:
spark.read.format("json").option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.load(flightDataJson2010).show(3)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
+-----------------+-------------------+-----+
only showing top 3 rows



### _JSON Write Example_

In [236]:
csvFile.write.format("json").mode("overwrite")\
.save("/Users/grp/sparkTheDefinitiveGuide/tmp/jsonWrite.json")

In [237]:
# 1 JSON object per line
!head /Users/grp/sparkTheDefinitiveGuide/tmp/jsonWrite.json/*.json

{"DEST_COUNTRY_NAME":"United States","ORIGIN_COUNTRY_NAME":"Romania","count":1}
{"DEST_COUNTRY_NAME":"United States","ORIGIN_COUNTRY_NAME":"Ireland","count":264}
{"DEST_COUNTRY_NAME":"United States","ORIGIN_COUNTRY_NAME":"India","count":69}
{"DEST_COUNTRY_NAME":"Egypt","ORIGIN_COUNTRY_NAME":"United States","count":24}
{"DEST_COUNTRY_NAME":"Equatorial Guinea","ORIGIN_COUNTRY_NAME":"United States","count":1}
{"DEST_COUNTRY_NAME":"United States","ORIGIN_COUNTRY_NAME":"Singapore","count":25}
{"DEST_COUNTRY_NAME":"United States","ORIGIN_COUNTRY_NAME":"Grenada","count":54}
{"DEST_COUNTRY_NAME":"Costa Rica","ORIGIN_COUNTRY_NAME":"United States","count":477}
{"DEST_COUNTRY_NAME":"Senegal","ORIGIN_COUNTRY_NAME":"United States","count":29}
{"DEST_COUNTRY_NAME":"United States","ORIGIN_COUNTRY_NAME":"Marshall Islands","count":44}


### _Parquet Read Example_

In [238]:
spark.read.format("parquet")\
.load(flightDataParquet2010).show(3)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
+-----------------+-------------------+-----+
only showing top 3 rows



### _Parquet Write Example_

In [239]:
csvFile.write.format("parquet").mode("overwrite")\
.save("/Users/grp/sparkTheDefinitiveGuide/tmp/parquetWrite.parquet")

In [240]:
# 1 parquet file
# !head /Users/grp/sparkTheDefinitiveGuide/tmp/parquetWrite.parquet/*.parquet

### _ORC Read Example_

In [241]:
spark.read.format("orc")\
.load(flightDataORC2010).show(3)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
+-----------------+-------------------+-----+
only showing top 3 rows



### _ORC Write Example_

In [242]:
csvFile.write.format("orc").mode("overwrite")\
.save("/Users/grp/sparkTheDefinitiveGuide/tmp/orcWrite.orc")

In [243]:
# 1 orc file
# !head /Users/grp/sparkTheDefinitiveGuide/tmp/orcWrite.orc/*.orc

### _SQLite SQL Database Example_

In [244]:
'''
run via shell: 
pyspark \
--master local[8] \
--driver-class-path /Users/grp/sparkTheDefinitiveGuide/sqlite-jdbc-3.8.6.jar \
--jars /Users/grp/sparkTheDefinitiveGuide/sqlite-jdbc-3.8.6.jar
'''

'\nrun via shell: \npyspark --master local[8] --driver-class-path /Users/grp/sparkTheDefinitiveGuide/sqlite-jdbc-3.8.6.jar --jars /Users/grp/sparkTheDefinitiveGuide/sqlite-jdbc-3.8.6.jar\n'

In [245]:
driver = "org.sqlite.JDBC"
path = sqliteJDBC
url = "jdbc:sqlite:" + path
tablename = "flight_info"

In [246]:
dbDataFrame = spark.read\
.format("jdbc").option("url", url).option("dbtable", tablename).option("driver",  driver).load()

In [247]:
dbDataFrame.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: decimal(20,0) (nullable = true)



In [248]:
dbDataFrame.select("DEST_COUNTRY_NAME").distinct().show(5)

+--------------------+
|   DEST_COUNTRY_NAME|
+--------------------+
|   Equatorial Guinea|
|             Bolivia|
|Turks and Caicos ...|
|            Pakistan|
|    Marshall Islands|
+--------------------+
only showing top 5 rows



### _Additional Options Example_

In [249]:
'''
pgDF = spark.read.format("jdbc")\
  .option("driver", "org.postgresql.Driver")\
  .option("url", "jdbc:postgresql://database_server")\
  .option("dbtable", "schema.tablename")\
  .option("user", "username").option("password", "my-secret-password").load()
'''

'\npgDF = spark.read.format("jdbc")  .option("driver", "org.postgresql.Driver")  .option("url", "jdbc:postgresql://database_server")  .option("dbtable", "schema.tablename")  .option("user", "username").option("password", "my-secret-password").load()\n'

### _Query Pushdown  Example_

In [250]:
dbDataFrame.select("DEST_COUNTRY_NAME").distinct().explain()

== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#8455], functions=[])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#8455, 5)
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#8455], functions=[])
      +- *(1) Scan JDBCRelation(flight_info) [numPartitions=1] [DEST_COUNTRY_NAME#8455] PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


### _Query Pushdown Filter To DB Example_

In [251]:
dbDataFrame.filter("DEST_COUNTRY_NAME in ('Anguilla', 'Sweden')").explain()

== Physical Plan ==
*(1) Scan JDBCRelation(flight_info) [numPartitions=1] [DEST_COUNTRY_NAME#8455,ORIGIN_COUNTRY_NAME#8456,count#8457] PushedFilters: [*In(DEST_COUNTRY_NAME, [Anguilla,Sweden])], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:decimal(20,0)>


### _Specify SQL Query (Query Result of Query) Example_

In [252]:
pushdownQuery = """(SELECT DISTINCT(DEST_COUNTRY_NAME) FROM flight_info) AS flight_info"""
dbDataFrame = spark.read\
.format("jdbc").option("url", url).option("dbtable", pushdownQuery).option("driver",  driver).load()

In [253]:
dbDataFrame.explain()

== Physical Plan ==
*(1) Scan JDBCRelation((SELECT DISTINCT(DEST_COUNTRY_NAME) FROM flight_info) AS flight_info) [numPartitions=1] [DEST_COUNTRY_NAME#8471] PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


### _Parallel Reads From DB Example_

In [254]:
# numPartitions specifies max num of partitions to how much spark is reading and writing in parallel
# partitions = level of parallelism
dbDataFrame = spark.read.format("jdbc")\
.option("url", url).option("dbtable", tablename).option("driver",  driver).option("numPartitions", 10).load()

### _Parallel Predicate Pushdown To DB Example_

In [255]:
props = {"driver":"org.sqlite.JDBC"}
predicates = [
  "DEST_COUNTRY_NAME = 'Sweden' OR ORIGIN_COUNTRY_NAME = 'Sweden'",
  "DEST_COUNTRY_NAME = 'Anguilla' OR ORIGIN_COUNTRY_NAME = 'Anguilla'"]
spark.read.jdbc(url, tablename, predicates=predicates, properties=props).show()
spark.read.jdbc(url,tablename,predicates=predicates,properties=props)\
  .rdd.getNumPartitions() # 2

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|           Sweden|      United States|   65|
|    United States|             Sweden|   73|
|         Anguilla|      United States|   21|
|    United States|           Anguilla|   20|
+-----------------+-------------------+-----+



2

### _Parallel Predicate Pushdown To DB w/ Duplicate Rows Example_

In [256]:
props = {"driver":"org.sqlite.JDBC"}
predicates = [
  "DEST_COUNTRY_NAME != 'Sweden' OR ORIGIN_COUNTRY_NAME != 'Sweden'",
  "DEST_COUNTRY_NAME != 'Anguilla' OR ORIGIN_COUNTRY_NAME != 'Anguilla'"]
spark.read.jdbc(url, tablename, predicates=predicates, properties=props).count()

510

In [257]:
spark.read.jdbc(url, tablename, predicates=predicates, properties=props).rdd.getNumPartitions() #2

2

### _Sliding Window Partition Example_

In [258]:
# min and max partition
colName = "count"
lowerBound = 0
upperBound = 348113
numPartitions = 10

spark.read.jdbc(url, tablename, column=colName, properties=props,
                lowerBound=lowerBound, upperBound=upperBound,
                numPartitions=numPartitions).count() # 255

255

In [259]:
spark.read.jdbc(url, tablename, column=colName, properties=props,
                lowerBound=lowerBound, upperBound=upperBound,
                numPartitions=numPartitions).rdd.getNumPartitions() #10

10

### _Writing To SQL DB Example_

In [260]:
newPath = "jdbc:sqlite://Users/grp/sparkTheDefinitiveGuide/tmp/sqlWrite.db"
csvFile.write.jdbc(newPath, tablename, mode="overwrite", properties=props)

In [261]:
# 1 db file
# !head /Users/grp/sparkTheDefinitiveGuide/tmp/sqlWrite.db

### _Reading Results Example_

In [262]:
spark.read.jdbc(newPath, tablename, properties=props).count()

255

### _Append Write/Re-read Example_

In [263]:
csvFile.write.jdbc(newPath, tablename, mode="append", properties=props)
spark.read.jdbc(newPath, tablename, properties=props).count()

510

### _TXT Read Example_

In [264]:
spark.read.text(flightDataCSV2010).selectExpr("split(value, ',') as rows").show(3, False)

+-----------------------------------------------+
|rows                                           |
+-----------------------------------------------+
|[DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count]|
|[United States, Romania, 1]                    |
|[United States, Ireland, 264]                  |
+-----------------------------------------------+
only showing top 3 rows



### _TXT Write Example_

In [265]:
csvFile.select("DEST_COUNTRY_NAME").write.mode("overwrite")\
.text("/Users/grp/sparkTheDefinitiveGuide/tmp/txtWrite.txt")

In [266]:
# 1 txt file
!head /Users/grp/sparkTheDefinitiveGuide/tmp/txtWrite.txt/*.txt

United States
United States
United States
Egypt
Equatorial Guinea
United States
United States
Costa Rica
Senegal
United States


### _TXT Write Partition By Example_

In [267]:
csvFile.limit(10).select("DEST_COUNTRY_NAME", "count").write.mode("overwrite").partitionBy("count")\
.text("/Users/grp/sparkTheDefinitiveGuide/tmp/partitionWrite.txt")

In [268]:
import os
for i in os.listdir("/Users/grp/sparkTheDefinitiveGuide/tmp/partitionWrite.txt"): print(i)

count=69
._SUCCESS.crc
count=29
count=264
count=44
count=54
count=477
_SUCCESS
count=1
count=24
count=25


In [269]:
# files split by count column
!head /Users/grp/sparkTheDefinitiveGuide/tmp/partitionWrite.txt/count=1/*.txt
print("\n")
!head /Users/grp/sparkTheDefinitiveGuide/tmp/partitionWrite.txt/count=24/*.txt
print("\n")
!head /Users/grp/sparkTheDefinitiveGuide/tmp/partitionWrite.txt/count=25/*.txt
print("\n")
!head /Users/grp/sparkTheDefinitiveGuide/tmp/partitionWrite.txt/count=264/*.txt

United States
Equatorial Guinea


Egypt


United States


United States


### _Writing Data in Parallel Example_

In [270]:
csvFile.repartition(5).write.mode("overwrite").format("csv")\
.save("/Users/grp/sparkTheDefinitiveGuide/tmp/repartitionWrite.csv")

In [271]:
# files split into 5 partitions
for i in os.listdir("/Users/grp/sparkTheDefinitiveGuide/tmp/repartitionWrite.csv/"): print(i)

.part-00004-b24e2d82-a563-4d78-807c-6d3915c5d806-c000.csv.crc
._SUCCESS.crc
part-00004-b24e2d82-a563-4d78-807c-6d3915c5d806-c000.csv
.part-00001-b24e2d82-a563-4d78-807c-6d3915c5d806-c000.csv.crc
part-00003-b24e2d82-a563-4d78-807c-6d3915c5d806-c000.csv
.part-00000-b24e2d82-a563-4d78-807c-6d3915c5d806-c000.csv.crc
.part-00002-b24e2d82-a563-4d78-807c-6d3915c5d806-c000.csv.crc
part-00002-b24e2d82-a563-4d78-807c-6d3915c5d806-c000.csv
.part-00003-b24e2d82-a563-4d78-807c-6d3915c5d806-c000.csv.crc
_SUCCESS
part-00001-b24e2d82-a563-4d78-807c-6d3915c5d806-c000.csv
part-00000-b24e2d82-a563-4d78-807c-6d3915c5d806-c000.csv


In [272]:
! head /Users/grp/sparkTheDefinitiveGuide/tmp/repartitionWrite.csv/part-00001*.csv

Barbados,United States,130
United States,Fiji,51
United States,Senegal,46
New Zealand,United States,86
Kiribati,United States,17
Afghanistan,United States,11
Latvia,United States,12
United States,Luxembourg,90
United States,Angola,18
United States,Cyprus,1


### _Partitioning (partitionBy) Example_

In [273]:
csvFile.limit(10).write.mode("overwrite").partitionBy("DEST_COUNTRY_NAME")\
.save("/Users/grp/sparkTheDefinitiveGuide/tmp/partitionByWrite.parquet/")

In [274]:
# files split by predicate
for i in os.listdir("/Users/grp/sparkTheDefinitiveGuide/tmp/partitionByWrite.parquet/"): print(i)

DEST_COUNTRY_NAME=United States
DEST_COUNTRY_NAME=Costa Rica
._SUCCESS.crc
DEST_COUNTRY_NAME=Senegal
DEST_COUNTRY_NAME=Equatorial Guinea
_SUCCESS
DEST_COUNTRY_NAME=Egypt


### _Bucketing Example_

In [275]:
numberBuckets = 10
columnToBucketBy = "count"
csvFile.write.format("parquet").mode("overwrite")\
.bucketBy(numberBuckets, columnToBucketBy).mode("overwrite").saveAsTable("bucketedFiles")

In [276]:
# stored in spark-warehouse local dir or /user/hive/warehouse on a cluster
for i in os.listdir("/Users/grp/sparkNotebooks/spark-warehouse/bucketedfiles/"): print(i)

.part-00000-417cbe3f-8759-4ed4-b863-80f242df250f_00007.c000.snappy.parquet.crc
.part-00000-417cbe3f-8759-4ed4-b863-80f242df250f_00008.c000.snappy.parquet.crc
part-00000-417cbe3f-8759-4ed4-b863-80f242df250f_00008.c000.snappy.parquet
part-00000-417cbe3f-8759-4ed4-b863-80f242df250f_00009.c000.snappy.parquet
.part-00000-417cbe3f-8759-4ed4-b863-80f242df250f_00003.c000.snappy.parquet.crc
part-00000-417cbe3f-8759-4ed4-b863-80f242df250f_00006.c000.snappy.parquet
._SUCCESS.crc
.part-00000-417cbe3f-8759-4ed4-b863-80f242df250f_00000.c000.snappy.parquet.crc
part-00000-417cbe3f-8759-4ed4-b863-80f242df250f_00007.c000.snappy.parquet
part-00000-417cbe3f-8759-4ed4-b863-80f242df250f_00004.c000.snappy.parquet
part-00000-417cbe3f-8759-4ed4-b863-80f242df250f_00005.c000.snappy.parquet
.part-00000-417cbe3f-8759-4ed4-b863-80f242df250f_00004.c000.snappy.parquet.crc
part-00000-417cbe3f-8759-4ed4-b863-80f242df250f_00003.c000.snappy.parquet
part-00000-417cbe3f-8759-4ed4-b863-80f242df250f_00002.c000.snappy.parquet

## _Chapter #10 - Spark SQL_

-  ability to run SQL queries against views or tables in databases
-  SQL or "Structured Query Language" is a language expressing relational operations over data (manipulations, definitions, controls)
-  supports both ANSI-SQL and HiveQL queries
-  intended to operate as an OLAP DB not a OLTP DB for low-latency queries
-  a Spark SQL CLI is available via ./bin/spark-sql
-  ad hoc Spark SQL via SparkSession as spark.sql(...)
-  includes an JDBC/ODBC server started via ./sbin/start-thriftserver.sh

### Spark's Relationship to Hive:
-  connects to Hive metastores (where Hive maintains table information)  
-  configure Hive with Spark by placing hive-site.xml, core-site.xml, and hdfs-site.xml files in conf/

### Catalog:
-  stores metadata about data stored in tables, databases, functions, views
-  available in the _org.apache.spark.sql.catalog.Catalog_ package

### Tables:
-  structure of data
-  hold information - data and metadata (data about the tables)
-  tables are defined in database and dataframes are defined within programming language code
-  tables will be assigned a database (_default_ is default database) ... "show tables in DATABASENAME"
-  Types:
    -  Managed:
        -  saveAsTable on DF
        -  writes to default Hive Warehouse location (/user/hive/warehouse)
    -  Unmanaged:
        - define a table structure from files on disk

### Dropping Tables:
-  dropping a managed table the data and the table definition will be removed
-  "DROP TABLE IF EXISTS" will delete the table and the data if the table exists
-  dropping a unmanaged table the underlying table will be removed however the data will still exist in linked directory

### Metadata:
-  can describe the table's metadata
-  REFRESH TABLE refreshes all cached files linked to table
-  REPAIR TABLE refreshes partitions / collects new partitions maintained in the Catalog

### Views:
-  set of transformations (saved query) on top of an existing table
-  equivalent to creating a new DF from an existing DF
-  when dropping a VIEW the underlying data is not removed just the view definition itself
-  Types:
    -  global [viewable across entire Spark Application however removed at end of session] (GLOBAL TEMPORARY VIEW)
    -  set to a database
    -  per session [only available during current sesion and not registered to a database] (TEMPORARY VIEW)
    
### Databases:
-  organize tables

### Spark SQL Complex Types:
-  structs [provide a way of creating or querying nested data in Spark]
-  lists [array]
-  maps

### Functions:
-  system functions as well as user defined functions

### Subqueries:
-  queries within other queries
-  Kinds:
    -  correlated subqueries [use logic from outer scope of query in inner query to suplement information in subquery]
    -  uncorrelated subqueries [includes no information from the outer scope of the query]
    -  predicate subqueries [filtering based on values]

### _Chapter #10 Exercises (SQL)_

In [277]:
spark.sql("drop database if exists sparkSQL CASCADE")
spark.sql("create database sparkSQL")
spark.sql("use sparkSQL")

DataFrame[]

In [278]:
spark.sql("drop table if exists flights_from_select")
spark.sql("drop table if exists flights")
spark.sql("drop table if exists flights_csv")
spark.sql("drop table if exists hive_flights")
spark.sql("drop table if exists partitioned_flights")
spark.sql("drop table if exists nested_data")
spark.sql("drop table if exists just_usa_view")
spark.sql("drop table if exists just_usa_view_temp")

DataFrame[]

### _Spark DF and SQL Integration Example_

In [279]:
spark.read.json(flightDataJson2015).createOrReplaceTempView("some_sql_view") # DF => SQL

In [280]:
spark.sql\
(
"""
SELECT DEST_COUNTRY_NAME, sum(count)
FROM some_sql_view GROUP BY DEST_COUNTRY_NAME
"""
)\
.where("DEST_COUNTRY_NAME like 'S%'").where("`sum(count)` > 10")\
.count() # SQL => DF

12

### _Creating Spark Managed Tables Example_:
-  USING syntax uses Spark Serialization whereas STORED AS uses Hive SerDe configuration which is much slower

In [281]:
spark.sql\
(
"""
CREATE TABLE flights (DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count LONG)
USING JSON OPTIONS (path '/Users/grp/sparkTheDefinitiveGuide/data/flight-data/json/2015-summary.json')
"""
)

DataFrame[]

In [282]:
spark.sql("describe flights").show()

+-------------------+---------+-------+
|           col_name|data_type|comment|
+-------------------+---------+-------+
|  DEST_COUNTRY_NAME|   string|   null|
|ORIGIN_COUNTRY_NAME|   string|   null|
|              count|   bigint|   null|
+-------------------+---------+-------+



In [283]:
spark.sql("select * from flights").show(3)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
+-----------------+-------------------+-----+
only showing top 3 rows



In [284]:
spark.sql\
(
"""
CREATE TABLE flights_csv (DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING COMMENT "remember, the US will be most prevalent",
count LONG) USING csv OPTIONS (header true, path '/Users/grp/sparkTheDefinitiveGuide/data/flight-data/csv/2015-summary.csv')
"""
)

DataFrame[]

In [285]:
spark.sql("describe flights_csv").show(truncate=False)

+-------------------+---------+---------------------------------------+
|col_name           |data_type|comment                                |
+-------------------+---------+---------------------------------------+
|DEST_COUNTRY_NAME  |string   |null                                   |
|ORIGIN_COUNTRY_NAME|string   |remember, the US will be most prevalent|
|count              |bigint   |null                                   |
+-------------------+---------+---------------------------------------+



In [286]:
spark.sql("select * from flights_csv").show(3)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
+-----------------+-------------------+-----+
only showing top 3 rows



In [287]:
spark.sql("CREATE TABLE flights_from_select USING parquet AS SELECT * FROM flights")
spark.sql("CREATE TABLE IF NOT EXISTS flights_from_select AS SELECT * FROM flights")

DataFrame[]

### _Creating Spark Managed Partitioned Table Example_

In [288]:
spark.sql\
(
"""
CREATE TABLE partitioned_flights USING parquet PARTITIONED BY (DEST_COUNTRY_NAME)
AS SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count FROM flights
"""
)

DataFrame[]

In [289]:
spark.sql("select * from partitioned_flights").show(3)

+-------------------+-----+-----------------+
|ORIGIN_COUNTRY_NAME|count|DEST_COUNTRY_NAME|
+-------------------+-----+-----------------+
|            Romania|   15|    United States|
|            Croatia|    1|    United States|
|            Ireland|  344|    United States|
+-------------------+-----+-----------------+
only showing top 3 rows



In [290]:
spark.sql("describe partitioned_flights").show(truncate=False)

+-----------------------+---------+-------+
|col_name               |data_type|comment|
+-----------------------+---------+-------+
|ORIGIN_COUNTRY_NAME    |string   |null   |
|count                  |bigint   |null   |
|DEST_COUNTRY_NAME      |string   |null   |
|# Partition Information|         |       |
|# col_name             |data_type|comment|
|DEST_COUNTRY_NAME      |string   |null   |
+-----------------------+---------+-------+



In [291]:
spark.sql("show PARTITIONS partitioned_flights").show(3, truncate=False)

+--------------------------+
|partition                 |
+--------------------------+
|DEST_COUNTRY_NAME=Algeria |
|DEST_COUNTRY_NAME=Angola  |
|DEST_COUNTRY_NAME=Anguilla|
+--------------------------+
only showing top 3 rows



### _Creating External Table Example_

In [292]:
spark.sql\
(
"""
CREATE EXTERNAL TABLE hive_flights (DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count LONG)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/Users/grp/sparkTheDefinitiveGuide/data/flight-data-hive/'
"""
)

DataFrame[]

In [293]:
spark.sql("show tables").show()

+--------+-------------------+-----------+
|database|          tableName|isTemporary|
+--------+-------------------+-----------+
|sparksql|            flights|      false|
|sparksql|        flights_csv|      false|
|sparksql|flights_from_select|      false|
|sparksql|       hive_flights|      false|
|sparksql|partitioned_flights|      false|
|        |          complexdf|       true|
|        |         datatable2|       true|
|        |          datetable|       true|
|        |           dfnonull|       true|
|        |            dftable|       true|
|        |         dfwithdate|       true|
|        |    graduateprogram|       true|
|        |             person|       true|
|        |      some_sql_view|       true|
|        |        sparkstatus|       true|
+--------+-------------------+-----------+



### _Insert and Insert Partition Example_

In [294]:
spark.sql\
(
"""
INSERT INTO flights_from_select SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count FROM flights
"""
)

DataFrame[]

In [295]:
spark.sql\
(
"""
INSERT INTO partitioned_flights
PARTITION (DEST_COUNTRY_NAME="UNITED STATES")
SELECT count, ORIGIN_COUNTRY_NAME FROM flights
WHERE DEST_COUNTRY_NAME='UNITED STATES'
"""
)

DataFrame[]

### _Metadata Example_

In [296]:
spark.sql("REFRESH table partitioned_flights")
spark.sql("MSCK REPAIR TABLE partitioned_flights")

DataFrame[]

### _Dropping Tables Example_

In [297]:
'''
spark.sql("DROP TABLE flights")
spark.sql("DROP TABLE IF EXISTS flights")
'''

'\nspark.sql("DROP TABLE flights")\nspark.sql("DROP TABLE IF EXISTS flights")\n'

### _Cache/UnCache Tables Example_

In [298]:
'''
spark.sql("CACHE TABLE flights")
spark.sql("UNCACHE TABLE flights")
'''

'\nspark.sql("CACHE TABLE flights")\nspark.sql("UNCACHE TABLE flights")\n'

### _Views Example_

In [299]:
spark.sql("""
CREATE VIEW just_usa_view AS
SELECT * FROM flights WHERE dest_country_name = 'United States'
""")

spark.sql("""
CREATE TEMP VIEW just_usa_view_temp AS
SELECT * FROM flights WHERE dest_country_name = 'United States'
""")

spark.sql("""
CREATE GLOBAL TEMP VIEW just_usa_global_view_temp AS
SELECT * FROM flights WHERE dest_country_name = 'United States'
""")

spark.sql("""
CREATE OR REPLACE TEMP VIEW just_usa_view_temp AS
SELECT * FROM flights WHERE dest_country_name = 'United States'
""")

DataFrame[]

In [300]:
spark.sql("show tables").show()

+--------+-------------------+-----------+
|database|          tableName|isTemporary|
+--------+-------------------+-----------+
|sparksql|            flights|      false|
|sparksql|        flights_csv|      false|
|sparksql|flights_from_select|      false|
|sparksql|       hive_flights|      false|
|sparksql|      just_usa_view|      false|
|sparksql|partitioned_flights|      false|
|        |          complexdf|       true|
|        |         datatable2|       true|
|        |          datetable|       true|
|        |           dfnonull|       true|
|        |            dftable|       true|
|        |         dfwithdate|       true|
|        |    graduateprogram|       true|
|        | just_usa_view_temp|       true|
|        |             person|       true|
|        |      some_sql_view|       true|
|        |        sparkstatus|       true|
+--------+-------------------+-----------+



### _Database Example_

In [301]:
'''
spark.sql("show databases").show()
spark.sql("create database example")
spark.sql("use example")
spark.sql("select * from table").show()
spark.sql("select * from example.table").show()
spark.sql("select current_database()").show()
spark.sql("drop database if exists example")
'''

'\nspark.sql("show databases").show()\nspark.sql("create database example")\nspark.sql("use example")\nspark.sql("select * from table").show()\nspark.sql("select * from example.table").show()\nspark.sql("select current_database()").show()\nspark.sql("drop database if exists example")\n'

In [302]:
spark.sql("select current_database()").show()

+------------------+
|current_database()|
+------------------+
|          sparksql|
+------------------+



### _Select Statements Example_

In [303]:
'''
SELECT [ALL|DISTINCT] named_expression[, named_expression, ...]
    FROM relation[, relation, ...]
    [lateral_view[, lateral_view, ...]]
    [WHERE boolean_expression]
    [aggregation [HAVING boolean_expression]]
    [ORDER BY sort_expressions]
    [CLUSTER BY expressions]
    [DISTRIBUTE BY expressions]
    [SORT BY sort_expressions]
    [WINDOW named_window[, WINDOW named_window, ...]]
    [LIMIT num_rows]

named_expression:
    : expression [AS alias]

relation:
    | join_relation
    | (table_name|query|relation) [sample] [AS alias]
    : VALUES (expressions)[, (expressions), ...]
          [AS (column_name[, column_name, ...])]

expressions:
    : expression[, expression, ...]

sort_expressions:
    : expression [ASC|DESC][, expression [ASC|DESC], ...]
'''

'\nSELECT [ALL|DISTINCT] named_expression[, named_expression, ...]\n    FROM relation[, relation, ...]\n    [lateral_view[, lateral_view, ...]]\n    [WHERE boolean_expression]\n    [aggregation [HAVING boolean_expression]]\n    [ORDER BY sort_expressions]\n    [CLUSTER BY expressions]\n    [DISTRIBUTE BY expressions]\n    [SORT BY sort_expressions]\n    [WINDOW named_window[, WINDOW named_window, ...]]\n    [LIMIT num_rows]\n\nnamed_expression:\n    : expression [AS alias]\n\nrelation:\n    | join_relation\n    | (table_name|query|relation) [sample] [AS alias]\n    : VALUES (expressions)[, (expressions), ...]\n          [AS (column_name[, column_name, ...])]\n\nexpressions:\n    : expression[, expression, ...]\n\nsort_expressions:\n    : expression [ASC|DESC][, expression [ASC|DESC], ...]\n'

### _CASE ... WHEN ... THEN Statements Example_

In [304]:
spark.sql("""
SELECT
  CASE WHEN DEST_COUNTRY_NAME = 'UNITED STATES' THEN 1
       WHEN DEST_COUNTRY_NAME = 'Egypt' THEN 0
       ELSE -1 END
FROM partitioned_flights
""").show(3)

+--------------------------------------------------------------------------------------------------------+
|CASE WHEN (DEST_COUNTRY_NAME = UNITED STATES) THEN 1 WHEN (DEST_COUNTRY_NAME = Egypt) THEN 0 ELSE -1 END|
+--------------------------------------------------------------------------------------------------------+
|                                                                                                      -1|
|                                                                                                      -1|
|                                                                                                      -1|
+--------------------------------------------------------------------------------------------------------+
only showing top 3 rows



### _Structs Example_

In [305]:
spark\
.sql(\
    """
    CREATE VIEW IF NOT EXISTS nested_data AS
    SELECT (DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME) as country, count FROM flights
    """)
spark.sql("select * from nested_data").show(3, False)
spark.sql("select country.DEST_COUNTRY_NAME, count from nested_data").show(3, False)
spark.sql("select country.*, count from nested_data").show(3, False)

+------------------------+-----+
|country                 |count|
+------------------------+-----+
|[United States, Romania]|15   |
|[United States, Croatia]|1    |
|[United States, Ireland]|344  |
+------------------------+-----+
only showing top 3 rows

+-----------------+-----+
|DEST_COUNTRY_NAME|count|
+-----------------+-----+
|United States    |15   |
|United States    |1    |
|United States    |344  |
+-----------------+-----+
only showing top 3 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |15   |
|United States    |Croatia            |1    |
|United States    |Ireland            |344  |
+-----------------+-------------------+-----+
only showing top 3 rows



### _Lists Example_

In [306]:
spark\
.sql(\
    """
    SELECT DEST_COUNTRY_NAME as new_name, collect_list(count) as flight_counts,
    collect_set(ORIGIN_COUNTRY_NAME) as origin_set
    FROM flights GROUP BY DEST_COUNTRY_NAME
    """)\
.show(3, False)

spark.sql("SELECT DEST_COUNTRY_NAME, ARRAY(1, 2, 3) FROM flights").show(3, False)

spark\
.sql(\
    """
    SELECT DEST_COUNTRY_NAME as new_name, collect_list(count)[0]
    FROM flights GROUP BY DEST_COUNTRY_NAME
    """)\
.show(3, False)

spark\
.sql(\
    """
    CREATE OR REPLACE TEMP VIEW flights_agg AS
    SELECT DEST_COUNTRY_NAME, collect_list(count) as collected_counts
    FROM flights GROUP BY DEST_COUNTRY_NAME
    """)

spark.sql("SELECT explode(collected_counts), DEST_COUNTRY_NAME FROM flights_agg").show(3, False)

+--------+-------------+---------------+
|new_name|flight_counts|origin_set     |
+--------+-------------+---------------+
|Algeria |[4]          |[United States]|
|Angola  |[15]         |[United States]|
|Austria |[62]         |[United States]|
+--------+-------------+---------------+
only showing top 3 rows

+-----------------+--------------+
|DEST_COUNTRY_NAME|array(1, 2, 3)|
+-----------------+--------------+
|United States    |[1, 2, 3]     |
|United States    |[1, 2, 3]     |
|United States    |[1, 2, 3]     |
+-----------------+--------------+
only showing top 3 rows

+--------+----------------------+
|new_name|collect_list(count)[0]|
+--------+----------------------+
|Algeria |4                     |
|Angola  |15                    |
|Austria |62                    |
+--------+----------------------+
only showing top 3 rows

+---+-----------------+
|col|DEST_COUNTRY_NAME|
+---+-----------------+
|4  |Algeria          |
|15 |Angola           |
|62 |Austria          |
+---+------

### _Function Example_

In [307]:
spark.sql("SHOW FUNCTIONS").show(3) # list of spark functions
spark.sql("SHOW SYSTEM FUNCTIONS").show(3) # spark built in functions
spark.sql("SHOW USER FUNCTIONS").show(3) # user defined functions
spark.sql("SHOW FUNCTIONS 's*'").show(3) # functions that begin with 's'
spark.sql("SHOW FUNCTIONS LIKE 'collect*'").show(3) # functions that contain 'collect'
spark.sql("DESCRIBE FUNCTION collect_list").collect() # describes function

+--------+
|function|
+--------+
|       !|
|       %|
|       &|
+--------+
only showing top 3 rows

+--------+
|function|
+--------+
|       !|
|       %|
|       &|
+--------+
only showing top 3 rows

+--------+
|function|
+--------+
|power3py|
+--------+

+---------+
| function|
+---------+
|   second|
|sentences|
|      sha|
+---------+
only showing top 3 rows

+------------+
|    function|
+------------+
|collect_list|
| collect_set|
+------------+



[Row(function_desc='Function: collect_list'),
 Row(function_desc='Class: org.apache.spark.sql.catalyst.expressions.aggregate.CollectList'),
 Row(function_desc='Usage: collect_list(expr) - Collects and returns a list of non-unique elements.')]

### _Subquery Example_:
-  correlated subqueries
-  uncorrelated subqueries
-  predicate subqueries

In [308]:
# uncorrelated predicate subquery
print("uncorrelated predicate subquery:")

spark\
.sql(\
    """
    SELECT dest_country_name FROM flights
        GROUP BY dest_country_name ORDER BY sum(count) DESC LIMIT 5
    """)\
.show(5, False)

spark\
.sql(\
    """
    SELECT * FROM flights WHERE origin_country_name IN 
        (SELECT dest_country_name FROM flights GROUP BY dest_country_name ORDER BY sum(count) DESC LIMIT 5)
    """)\
.show(3, False)

# correlated predicate subquery
print("correlated predicate subquery:")

spark\
.sql(\
    """
    SELECT * FROM flights f1
    WHERE EXISTS (SELECT 1 FROM flights f2
                WHERE f1.dest_country_name = f2.origin_country_name)
    AND EXISTS (SELECT 1 FROM flights f2
                WHERE f2.dest_country_name = f1.origin_country_name)
    """)\
.show(3, False)

# uncorrelated scalar queries
print("uncorrelated scalar queries:")

spark.sql("SELECT *, (SELECT max(count) FROM flights) AS maximum FROM flights").show(3, False)

uncorrelated predicate subquery:
+-----------------+
|dest_country_name|
+-----------------+
|United States    |
|Canada           |
|Mexico           |
|United Kingdom   |
|Japan            |
+-----------------+

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|Egypt            |United States      |15   |
|Costa Rica       |United States      |588  |
|Senegal          |United States      |40   |
+-----------------+-------------------+-----+
only showing top 3 rows

correlated predicate subquery:
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |15   |
|United States    |Croatia            |1    |
|United States    |Ireland            |344  |
+-----------------+-------------------+-----+
only showing top 3 rows

uncorrelated scalar queries:
+-----------------+-------------

### Spark SQL Configuration Example

In [309]:
spark.sql("SET spark.sql.shuffle.partitions=8")

DataFrame[key: string, value: string]

## _Chapter #11 - Datasets_

-  solely JVM language feature that only works with Scala and Java programming language
-  ability to define the object that each row in your Dataset will consist of
-  Scala:
    -  define case class object that holds schema
    -  Case Class:
        -  immutable
        -  comparison by structure (schema) instead of value
-  Java:
    -  define Java Bean
-  Spark converts the Spark Row format to the object specified (Scala case class or Java class)

### _Chapter #11 Exercises (Scala/Java Datasets)_

In [310]:
'''
case class Flight(DEST_COUNTRY_NAME: String,
                  ORIGIN_COUNTRY_NAME: String, count: BigInt)


// COMMAND ----------

val flightsDF = spark.read
  .parquet("/data/flight-data/parquet/2010-summary.parquet/")
val flights = flightsDF.as[Flight]


// COMMAND ----------

flights.show(2)


// COMMAND ----------

flights.first.DEST_COUNTRY_NAME // United States


// COMMAND ----------

def originIsDestination(flight_row: Flight): Boolean = {
  return flight_row.ORIGIN_COUNTRY_NAME == flight_row.DEST_COUNTRY_NAME
}


// COMMAND ----------

flights.filter(flight_row => originIsDestination(flight_row)).first()


// COMMAND ----------

flights.collect().filter(flight_row => originIsDestination(flight_row))


// COMMAND ----------

val destinations = flights.map(f => f.DEST_COUNTRY_NAME)


// COMMAND ----------

val localDestinations = destinations.take(5)


// COMMAND ----------

case class FlightMetadata(count: BigInt, randomData: BigInt)

val flightsMeta = spark.range(500).map(x => (x, scala.util.Random.nextLong))
  .withColumnRenamed("_1", "count").withColumnRenamed("_2", "randomData")
  .as[FlightMetadata]


// COMMAND ----------

val flights2 = flights
  .joinWith(flightsMeta, flights.col("count") === flightsMeta.col("count"))


// COMMAND ----------

flights2.selectExpr("_1.DEST_COUNTRY_NAME")


// COMMAND ----------

flights2.take(2)


// COMMAND ----------

val flights2 = flights.join(flightsMeta, Seq("count"))


// COMMAND ----------

val flights2 = flights.join(flightsMeta.toDF(), Seq("count"))


// COMMAND ----------

flights.groupBy("DEST_COUNTRY_NAME").count()


// COMMAND ----------

flights.groupByKey(x => x.DEST_COUNTRY_NAME).count()


// COMMAND ----------

flights.groupByKey(x => x.DEST_COUNTRY_NAME).count().explain


// COMMAND ----------

def grpSum(countryName:String, values: Iterator[Flight]) = {
  values.dropWhile(_.count < 5).map(x => (countryName, x))
}
flights.groupByKey(x => x.DEST_COUNTRY_NAME).flatMapGroups(grpSum).show(5)


// COMMAND ----------

def grpSum2(f:Flight):Integer = {
  1
}
flights.groupByKey(x => x.DEST_COUNTRY_NAME).mapValues(grpSum2).count().take(5)


// COMMAND ----------

def sum2(left:Flight, right:Flight) = {
  Flight(left.DEST_COUNTRY_NAME, null, left.count + right.count)
}
flights.groupByKey(x => x.DEST_COUNTRY_NAME).reduceGroups((l, r) => sum2(l, r))
  .take(5)


// COMMAND ----------

flights.groupBy("DEST_COUNTRY_NAME").count().explain


// COMMAND ----------

'''

'\ncase class Flight(DEST_COUNTRY_NAME: String,\n                  ORIGIN_COUNTRY_NAME: String, count: BigInt)\n\n\n// COMMAND ----------\n\nval flightsDF = spark.read\n  .parquet("/data/flight-data/parquet/2010-summary.parquet/")\nval flights = flightsDF.as[Flight]\n\n\n// COMMAND ----------\n\nflights.show(2)\n\n\n// COMMAND ----------\n\nflights.first.DEST_COUNTRY_NAME // United States\n\n\n// COMMAND ----------\n\ndef originIsDestination(flight_row: Flight): Boolean = {\n  return flight_row.ORIGIN_COUNTRY_NAME == flight_row.DEST_COUNTRY_NAME\n}\n\n\n// COMMAND ----------\n\nflights.filter(flight_row => originIsDestination(flight_row)).first()\n\n\n// COMMAND ----------\n\nflights.collect().filter(flight_row => originIsDestination(flight_row))\n\n\n// COMMAND ----------\n\nval destinations = flights.map(f => f.DEST_COUNTRY_NAME)\n\n\n// COMMAND ----------\n\nval localDestinations = destinations.take(5)\n\n\n// COMMAND ----------\n\ncase class FlightMetadata(count: BigInt, randomData

### grp