# Add python files to Spark cluster

The `SparkContext.addPyFiles()` function can be used to add py files. We can define objects and variables in these files and make them available to the Spark cluster.

In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
import findspark
findspark.init()

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.39)] [                                                                               Get:2 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
                                                                               Get:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
                                                                               Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
                                                                               Get:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease [1,581 B]
                                                                               Hit:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic 

In [None]:
from pyspark import SparkConf, SparkContext, SparkFiles
from pyspark.sql import SparkSession

sc = SparkContext(conf=SparkConf())

In [None]:
from google.colab import files
files.upload()

Saving my_module.py to my_module.py


{'my_module.py': b'def addPyFiles_is_successfull():\n    return(True)\n\ndef sum_two_variables(a, b):\n    return(sum([a,b]))\n    '}

In [None]:
sc.addPyFile('./my_module.py')

In [None]:
SparkFiles.get('my_module.py')

'/tmp/spark-2b33888c-fd73-46e0-9c28-bb07c72787d7/userFiles-53df3eb6-762f-467d-b63e-edaf91718bf0/my_module.py'

# Use **my_module.py**
We can import `my_module` as a python module

In [None]:
from my_module import *

In [None]:
addPyFiles_is_successfull()

True

In [None]:
!cat ./my_module.py

def addPyFiles_is_successfull():
    return(True)

def sum_two_variables(a, b):
    return(sum([a,b]))
    

In [None]:
sum_two_variables(4,5)

9

# Dense and sparse vectors

In [None]:
spark = SparkSession(sparkContext=sc)

In [None]:
from pyspark.ml.linalg import Vector, DenseVector, SparseVector

# Dense vector and sparse vector

A vector can be represented in dense and sparse formats. A dense vector is a regular vector that has each elements printed. A sparse vector use three components to represent a vector but with less memory.

In [None]:
dv = DenseVector([1.0,0.,0.,0.,4.5,0])
dv

DenseVector([1.0, 0.0, 0.0, 0.0, 4.5, 0.0])

## Three components of a sparse vector

* vector size
* indices of active elements
* values of active elements

In the above dense vector:

* vector size = 6
* indices of active elements = [0, 4]
* values of active elements = [1.0, 4.5]

We can use the `SparseVector()` function to create a sparse vector. The first argument is the vector size, the second
argument is a dictionary. The keys are indices of active elements and the values are values of active elements.

In [None]:
sv = SparseVector(6, {0:1.0, 4:4.5})
sv

SparseVector(6, {0: 1.0, 4: 4.5})

## Convert sparse vector to dense vector

In [None]:
DenseVector(sv.toArray())

DenseVector([1.0, 0.0, 0.0, 0.0, 4.5, 0.0])

## Convert dense vector to sparse vector

In [None]:
active_elements_dict = {index: value for index, value in enumerate(dv) if value != 0}
active_elements_dict

{0: 1.0, 4: 4.5}

In [None]:
SparseVector(len(dv), active_elements_dict)

SparseVector(6, {0: 1.0, 4: 4.5})

In [None]:
# Quick exercise
sv2 = SparseVector(10, {1:1.3, 6: 5, 9: 2.1})
sv2

SparseVector(10, {1: 1.3, 6: 5.0, 9: 2.1})

In [None]:
dv2 = DenseVector(sv2.toArray())
dv2

DenseVector([0.0, 1.3, 0.0, 0.0, 0.0, 0.0, 5.0, 0.0, 0.0, 2.1])

# Pipeline

Pipeline is a sequence of stages which consists of **Estimators** and/or **Transformers**. **Estimator** has **`fit`** method and **Transformer** has **`transform`** method. Therefore, we can say, **a pipeline is a sequence of `fit` and `transform` methods**. When it is a **`fit`** method, it applies to the input data and turns into a **`transform`** method. Then the **`transform`** method applies to the **fitted** data and output **transformed** data. **The transformed data output from previous stage has to be an acceptable input to the next stage's fit/transform method**.

## Example

In [None]:
import pandas as pd
pdf = pd.DataFrame({
        'x1': ['a','a','b','b', 'b', 'c'],
        'x2': ['apple', 'orange', 'orange','orange', 'peach', 'peach'],
        'x3': [1, 1, 2, 2, 2, 4],
        'x4': [2.4, 2.5, 3.5, 1.4, 2.1,1.5],
        'y1': [1, 0, 1, 0, 0, 1],
        'y2': ['yes', 'no', 'no', 'yes', 'yes', 'yes']
    })
df = spark.createDataFrame(pdf)
df.show()

+---+------+---+---+---+---+
| x1|    x2| x3| x4| y1| y2|
+---+------+---+---+---+---+
|  a| apple|  1|2.4|  1|yes|
|  a|orange|  1|2.5|  0| no|
|  b|orange|  2|3.5|  1| no|
|  b|orange|  2|1.4|  0|yes|
|  b| peach|  2|2.1|  0|yes|
|  c| peach|  4|1.5|  1|yes|
+---+------+---+---+---+---+



In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder

## Example

We are going to use pipeline to `StringIndex` columns x1, x2, y1, and y2. Then we `OneHotEncode` the resulting `StringIdexed` columns.

In [None]:
stringindex_stages = [StringIndexer(inputCol=c, outputCol='idx_' + c) for c in ['x1', 'x2', 'y1', 'y2']]
stringindex_stages

[StringIndexer_c728dde5143f,
 StringIndexer_ae9f62ca2dcb,
 StringIndexer_fe861f4fa4fd,
 StringIndexer_b906d2cefdc1]

In [None]:
onehotencode_stages = [OneHotEncoder(inputCol='idx_' + c, outputCol='ohe_' + c) for c in ['x1', 'x2', 'y1', 'y2']]
onehotencode_stages

[OneHotEncoder_3eb7cec4feef,
 OneHotEncoder_f657ebd84d6d,
 OneHotEncoder_e7f87bb48b12,
 OneHotEncoder_b6228bebf1df]

Note that the **outputCol label** in StringIndex stages is the same as the **inputCol label** in the OneHotEncode stages.

## Elements in the stage list

In [None]:
all_stages = stringindex_stages + onehotencode_stages
[type(x) for x in all_stages]

[pyspark.ml.feature.StringIndexer,
 pyspark.ml.feature.StringIndexer,
 pyspark.ml.feature.StringIndexer,
 pyspark.ml.feature.StringIndexer,
 pyspark.ml.feature.OneHotEncoder,
 pyspark.ml.feature.OneHotEncoder,
 pyspark.ml.feature.OneHotEncoder,
 pyspark.ml.feature.OneHotEncoder]

In the above list, **`pyspark.ml.feature.StringIndexer`** is an **Estimator**(has a fit method) and **`pyspark.ml.feature.OneHotEncoder`** is a **transformer**(has a transform method).

## Build and run pipeline

In [None]:
Pipeline(stages=all_stages).fit(df).transform(df).show()

+---+------+---+---+---+---+------+------+------+------+-------------+-------------+-------------+-------------+
| x1|    x2| x3| x4| y1| y2|idx_x1|idx_x2|idx_y1|idx_y2|       ohe_x1|       ohe_x2|       ohe_y1|       ohe_y2|
+---+------+---+---+---+---+------+------+------+------+-------------+-------------+-------------+-------------+
|  a| apple|  1|2.4|  1|yes|   1.0|   2.0|   1.0|   0.0|(2,[1],[1.0])|    (2,[],[])|    (1,[],[])|(1,[0],[1.0])|
|  a|orange|  1|2.5|  0| no|   1.0|   0.0|   0.0|   1.0|(2,[1],[1.0])|(2,[0],[1.0])|(1,[0],[1.0])|    (1,[],[])|
|  b|orange|  2|3.5|  1| no|   0.0|   0.0|   1.0|   1.0|(2,[0],[1.0])|(2,[0],[1.0])|    (1,[],[])|    (1,[],[])|
|  b|orange|  2|1.4|  0|yes|   0.0|   0.0|   0.0|   0.0|(2,[0],[1.0])|(2,[0],[1.0])|(1,[0],[1.0])|(1,[0],[1.0])|
|  b| peach|  2|2.1|  0|yes|   0.0|   1.0|   0.0|   0.0|(2,[0],[1.0])|(2,[1],[1.0])|(1,[0],[1.0])|(1,[0],[1.0])|
|  c| peach|  4|1.5|  1|yes|   2.0|   1.0|   1.0|   0.0|    (2,[],[])|(2,[1],[1.0])|    (1,[],[]

## Reorder pipeline stages

In the example above, our strategy is to **StringIndex** all four columns and then **OneHotEncode** them. Since each **OneHotEncode** stage only depends on the output of their corresponding **StringIndex** stage, our stages list could be **`[stringindexer on x1, onehotencoder on x1, stringindexer on x2, onehotencoder on x2, stringindexer on y1, onehotencoder on y1, stringindexer on y2, onehotencoder on y2]`**.

### Old stages

In [None]:
all_stages

[StringIndexer_c728dde5143f,
 StringIndexer_ae9f62ca2dcb,
 StringIndexer_fe861f4fa4fd,
 StringIndexer_b906d2cefdc1,
 OneHotEncoder_3eb7cec4feef,
 OneHotEncoder_f657ebd84d6d,
 OneHotEncoder_e7f87bb48b12,
 OneHotEncoder_b6228bebf1df]

### New stages

In [None]:
new_all_stages = [all_stages[x] for x in [0,4,1,5,2,6,3,7]]
new_all_stages

[StringIndexer_c728dde5143f,
 OneHotEncoder_3eb7cec4feef,
 StringIndexer_ae9f62ca2dcb,
 OneHotEncoder_f657ebd84d6d,
 StringIndexer_fe861f4fa4fd,
 OneHotEncoder_e7f87bb48b12,
 StringIndexer_b906d2cefdc1,
 OneHotEncoder_b6228bebf1df]

## Build and run pipeline

In [None]:
Pipeline(stages=new_all_stages).fit(df).transform(df).show()

+---+------+---+---+---+---+------+-------------+------+-------------+------+-------------+------+-------------+
| x1|    x2| x3| x4| y1| y2|idx_x1|       ohe_x1|idx_x2|       ohe_x2|idx_y1|       ohe_y1|idx_y2|       ohe_y2|
+---+------+---+---+---+---+------+-------------+------+-------------+------+-------------+------+-------------+
|  a| apple|  1|2.4|  1|yes|   1.0|(2,[1],[1.0])|   2.0|    (2,[],[])|   1.0|    (1,[],[])|   0.0|(1,[0],[1.0])|
|  a|orange|  1|2.5|  0| no|   1.0|(2,[1],[1.0])|   0.0|(2,[0],[1.0])|   0.0|(1,[0],[1.0])|   1.0|    (1,[],[])|
|  b|orange|  2|3.5|  1| no|   0.0|(2,[0],[1.0])|   0.0|(2,[0],[1.0])|   1.0|    (1,[],[])|   1.0|    (1,[],[])|
|  b|orange|  2|1.4|  0|yes|   0.0|(2,[0],[1.0])|   0.0|(2,[0],[1.0])|   0.0|(1,[0],[1.0])|   0.0|(1,[0],[1.0])|
|  b| peach|  2|2.1|  0|yes|   0.0|(2,[0],[1.0])|   1.0|(2,[1],[1.0])|   0.0|(1,[0],[1.0])|   0.0|(1,[0],[1.0])|
|  c| peach|  4|1.5|  1|yes|   2.0|    (2,[],[])|   1.0|(2,[1],[1.0])|   1.0|    (1,[],[])|   0.

# SQL functions

In [None]:
from google.colab import files
files.upload()

Saving prostate.csv to prostate.csv


{'prostate.csv': b'lcavol,lweight,age,lbph,svi,lcp,gleason,pgg45,lpsa\r-0.579818495,2.769458829,50,-1.386294361,0,-1.386294361,6,0,-0.430782916\r-0.994252273,3.319625728,58,-1.386294361,0,-1.386294361,6,0,-0.162518929\r-0.510825624,2.691243083,74,-1.386294361,0,-1.386294361,7,20,-0.162518929\r-1.203972804,3.282789151,58,-1.386294361,0,-1.386294361,6,0,-0.162518929\r0.751416089,3.432372999,62,-1.386294361,0,-1.386294361,6,0,0.371563556\r-1.049822124,3.228826156,50,-1.386294361,0,-1.386294361,6,0,0.765467842\r0.737164066,3.473518043,64,0.615185639,0,-1.386294361,6,0,0.765467842\r0.693147181,3.539508997,58,1.53686722,0,-1.386294361,6,0,0.854415328\r-0.776528789,3.539508997,47,-1.386294361,0,-1.386294361,6,0,1.047318994\r0.223143551,3.244543572,63,-1.386294361,0,-1.386294361,6,0,1.047318994\r0.254642218,3.604138226,65,-1.386294361,0,-1.386294361,6,0,1.266947603\r-1.347073648,3.598681186,63,1.266947603,0,-1.386294361,6,0,1.266947603\r1.613429934,3.022860941,63,-1.386294361,0,-0.597837001,7,

In [None]:
iris = spark.read.csv('./iris.csv', header=True, inferSchema=True)
iris.show(5)

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows



In [None]:
prostate = spark.read.csv('./prostate.csv', header=True, inferSchema=True)
prostate.show(5)

+------------+-----------+---+------------+---+------------+-------+-----+------------+
|      lcavol|    lweight|age|        lbph|svi|         lcp|gleason|pgg45|        lpsa|
+------------+-----------+---+------------+---+------------+-------+-----+------------+
|-0.579818495|2.769458829| 50|-1.386294361|  0|-1.386294361|      6|    0|-0.430782916|
|-0.994252273|3.319625728| 58|-1.386294361|  0|-1.386294361|      6|    0|-0.162518929|
|-0.510825624|2.691243083| 74|-1.386294361|  0|-1.386294361|      7|   20|-0.162518929|
|-1.203972804|3.282789151| 58|-1.386294361|  0|-1.386294361|      6|    0|-0.162518929|
| 0.751416089|3.432372999| 62|-1.386294361|  0|-1.386294361|      6|    0| 0.371563556|
+------------+-----------+---+------------+---+------------+-------+-----+------------+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import numpy as np
import pandas as pd

In [None]:
prostate.select('lpsa', abs(prostate.lpsa).alias('abs(lpsa)')).show(5)

+------------+-----------+
|        lpsa|  abs(lpsa)|
+------------+-----------+
|-0.430782916|0.430782916|
|-0.162518929|0.162518929|
|-0.162518929|0.162518929|
|-0.162518929|0.162518929|
| 0.371563556|0.371563556|
+------------+-----------+
only showing top 5 rows



## `acos`

In [None]:
pdf = pd.DataFrame({
    'x': list(-np.random.rand(5)) + list(np.random.rand(5))
})
df = spark.createDataFrame(pdf)
df.show(5)

+--------------------+
|                   x|
+--------------------+
|-0.02561841650378...|
| -0.9511210096302304|
|-0.30015015484867946|
| -0.7542980823549114|
|-0.04565449400795207|
+--------------------+
only showing top 5 rows



In [None]:
df.select('x', acos(df.x)).show(5)

+--------------------+------------------+
|                   x|           ACOS(x)|
+--------------------+------------------+
|-0.02561841650378...|1.5964175463683339|
| -0.9511210096302304| 2.827642160120233|
|-0.30015015484867946|1.8756463897580689|
| -0.7542980823549114| 2.425380659457947|
|-0.04565449400795207| 1.616466695557147|
+--------------------+------------------+
only showing top 5 rows



## `add_months`

In [None]:
import datetime

In [None]:
base = datetime.date.today()
date_list = [base + datetime.timedelta(days=x) for x in list(range(0, 10))*10]
pdf = pd.DataFrame({
    'dates': date_list
})
df = spark.createDataFrame(pdf)
df.show(5)

+----------+
|     dates|
+----------+
|2022-12-02|
|2022-12-03|
|2022-12-04|
|2022-12-05|
|2022-12-06|
+----------+
only showing top 5 rows



In [None]:
df.select('dates', add_months(df.dates, 2).alias('new_dates')).show(5)

+----------+----------+
|     dates| new_dates|
+----------+----------+
|2022-12-02|2023-02-02|
|2022-12-03|2023-02-03|
|2022-12-04|2023-02-04|
|2022-12-05|2023-02-05|
|2022-12-06|2023-02-06|
+----------+----------+
only showing top 5 rows



## `approx_count_distinct`

In [None]:
prostate.select(approx_count_distinct(prostate.gleason)).show(5)

+------------------------------+
|approx_count_distinct(gleason)|
+------------------------------+
|                             4|
+------------------------------+



In [None]:
iris.select(approx_count_distinct(iris.species)).show(5)

+------------------------------+
|approx_count_distinct(species)|
+------------------------------+
|                             3|
+------------------------------+



## `array`

In [None]:
iris.show(5)

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows



In [None]:
df_arr = iris.select('species', array(['sepal_length', 'sepal_width', 'petal_length', 'petal_width']).alias('features'))
df_arr.show(5)

+-------+--------------------+
|species|            features|
+-------+--------------------+
| setosa|[5.1, 3.5, 1.4, 0.2]|
| setosa|[4.9, 3.0, 1.4, 0.2]|
| setosa|[4.7, 3.2, 1.3, 0.2]|
| setosa|[4.6, 3.1, 1.5, 0.2]|
| setosa|[5.0, 3.6, 1.4, 0.2]|
+-------+--------------------+
only showing top 5 rows



## `array_contains`

In [None]:
df = df_arr.select('species', 'features', array_contains(df_arr.features, 1.4).alias('new_features'))
df.show(5)

+-------+--------------------+------------+
|species|            features|new_features|
+-------+--------------------+------------+
| setosa|[5.1, 3.5, 1.4, 0.2]|        true|
| setosa|[4.9, 3.0, 1.4, 0.2]|        true|
| setosa|[4.7, 3.2, 1.3, 0.2]|       false|
| setosa|[4.6, 3.1, 1.5, 0.2]|       false|
| setosa|[5.0, 3.6, 1.4, 0.2]|        true|
+-------+--------------------+------------+
only showing top 5 rows



In [None]:
df.filter(df.new_features).show(5)

+-------+--------------------+------------+
|species|            features|new_features|
+-------+--------------------+------------+
| setosa|[5.1, 3.5, 1.4, 0.2]|        true|
| setosa|[4.9, 3.0, 1.4, 0.2]|        true|
| setosa|[5.0, 3.6, 1.4, 0.2]|        true|
| setosa|[4.6, 3.4, 1.4, 0.3]|        true|
| setosa|[4.4, 2.9, 1.4, 0.2]|        true|
+-------+--------------------+------------+
only showing top 5 rows



## `asc`

`asc` returns a **sort expression**, which can be used as argument of sort functions such as `pyspark.sql.DataFrame.sort` and `pyspark.sql.DataFrame.orderBy`

In [None]:
prostate.sort(prostate.lpsa.asc()).show(5)

+------------+-----------+---+------------+---+------------+-------+-----+------------+
|      lcavol|    lweight|age|        lbph|svi|         lcp|gleason|pgg45|        lpsa|
+------------+-----------+---+------------+---+------------+-------+-----+------------+
|-0.579818495|2.769458829| 50|-1.386294361|  0|-1.386294361|      6|    0|-0.430782916|
|-0.994252273|3.319625728| 58|-1.386294361|  0|-1.386294361|      6|    0|-0.162518929|
|-1.203972804|3.282789151| 58|-1.386294361|  0|-1.386294361|      6|    0|-0.162518929|
|-0.510825624|2.691243083| 74|-1.386294361|  0|-1.386294361|      7|   20|-0.162518929|
| 0.751416089|3.432372999| 62|-1.386294361|  0|-1.386294361|      6|    0| 0.371563556|
+------------+-----------+---+------------+---+------------+-------+-----+------------+
only showing top 5 rows



In [None]:
prostate.orderBy(prostate.lpsa.asc()).show(5)

+------------+-----------+---+------------+---+------------+-------+-----+------------+
|      lcavol|    lweight|age|        lbph|svi|         lcp|gleason|pgg45|        lpsa|
+------------+-----------+---+------------+---+------------+-------+-----+------------+
|-0.579818495|2.769458829| 50|-1.386294361|  0|-1.386294361|      6|    0|-0.430782916|
|-0.994252273|3.319625728| 58|-1.386294361|  0|-1.386294361|      6|    0|-0.162518929|
|-1.203972804|3.282789151| 58|-1.386294361|  0|-1.386294361|      6|    0|-0.162518929|
|-0.510825624|2.691243083| 74|-1.386294361|  0|-1.386294361|      7|   20|-0.162518929|
| 0.751416089|3.432372999| 62|-1.386294361|  0|-1.386294361|      6|    0| 0.371563556|
+------------+-----------+---+------------+---+------------+-------+-----+------------+
only showing top 5 rows



Find more:

* `ascii`
* `asin`
* `atan`
* `atan2`

## `avg`

In [None]:
prostate.select(avg(prostate.lpsa)).show()

+------------------+
|         avg(lpsa)|
+------------------+
|2.4783868787422683|
+------------------+



In [None]:
# `cbrt`

prostate.select('lpsa', cbrt(prostate.lpsa)).show(5)

+------------+-------------------+
|        lpsa|         CBRT(lpsa)|
+------------+-------------------+
|-0.430782916|-0.7552420410177275|
|-0.162518929|-0.5457176294010901|
|-0.162518929|-0.5457176294010901|
|-0.162518929|-0.5457176294010901|
| 0.371563556| 0.7189152621521183|
+------------+-------------------+
only showing top 5 rows



In [None]:
# `ceil`
prostate.select('lpsa', ceil(prostate.lpsa)).show(5)

+------------+----------+
|        lpsa|CEIL(lpsa)|
+------------+----------+
|-0.430782916|         0|
|-0.162518929|         0|
|-0.162518929|         0|
|-0.162518929|         0|
| 0.371563556|         1|
+------------+----------+
only showing top 5 rows



## `coalesce`

Return the first column that is not null.

In [None]:
df = spark.createDataFrame([(None, None), (1, None), (None, 2)], ("a", "b"))
df.show()

+----+----+
|   a|   b|
+----+----+
|null|null|
|   1|null|
|null|   2|
+----+----+



In [None]:
df.select(coalesce(df.a, df.b)).show()

+--------------+
|coalesce(a, b)|
+--------------+
|          null|
|             1|
|             2|
+--------------+



## `col`

Returns a **Column** based on the given column name. It can save your some typing when the dataframe is very long.

In [None]:
prostate.show(5)

+------------+-----------+---+------------+---+------------+-------+-----+------------+
|      lcavol|    lweight|age|        lbph|svi|         lcp|gleason|pgg45|        lpsa|
+------------+-----------+---+------------+---+------------+-------+-----+------------+
|-0.579818495|2.769458829| 50|-1.386294361|  0|-1.386294361|      6|    0|-0.430782916|
|-0.994252273|3.319625728| 58|-1.386294361|  0|-1.386294361|      6|    0|-0.162518929|
|-0.510825624|2.691243083| 74|-1.386294361|  0|-1.386294361|      7|   20|-0.162518929|
|-1.203972804|3.282789151| 58|-1.386294361|  0|-1.386294361|      6|    0|-0.162518929|
| 0.751416089|3.432372999| 62|-1.386294361|  0|-1.386294361|      6|    0| 0.371563556|
+------------+-----------+---+------------+---+------------+-------+-----+------------+
only showing top 5 rows



In [None]:
prostate.select(col('lcavol'), col('age')).show(5)

+------------+---+
|      lcavol|age|
+------------+---+
|-0.579818495| 50|
|-0.994252273| 58|
|-0.510825624| 74|
|-1.203972804| 58|
| 0.751416089| 62|
+------------+---+
only showing top 5 rows



## `collect_list`

In [None]:
pdf = pd.DataFrame({
    'x':[1, 2, 2, 3, 4,4,4,4]
})
df = spark.createDataFrame(pdf)
df.show()

+---+
|  x|
+---+
|  1|
|  2|
|  2|
|  3|
|  4|
|  4|
|  4|
|  4|
+---+



In [None]:
df.select(collect_list(df.x)).show()

+--------------------+
|     collect_list(x)|
+--------------------+
|[1, 2, 2, 3, 4, 4...|
+--------------------+



In [None]:
# `collect_set`
df.select(collect_set(df.x)).show()

+--------------+
|collect_set(x)|
+--------------+
|  [1, 2, 3, 4]|
+--------------+



## `concat`

In [None]:
df = spark.createDataFrame([['a', '1'], ['b', '2']], ['x', 'v'])
df.show()

+---+---+
|  x|  v|
+---+---+
|  a|  1|
|  b|  2|
+---+---+



In [None]:
df.select('x', 'v', concat(df.x, df.v).alias('concate(x,v)')).show()

+---+---+------------+
|  x|  v|concate(x,v)|
+---+---+------------+
|  a|  1|          a1|
|  b|  2|          b2|
+---+---+------------+



## `concat_ws`

In [None]:
df.select('x', 'v', concat_ws('_', df.x, df.v).alias('concate(x,v)')).show()

+---+---+------------+
|  x|  v|concate(x,v)|
+---+---+------------+
|  a|  1|         a_1|
|  b|  2|         b_2|
+---+---+------------+



## `corr`

In [None]:
prostate.show(5)

+------------+-----------+---+------------+---+------------+-------+-----+------------+
|      lcavol|    lweight|age|        lbph|svi|         lcp|gleason|pgg45|        lpsa|
+------------+-----------+---+------------+---+------------+-------+-----+------------+
|-0.579818495|2.769458829| 50|-1.386294361|  0|-1.386294361|      6|    0|-0.430782916|
|-0.994252273|3.319625728| 58|-1.386294361|  0|-1.386294361|      6|    0|-0.162518929|
|-0.510825624|2.691243083| 74|-1.386294361|  0|-1.386294361|      7|   20|-0.162518929|
|-1.203972804|3.282789151| 58|-1.386294361|  0|-1.386294361|      6|    0|-0.162518929|
| 0.751416089|3.432372999| 62|-1.386294361|  0|-1.386294361|      6|    0| 0.371563556|
+------------+-----------+---+------------+---+------------+-------+-----+------------+
only showing top 5 rows



In [None]:
prostate.select(corr(prostate.age, prostate.lpsa)).show(5)

+-------------------+
|    corr(age, lpsa)|
+-------------------+
|0.16959284228582772|
+-------------------+



In [None]:
# `count`
prostate.select(count(prostate.lpsa)).show()

+-----------+
|count(lpsa)|
+-----------+
|         97|
+-----------+



## `covar_pop`

**population covariance**: $\frac{1}{n}\sum_{i=1}^n(x_{i} - \bar{x})(y_{i} - \bar{y})$

In [None]:
prostate.select(covar_pop(prostate.age, prostate.lpsa)).show()

+--------------------+
|covar_pop(age, lpsa)|
+--------------------+
|  1.4424746293984458|
+--------------------+



## `covar_samp`
**sample covariance**: $\frac{1}{n-1}\sum_{i=1}^n(x_{i} - \bar{x})(y_{i} - \bar{y})$

In [None]:
prostate.select(covar_samp(prostate.age, prostate.lpsa)).show()

+---------------------+
|covar_samp(age, lpsa)|
+---------------------+
|   1.4575004067880128|
+---------------------+



## `create_map`

In [None]:
iris.show(5)

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows



In [None]:
df = iris.select(create_map('species', 'sepal_length'))
df.show(5)

+--------------------------+
|map(species, sepal_length)|
+--------------------------+
|           {setosa -> 5.1}|
|           {setosa -> 4.9}|
|           {setosa -> 4.7}|
|           {setosa -> 4.6}|
|           {setosa -> 5.0}|
+--------------------------+
only showing top 5 rows



In [None]:
df.dtypes

[('map(species, sepal_length)', 'map<string,double>')]

## `current_date`

In [None]:
df = spark.createDataFrame([[1],[2],[3],[4]], ['x'])
df.show()

+---+
|  x|
+---+
|  1|
|  2|
|  3|
|  4|
+---+



In [None]:
df.select('x', current_date()).show()

+---+--------------+
|  x|current_date()|
+---+--------------+
|  1|    2022-12-02|
|  2|    2022-12-02|
|  3|    2022-12-02|
|  4|    2022-12-02|
+---+--------------+



## `current_tmestamp`

In [None]:
df.select('x', current_timestamp()).show(truncate=False)

+---+-----------------------+
|x  |current_timestamp()    |
+---+-----------------------+
|1  |2022-12-02 04:54:16.886|
|2  |2022-12-02 04:54:16.886|
|3  |2022-12-02 04:54:16.886|
|4  |2022-12-02 04:54:16.886|
+---+-----------------------+



## `date_add`

In [None]:
df2 = df.select('x', current_date().alias('current_date'))
df2.show(5)

+---+------------+
|  x|current_date|
+---+------------+
|  1|  2022-12-02|
|  2|  2022-12-02|
|  3|  2022-12-02|
|  4|  2022-12-02|
+---+------------+



In [None]:
df2.select('x', 'current_date', date_add(df2.current_date, 10)).show()

+---+------------+--------------------------+
|  x|current_date|date_add(current_date, 10)|
+---+------------+--------------------------+
|  1|  2022-12-02|                2022-12-12|
|  2|  2022-12-02|                2022-12-12|
|  3|  2022-12-02|                2022-12-12|
|  4|  2022-12-02|                2022-12-12|
+---+------------+--------------------------+



## `date_format`

In [None]:
df2.select('x', 'current_date', date_format('current_date', 'MM/dd/yyyy').alias('new_date')).show()

+---+------------+----------+
|  x|current_date|  new_date|
+---+------------+----------+
|  1|  2022-12-02|12/02/2022|
|  2|  2022-12-02|12/02/2022|
|  3|  2022-12-02|12/02/2022|
|  4|  2022-12-02|12/02/2022|
+---+------------+----------+



# TF, IDF and TF-IDF

* TF is short for **Term Frequency**. It is simply the frequency of a term in a document. The higher the TF is for a specific term, the more important that term is to that document.

* IDF is short for **Inverse Document Frequency**. It is the frequency of documents that contain a specific term. If a term exists in every single document, then the Document Frequency is the largest and is 1. And the Inverse Document Frequency will be the smallest. In the situation, this term is non-informative for classifying the documents.The IDF is a measure of the relevance of a term. The higher the IDF is, the more relavant the term is.

* TF-IDF is the product of TF and IDF. A high TF-IDF is obtained when the The Term Frequency is high and the Document Frequency is low (IDF is high).


# Term Frequency, HashingTF and CountVectorizer

Pyspark has two functions to calculate term frequencies from documents: the **`HashingTF()`** and the **`CountVectorizer()`**. These two functions do two things:

1. Indexing terms: converting words to numbers.
2. Calculate term frequencies for each documents.

The `HashingTF()` utilizes the Murmurhash 3 function to map a raw feature (a term) into an index (a number). Hashing is the process of transforming data of arbitrary size to size-fixed, usually shorter data. The term frequencies are calculated based on the generated indices. For the  HashingTF() method, the mapping process is very cheap. Because each term-to-index mapping is independent of other term-to-index mapping. The hashing function takes a unique input and gerenate a “unique result”. However, **hashing collision** may occur, which means different features (terms) may be hased to the same index.

The **`CountVectorizer()`** indexes terms by descending order of term frequencies in the entire corpus, NOT the term frequencies in the document. After the indexing process, the term frequencies are calculated by documents.

In [None]:
import pandas as pd
pdf = pd.DataFrame({
        'terms': [
            ['spark', 'spark', 'spark', 'is', 'awesome', 'awesome'],
            ['I', 'love', 'spark', 'very', 'very', 'much'],
            ['everyone', 'should', 'use', 'spark']
        ]
    })
df = spark.createDataFrame(pdf)
df.show(truncate=False)

+-------------------------------------------+
|terms                                      |
+-------------------------------------------+
|[spark, spark, spark, is, awesome, awesome]|
|[I, love, spark, very, very, much]         |
|[everyone, should, use, spark]             |
+-------------------------------------------+



## HashingTF

The **numFeatures** paramter takes an integer, which should be larger than the total number of terms in the corpus. And it should be a power of two so that features are mapped evenly to columns.



In [None]:
from pyspark.ml.feature import HashingTF
from pyspark.ml import Pipeline

hashtf = HashingTF(numFeatures=2**4, inputCol='terms', outputCol='features(numFeatures), [index], [term frequency]')
stages = [hashtf]
pipeline = Pipeline(stages=stages)

In [None]:
pipeline.fit(df).transform(df).show(truncate=False)

+-------------------------------------------+------------------------------------------------+
|terms                                      |features(numFeatures), [index], [term frequency]|
+-------------------------------------------+------------------------------------------------+
|[spark, spark, spark, is, awesome, awesome]|(16,[6,9],[3.0,3.0])                            |
|[I, love, spark, very, very, much]         |(16,[0,6,8,12],[1.0,1.0,2.0,2.0])               |
|[everyone, should, use, spark]             |(16,[5,6,13],[1.0,1.0,2.0])                     |
+-------------------------------------------+------------------------------------------------+



You may note that the first document has three distinct terms, but only two term frequencies are obtained. This apparent discrepancy is due to a **hashing collision**: both `spark` and `is` are getting hashed to `1`. The term frequency for index `1` in the first document is `4.0` corresponding to the three counts of `spark` and the one count of `is`. The likelihood of a hashing collision can be reduced by increasing the `numFeatures` parameter passed to the `HashingTF` function (the default for example is $2^{18} = 262,144$).

In [None]:
hashtf = HashingTF(numFeatures=2**10, inputCol='terms', outputCol='features(numFeatures), [index], [term frequency]')
stages = [hashtf]
pipeline = Pipeline(stages=stages)
pipeline.fit(df).transform(df).show(truncate=False)

+-------------------------------------------+--------------------------------------------------+
|terms                                      |features(numFeatures), [index], [term frequency]  |
+-------------------------------------------+--------------------------------------------------+
|[spark, spark, spark, is, awesome, awesome]|(1024,[345,502,761],[1.0,3.0,2.0])                |
|[I, love, spark, very, very, much]         |(1024,[112,120,502,556,988],[1.0,2.0,1.0,1.0,1.0])|
|[everyone, should, use, spark]             |(1024,[237,413,502,885],[1.0,1.0,1.0,1.0])        |
+-------------------------------------------+--------------------------------------------------+



## CountVectorizer

The **`CountVectorizer()`** function has three parameters to control which terms will be kept as features.

* minTF: features that has term frequency less than minTF will be removed. If minTF=1minTF=1, then no features will be removed.
* minDF: features that has document frequency less than minDF will be removed. If minDF=1minDF=1, then no features will be removed.
* vocabSize: keep terms of the top vocabSize frequencies.

In the example below, the `minTF=1.0,minDF=1.0minTF=1.0,minDF=1.0` and `vocabSize=20vocabSize=20`, which is larger than the total number of terms. Therefore, all features (terms) will be kept.

In [None]:
from pyspark.ml.feature import CountVectorizer
from pyspark.ml import Pipeline

countvectorizer = CountVectorizer(minTF=1.0, minDF=1.0, vocabSize=20, 
                                  inputCol='terms', outputCol='features(vocabSize), [index], [term frequency]')
stages = [countvectorizer]
pipeline = Pipeline(stages=stages)


In [None]:
pipeline.fit(df).transform(df).show(truncate=False)

+-------------------------------------------+----------------------------------------------+
|terms                                      |features(vocabSize), [index], [term frequency]|
+-------------------------------------------+----------------------------------------------+
|[spark, spark, spark, is, awesome, awesome]|(10,[0,1,6],[3.0,2.0,1.0])                    |
|[I, love, spark, very, very, much]         |(10,[0,2,3,7,9],[1.0,2.0,1.0,1.0,1.0])        |
|[everyone, should, use, spark]             |(10,[0,4,5,8],[1.0,1.0,1.0,1.0])              |
+-------------------------------------------+----------------------------------------------+



Now, lets use the StringIndexer() to index the corpus and see if the results is consistant with the CountVectorizer() method.

### `flatMap` documents so that each row has a single term.

In [None]:
from pyspark.sql.types import StringType
df_vocab = df.select('terms').rdd.\
            flatMap(lambda x: x[0]).\
            toDF(schema=StringType()).toDF('terms')
df_vocab.show()

+--------+
|   terms|
+--------+
|   spark|
|   spark|
|   spark|
|      is|
| awesome|
| awesome|
|       I|
|    love|
|   spark|
|    very|
|    very|
|    much|
|everyone|
|  should|
|     use|
|   spark|
+--------+



### Calculate term frequencies in the corpus

In [None]:
vocab_freq = df_vocab.rdd.countByValue()
pdf = pd.DataFrame({
        'term': list(vocab_freq.keys()),
        'frequency': list(vocab_freq.values())
    })
pdf
tf = spark.createDataFrame(pdf).orderBy('frequency', ascending=False)
tf.show()

+----------+---------+
|      term|frequency|
+----------+---------+
|   {spark}|        5|
|    {very}|        2|
| {awesome}|        2|
|    {much}|        1|
|  {should}|        1|
|{everyone}|        1|
|     {use}|        1|
|      {is}|        1|
|       {I}|        1|
|    {love}|        1|
+----------+---------+



## Apply `StringIndexer()` to df_vocab

In [None]:
from pyspark.ml.feature import StringIndexer
stringindexer = StringIndexer(inputCol='terms', outputCol='StringIndexer(index)')

In [None]:
stringindexer.fit(df_vocab).transform(df_vocab).\
    distinct().\
    orderBy('StringIndexer(index)').show()

+--------+--------------------+
|   terms|StringIndexer(index)|
+--------+--------------------+
|   spark|                 0.0|
| awesome|                 1.0|
|    very|                 2.0|
|       I|                 3.0|
|everyone|                 4.0|
|      is|                 5.0|
|    love|                 6.0|
|    much|                 7.0|
|  should|                 8.0|
|     use|                 9.0|
+--------+--------------------+



The indexing result is consistant for the first three terms. The rest of terms have the same frequency which is 1. These terms can not be sorted by frequency. This might be the reason that their indices don’t match the results from the CountVectorizer() method.