In [1]:
try:
    sc.stop()
except:
    pass
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sc=SparkContext()
spark = SparkSession(sparkContext=sc)

In [2]:
mtcarsfile="../../data/mtcars.csv"

In [3]:
mtcars_df = spark.read.csv(mtcarsfile, inferSchema=True, header=True)

In [4]:
mtcars_df.show(2)

+-------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|        model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|    Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
+-------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 2 rows



### Aggregate functions
Two aggregate functions:

* aggregate()
* aggregateByKey()
#### aggregate(zeroValue, seqOp, combOp)
* **zeroValue** is like a data container. Its structure should match with the data structure of the returned values from the seqOp function.
* **seqOp** is a function that takes two arguments: the first argument is the zeroValue and the second argument is an element from the RDD. The zeroValue gets updated with the returned value after every run.
* **combOp** is a function that takes two arguments: the first argument is the final zeroValue from one partition and the other is another final zeroValue from another partition.
The code below calculates the total sum of squares for **mpg** and **disp** in data set mtcars.


In [22]:
# Step 1: get some data.
# Step 2: calculate averages of mgp and disp

mpg_mean  = mtcars_df.select('mpg').rdd.map(lambda x:x[0]).mean()
disp_mean = mtcars_df.select('disp').rdd.map(lambda x:x[0]).mean() 

print("""
   Mean of mpg  : {0}
   Mean of disp : {1}
""".format(mpg_mean,disp_mean) )


   Mean of mpg  : 20.090625000000003
   Mean of disp : 230.721875



In [23]:
# Step 3: build zeroValue, seqOp and combOp
zeroValue = (0, 0)

In [24]:
list(enumerate(mtcars_df.columns))

[(0, 'model'),
 (1, 'mpg'),
 (2, 'cyl'),
 (3, 'disp'),
 (4, 'hp'),
 (5, 'drat'),
 (6, 'wt'),
 (7, 'qsec'),
 (8, 'vs'),
 (9, 'am'),
 (10, 'gear'),
 (11, 'carb')]

In [29]:
mtcars_df_1 = mtcars_df.select(['mpg','disp'])
mtcars_df_1.show(2)
seqOp = lambda z, x: (z[0] + (x[0] - mpg_mean)**2, z[1] + (x[1] - disp_mean)**2)
combOp = lambda px, py: ( px[0] + py[0], px[1] + py[1] )

+----+-----+
| mpg| disp|
+----+-----+
|21.0|160.0|
|21.0|160.0|
+----+-----+
only showing top 2 rows



In [30]:
mtcars_df_1.rdd.aggregate(zeroValue, seqOp, combOp)

(1126.0471874999998, 476184.7946875)

### aggregateByKey(zeroValue, seqOp, combOp)
This function does similar things as **aggregate()**. The **aggregate()** aggregate all results to the very end, but **aggregateByKey()** merge results by key.

In [31]:
iris_rdd = sc.textFile('../../data/iris.csv')
iris_rdd.take(2)

['sepal_length,sepal_width,petal_length,petal_width,species',
 '5.1,3.5,1.4,0.2,setosa']

In [33]:
iris_rdd = sc.textFile('../../data/iris.csv',use_unicode=True)
iris_rdd.take(2)

['sepal_length,sepal_width,petal_length,petal_width,species',
 '5.1,3.5,1.4,0.2,setosa']

In [37]:
header=iris_rdd.map(lambda x : x.split(',')).first()
header

['sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'species']

In [41]:
iris_row = iris_rdd.map(lambda x : x.split(',')).filter(lambda x: x != header)
iris_row.take(2)

[['5.1', '3.5', '1.4', '0.2', 'setosa'],
 ['4.9', '3.0', '1.4', '0.2', 'setosa']]

In [45]:
iris_row_tuple = iris_row.map(lambda x: (x[-1], [*map(float,x[:-1])]))

In [47]:
iris_row_tuple.take(4)

[('setosa', [5.1, 3.5, 1.4, 0.2]),
 ('setosa', [4.9, 3.0, 1.4, 0.2]),
 ('setosa', [4.7, 3.2, 1.3, 0.2]),
 ('setosa', [4.6, 3.1, 1.5, 0.2])]

### Define initial values, seqOp and combOp

In [48]:
zero_value = (0, 0)
seqOp = (lambda x, y: (x[0] + (y[0])**2, x[1] + (y[1])**2))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))

### Implement aggregateByKey()¶

In [50]:
iris_row_tuple.aggregateByKey(zero_value, seqOp, combOp).collect()

[('setosa', (1259.0899999999997, 591.2500000000002)),
 ('versicolor', (1774.8600000000006, 388.4700000000001)),
 ('virginica', (2189.9000000000005, 447.33))]

### Map functions
These functions are probably the most commonly used functions when dealing with an RDD object.

    *map()
    *mapValues()
    *flatMap()
    *flatMapValues()
#### map
The map() method applies a function to each elements of the RDD. Each element has to be a valid input to the function. The returned RDD has the function outputs as its new elements.

Elements in the RDD object map_exp_rdd below are rows of the mtcars in string format. We are going to apply the map() function multiple times to convert each string elements as a list elements. Each list element has two values: the first value will be the auto model in string format; the second value will be a list of numeric values.

In [51]:
mtcars_df.show(4)

