# Spark Learning Note - Data Types
Jia Geng | gjia0214@gmail.com

<a id='directory'></a>

## Directory

- [Data Source](https://github.com/databricks/Spark-The-Definitive-Guide/tree/master/data/)
- [1. Boolean & Filtering](#sec1)
- [2.1 Numbers](#sec2-1)
- [2.2 More Useful Functions](#sec2-2)
- [3. Strings](#sec3)
- [4. Date and Timestamp](#sec4)
- [5. Nulls](#sec5)
- [6. Copmlex Types](#sec6)


## API Locations

**Column Methods**
- `pyspark.sql.functions`

**DataFrame methods**: Since DataFrame is just a Dataset of Row objects. Many useful methods are under the `DataSet` module:
- sub-module `DataFrameStatFunctions` for statistical methods
- sub-module `DataFrameNaFunctions` for handling null data

In [2]:
# check java version 
# use sudo update-alternatives --config java 
# to switch java version if needed.
!java -version

openjdk version "1.8.0_252"
OpenJDK Runtime Environment (build 1.8.0_252-8u252-b09-1~19.10-b09)
OpenJDK 64-Bit Server VM (build 25.252-b09, mixed mode)


In [3]:
from pyspark.sql import SparkSession
data_example_path = '/home/jgeng/Documents/Git/SparkLearning/data/retail.csv'

In [4]:
# build a spark session locally
spark = SparkSession.builder.appName('Spark Example').getOrCreate()

# specify the number of worker
spark

In [5]:
df = spark.read.format('csv').option('header', True).option('inferSchema', True).load(data_example_path)

In [6]:
df.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows



### 1. Boolean / Filtering <a id='sec1'></a>

Booleans are usually for filtering and sometime we need to create boolean column to filter the data.

Some methods for boolean selection/filter from `pyspark.sql.functions`.
Below functions return a Column object, which contains the index information for filtering.
Some column objects are not boolean, need to convert the column object to boolean. Because `filter` and `where` only accept boolean.
- `df.col_name.isin(s or [s1, s2])`: return booleans can be directly used for filtering.
- `instr(col_name, s)`: return the position of the first occurance of the substring s in data (0 if not occur), need to convert to boolean for filtering.
- if use variable for filtering expression, need to use `&` `|` for `and` `or` operation
- **Be careful about null data when creating boolean**. Use `.eqNullSafe()` (more later)

[back to top](#directory)

In [7]:
# filtering by value in a specific column
df.where(df.StockCode.isin(['DOT', '71053'])).show()
df.where(df.StockCode.isin('DOT', '71053'))  # equivelent

+---------+---------+-------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|        Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+-------------------+--------+-------------------+---------+----------+--------------+
|   536365|    71053|WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536373|    71053|WHITE METAL LANTERN|       6|2010-12-01 09:02:00|     3.39|   17850.0|United Kingdom|
|   536375|    71053|WHITE METAL LANTERN|       6|2010-12-01 09:32:00|     3.39|   17850.0|United Kingdom|
|   536396|    71053|WHITE METAL LANTERN|       6|2010-12-01 10:51:00|     3.39|   17850.0|United Kingdom|
|   536406|    71053|WHITE METAL LANTERN|       8|2010-12-01 11:33:00|     3.39|   17850.0|United Kingdom|
|   536544|    71053|WHITE METAL LANTERN|       1|2010-12-01 14:32:00|     8.47|      null|United Kingdom|
|   536544|      DOT|     DOTCOM POST

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string]

In [8]:
from pyspark.sql.functions import instr

# filter by whether a substring is in a column feature
df.where(instr(df.Description, 'POSTAGE')==8).show(2)  # Postage first appear at pos 8 in DOTCOM POSTAGE
df.where(instr(df.Description, 'POSTAGE')==1).show(2)  # find all rows with Description field that start with POSTAGE
df.where(instr(df.Description, 'POSTAGE')==0).show(2)  # find all rows with Description fielf that does not contain POSTAGE

+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|   Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|   536544|      DOT|DOTCOM POSTAGE|       1|2010-12-01 14:32:00|   569.77|      null|United Kingdom|
|   536592|      DOT|DOTCOM POSTAGE|       1|2010-12-01 17:06:00|   607.49|      null|United Kingdom|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+

+---------+---------+-----------+--------+-------------------+---------+----------+-----------+
|InvoiceNo|StockCode|Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|    Country|
+---------+---------+-----------+--------+-------------------+---------+----------+-----------+
|   536370|     POST|    POSTAGE|       3|2010-12-01 08:45:00|     18.0|   12583.0|     France|
|  

In [9]:
# combined filters
filter1 = instr(df.Description, 'POSTAGE')==0
filter2 = df.StockCode.isin(['DOT', '71053'])
df.where(filter1 | filter2).show(2)
df.where(filter1 & filter2).show(2)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 2 rows

+---------+---------+-------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|        Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+-------------------+--------+-------------------+---------+----------+--------------+
|   53

In [10]:
from pyspark.sql.functions import col

# we can also use withColumn() to do the filtering and selection together
# Adds a boolean column -> filter on the boolean column -> select
df.withColumn('isExpensive', col('UnitPrice')>200).where('isExpensive').select('StockCode', 'Quantity', 'UnitPrice').show(3)

+---------+--------+---------+
|StockCode|Quantity|UnitPrice|
+---------+--------+---------+
|      DOT|       1|   569.77|
|      DOT|       1|   607.49|
+---------+--------+---------+



## 2.1 Numbers <a id='sec2-1'></a>

Often time we need to manually transform the columns with some defined functions. We can do this by getting the numeric column (`df.col_name`) then apply regular operations on it then add it to the table (use `select` or `withColumn`).
- operations between two numerical column is supported.
- `selectExpr(expressions)` or `select(expr())` can also do this
- use `round(num, digits)` to round number or ROUND in Expr
- `round` will round 2.5 to 3. `bround` will round 2.5 to 2 
- `corr` to compute correlation between two columns (with `selectExpr`). 
    - correlation between two column is a singal value so don't `select` it with other column
    - use `df.stat.corr(col1, col2)` to get scalar values
    
**A very handy method: `df.describe().show()` to show some basic statistic about the data.**
If want to get the specific metric, use functions from `pyspark.sql.functions`: `count()`, `mean()`, `stdev_pop()`, `min()`, `max()`

[back to top](#directory)

In [11]:
# modified price
correctPrice = pow((df.UnitPrice * 2 + 2), 2)

# DO NOT drop before select
# we need the column when compute correctPrice
df.select('*', correctPrice.alias('TrueUnitPrice')).drop('UnitPrice').show(2)  

+---------+---------+--------------------+--------+-------------------+----------+--------------+-----------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|CustomerID|       Country|    TrueUnitPrice|
+---------+---------+--------------------+--------+-------------------+----------+--------------+-----------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|   17850.0|United Kingdom|            50.41|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|   17850.0|United Kingdom|77.08840000000002|
+---------+---------+--------------------+--------+-------------------+----------+--------------+-----------------+
only showing top 2 rows



In [12]:
# multiply two numerical is supported 
totalPrice = df.Quantity * df.UnitPrice
df.select('StockCode', totalPrice.alias('TotalPrice')).show(2)

+---------+------------------+
|StockCode|        TotalPrice|
+---------+------------------+
|   85123A|15.299999999999999|
|    71053|             20.34|
+---------+------------------+
only showing top 2 rows



In [13]:
# above can be executed using the selectExpr
df.selectExpr('StockCode', 'ROUND(SQRT(POWER(Quantity * UnitPrice, 2)), 2) as TotalPrice').show(2)

+---------+----------+
|StockCode|TotalPrice|
+---------+----------+
|   85123A|      15.3|
|    71053|     20.34|
+---------+----------+
only showing top 2 rows



In [14]:
# correlation between two columns
df.selectExpr('corr(Quantity, UnitPrice)').show()

# use corr from df.stat
df.stat.corr('Quantity', 'UnitPrice')

+-----------------------------------------+
|corr(CAST(Quantity AS DOUBLE), UnitPrice)|
+-----------------------------------------+
|                     -0.04112314436835551|
+-----------------------------------------+



-0.04112314436835551

In [15]:
# super handy method  
df.select('CustomerID', 'Quantity', 'UnitPrice').describe().show()  # becarefule about the mean

+-------+------------------+------------------+------------------+
|summary|        CustomerID|          Quantity|         UnitPrice|
+-------+------------------+------------------+------------------+
|  count|              1968|              3108|              3108|
|   mean|15661.388719512195| 8.627413127413128| 4.151946589446603|
| stddev|1854.4496996893627|26.371821677029203|15.638659854603892|
|    min|           12431.0|               -24|               0.0|
|    max|           18229.0|               600|            607.49|
+-------+------------------+------------------+------------------+



In [16]:
from pyspark.sql.functions import stddev_pop, mean

df.select(mean('UnitPrice'), stddev_pop('UnitPrice')).show()


+-----------------+---------------------+
|   avg(UnitPrice)|stddev_pop(UnitPrice)|
+-----------------+---------------------+
|4.151946589446603|   15.636143780280698|
+-----------------+---------------------+



## 2.2 Some Other Useful functions <a id='sec2-2'></a>

- `df.stat.crosstab(col1, col2)` return a frequency table of paired feature values.
- `freqItems()` to find frequent items for columns, possibly with false positives.
- `df.stat.approxQuantile(colname, [quantiles], relError)` to get the approximated quantile
- `monotonically_increasing_id` from `pyspark.sql.functions` to introduce id into data frame (using `select`)

[back to top](#directory)

In [17]:
# cross tab
df.limit(10).stat.crosstab('CustomerID', 'UnitPrice').show()


+--------------------+----+----+----+----+----+----+----+
|CustomerID_UnitPrice|1.69|1.85|2.55|2.75|3.39|4.25|7.65|
+--------------------+----+----+----+----+----+----+----+
|             13047.0|   1|   0|   0|   0|   0|   0|   0|
|             17850.0|   0|   2|   1|   1|   3|   1|   1|
+--------------------+----+----+----+----+----+----+----+



In [18]:
# freq items
df.stat.freqItems(['CustomerID']).show()
print(df.where('CustomerID = 12662').count())
print(df.where('CustomerID = 12868').count())

+--------------------+
|CustomerID_freqItems|
+--------------------+
|[12662.0, 12868.0...|
+--------------------+

15
12


In [19]:
from pyspark.sql.functions import monotonically_increasing_id

# add id to a dataframe
df.select(monotonically_increasing_id().alias('id'), '*').show(5)

+---+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
| id|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|  0|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|  1|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|  2|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|  3|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|  4|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
+---+---------+---------+--------------------+--------+-------------------+---------+----------+--------

In [20]:
quantiles = [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]  # must be all float!!!
relError = 0.01
df.select(mean('Quantity')).show()
df.selectExpr('min(Quantity)').show()
df.selectExpr('max(Quantity)').show()

# approximated quantiles

df.stat.approxQuantile('Quantity', quantiles, relError)

+-----------------+
|    avg(Quantity)|
+-----------------+
|8.627413127413128|
+-----------------+

+-------------+
|min(Quantity)|
+-------------+
|          -24|
+-------------+

+-------------+
|max(Quantity)|
+-------------+
|          600|
+-------------+



[-24.0, 1.0, 1.0, 1.0, 2.0, 2.0, 3.0, 6.0, 10.0, 12.0, 600.0]

## 3. Strings <a id='sec3'></a>

Case, trimming, padding: under `pyspark.sql.functions`:
- `initcap(col(col_name))` to capitalize the initial of each word
    - similarly, `lower`, `upper`
- `ltrim`, `rtrim`, `trim` to trim spaces
- `lpad(col(col_name), n, s)`, to left pad the column with n given string. `rpad` for right pad 

Oftentime we need to do some fine feature engineering on the string data. `pyspark.sql.functions` have some string manipulation method supports regular exptression (regex). Note: all methods return a column object
- replace regex with a given string: `regexp_replace(col(col_name), regex_string, s)` 
- translate via mapping: `translate(col(col_name), 'ABC', 'abc')` A -> a, B -> b, C -> c. 
- extract using regex: `regexp_extrac(col(col_name), regex_string, i)`

Check whether contains a string
- use `instr(col(col_name), substring) >= 1` to create a boolean column 
    - `instr` return a column of integers for the position indexes of the first matched substring.
    - use `>=1` to convert it to boolean column
    - use withColumn -> where -> select to do the filtering 
    
[back to top](#directory)

In [21]:
df = spark.read.format('csv').option('header', True).option('inferSchema', True).load(data_example_path)
df.show(3)
df.printSchema()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 3 rows

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |

In [22]:
from pyspark.sql.functions import initcap, lower, upper

# init capitalize
df.select(initcap(col('Description')).alias('Description')).show(1)
# lower
df.select(lower(col('Description')).alias('Description')).show(1)
# upper
df.select(upper(col('Description')).alias('Description')).show(1)

+--------------------+
|         Description|
+--------------------+
|White Hanging Hea...|
+--------------------+
only showing top 1 row

+--------------------+
|         Description|
+--------------------+
|white hanging hea...|
+--------------------+
only showing top 1 row

+--------------------+
|         Description|
+--------------------+
|WHITE HANGING HEA...|
+--------------------+
only showing top 1 row



In [23]:
from pyspark.sql.functions import trim, lpad, lit

# trim    
df.select(lit('     hello  ').alias('Not Trimed')).show(1)
df.select(trim(lit('     hello  ')).alias('Trimed')).show(1)

# pad
df.select(lpad(lit('hello'), 10, "x").alias('Padded')).show(1)


+------------+
|  Not Trimed|
+------------+
|     hello  |
+------------+
only showing top 1 row

+------+
|Trimed|
+------+
| hello|
+------+
only showing top 1 row

+----------+
|    Padded|
+----------+
|xxxxxhello|
+----------+
only showing top 1 row



In [24]:
from pyspark.sql.functions import regexp_extract, regexp_replace, translate

regex_string = 'BLACK|WHITE|GRAY|RED|GREEN|BLUE'  # any regex style string, here | is regex or

# replace
df.select(regexp_replace(col('DESCRIPTION'), regex_string, 'COLOR').alias('DESCRIPTION')).show(2)  # replace all colors by COLOR


# translate
df.select(translate(col('DESCRIPTION'), 'WH', '12').alias('DESCRIPTION')).show(2)


# extract 
regex_string = '(BLACK|WHITE|GRAY|RED|GREEN|BLUE)'
df.select(regexp_extract(col('DESCRIPTION'), regex_string, 1).alias('Extracted')).show(2)

+--------------------+
|         DESCRIPTION|
+--------------------+
|COLOR HANGING HEA...|
| COLOR METAL LANTERN|
+--------------------+
only showing top 2 rows

+--------------------+
|         DESCRIPTION|
+--------------------+
|12ITE 2ANGING 2EA...|
| 12ITE METAL LANTERN|
+--------------------+
only showing top 2 rows

+---------+
|Extracted|
+---------+
|    WHITE|
|    WHITE|
+---------+
only showing top 2 rows



In [25]:
from pyspark.sql.functions import instr

# boolean column of whether contain WHITE
containWHITE = instr(df.Description, 'WHITE') >= 1

# use the withColumn, where, select trick to filter 
df.withColumn('containWhite', containWHITE).where('containWhite').select('Description').show(3, False)  # False to print all info

+----------------------------------+
|Description                       |
+----------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER|
|WHITE METAL LANTERN               |
|RED WOOLLY HOTTIE WHITE HEART.    |
+----------------------------------+
only showing top 3 rows



In [26]:
from pyspark.sql.functions import expr, locate

# A good compact color keyword filtering
colors = ['white', 'black', 'red', 'green', 'blue']

def color_locator(column, color_string):
    """
    method to return a boolean column of whether contain the color
    """
    # name the column is is_{} so that we can use expression to filter designated color
    # cast the location (1-based) to boolean: 0 -> False >=1 -> True
    return locate(color_string.upper(), column).cast('boolean').alias('is_{}'.format(color_string))

# get all color boolean columns!
all_color_location = [color_locator(df.Description, color) for color in colors]

# what it looks like
df.select(*all_color_location).show(3)

# select all descriptions that contains color white and black
# use where('col_name1 and col_name2') to filter color since these are boolean column
# first select need to include the description column! 
df.select(*all_color_location, 'Description').where('is_white and is_black').select('Description').show(3)

+--------+--------+------+--------+-------+
|is_white|is_black|is_red|is_green|is_blue|
+--------+--------+------+--------+-------+
|    true|   false| false|   false|  false|
|    true|   false| false|   false|  false|
|   false|   false| false|   false|  false|
+--------+--------+------+--------+-------+
only showing top 3 rows

+--------------------+
|         Description|
+--------------------+
|JUMBO  BAG BAROQU...|
|WOOD BLACK BOARD ...|
|JUMBO  BAG BAROQU...|
+--------------------+
only showing top 3 rows



## 4. Dates and Timestamps <a id='sec4'></a>

Most of the time, we do:
- convert strings to date/timestamps after loading the string data
- extract additional features from date/timestamps - holidays, i-th day of week/month, is weekend, etc
- encode date/timestamps for the machine learning models (not discussed here but later)

Spark use `TimestampType` class

Some potential GOTCHAs:
- spark might not be able to recognize the date/timestamp from the data, when strangely formatted
- spark `TimestampType` only supports second-level precision. If we need to work with ms or us, need to operating them as `longs`!

**Some Essential Functions for timestamp (from `pyspark.sql.functions`)**
- create date/time: `current_date()`, `current_timestamp()`, `to_date(lit('MM-dd-yyyy'))`, `to_timestampe(lit(...))`
    - `to_date()`, `to_timestamp()`converts a lit(string) to date. **if the input can not be converted, it will create a null value!**
    - this can be fixed by specifying the date format, e.g. `to_date(lit(...), 'yyyy-MM-dd')`
    - **MM for month, mm for minutes!!!!**
- operations: `date_sub(col, n)`, `date_add(col, n)`
- compute gap: `datediff(col1, col2)`, `months_between(col1, col2)`


**One big GOTCHA**
- the input data might not always follow the correct format. it is possible some are, some aren't.

[back to top](#directory)

In [27]:
df = spark.read.format('csv').option('header', True).option('inferSchema', True).load(data_example_path)
df.show(3)
df.printSchema()  # when the data is in standard format of data time, it will be able to infer it!

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 3 rows

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |

In [28]:
from pyspark.sql.functions import current_date, current_timestamp, to_date, lit, to_timestamp

# create a datetime dataframe
# spark.range will have a automatic column name: id!!

datetimeDF = spark.range(3).withColumn('date', current_date()).withColumn('time', current_timestamp())
datetimeDF.show(4, False)
datetimeDF.printSchema()

# to date converts a string to date (need to use lit())
datetimeDF.select(to_date(lit('1992-02-14'), 'yyyy-MM-dd').alias('bday')).show()  # MM for month, mm for minutes!
datetimeDF.select(to_timestamp(lit('1992-02-14'), 'yyyy-MM-dd').alias('bday')).show()  # MM for month, mm for minutes!
datetimeDF.select(to_date(lit('1992-20-02')).alias('incorrect')).show()

+---+----------+-----------------------+
|id |date      |time                   |
+---+----------+-----------------------+
|0  |2020-04-24|2020-04-24 14:37:40.607|
|1  |2020-04-24|2020-04-24 14:37:40.607|
|2  |2020-04-24|2020-04-24 14:37:40.607|
+---+----------+-----------------------+

root
 |-- id: long (nullable = false)
 |-- date: date (nullable = false)
 |-- time: timestamp (nullable = false)

+----------+
|      bday|
+----------+
|1992-02-14|
|1992-02-14|
|1992-02-14|
+----------+

+-------------------+
|               bday|
+-------------------+
|1992-02-14 00:00:00|
|1992-02-14 00:00:00|
|1992-02-14 00:00:00|
+-------------------+

+---------+
|incorrect|
+---------+
|     null|
|     null|
|     null|
+---------+



In [29]:
from pyspark.sql.functions import date_add, date_sub, datediff, months_between, col

# add two columns
datetimeDF = datetimeDF.withColumn('week before', date_sub(col('date'), 7))
datetimeDF = datetimeDF.withColumn('month before', date_sub(col('date'), 15))
datetimeDF.show(20, False)

# compute gap
datetimeDF.select(datediff(col('date'), col('week before')).alias('day diff')).show()
datetimeDF.select(months_between(col('date'), col('month before')).alias('month diff')).show()

+---+----------+-----------------------+-----------+------------+
|id |date      |time                   |week before|month before|
+---+----------+-----------------------+-----------+------------+
|0  |2020-04-24|2020-04-24 14:37:40.867|2020-04-17 |2020-04-09  |
|1  |2020-04-24|2020-04-24 14:37:40.867|2020-04-17 |2020-04-09  |
|2  |2020-04-24|2020-04-24 14:37:40.867|2020-04-17 |2020-04-09  |
+---+----------+-----------------------+-----------+------------+

+--------+
|day diff|
+--------+
|       7|
|       7|
|       7|
+--------+

+----------+
|month diff|
+----------+
|0.48387097|
|0.48387097|
|0.48387097|
+----------+



In [30]:
# use date for filtering!
df.where(col('invoiceDate') < lit('2010-12-01 08:35:00')).where(col('invoiceDate') > lit('2010-12-01 08:33:00')).show(3)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536367|    84879|ASSORTED COLOUR B...|      32|2010-12-01 08:34:00|     1.69|   13047.0|United Kingdom|
|   536367|    22745|POPPY'S PLAYHOUSE...|       6|2010-12-01 08:34:00|      2.1|   13047.0|United Kingdom|
|   536367|    22748|POPPY'S PLAYHOUSE...|       6|2010-12-01 08:34:00|      2.1|   13047.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 3 rows



## 5. Nulls <a id='sec5'></a>

In spark, it is better to use null to represent the missing data instead of empty string.

Two basic operation on null values:
- Drop nulls
- Fill nulls (on global or per column basis)

Use `coalesce()` to fill the null:
- `df.select(coalesce(col(col_name), lit(0))`
    - introduce a dummy column of 0s
    - coalesce a column with the dummy column
    - so that the nulls will be filled by 0s
    
More common way to deal with null is through the `df.na` field
- `df.na.drop(mode[, subset=])`
    - `.drop('any')` to drop any row that contains null
    - `.drop('all')` to drop any row that is all null and any col that is all null
    - `.drop(mode, subset=[feature1, ...])` work on a subset of features
- `df.na.fill(fill_val[, subet=])`
    - if `fill_val` is does not match, null stays
    - `fill_val` can be a dictionary: `{col: val}`
- `df.na.replace(to_replace=[...], values=[...], subset=[cols])` is alias of `df.replace()` 


Ordering with null value `df.orderBy()`
- by default null at start
- use `asc_nulls_first(col_name)`, `asc_null_last(col_name)`, etc to determine the position of the null values (under `pyspark.sql.functions`)
- e.g. `df.orderBy(dsc_null_last(col_name))`

[back to top](#directory)

In [61]:
## from pyspark.sql.functions import coalesce

# use coalesce fill null (might not be the best way to fill null)

# coalesce - for each row, return the first column that is not null
cDF = spark.createDataFrame([(None, None), (1, None), (None, 2)], ("a", "b"))
cDF.show()

# add a dummy column 0 then coalesce ==> fill null with 0!
cDF.select(coalesce(cDF["a"], cDF["b"])).show()
cDF.select('*', coalesce(cDF["a"], lit(0.0))).show()

+----+----+
|   a|   b|
+----+----+
|null|null|
|   1|null|
|null|   2|
+----+----+

+--------------+
|coalesce(a, b)|
+--------------+
|          null|
|             1|
|             2|
+--------------+

+----+----+----------------+
|   a|   b|coalesce(a, 0.0)|
+----+----+----------------+
|null|null|             0.0|
|   1|null|             1.0|
|null|   2|             0.0|
+----+----+----------------+



In [66]:
# managing nulls via df.na attribute functions

# drop any - default is drop any row that any value is null
cDF.na.drop().show(3)
cDF.na.drop('any').show(3)  # equivalent

# drop all - drop the row if all values are null, the column if all values are null
cDF.na.drop('all').show(3)

# drop specific columns, subset=[cols]
cDF.na.drop('any', subset=['b']).show(3)  # work on the subset columns



+---+---+
|  a|  b|
+---+---+
+---+---+

+---+---+
|  a|  b|
+---+---+
+---+---+

+----+----+
|   a|   b|
+----+----+
|   1|null|
|null|   2|
+----+----+

+----+---+
|   a|  b|
+----+---+
|null|  2|
+----+---+



In [86]:
from pyspark.sql.functions import mean

# fill - by default fill all nulls with the given val
# if type does not match, stay null
cDF.na.fill('22').show(3)  
cDF.na.fill(22).show(3)  

# fill - subset
cDF.na.fill(2, subset=['b']).show()

# fill - using dict
fill_dict = {'b': 2}
cDF.na.fill(fill_dict).show(3)

+----+----+
|   a|   b|
+----+----+
|null|null|
|   1|null|
|null|   2|
+----+----+

+---+---+
|  a|  b|
+---+---+
| 22| 22|
|  1| 22|
| 22|  2|
+---+---+

+----+---+
|   a|  b|
+----+---+
|null|  2|
|   1|  2|
|null|  2|
+----+---+

+----+---+
|   a|  b|
+----+---+
|null|  2|
|   1|  2|
|null|  2|
+----+---+



In [101]:
# repalce

# cDF.na.replace and cDF.replace are alias of each other!
cDF.replace([1, 2], [100, 200], subset=['a', 'b']).show()

+----+----+
|   a|   b|
+----+----+
|null|null|
| 100|null|
|null| 200|
+----+----+



In [121]:
from pyspark.sql.functions import asc_nulls_last, asc, col

# ordering - by default is null first
# use asc_nulls_last to specify the position of null
# asc functions recieve column name rather than the column object!

cDF.orderBy(asc('a')).show()
cDF.orderBy(asc_nulls_last('a')).show()

+----+----+
|   a|   b|
+----+----+
|null|null|
|null|   2|
|   1|null|
+----+----+

+----+----+
|   a|   b|
+----+----+
|   1|null|
|null|   2|
|null|null|
+----+----+



## 6. Complex Types <a id='sec6'></a>
 
Spark has three kinds of complex types:
- `Struct`: DataFrames within DataFrames
    - `struct()` under `pyspark.sql.functions`
    - use `df.select(struct(cols))` to create struct column
    - **struct column support `getField()` method to get the sub-column!**
    - use `df.select(col(complex_col_name.*))` to get the sub DataFrame
    
- `Array`: Features that need to be represented by a list of stuff, e.g. list of words
    - use `split(col(col_name), delimiter)` to create an array column
    - array column supports index selection! `df.select(col(arr_col_name)[i])`
    - array supports `size()` function: `df.select(size(col(arr_col_name)))`
    - array supports `array_contains()` fubction
    - some more array functions udner `pyspark.sql.functions`
    - **Often time, we want to convert the array of items into rows such that each row contains one item**
        - this can be achieved by `explode(arr_col)` function, which takes the array col
    - from `array<string>` back to string: `concat_ws(sep, col)`
- `Map`: key: val pairs of columns
    - `create_map(col1, col2)` from `pyspark.sql.functions`
    - the Map column supports key-val queries
   
- `explode(col)` method can:
    - break the array column down, each item will become a row
    - convert the key-val map column into two seperate columns

[back to top](#directory)

In [123]:
df = spark.read.format('csv').option('header', True).option('inferSchema', True).load(data_example_path)
df.show(3)
df.printSchema()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 3 rows

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |

In [134]:
from pyspark.sql.functions import struct

# creat table with struct column
complexDF = df.select('InvoiceNo', struct(col('StockCode'), col('Description'), col('UnitPrice')).alias('Complex_Struct'))
complexDF.show(3, False)

+---------+--------------------------------------------------+
|InvoiceNo|Complex_Struct                                    |
+---------+--------------------------------------------------+
|536365   |[85123A, WHITE HANGING HEART T-LIGHT HOLDER, 2.55]|
|536365   |[71053, WHITE METAL LANTERN, 3.39]                |
|536365   |[84406B, CREAM CUPID HEARTS COAT HANGER, 2.75]    |
+---------+--------------------------------------------------+
only showing top 3 rows



In [146]:
# Struct Column is a sub DataFrame

# It have sub-columns under the hood, which can be access by getField() method
complexDF.select(col('Complex_Struct').getField('Description')).show(3, False)

# Use dot also work!
complexDF.select(col('Complex_Struct').Description).show(3, False)

# use * to get the sub DataFrame from the complex struct!
complexDF.select('Complex_Struct.*').show(3)

+----------------------------------+
|Complex_Struct.Description        |
+----------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER|
|WHITE METAL LANTERN               |
|CREAM CUPID HEARTS COAT HANGER    |
+----------------------------------+
only showing top 3 rows

+----------------------------------+
|Complex_Struct.Description        |
+----------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER|
|WHITE METAL LANTERN               |
|CREAM CUPID HEARTS COAT HANGER    |
+----------------------------------+
only showing top 3 rows

+---------+--------------------+---------+
|StockCode|         Description|UnitPrice|
+---------+--------------------+---------+
|   85123A|WHITE HANGING HEA...|     2.55|
|    71053| WHITE METAL LANTERN|     3.39|
|   84406B|CREAM CUPID HEART...|     2.75|
+---------+--------------------+---------+
only showing top 3 rows



In [169]:
from pyspark.sql.functions import split, size, array_contains, monotonically_increasing_id

# create an array column by splitting the description column with delimiter ' '
arrDF = df.select(monotonically_increasing_id().alias('id'), split(col('Description'), ' ').alias('arr'))
arrDF.show(3)

# array column supports index selection!
arrDF.select(col('arr')[4]).show(3)  # if nothing on the index, it will be null

+---+--------------------+
| id|                 arr|
+---+--------------------+
|  0|[WHITE, HANGING, ...|
|  1|[WHITE, METAL, LA...|
|  2|[CREAM, CUPID, HE...|
+---+--------------------+
only showing top 3 rows

+------+
|arr[4]|
+------+
|HOLDER|
|  null|
|HANGER|
+------+
only showing top 3 rows



In [177]:
# we can get the length of the array using the size function
arrDF.select('*', size(col('arr'))).show(3)  # size does not work on regular column!

+---+--------------------+---------+
| id|                 arr|size(arr)|
+---+--------------------+---------+
|  0|[WHITE, HANGING, ...|        5|
|  1|[WHITE, METAL, LA...|        3|
|  2|[CREAM, CUPID, HE...|        5|
+---+--------------------+---------+
only showing top 3 rows



In [176]:
# array contains function
splitted_col = split(col('Description'), ' ')
df.select(array_contains(splitted_col, 'WHITE').alias('contain_WHITE')).show(3)

+-------------+
|contain_WHITE|
+-------------+
|         true|
|         true|
|        false|
+-------------+
only showing top 3 rows



In [174]:
from pyspark.sql.functions import explode

# explode the array column 
arrDF.select('*', explode(col('arr'))).show(9)  # explode break down the array items!

+---+--------------------+-------+
| id|                 arr|    col|
+---+--------------------+-------+
|  0|[WHITE, HANGING, ...|  WHITE|
|  0|[WHITE, HANGING, ...|HANGING|
|  0|[WHITE, HANGING, ...|  HEART|
|  0|[WHITE, HANGING, ...|T-LIGHT|
|  0|[WHITE, HANGING, ...| HOLDER|
|  1|[WHITE, METAL, LA...|  WHITE|
|  1|[WHITE, METAL, LA...|  METAL|
|  1|[WHITE, METAL, LA...|LANTERN|
|  2|[CREAM, CUPID, HE...|  CREAM|
+---+--------------------+-------+
only showing top 9 rows



In [193]:
from pyspark.sql.functions import create_map

# map so that can access by keys
mapDF = arrDF.select(create_map(col('id'), col('arr')).alias('map'))
mapDF.show(3)

# access by key - this will give you a column
# all rows that can not be mapped with the key will be null
mapDF.select('*', col('map')[0]).show(5)
mapDF.select('*', col('map')[0]).na.drop().show()

+--------------------+
|                 map|
+--------------------+
|[0 -> [WHITE, HAN...|
|[1 -> [WHITE, MET...|
|[2 -> [CREAM, CUP...|
+--------------------+
only showing top 3 rows

+--------------------+----------------------+
|                 map|map[CAST(0 AS BIGINT)]|
+--------------------+----------------------+
|[0 -> [WHITE, HAN...|  [WHITE, HANGING, ...|
|[1 -> [WHITE, MET...|                  null|
|[2 -> [CREAM, CUP...|                  null|
|[3 -> [KNITTED, U...|                  null|
|[4 -> [RED, WOOLL...|                  null|
+--------------------+----------------------+
only showing top 5 rows

+--------------------+----------------------+
|                 map|map[CAST(0 AS BIGINT)]|
+--------------------+----------------------+
|[0 -> [WHITE, HAN...|  [WHITE, HANGING, ...|
+--------------------+----------------------+



In [199]:
# another create map example on non complex column
mapDF = df.select('Description', 'Quantity', create_map(col('Description'), col('Quantity')).alias('map'))
mapDF.show(3, False)

+----------------------------------+--------+-----------------------------------------+
|Description                       |Quantity|map                                      |
+----------------------------------+--------+-----------------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER|6       |[WHITE HANGING HEART T-LIGHT HOLDER -> 6]|
|WHITE METAL LANTERN               |6       |[WHITE METAL LANTERN -> 6]               |
|CREAM CUPID HEARTS COAT HANGER    |8       |[CREAM CUPID HEARTS COAT HANGER -> 8]    |
+----------------------------------+--------+-----------------------------------------+
only showing top 3 rows



In [203]:
mapDF.select('*', col('map')['WHITE METAL LANTERN']).show(3)

+--------------------+--------+--------------------+------------------------+
|         Description|Quantity|                 map|map[WHITE METAL LANTERN]|
+--------------------+--------+--------------------+------------------------+
|WHITE HANGING HEA...|       6|[WHITE HANGING HE...|                    null|
| WHITE METAL LANTERN|       6|[WHITE METAL LANT...|                       6|
|CREAM CUPID HEART...|       8|[CREAM CUPID HEAR...|                    null|
+--------------------+--------+--------------------+------------------------+
only showing top 3 rows



In [207]:
# use explode on map column
# explode converts the key-val column into two columns
mapDF.select(explode(col('map'))).show(5)

+--------------------+-----+
|                 key|value|
+--------------------+-----+
|WHITE HANGING HEA...|    6|
| WHITE METAL LANTERN|    6|
|CREAM CUPID HEART...|    8|
|KNITTED UNION FLA...|    6|
|RED WOOLLY HOTTIE...|    6|
+--------------------+-----+
only showing top 5 rows



In [None]:
# iterate through all schemas
