In [15]:
import findspark
findspark.init()

import pyspark
from pyspark import SparkContext
from pyspark import SparkConf

In [18]:
## -------------------------------
## -- Use built-in Spark Server --
## -------------------------------
# conf = pyspark.SparkConf().set('spark.driver.host','127.0.0.1')
# sc = pyspark.SparkContext(master='local', appName='appPySpark',conf=conf)
conf = SparkConf().setAppName('appName').setMaster('local')
sc = SparkContext(conf=conf)
## -----------------------------
## -- Use Remote Spark Server --
## -----------------------------
## (Class of Spark Version compatibility is mandatory! Or, the following connection will fail!)
# conf = SparkConf().setAppName('appName').setMaster('spark://192.168.0.160:7077')
# sc = SparkContext(conf=conf)

In [19]:
inputRDD = sc.parallelize([("Z", 100),("A", 26),("B", 30),("C", 40),("B", 50),("A", 80),("B", 95),("C", 88)])

print('\n---- 1.) Find Max / Min Tuples with the same key: ----')
reduced_min=inputRDD.reduceByKey( lambda a,b: a if a < b else b).collect()
print(reduced_min)
reduced_max=inputRDD.reduceByKey( lambda a,b: b if a < b else a).collect()
print(reduced_max)

print('\n---- 2.) Aggregate Tuples with the same key and sport in acending order: ---') 
aggregated_tuples=inputRDD.reduceByKey(lambda a,b: a+b).sortBy(lambda x: x[1]).collect()
print(aggregated_tuples)

print('\n---- 3.) Find Max / Min Tuples being aggregated with the same key: ----')
## Find Find Max / Min Tuples being aggregated with the same key:
## -- 1.) Aggregated Same key of tuple (by sum of key's values with the same key)
## -- 2.) Find the Max/Min of the aggregated tuples.
aggregated_min=inputRDD.reduceByKey(lambda a,b: a+b).reduce( lambda a,b: a if a[1] < b[1] else b)
print(aggregated_min)
aggregated_max=inputRDD.reduceByKey(lambda a,b: a+b).reduce( lambda a,b: b if a[1] < b[1] else a)
print(aggregated_max)

print('\n---- 4.) Find the Max / Min Tuple (no aggregation for the same key: ----')
## Find the Max / Min Tuple (no aggregation for the same key):
the_min=inputRDD.reduce( lambda a,b: a if a[1] < b[1] else b)
print(the_min)
the_max=inputRDD.reduce( lambda a,b: b if a[1] < b[1] else a)
print(the_max)

print('\n---- 5.) Find the max / min value of all tuples: ----')
## Find the max / min value of all tuples:
min_value=inputRDD.values().reduce( lambda a,b: a if a < b else b)
print(min_value)
max_value=inputRDD.values().reduce( lambda a,b: b if a < b else a)
print(max_value)



---- 1.) Find Max / Min Tuples with the same key: ----
[('Z', 100), ('A', 26), ('B', 30), ('C', 40)]
[('Z', 100), ('A', 80), ('B', 95), ('C', 88)]

---- 2.) Aggregate Tuples with the same key and sport in acending order: ---
[('Z', 100), ('A', 106), ('C', 128), ('B', 175)]

---- 3.) Find Max / Min Tuples being aggregated with the same key: ----
('Z', 100)
('B', 175)

---- 4.) Find the Max / Min Tuple (no aggregation for the same key: ----
('A', 26)
('Z', 100)

---- 5.) Find the max / min value of all tuples: ----
26
100


In [14]:
sc.stop()