+--------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|         model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+--------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|     Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
| Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|    Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
+--------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 4 rows



In [53]:
header = mtcars_df.columns
header

['model',
 'mpg',
 'cyl',
 'disp',
 'hp',
 'drat',
 'wt',
 'qsec',
 'vs',
 'am',
 'gear',
 'carb']

In [54]:
mtcars_rdd = sc.textFile(mtcarsfile)
mtcars_rdd.take(2)

['model,mpg,cyl,disp,hp,drat,wt,qsec,vs,am,gear,carb',
 'Mazda RX4,21,6,160,110,3.9,2.62,16.46,0,1,4,4']

In [55]:
mtcars_rdd_map = mtcars_rdd.map(lambda x : x.split(','))

In [57]:
mtcars_rdd_map.take(2)

[['model',
  'mpg',
  'cyl',
  'disp',
  'hp',
  'drat',
  'wt',
  'qsec',
  'vs',
  'am',
  'gear',
  'carb'],
 ['Mazda RX4',
  '21',
  '6',
  '160',
  '110',
  '3.9',
  '2.62',
  '16.46',
  '0',
  '1',
  '4',
  '4']]

In [61]:
mtcars_rdd_map_header = mtcars_rdd_map.first()
print(mtcars_rdd_map_header)
mtcars_rdd_map_rows = mtcars_rdd_map.filter(lambda x : x!=mtcars_rdd_map_header)
print(mtcars_rdd_map_rows.take(2))

['model', 'mpg', 'cyl', 'disp', 'hp', 'drat', 'wt', 'qsec', 'vs', 'am', 'gear', 'carb']
[['Mazda RX4', '21', '6', '160', '110', '3.9', '2.62', '16.46', '0', '1', '4', '4'], ['Mazda RX4 Wag', '21', '6', '160', '110', '3.9', '2.875', '17.02', '0', '1', '4', '4']]


In [64]:
# # split auto model from other feature values

# convert string values to numeric values

mtcars_rdd_map_rows_s_f = mtcars_rdd_map_rows.map(lambda x : (x[0],[*map(float,x[1:])]))

In [65]:
mtcars_rdd_map_rows_s_f.take(1)

[('Mazda RX4',
  [21.0, 6.0, 160.0, 110.0, 3.9, 2.62, 16.46, 0.0, 1.0, 4.0, 4.0])]

### mapValues
The mapValues function requires that each element in the RDD has a **key/value** pair structure, for example, a tuple of 2 items, or a list of 2 items. The mapValues function applies a function to each of the element values. The element key will remain unchanged.

We can apply the mapValues function to the RDD object mapValues_exp_rdd below.

In [67]:
mtcars_rdd_map_rows_s_f.mapValues(lambda x:x).take(1)

[('Mazda RX4',
  [21.0, 6.0, 160.0, 110.0, 3.9, 2.62, 16.46, 0.0, 1.0, 4.0, 4.0])]

In [70]:
import numpy as np

In [75]:
mtcars_rdd_map_rows_s_f.mapValues(lambda x:np.mean(x)).collect()[:4]
### When using mapValues(), the x in the above lambda function refers to the element value, not including the element key.

[('Mazda RX4', 29.90727272727273),
 ('Mazda RX4 Wag', 29.98136363636364),
 ('Datsun 710', 23.59818181818182),
 ('Hornet 4 Drive', 38.73954545454546)]

### flatMap
This function first applies a function to each elements of an RDD and then flatten the results. We can simply use this function to flatten elements of an RDD without extra operation on each elements.

In [81]:
x = [('a','b','c'),('d','e','e'),('f','g')]
x
flapMap_x = sc.parallelize(x)
flapMap_x.collect()

[('a', 'b', 'c'), ('d', 'e', 'e'), ('f', 'g')]

In [83]:
flapMap_x.flatMap(lambda x:x).collect()

['a', 'b', 'c', 'd', 'e', 'e', 'f', 'g']

In [84]:
y = np.arange(1,10)
y.shape = (3,3)
flapMap_y = sc.parallelize(y)
flapMap_y.collect()

[array([1, 2, 3]), array([4, 5, 6]), array([7, 8, 9])]

In [85]:
flapMap_y.flatMap(lambda y:y).collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9]

### flatMapValues
The flatMapValues function requires that each element in the RDD has a key/value pair structure. It applies a function to each element value of the RDD object and then flatten the results.

For example, my raw data looks like below. But I would like to transform the data so that it has three columns: the first column is the sample id; the second the column is the three types (A,B or C); the third column is the values.

 
|sample id|A    |B    |C    |
|---------|-----|-----|-----|
|1        |23   |18   |32   |
|2        |18   |29   |31   |
|3        |34   |21   |18   |   

In [90]:
my_data = [
    [1, (23, 28, 32)],
    [2, (18, 29, 31)],
    [3, (34, 21, 18)]
]

flatMapValues_rdd = sc.parallelize(my_data)
print(flatMapValues_rdd.collect())
flatMapValues_rdd_1 = flatMapValues_rdd.flatMapValues(lambda x: list(zip(list('ABC'), x)))
flatMapValues_rdd_1.collect()

[[1, (23, 28, 32)], [2, (18, 29, 31)], [3, (34, 21, 18)]]


