# Problem - solution approach

# Spark Architecture and the Resilient Distributed Dataset
The main components of the Spark architecture are the driver (accessed throw SparkContect object in pyspark) and executors. For each PySpark application, there will be one driver program and one or more executors running on the cluster slave machine. You might be wondering, what is an application in the context of PySpark? An application is a whole bunch of code used to solve a problem.

The driver is the process that coordinates with many executors running on various slave machines. Spark follows a master/slave architecture. The SparkContext object is created by the driver. SparkContext is the main entry point to a PySpark application.

**We can perform two types of operations on the RDD:** transformation and action              . Transformation on an RDD returns another RDD. We know that RDDs are immutable; therefore, changing the RDD is impossible. Hence transformations always return another RDD. Transformations are lazy, whereas actions are eagerly evaluated. I say that the transformation is lazy because whenever a transformation is applied to an RDD, that operation is not applied to the data at the same time. Instead, PySpark notes the operation request, but all the transformations are applied when the first action is called.

## Problem1: Create an RDD

In [2]:
# a python list of float
plist = [1.2 , 2.3 , 3.4 , 4.5 , 2.4 , 2.3, 4.0 ]

In [5]:
display(plist)

[1.2, 2.3, 3.4, 4.5, 2.4, 2.3, 4.0]

In [7]:
#init the pyspark
import findspark
findspark.init('/opt/spark/spark-3.0.1-bin-hadoop2.7')

In [8]:
#create the session and sparkcontext
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("RDD").master("local[*]").getOrCreate()
sc = spark.sparkContext
print(spark)
print(spark.sparkContext)

<pyspark.sql.session.SparkSession object at 0x0000018F492E7610>
<SparkContext master=local[*] appName=RDD>


In [10]:
sc = spark.sparkContext
parPythonData = sc.parallelize(plist,2)

In [12]:
# a lazy definition
parPythonData

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:262

In [11]:
# now we avaluate
parPythonData.collect()

[1.2, 2.3, 3.4, 4.5, 2.4, 2.3, 4.0]

In [13]:
parPythonData.first()

1.2

In [14]:
parPythonData.take(3)

[1.2, 2.3, 3.4]

In [15]:
# the number of // partitions
parPythonData.getNumPartitions()

2

## Problem2 : You are given daily temperatures in Fahrenheit. You want to perform some analysis on that data. But your new software takes input in Celsius!

$$^oC = (^oF – 32) × 5/9$$

In [16]:
tempData = [59,57.2,53.6,55.4,51.8,53.6,55.4]
parTempData = sc.parallelize(tempData,4)

In [18]:
ftoc = lambda tempf: (tempf -32) *5/9

In [20]:
ftoc(80)

26.666666666666668

In [21]:
parCentigradeData = parTempData.map(ftoc)

In [22]:
parCentigradeData.collect()

[15.0, 14.000000000000002, 12.0, 13.0, 10.999999999999998, 12.0, 13.0]

### Filtering temp greater than $13^o$ C

In [23]:
tmorethan13 = lambda t: t >=13

In [24]:
#more general
tmorethan = lambda threshold: lambda t: t >= threshold

In [25]:
tmorethan(13)

<function __main__.<lambda>.<locals>.<lambda>(t)>

In [26]:
filteredTemprature = parCentigradeData.filter(tmorethan13)

In [27]:
filteredTemprature.collect()

[15.0, 14.000000000000002, 13.0, 13.0]

In [28]:
parCentigradeData.filter(tmorethan(14)).collect()

[15.0, 14.000000000000002]

## problem 3: Data manipulation and run aggregation operations (avg, sums, ...).

given data indicating student grades for a two-sessions exams. eight students are enrolled in this course.
Calculate the following:
* Average grades per session, each semester, for each student
* Top three students who have the highest average grades in the second year
* Bottom three students who have the lowest average grades in the second year
* All students who have earned more than an 80% average in the second semester of the second semester

In [33]:
rawstudentsfile = sc.textFile('students.txt')

In [44]:
rawstudentsfile.getNumPartitions()

2

In [34]:
rawstudentsfile.take(3)

['Student\tSemester\tGrade1\tGarde2', 'st1\ts1\t65\t75', 'st1\ts2\t74\t90']

In [37]:
cleanstudentsfiles = rawstudentsfile.filter(lambda line: line.startswith('st'))

In [38]:
cleanstudentsfiles.collect()

['st1\ts1\t65\t75',
 'st1\ts2\t74\t90',
 'st2\ts1\t75\t85',
 'st2\ts2\t70\t80',
 'st3\ts1\t78\t75',
 'st3\ts2\t81\t85',
 'st4\ts1\t20\t50',
 'st4\ts2\t60\t65',
 'st5\ts1\t65\t75',
 'st5\ts2\t74\t90',
 'st6\ts1\t75\t85',
 'st6\ts2\t77\t88',
 'st7\ts1\t76\t91',
 'st7\ts2\t65\t45',
 'st8\ts1\t70\t70',
 'st8\ts2\t80\t75']

