### Loading Spark

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sc = SparkContext()
spark = SparkSession(sparkContext = sc)

### Aggregations in Spark

#### aggregate(zeroValue, seqOp, combOp) 

In [11]:
spark_df = spark.read.csv(path = 'BankChurners.csv',  
                                  header = True, 
                                  inferSchema = True, 
                                  ).select('Customer_Age', 'Credit_Limit')
spark_df.take(5)

[Row(Customer_Age=45, Credit_Limit=12691.0),
 Row(Customer_Age=49, Credit_Limit=8256.0),
 Row(Customer_Age=51, Credit_Limit=3418.0),
 Row(Customer_Age=40, Credit_Limit=3313.0),
 Row(Customer_Age=40, Credit_Limit=4716.0)]

In [12]:
customer_age_mean = spark_df.select('Customer_Age').rdd.map(lambda x: x[0]).mean()
credit_limit_mean = spark_df.select('Credit_Limit').rdd.map(lambda x: x[0]).mean()
print('Customer Age Mean is: ', customer_age_mean, '\n', 'Credit Limit Mean is: ', credit_limit_mean)

Customer Age Mean is:  46.325960304137524 
 Credit Limit Mean is:  8631.953698035026


#### Building zeroValue, seqOp, combOp for aggregate function

In [13]:
zeroValue = (0, 0)
seqOp = lambda z, x: (z[0] + (x[0] - customer_age_mean) ** 2, z[1] + (x[1] - credit_limit_mean) ** 2)
combOp = lambda px, py: (px[0] + py[0], px[1] + py[1])
spark_df.rdd.aggregate(zeroValue, seqOp, combOp)

(650791.0050360541, 836466948462.1321)

In [25]:
spark_df = sc.textFile('iris.csv', use_unicode = True)
spark_df.take(5)

['sepal.length,sepal.width,petal.length,petal.width,species',
 '5.1,3.5,1.4,.2,Setosa',
 '4.9,3,1.4,.2,Setosa',
 '4.7,3.2,1.3,.2,Setosa',
 '4.6,3.1,1.5,.2,Setosa']

In [26]:
spark_df_rdd = spark_df.map(lambda x: x.split(',')).\
                    filter(lambda x: x[0] != 'sepal.length').\
                    map(lambda x: (x[-1], [*map(float, x[:-1])]))
spark_df_rdd.take(5)

[('Setosa', [5.1, 3.5, 1.4, 0.2]),
 ('Setosa', [4.9, 3.0, 1.4, 0.2]),
 ('Setosa', [4.7, 3.2, 1.3, 0.2]),
 ('Setosa', [4.6, 3.1, 1.5, 0.2]),
 ('Setosa', [5.0, 3.6, 1.4, 0.2])]

In [30]:
zeroValue = (0, 0)
seqOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
spark_df_rdd.aggregateByKey(zeroValue, seqOp, combOp).collect()

[('Versicolor', (296.8, 138.5)),
 ('Virginica', (329.3999999999999, 148.7)),
 ('Setosa', (250.29999999999998, 171.40000000000003))]