[(1, ('A', 23)),
 (1, ('B', 28)),
 (1, ('C', 32)),
 (2, ('A', 18)),
 (2, ('B', 29)),
 (2, ('C', 31)),
 (3, ('A', 34)),
 (3, ('B', 21)),
 (3, ('C', 18))]

In [103]:
def changefmv(x):
    v= [x[0]] + list(x[1])
    return v
print(flatMapValues_rdd_1.flatMapValues(lambda x: [x[0]]).collect())
print(flatMapValues_rdd_1.flatMapValues(lambda x: [x[1]]).collect())

[(1, 'A'), (1, 'B'), (1, 'C'), (2, 'A'), (2, 'B'), (2, 'C'), (3, 'A'), (3, 'B'), (3, 'C')]
[(1, 23), (1, 28), (1, 32), (2, 18), (2, 29), (2, 31), (3, 34), (3, 21), (3, 18)]


## First Data Check

In [111]:
def describe_columns(df):
    for i in df.columns:
        print('Column: ' + i)
        df.select(i).describe().show()
describe_columns(mtcars_df)

Column: model
+-------+-----------+
|summary|      model|
+-------+-----------+
|  count|         32|
|   mean|       null|
| stddev|       null|
|    min|AMC Javelin|
|    max| Volvo 142E|
+-------+-----------+

Column: mpg
+-------+------------------+
|summary|               mpg|
+-------+------------------+
|  count|                32|
|   mean|20.090624999999996|
| stddev| 6.026948052089103|
|    min|              10.4|
|    max|              33.9|
+-------+------------------+

Column: cyl
+-------+------------------+
|summary|               cyl|
+-------+------------------+
|  count|                32|
|   mean|            6.1875|
| stddev|1.7859216469465444|
|    min|                 4|
|    max|                 8|
+-------+------------------+

Column: disp
+-------+------------------+
|summary|              disp|
+-------+------------------+
|  count|                32|
|   mean|230.72187500000004|
| stddev|123.93869383138195|
|    min|              71.1|
|    max|             4


### Select Rows by index
First, we need to add index to each rows. The ***zipWithIndex*** function zips the RDD elements with their corresponding index and returns the result as a new element.

In [5]:
mtcars = mtcars_df

In [6]:
mtcars.rdd.zipWithIndex().take(3)

[(Row(model='Mazda RX4', mpg=21.0, cyl=6, disp=160.0, hp=110, drat=3.9, wt=2.62, qsec=16.46, vs=0, am=1, gear=4, carb=4),
  0),
 (Row(model='Mazda RX4 Wag', mpg=21.0, cyl=6, disp=160.0, hp=110, drat=3.9, wt=2.875, qsec=17.02, vs=0, am=1, gear=4, carb=4),
  1),
 (Row(model='Datsun 710', mpg=22.8, cyl=4, disp=108.0, hp=93, drat=3.85, wt=2.32, qsec=18.61, vs=1, am=1, gear=4, carb=1),
  2)]

In [7]:
mtcars.rdd.zipWithIndex().map(lambda x:[x[1]]+[*x[0]]).take(2)

[[0, 'Mazda RX4', 21.0, 6, 160.0, 110, 3.9, 2.62, 16.46, 0, 1, 4, 4],
 [1, 'Mazda RX4 Wag', 21.0, 6, 160.0, 110, 3.9, 2.875, 17.02, 0, 1, 4, 4]]

In [9]:
header = ['index']+mtcars.columns
mtcars_df = mtcars.rdd.zipWithIndex().map(lambda x:[x[1]]+[*x[0]]).toDF(header)

In [10]:
mtcars_df.show(2)

+-----+-------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|index|        model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----+-------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|    0|    Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|    1|Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
+-----+-------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 2 rows



After we obtain the **index column**, we can apply the ***pyspark.sql.DataFrame.filter** function to select rows of the DataFrame. The filter function takes a **column of types.BooleanType** as input.


### Select specific rows

In [13]:
mtcars_df.filter(mtcars_df.index.isin([1,2,4,6,9])).show()

+-----+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|index|            model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|    1|    Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|    2|       Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|    4|Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
|    6|       Duster 360|14.3|  8|360.0|245|3.21| 3.57|15.84|  0|  0|   3|   4|
|    9|         Merc 280|19.2|  6|167.6|123|3.92| 3.44| 18.3|  1|  0|   4|   4|
+-----+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+



### Select rows between a range.count()
#### Select rows between a range

In [18]:
mtcars_df.filter(mtcars_df.index.between(5,10)).show()

+-----+----------+----+---+-----+---+----+----+-----+---+---+----+----+
|index|     model| mpg|cyl| disp| hp|drat|  wt| qsec| vs| am|gear|carb|
+-----+----------+----+---+-----+---+----+----+-----+---+---+----+----+
|    5|   Valiant|18.1|  6|225.0|105|2.76|3.46|20.22|  1|  0|   3|   1|
|    6|Duster 360|14.3|  8|360.0|245|3.21|3.57|15.84|  0|  0|   3|   4|
|    7| Merc 240D|24.4|  4|146.7| 62|3.69|3.19| 20.0|  1|  0|   4|   2|
|    8|  Merc 230|22.8|  4|140.8| 95|3.92|3.15| 22.9|  1|  0|   4|   2|
|    9|  Merc 280|19.2|  6|167.6|123|3.92|3.44| 18.3|  1|  0|   4|   4|
|   10| Merc 280C|17.8|  6|167.6|123|3.92|3.44| 18.9|  1|  0|   4|   4|
+-----+----------+----+---+-----+---+----+----+-----+---+---+----+----+