In [42]:
studentMarksData = cleanstudentsfiles.map(lambda line: line.split('\t'))

In [45]:
studentMarksData.getNumPartitions()

2

In [43]:
studentMarksData.collect()

[['st1', 's1', '65', '75'],
 ['st1', 's2', '74', '90'],
 ['st2', 's1', '75', '85'],
 ['st2', 's2', '70', '80'],
 ['st3', 's1', '78', '75'],
 ['st3', 's2', '81', '85'],
 ['st4', 's1', '20', '50'],
 ['st4', 's2', '60', '65'],
 ['st5', 's1', '65', '75'],
 ['st5', 's2', '74', '90'],
 ['st6', 's1', '75', '85'],
 ['st6', 's2', '77', '88'],
 ['st7', 's1', '76', '91'],
 ['st7', 's2', '65', '45'],
 ['st8', 's1', '70', '70'],
 ['st8', 's2', '80', '75']]

In [46]:
studentMarksDataRDD = studentMarksData.repartition(4)

In [47]:
studentMarksDataRDD.getNumPartitions()

4

In [48]:
studentMarksDataRDD.collect()

[['st4', 's2', '60', '65'],
 ['st5', 's1', '65', '75'],
 ['st5', 's2', '74', '90'],
 ['st6', 's1', '75', '85'],
 ['st6', 's2', '77', '88'],
 ['st7', 's1', '76', '91'],
 ['st7', 's2', '65', '45'],
 ['st8', 's1', '70', '70'],
 ['st8', 's2', '80', '75'],
 ['st1', 's1', '65', '75'],
 ['st1', 's2', '74', '90'],
 ['st2', 's1', '75', '85'],
 ['st2', 's2', '70', '80'],
 ['st3', 's1', '78', '75'],
 ['st3', 's2', '81', '85'],
 ['st4', 's1', '20', '50']]

### Calculating Average Session Grades

In [52]:
studentMarksMean = studentMarksDataRDD.map(lambda x : [x[0],x[1],(int(x[2])+int(x[3]))/2])

$$[x_0,x_1,x_2,x_3] \mapsto [x_0,x_1,(x_2+x_3)/2]$$

In [53]:
studentMarksMean.collect()

[['st4', 's2', 62.5],
 ['st5', 's1', 70.0],
 ['st5', 's2', 82.0],
 ['st6', 's1', 80.0],
 ['st6', 's2', 82.5],
 ['st7', 's1', 83.5],
 ['st7', 's2', 55.0],
 ['st8', 's1', 70.0],
 ['st8', 's2', 77.5],
 ['st1', 's1', 70.0],
 ['st1', 's2', 82.0],
 ['st2', 's1', 80.0],
 ['st2', 's2', 75.0],
 ['st3', 's1', 76.5],
 ['st3', 's2', 83.0],
 ['st4', 's1', 35.0]]

In [54]:
secondSemMarks = studentMarksMean.filter(lambda x : "s2" in x)

In [55]:
secondSemMarks.collect()

[['st4', 's2', 62.5],
 ['st5', 's2', 82.0],
 ['st6', 's2', 82.5],
 ['st7', 's2', 55.0],
 ['st8', 's2', 77.5],
 ['st1', 's2', 82.0],
 ['st2', 's2', 75.0],
 ['st3', 's2', 83.0]]

In [56]:
sortedMarksData = secondSemMarks.sortBy(keyfunc = lambda x : -x[2])

In [57]:
sortedMarksData.collect()

[['st3', 's2', 83.0],
 ['st6', 's2', 82.5],
 ['st5', 's2', 82.0],
 ['st1', 's2', 82.0],
 ['st8', 's2', 77.5],
 ['st2', 's2', 75.0],
 ['st4', 's2', 62.5],
 ['st7', 's2', 55.0]]

In [58]:
secondSemMarks.sortBy(keyfunc = lambda x : x[2]).collect()

[['st7', 's2', 55.0],
 ['st4', 's2', 62.5],
 ['st2', 's2', 75.0],
 ['st8', 's2', 77.5],
 ['st5', 's2', 82.0],
 ['st1', 's2', 82.0],
 ['st6', 's2', 82.5],
 ['st3', 's2', 83.0]]

In [59]:
# the top 3
sortedMarksData.take(3)

[['st3', 's2', 83.0], ['st6', 's2', 82.5], ['st5', 's2', 82.0]]

We have our answer. But can we optimize it further? In order to get top-three data, we are sorting the whole list. We can optimize this by using the takeOrdered() function. This function takes two arguments: the number of elements we require, and key, which uses a lambda function to determine how to take the data out.

In [60]:
topThreeStudents = secondSemMarks.takeOrdered(num=3, key = lambda x :-x[2])

In [61]:
topThreeStudents

[['st3', 's2', 83.0], ['st6', 's2', 82.5], ['st5', 's2', 82.0]]

In order to print the result, we are not using the collect() function to get the data. Remember that transformation creates another RDD, so we require the collect() function to collect data. But an action will directly fetch the data to the driver, and collect() is not required. So you can conclude that the takeOrdered() function is an action.