In [154]:
# create entry points to spark
try:
    sc.stop()
except:
    pass
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sc=SparkContext()
spark = SparkSession(sparkContext=sc)

## Aggregate functions
Two aggregate functions:

* `aggregate()`
* `aggregateByKey()`

### `aggregate(zeroValue, seqOp, combOp)`

* **zeroValue** is like a data container. Its structure should match with the data structure of the returned values from the seqOp function.
* **seqOp** is a function that takes two arguments: the first argument is the zeroValue and the second argument is an element from the RDD. The zeroValue gets updated with the returned value after every run.
* **combOp** is a function that takes two arguments: the first argument is the final zeroValue from one partition and the other is another final zeroValue from another partition.

The code below calculates the total sum of squares for **mpg** and **disp** in data set **mtcars**.

Step 1: get some data.

In [155]:
mtcars_df = spark.read.csv('data/SparkData/mtcars.csv', inferSchema=True, header=True).select(['mpg', 'disp'])
mtcars_df.take(5)

[Row(mpg=21.0, disp=160.0),
 Row(mpg=21.0, disp=160.0),
 Row(mpg=22.8, disp=108.0),
 Row(mpg=21.4, disp=258.0),
 Row(mpg=18.7, disp=360.0)]

Step 2: calculate averages of mgp and disp

In [156]:
mpg_mean = mtcars_df.select('mpg').rdd.map(lambda x: x[0]).mean()
disp_mean = mtcars_df.select('disp').rdd.map(lambda x: x[0]).mean()
print('mpg mean = ', mpg_mean, '; ' 
      'disp mean = ', disp_mean)

[Stage 3:>                                                          (0 + 1) / 1]                                                                                

mpg mean =  20.090625000000003 ; disp mean =  230.721875


Step 3: build **zeroValue, seqOp** and **combOp**

We are calculating two TSS. We create a tuple to store two values.

In [157]:
zeroValue = (0, 0) 

The **z** below refers to `zeroValue`. Its values get updated after every run. The **x** refers to an element in an RDD partition. In this case, both **z** and **x** have two values.

In [158]:
seqOp = lambda z, x: (z[0] + (x[0] - mpg_mean)**2, z[1] + (x[1] - disp_mean)**2)

The `combOp` function simply aggrate all `zeroValues` into one. 

In [159]:
combOp = lambda px, py: ( px[0] + py[0], px[1] + py[1] )

Implement `aggregate()` function.

In [160]:
mtcars_df.rdd.aggregate(zeroValue, seqOp, combOp)

(1126.0471874999998, 476184.7946875)

## `aggregateByKey(zeroValue, seqOp, combOp)`

This function does similar things as `aggregate()`. The `aggregate()` aggregate all results to the very end, but aggregateByKey() merge results by key.

### Import data

In [8]:
x=['hello', 'world', 'good', 'hello']

In [9]:
zero_value = 0
seqOp = (lambda x, y: x + y)
combOp = (lambda x, y: x + y)

In [10]:
zz=sc.parallelize(x).map(lambda x: (x,1))

In [11]:
zz.collect()

[('hello', 1), ('world', 1), ('good', 1), ('hello', 1)]

In [12]:
zz.aggregateByKey(zero_value,seqOp,combOp).collect()

[('good', 1), ('hello', 2), ('world', 1)]

In [13]:
iris_rdd = sc.textFile('data/SparkData/iris.csv', use_unicode=True)
iris_rdd.take(2)

['sepal_length,sepal_width,petal_length,petal_width,species',
 '5.1,3.5,1.4,0.2,setosa']

### Transform data to a tuple RDD

In [14]:
iris_rdd_2 = iris_rdd.map(lambda x: x.split(',')).\
    filter(lambda x: x[0] != 'sepal_length').\
    map(lambda x: (x[-1], [*map(float, x[:-1])]))
iris_rdd_2.take(5)

[('setosa', [5.1, 3.5, 1.4, 0.2]),
 ('setosa', [4.9, 3.0, 1.4, 0.2]),
 ('setosa', [4.7, 3.2, 1.3, 0.2]),
 ('setosa', [4.6, 3.1, 1.5, 0.2]),
 ('setosa', [5.0, 3.6, 1.4, 0.2])]

### Define initial values, seqOp and combOp

In [15]:
zero_value = (0, 0, 0, 0)
seqOp = (lambda x, y: (x[0] + (y[0])**2, x[1] + (y[1])**2, x[2] + (y[2])**2, x[3] + (y[3])**2))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1], x[2]+y[2], x[3] + y[3]))

### Implement `aggregateByKey()`

In [16]:
iris_rdd_2.aggregateByKey(zero_value, seqOp, combOp).collect()