### Select rows by a cutoff index

In [19]:
mtcars_df.filter(mtcars_df.index < 3).show()

+-----+-------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|index|        model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----+-------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|    0|    Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|    1|Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|    2|   Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
+-----+-------------+----+---+-----+---+----+-----+-----+---+---+----+----+



### Select rows by logical criteria
Example 1: select rows when cyl = 4

In [20]:
mtcars_df.filter(mtcars_df.cyl == 4).show(2)

+-----+----------+----+---+-----+---+----+----+-----+---+---+----+----+
|index|     model| mpg|cyl| disp| hp|drat|  wt| qsec| vs| am|gear|carb|
+-----+----------+----+---+-----+---+----+----+-----+---+---+----+----+
|    2|Datsun 710|22.8|  4|108.0| 93|3.85|2.32|18.61|  1|  1|   4|   1|
|    7| Merc 240D|24.4|  4|146.7| 62|3.69|3.19| 20.0|  1|  0|   4|   2|
+-----+----------+----+---+-----+---+----+----+-----+---+---+----+----+
only showing top 2 rows



Example 2: select rows when vs = 1 and am = 1

When the filtering is based on multiple conditions (e.g., vs = 1 and am = 1), we use the conditions to build a new boolean type column. And we filter the DataFrame by the new column.

In [21]:
from pyspark.sql import functions as F

Warning: when passing multiple conditions to the **`when()`** function, each condition has to be within a pair of parentheses

In [23]:
filtering_column = F.when((mtcars_df.vs == 1) & (mtcars_df.am == 1), 1).name('filter_col')
filtering_column

Column<b'CASE WHEN ((vs = 1) AND (am = 1)) THEN 1 END AS `filter_col`'>

Now we need to add the new column to the original DataFrame. This can be done by applying the select() function to select all original columns as well as the new filtering columns.

In [24]:
all_original_columns = [eval('mtcars_df.' + c) for c in mtcars_df.columns]
all_original_columns

[Column<b'index'>,
 Column<b'model'>,
 Column<b'mpg'>,
 Column<b'cyl'>,
 Column<b'disp'>,
 Column<b'hp'>,
 Column<b'drat'>,
 Column<b'wt'>,
 Column<b'qsec'>,
 Column<b'vs'>,
 Column<b'am'>,
 Column<b'gear'>,
 Column<b'carb'>]

In [25]:
all_columns = all_original_columns + [filtering_column]
all_columns

[Column<b'index'>,
 Column<b'model'>,
 Column<b'mpg'>,
 Column<b'cyl'>,
 Column<b'disp'>,
 Column<b'hp'>,
 Column<b'drat'>,
 Column<b'wt'>,
 Column<b'qsec'>,
 Column<b'vs'>,
 Column<b'am'>,
 Column<b'gear'>,
 Column<b'carb'>,
 Column<b'CASE WHEN ((vs = 1) AND (am = 1)) THEN 1 END AS `filter_col`'>]

In [26]:
new_mtcars_df = mtcars_df.select(all_columns)
new_mtcars_df.show()

+-----+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+----------+
|index|              model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|filter_col|
+-----+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+----------+
|    0|          Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|      null|
|    1|      Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|      null|
|    2|         Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|         1|
|    3|     Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|      null|
|    4|  Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|      null|
|    5|            Valiant|18.1|  6|225.0|105|2.76| 3.46|20.22|  1|  0|   3|   1|      null|
|    6|         Duster 360|14.3|  8|360.0|245|3.21| 3.57|15.84|  0|  0|   3|   4|      null|
|    7|          Merc 240D|24.4|  4|146.7| 62|3.69| 3.19| 20.0|  1|  0

In [27]:
new_mtcars_df.filter(new_mtcars_df.filter_col == 1).drop('filter_col').show()

+-----+--------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|index|         model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----+--------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|    2|    Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|   17|      Fiat 128|32.4|  4| 78.7| 66|4.08|  2.2|19.47|  1|  1|   4|   1|
|   18|   Honda Civic|30.4|  4| 75.7| 52|4.93|1.615|18.52|  1|  1|   4|   2|
|   19|Toyota Corolla|33.9|  4| 71.1| 65|4.22|1.835| 19.9|  1|  1|   4|   1|
|   25|     Fiat X1-9|27.3|  4| 79.0| 66|4.08|1.935| 18.9|  1|  1|   4|   1|
|   27|  Lotus Europa|30.4|  4| 95.1|113|3.77|1.513| 16.9|  1|  1|   5|   2|
|   31|    Volvo 142E|21.4|  4|121.0|109|4.11| 2.78| 18.6|  1|  1|   4|   2|
+-----+--------------+----+---+-----+---+----+-----+-----+---+---+----+----+



In [29]:
F.when((mtcars_df.vs == 1) & (mtcars_df.am == 1), 1)

Column<b'CASE WHEN ((vs = 1) AND (am = 1)) THEN 1 END'>

In [32]:
mtcars_df.filter(F.when((mtcars_df.vs == 1) & (mtcars_df.am == 1), 1) == 1).show(2)

+-----+----------+----+---+-----+---+----+----+-----+---+---+----+----+
|index|     model| mpg|cyl| disp| hp|drat|  wt| qsec| vs| am|gear|carb|
+-----+----------+----+---+-----+---+----+----+-----+---+---+----+----+
|    2|Datsun 710|22.8|  4|108.0| 93|3.85|2.32|18.61|  1|  1|   4|   1|
|   17|  Fiat 128|32.4|  4| 78.7| 66|4.08| 2.2|19.47|  1|  1|   4|   1|
+-----+----------+----+---+-----+---+----+----+-----+---+---+----+----+
only showing top 2 rows



### Select columns by name
We can simply use the **select()** function to select columns by name.

In [33]:
mtcars.select(['hp','disp']).show(5)

+---+-----+
| hp| disp|
+---+-----+
|110|160.0|
|110|160.0|
| 93|108.0|
|110|258.0|
|175|360.0|
+---+-----+
only showing top 5 rows



### Select columns by index
We can convert indices to corresponding column names and then select columns by name.

In [34]:
indices = [0,2,4,6]
selected_columns = [ mtcars.columns[index] for index in indices]
print(selected_columns)
mtcars.select(selected_columns).show(4)


['model', 'cyl', 'hp', 'wt']
+--------------+---+---+-----+
|         model|cyl| hp|   wt|
+--------------+---+---+-----+
|     Mazda RX4|  6|110| 2.62|
| Mazda RX4 Wag|  6|110|2.875|
|    Datsun 710|  4| 93| 2.32|
|Hornet 4 Drive|  6|110|3.215|
+--------------+---+---+-----+
only showing top 4 rows



### Select columns by pattern
Example: columns start with 'd'.

In [36]:
import re 

selected_columns = [ x for x in mtcars.columns if re.compile('^d').match(x) is not None]
print(selected_columns)
mtcars.select(selected_columns).show(4)

['disp', 'drat']
+-----+----+
| disp|drat|
+-----+----+
|160.0| 3.9|
|160.0| 3.9|
|108.0|3.85|
|258.0|3.08|
+-----+----+
only showing top 4 rows



### Column expression
A Spark column instance is NOT a column of values from the DataFrame: when you crate a column instance, it does not give you the actual values of that column in the DataFrame. I found it makes more sense to me if I consider a column instance as a column of expressions. These expressions are evaluated by other methods (e.g., the select(), groupby(), and orderby() from pyspark.sql.DataFrame)

The **pyspark.sql.Column** has many methods that acts on a column and returns a column instance.

In [38]:
mtcars.filter(mtcars.gear.isin([2,3])).show()

+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|              model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|     Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
|  Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
|            Valiant|18.1|  6|225.0|105|2.76| 3.46|20.22|  1|  0|   3|   1|
|         Duster 360|14.3|  8|360.0|245|3.21| 3.57|15.84|  0|  0|   3|   4|
|         Merc 450SE|16.4|  8|275.8|180|3.07| 4.07| 17.4|  0|  0|   3|   3|
|         Merc 450SL|17.3|  8|275.8|180|3.07| 3.73| 17.6|  0|  0|   3|   3|
|        Merc 450SLC|15.2|  8|275.8|180|3.07| 3.78| 18.0|  0|  0|   3|   3|
| Cadillac Fleetwood|10.4|  8|472.0|205|2.93| 5.25|17.98|  0|  0|   3|   4|
|Lincoln Continental|10.4|  8|460.0|215| 3.0|5.424|17.82|  0|  0|   3|   4|
|  Chrysler Imperial|14.7|  8|440.0|230|3.23|5.345|17.42|  0|  0|   3|   4|
|      Toyot

In [45]:
mtcars.select(mtcars.gear).distinct().show()

+----+
|gear|
+----+
|   3|
|   5|
|   4|
+----+



### between(): true/false if the column value is between a given range¶

In [46]:
mtcars.mpg.between(18,20)

Column<b'((mpg >= 18) AND (mpg <= 20))'>

In [50]:
mtcars.select(mtcars.mpg.between(18,20).alias('18 to 20')).show(5)

+--------+
|18 to 20|
+--------+
|   false|
|   false|
|   false|
|   false|
|    true|
+--------+
only showing top 5 rows



In [51]:
mtcars.filter(mtcars.mpg.between(18,20)).show()

+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|            model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
|          Valiant|18.1|  6|225.0|105|2.76| 3.46|20.22|  1|  0|   3|   1|
|         Merc 280|19.2|  6|167.6|123|3.92| 3.44| 18.3|  1|  0|   4|   4|
| Pontiac Firebird|19.2|  8|400.0|175|3.08|3.845|17.05|  0|  0|   3|   2|
|     Ferrari Dino|19.7|  6|145.0|175|3.62| 2.77| 15.5|  0|  1|   5|   6|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+



### contains(): true/false if the column value contains a string

In [53]:
mtcars.model.contains('Ho')

Column<b'contains(model, Ho)'>

In [55]:
mtcars.filter(mtcars.model.contains('Ho')).show(4)

+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|            model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|   Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
|Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
|      Honda Civic|30.4|  4| 75.7| 52|4.93|1.615|18.52|  1|  1|   4|   2|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+