[('setosa',
  (1259.0899999999997,
   591.2500000000002,
   108.63999999999997,
   3.5400000000000005)),
 ('versicolor', (1774.8600000000001, 388.47, 918.2, 89.83)),
 ('virginica', (2189.9000000000005, 447.33, 1556.1599999999994, 208.93))]

In [17]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix

In [140]:
rows = sc.parallelize([
                    Vectors.dense(0.0, 1.0, 2.0),
                    Vectors.dense(3.0, 4.0, 5.0),
])

In [141]:
mat = RowMatrix(rows)

In [142]:
svd = mat.computeSVD(2, computeU=True)



In [143]:
U = svd.U       # The U factor is a RowMatrix.

In [144]:
s = svd.s       # The singular values are stored in a local dense vector.

In [145]:
V = svd.V       # The V factor is a local dense matrix.

In [146]:
U.numRows()

2

In [147]:
U.numCols()

2

In [148]:
U.numRows()

2

In [149]:
uMatrix=[]
for i in U.rows.collect():
    uMatrix.append([i[0],i[1]])
uMatrix

[[-0.27472112789737796, -0.9615239476408239],
 [-0.9615239476408228, 0.27472112789737846]]

In [128]:
import numpy as np
u=np.matrix(uMatrix)
uT=u.transpose()
u




matrix([[-2.60154843e-01, -1.90043939e-01,  8.80774787e-01,
          3.47042554e-01, -4.42378223e-09, -9.31322575e-09,
         -2.03726813e-09],
        [-4.87250158e-01, -1.62282993e-01,  1.53686688e-01,
         -8.44175287e-01, -9.31322575e-10,  1.49011612e-08,
         -2.32830644e-10],
        [-7.58264730e-01, -2.56908256e-01, -4.39763758e-01,
          4.06989662e-01, -3.16649675e-08,  1.49011612e-08,
          7.79982656e-09],
        [-3.46325484e-01,  9.33566109e-01,  8.49942012e-02,
          3.59021663e-02, -1.28056854e-09, -2.79396772e-09,
         -1.26892701e-08]])

In [129]:
uT

matrix([[-2.60154843e-01, -4.87250158e-01, -7.58264730e-01,
         -3.46325484e-01],
        [-1.90043939e-01, -1.62282993e-01, -2.56908256e-01,
          9.33566109e-01],
        [ 8.80774787e-01,  1.53686688e-01, -4.39763758e-01,
          8.49942012e-02],
        [ 3.47042554e-01, -8.44175287e-01,  4.06989662e-01,
          3.59021663e-02],
        [-4.42378223e-09, -9.31322575e-10, -3.16649675e-08,
         -1.28056854e-09],
        [-9.31322575e-09,  1.49011612e-08,  1.49011612e-08,
         -2.79396772e-09],
        [-2.03726813e-09, -2.32830644e-10,  7.79982656e-09,
         -1.26892701e-08]])

In [119]:
np.matmul(u,uT)

matrix([[ 0.10379724,  0.15760139,  0.2460901 , -0.08732033],
        [ 0.15760139,  0.26374849,  0.41115645,  0.01724524],
        [ 0.2460901 ,  0.41115645,  0.64096725,  0.02276556],
        [-0.08732033,  0.01724524,  0.02276556,  0.99148702]])

In [150]:
s.values

array([7.34846923, 1.        ])

In [59]:
help(mat.computeSVD)

Help on method computeSVD in module pyspark.mllib.linalg.distributed:

computeSVD(k: int, computeU: bool = False, rCond: float = 1e-09) -> 'SingularValueDecomposition[RowMatrix, Matrix]' method of pyspark.mllib.linalg.distributed.RowMatrix instance
    Computes the singular value decomposition of the RowMatrix.
    
    The given row matrix A of dimension (m X n) is decomposed into
    U * s * V'T where
    
    - U: (m X k) (left singular vectors) is a RowMatrix whose
      columns are the eigenvectors of (A X A')
    - s: DenseVector consisting of square root of the eigenvalues
      (singular values) in descending order.
    - v: (n X k) (right singular vectors) is a Matrix whose columns
      are the eigenvectors of (A' X A)
    
    For more specific details on implementation, please refer
    the Scala documentation.
    
    .. versionadded:: 2.2.0
    
    Parameters
    ----------
    k : int
        Number of leading singular values to keep (`0 < k <= n`).
        It might re

In [151]:
V.values

array([-0.39254051, -0.56077215, -0.7290038 ,  0.82416338,  0.13736056,
       -0.54944226])

In [152]:
V.numRows

3

In [153]:
V.numCols

2

In [164]:
df=spark.read.csv('data/SparkData/mtcars.csv', inferSchema=True, header=True)

In [165]:
df.show(5)

23/04/15 10:37:06 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , mpg, cyl, disp, hp, drat, wt, qsec, vs, am, gear, carb
 Schema: _c0, mpg, cyl, disp, hp, drat, wt, qsec, vs, am, gear, carb
Expected: _c0 but found: 
CSV file: file:///Users/user/BerkeleySpark/data/SparkData/mtcars.csv
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|              _c0| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|        Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|    Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|       Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|   Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
|Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 5 r

In [225]:
car_df=df.select(['cyl', 'mpg', 'disp'])

In [226]:
car_df.show(5)

+---+----+-----+
|cyl| mpg| disp|
+---+----+-----+
|  6|21.0|160.0|
|  6|21.0|160.0|
|  4|22.8|108.0|
|  6|21.4|258.0|
|  8|18.7|360.0|
+---+----+-----+
only showing top 5 rows



In [227]:
car_df.createOrReplaceTempView('car')

In [235]:
car_mean_df=spark.sql("select cyl as cylinder, avg(mpg) as mean_mpg, avg(disp) as mean_disp from car group by 1")

In [236]:
car_mean_df.show(5)

+--------+------------------+------------------+
|cylinder|          mean_mpg|         mean_disp|
+--------+------------------+------------------+
|       6| 19.74285714285714|183.31428571428572|
|       4|26.663636363636364|105.13636363636364|
|       8|15.100000000000003|353.09999999999997|
+--------+------------------+------------------+



In [237]:
car_df=car_df.withColumnRenamed("cyl","cylinder")
car_df.show(5)
car_mean_df.show(5)

+--------+----+-----+
|cylinder| mpg| disp|
+--------+----+-----+
|       6|21.0|160.0|
|       6|21.0|160.0|
|       4|22.8|108.0|
|       6|21.4|258.0|
|       8|18.7|360.0|
+--------+----+-----+
only showing top 5 rows

+--------+------------------+------------------+
|cylinder|          mean_mpg|         mean_disp|
+--------+------------------+------------------+
|       6| 19.74285714285714|183.31428571428572|
|       4|26.663636363636364|105.13636363636364|
|       8|15.100000000000003|353.09999999999997|
+--------+------------------+------------------+



In [239]:
car_df.createOrReplaceTempView("car_df")
car_mean_df.createOrReplaceTempView("car_mean_df")
car_mean_combined_df=spark.sql("select a.cylinder, a.mpg, b.mean_mpg, a.disp, b.mean_disp from car_df a inner join \
           car_mean_df b on a.cylinder = b.cylinder")
car_mean_combined_df.show(5)





+--------+----+------------------+-----+------------------+
|cylinder| mpg|          mean_mpg| disp|         mean_disp|
+--------+----+------------------+-----+------------------+
|       6|21.0| 19.74285714285714|160.0|183.31428571428572|
|       6|21.0| 19.74285714285714|160.0|183.31428571428572|
|       4|22.8|26.663636363636364|108.0|105.13636363636364|
|       6|21.4| 19.74285714285714|258.0|183.31428571428572|
|       8|18.7|15.100000000000003|360.0|353.09999999999997|
+--------+----+------------------+-----+------------------+
only showing top 5 rows



In [240]:
car_mean_combined_rdd=car_mean_combined_df.rdd.map(lambda x: (x[0], (x[1], x[2], x[3], x[4])))

In [241]:
car_mean_combined_rdd.take(4)
    
    

[(6, (21.0, 19.74285714285714, 160.0, 183.31428571428572)),
 (6, (21.0, 19.74285714285714, 160.0, 183.31428571428572)),
 (4, (22.8, 26.663636363636364, 108.0, 105.13636363636364)),
 (6, (21.4, 19.74285714285714, 258.0, 183.31428571428572))]

In [245]:
car_mean_square_rdd=car_mean_combined_rdd.map(lambda x: [x[0], ((x[1][0]-x[1][1])**2, (x[1][2]-x[1][3])**2)])
car_mean_square_rdd.take(5)

[[6, (1.5804081632653129, 543.5559183673471)],
 [6, (1.5804081632653129, 543.5559183673471)],
 [4, (14.92768595041322, 8.200413223140474)],
 [6, (2.746122448979596, 5577.955918367346)],
 [8, (12.959999999999972, 47.61000000000047)]]

Let's compute the squqre mean error between sum((mpg-mean_mpg)^2) and sum((disp-mean_disp)^) based on same key
which is model, and use aggregateByKey API

In [246]:
zero_value = (0, 0) #zero_value[0] is for mpg, zero_value[1] is for disp
seqOp = (lambda x, y: (x[0]+y[0], x[1]+y[1])) 
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))

In [247]:
car_mean_square_rdd.aggregateByKey(zero_value, seqOp, combOp).collect()

[(6, (12.677142857142847, 10364.628571428573)),
 (4, (203.38545454545448, 7220.825454545454)),
 (8, (85.19999999999999, 59708.38))]