In [56]:
mtcars.filter(mtcars.model.contains('Ho') == False).show(4)

+-------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|        model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|    Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|   Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|      Valiant|18.1|  6|225.0|105|2.76| 3.46|20.22|  1|  0|   3|   1|
+-------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 4 rows



### endswith(): true/false if the column value ends with a string

In [62]:
mtcars.select(mtcars.model, mtcars.model.endswith('d').alias('end-d')).show(5)

+-----------------+-----+
|            model|end-d|
+-----------------+-----+
|        Mazda RX4|false|
|    Mazda RX4 Wag|false|
|       Datsun 710|false|
|   Hornet 4 Drive|false|
|Hornet Sportabout|false|
+-----------------+-----+
only showing top 5 rows



### startswith(): true/false if the column value starts with a string

In [65]:
mtcars.select(mtcars.model, mtcars.model.startswith('M').alias('start-M')).show(5)

+-----------------+-------+
|            model|start-M|
+-----------------+-------+
|        Mazda RX4|   true|
|    Mazda RX4 Wag|   true|
|       Datsun 710|  false|
|   Hornet 4 Drive|  false|
|Hornet Sportabout|  false|
+-----------------+-------+
only showing top 5 rows



### like(): true/false if the column value matches a pattern based on a SQL LIKE

In [72]:
mtcars.filter(mtcars.model.like('H%da%')).show()

+-----------+----+---+----+---+----+-----+-----+---+---+----+----+
|      model| mpg|cyl|disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----------+----+---+----+---+----+-----+-----+---+---+----+----+
|Honda Civic|30.4|  4|75.7| 52|4.93|1.615|18.52|  1|  1|   4|   2|
+-----------+----+---+----+---+----+-----+-----+---+---+----+----+



### rlike(): true/false if the column value matches a pattern based on a SQL RLIKE (LIKE with Regex)

In [74]:
mtcars.filter(mtcars.model.rlike('^H.*t$')).show()

+-----------------+----+---+-----+---+----+----+-----+---+---+----+----+
|            model| mpg|cyl| disp| hp|drat|  wt| qsec| vs| am|gear|carb|
+-----------------+----+---+-----+---+----+----+-----+---+---+----+----+
|Hornet Sportabout|18.7|  8|360.0|175|3.15|3.44|17.02|  0|  0|   3|   2|
+-----------------+----+---+-----+---+----+----+-----+---+---+----+----+



### pyspark.sql.functions functions
****pyspark.sql.functions**** is collection of built-in functions for creating column expressions. These functions largely increase methods that we can use to manipulate DataFrame and DataFrame columns.

There are many sql functions from the pyspark.sql.functions module. Here I only choose a few to show how these functions extend the ability to create column expressions.

In [75]:
from pyspark.sql import functions as F

In [76]:
from pyspark.sql import Row
df = sc.parallelize([Row(x=1), Row(x=-1), Row(x=-2)]).toDF()
df.show()

+---+
|  x|
+---+
|  1|
| -1|
| -2|
+---+



In [82]:
F.abs(df.x)

Column<b'abs(x)'>

In [83]:
df.select(df.x, F.abs(df.x)).show()

+---+------+
|  x|abs(x)|
+---+------+
|  1|     1|
| -1|     1|
| -2|     2|
+---+------+



### concat(): create column expression that concatenates multiple column values into one

In [84]:
from pyspark.sql import Row

In [85]:
names = [Row(firstname="Ashok",lastname="kumar"),
         Row(firstname="Rama",lastname="Krishnan"),
         Row(firstname="Darris",lastname="Smith")]

In [87]:
df=sc.parallelize(names).toDF()

In [89]:
df.show()

+---------+--------+
|firstname|lastname|
+---------+--------+
|    Ashok|   kumar|
|     Rama|Krishnan|
|   Darris|   Smith|
+---------+--------+



In [90]:
df.select('firstname','lastname',F.concat(df.firstname,df.lastname)).show()

+---------+--------+---------------------------+
|firstname|lastname|concat(firstname, lastname)|
+---------+--------+---------------------------+
|    Ashok|   kumar|                 Ashokkumar|
|     Rama|Krishnan|               RamaKrishnan|
|   Darris|   Smith|                DarrisSmith|
+---------+--------+---------------------------+



### corr(): create column expression that returns pearson correlation coefficient between two columns

In [91]:
mtcars.select(F.corr(mtcars.drat, mtcars.wt)).show()

+-------------------+
|     corr(drat, wt)|
+-------------------+
|-0.7124406466973717|
+-------------------+



In [95]:
eval('mtcars.'+'drat')

Column<b'drat'>

In [97]:
mtcars.select(F.array([eval('mtcars.'+z) for z in mtcars.columns[1:]])).show(truncate=False)

+-----------------------------------------------------------------+
|array(mpg, cyl, disp, hp, drat, wt, qsec, vs, am, gear, carb)    |
+-----------------------------------------------------------------+
|[21.0, 6.0, 160.0, 110.0, 3.9, 2.62, 16.46, 0.0, 1.0, 4.0, 4.0]  |
|[21.0, 6.0, 160.0, 110.0, 3.9, 2.875, 17.02, 0.0, 1.0, 4.0, 4.0] |
|[22.8, 4.0, 108.0, 93.0, 3.85, 2.32, 18.61, 1.0, 1.0, 4.0, 1.0]  |
|[21.4, 6.0, 258.0, 110.0, 3.08, 3.215, 19.44, 1.0, 0.0, 3.0, 1.0]|
|[18.7, 8.0, 360.0, 175.0, 3.15, 3.44, 17.02, 0.0, 0.0, 3.0, 2.0] |
|[18.1, 6.0, 225.0, 105.0, 2.76, 3.46, 20.22, 1.0, 0.0, 3.0, 1.0] |
|[14.3, 8.0, 360.0, 245.0, 3.21, 3.57, 15.84, 0.0, 0.0, 3.0, 4.0] |
|[24.4, 4.0, 146.7, 62.0, 3.69, 3.19, 20.0, 1.0, 0.0, 4.0, 2.0]   |
|[22.8, 4.0, 140.8, 95.0, 3.92, 3.15, 22.9, 1.0, 0.0, 4.0, 2.0]   |
|[19.2, 6.0, 167.6, 123.0, 3.92, 3.44, 18.3, 1.0, 0.0, 4.0, 4.0]  |
|[17.8, 6.0, 167.6, 123.0, 3.92, 3.44, 18.9, 1.0, 0.0, 4.0, 4.0]  |
|[16.4, 8.0, 275.8, 180.0, 3.07, 4.07, 17.4, 0.0

### udf() function and sql types
The pyspark.sql.functions.udf() function is a very important function. It allows us to transfer a **user defined function** to a **pyspark.sql.functions** function which can act on columns of a DataFrame. It makes data framsformation much more flexible.

Using udf() could be tricky. The key is to understand how to define the returnType parameter.v

In [98]:
from pyspark.sql.types import *
from pyspark.sql.functions import udf

In [99]:
mtcars.show(3)

+-------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|        model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|    Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|   Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
+-------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 3 rows



In [100]:
# define function
def merge_two_columns(col1, col2):
    return([float(col1), float(col2)])

# convert user defined function into an udf function (sql function)
array_merge_two_columns_udf = udf(merge_two_columns, returnType=ArrayType(FloatType()))

In [102]:
array_col = array_merge_two_columns_udf(mtcars.disp, mtcars.hp)

In [103]:
all_original_cols = [eval('mtcars.' + x) for x in mtcars.columns]
all_new_cols = all_original_cols + [array_col]
all_new_cols


[Column<b'model'>,
 Column<b'mpg'>,
 Column<b'cyl'>,
 Column<b'disp'>,
 Column<b'hp'>,
 Column<b'drat'>,
 Column<b'wt'>,
 Column<b'qsec'>,
 Column<b'vs'>,
 Column<b'am'>,
 Column<b'gear'>,
 Column<b'carb'>,
 Column<b'merge_two_columns(disp, hp)'>]

In [104]:
mtcars.select(all_new_cols).show(5, truncate=False)

+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+---------------------------+
|model            |mpg |cyl|disp |hp |drat|wt   |qsec |vs |am |gear|carb|merge_two_columns(disp, hp)|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+---------------------------+
|Mazda RX4        |21.0|6  |160.0|110|3.9 |2.62 |16.46|0  |1  |4   |4   |[160.0, 110.0]             |
|Mazda RX4 Wag    |21.0|6  |160.0|110|3.9 |2.875|17.02|0  |1  |4   |4   |[160.0, 110.0]             |
|Datsun 710       |22.8|4  |108.0|93 |3.85|2.32 |18.61|1  |1  |4   |1   |[108.0, 93.0]              |
|Hornet 4 Drive   |21.4|6  |258.0|110|3.08|3.215|19.44|1  |0  |3   |1   |[258.0, 110.0]             |
|Hornet Sportabout|18.7|8  |360.0|175|3.15|3.44 |17.02|0  |0  |3   |2   |[360.0, 175.0]             |
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+---------------------------+
only showing top 5 rows



### ArrayType vs. StructType
Both ArrayType and StructType can be used to build returnType for a list. The difference is:

1. ArrayType requires all elements in the list have the same elementType, while StructType can have different elementTypes.
2. StructType represents a Row object.


### Define an ArrayType with elementType being FloatType.

In [105]:
# define function
def merge_two_columns(col1, col2):
    return([float(col1), float(col2)])
array_type = ArrayType(FloatType())
array_merge_two_columns_udf = udf(merge_two_columns, returnType=array_type)

### Define a StructType with one elementType being StringType and the other being FloatType.

In [110]:
# define function
def merge_two_columns(col1, col2):
    return([str(col1), float(col2)])
struct_type = StructType([
    StructField('f1', StringType()),
    StructField('f2', FloatType())
])
struct_merge_two_columns_udf = udf(merge_two_columns, returnType=struct_type)

In [111]:
array_col = array_merge_two_columns_udf(mtcars.hp, mtcars.disp)
array_col

Column<b'merge_two_columns(hp, disp)'>

In [112]:
struct_col = struct_merge_two_columns_udf(mtcars.model, mtcars.disp)
struct_col

Column<b'merge_two_columns(model, disp)'>

In [113]:
mtcars.select(array_col, struct_col).show(truncate=False)

+---------------------------+------------------------------+
|merge_two_columns(hp, disp)|merge_two_columns(model, disp)|
+---------------------------+------------------------------+
|[110.0, 160.0]             |[Mazda RX4, 160.0]            |
|[110.0, 160.0]             |[Mazda RX4 Wag, 160.0]        |
|[93.0, 108.0]              |[Datsun 710, 108.0]           |
|[110.0, 258.0]             |[Hornet 4 Drive, 258.0]       |
|[175.0, 360.0]             |[Hornet Sportabout, 360.0]    |
|[105.0, 225.0]             |[Valiant, 225.0]              |
|[245.0, 360.0]             |[Duster 360, 360.0]           |
|[62.0, 146.7]              |[Merc 240D, 146.7]            |
|[95.0, 140.8]              |[Merc 230, 140.8]             |
|[123.0, 167.6]             |[Merc 280, 167.6]             |
|[123.0, 167.6]             |[Merc 280C, 167.6]            |
|[180.0, 275.8]             |[Merc 450SE, 275.8]           |
|[180.0, 275.8]             |[Merc 450SL, 275.8]           |
|[180.0, 275.8]         

## Export data

In [115]:
from pyspark.sql import DataFrameWriter

In [116]:
mtcars = mtcars.coalesce(numPartitions=3)

In [117]:
mtcars.write.csv('data/saved-mtcars', header=True)

## SQL TEMP View Table

In [118]:
mtcars_df.createOrReplaceTempView("mtcars")

In [122]:
spark.sql("select * from mtcars where model like 'H%da%'").show()

+-----+-----------+----+---+----+---+----+-----+-----+---+---+----+----+
|index|      model| mpg|cyl|disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----+-----------+----+---+----+---+----+-----+-----+---+---+----+----+
|   18|Honda Civic|30.4|  4|75.7| 52|4.93|1.615|18.52|  1|  1|   4|   2|
+-----+-----------+----+---+----+---+----+-----+-----+---+---+----+----+



In [123]:
spark.sql("select mpg,cyl from mtcars").show()

+----+---+
| mpg|cyl|
+----+---+
|21.0|  6|
|21.0|  6|
|22.8|  4|
|21.4|  6|
|18.7|  8|
|18.1|  6|
|14.3|  8|
|24.4|  4|
|22.8|  4|
|19.2|  6|
|17.8|  6|
|16.4|  8|
|17.3|  8|
|15.2|  8|
|10.4|  8|
|10.4|  8|
|14.7|  8|
|32.4|  4|
|30.4|  4|
|33.9|  4|
+----+---+
only showing top 20 rows



In [130]:
spark.sql("SELECT ROW_NUMBER() OVER (PARTITION BY mpg ORDER BY mpg ) row_num,mpg,cyl FROM mtcars ").show()

+-------+----+---+
|row_num| mpg|cyl|
+-------+----+---+
|      1|15.5|  8|
|      1|17.3|  8|
|      1|13.3|  8|
|      1|19.7|  6|
|      1|21.4|  6|
|      2|21.4|  4|
|      1|15.8|  8|
|      1|27.3|  4|
|      1|19.2|  6|
|      2|19.2|  8|
|      1|24.4|  4|
|      1|33.9|  4|
|      1|18.1|  6|
|      1|15.2|  8|
|      2|15.2|  8|
|      1|14.7|  8|
|      1|21.0|  6|
|      2|21.0|  6|
|      1|14.3|  8|
|      1|22.8|  4|
+-------+----+---+
only showing top 20 rows



In [131]:
sql_with = "WITH cars AS (     SELECT         ROW_NUMBER() OVER(PARTITION BY mpg              ORDER BY                 mpg         ) row_num,         mpg,         cyl      FROM         mtcars ) SELECT     row_num,     mpg,     cyl FROM     cars WHERE row_num > 1"

In [132]:
spark.sql(sql_with).show()

+-------+----+---+
|row_num| mpg|cyl|
+-------+----+---+
|      2|21.4|  4|
|      2|19.2|  8|
|      2|15.2|  8|
|      2|21.0|  6|
|      2|22.8|  4|
|      2|10.4|  8|
|      2|30.4|  4|
+-------+----+---+



In [134]:
sql_with_1 = """
WITH cars AS (
    SELECT 
        ROW_NUMBER() OVER(PARTITION BY mpg
             ORDER BY 
                mpg
        ) row_num, 
        mpg, 
        cyl
     FROM 
        mtcars
) SELECT 
    row_num, 
    mpg, 
    cyl
FROM 
    cars
WHERE row_num = 1
"""

spark.sql(sql_with_1).show()

+-------+----+---+
|row_num| mpg|cyl|
+-------+----+---+
|      1|15.5|  8|
|      1|17.3|  8|
|      1|13.3|  8|
|      1|19.7|  6|
|      1|21.4|  6|
|      1|15.8|  8|
|      1|27.3|  4|
|      1|19.2|  6|
|      1|24.4|  4|
|      1|33.9|  4|
|      1|18.1|  6|
|      1|15.2|  8|
|      1|14.7|  8|
|      1|21.0|  6|
|      1|14.3|  8|
|      1|22.8|  4|
|      1|18.7|  8|
|      1|21.5|  4|
|      1|17.8|  6|
|      1|10.4|  8|
+-------+----+---+
only showing top 20 rows

