<pre><b>Q1 Python </b>
The following code separates given text into smaller texts based on
a desired width. Write a python function that replicates the same
behavior without using the textwrap.wrap function. Run your code
with an example and show the output. Explain your code and logic
with comments.

Example of the desired action with the library `textwrap`:
Input:
import textwrap
txt = 'AaaaaaaaaaaaAaaaaaaaaaaaAaaaaaaaaaaaAaaaaaaaaaaaAaaaaaaaaaaa'
textwrap.wrap(txt, 12)
Output:
['Aaaaaaaaaaaa', 'Aaaaaaaaaaaa', 'Aaaaaaaaaaaa',
'Aaaaaaaaaaaa', 'Aaaaaaaaaaaa']</pre>

In [0]:
import textwrap
txt = 'AaaaaaaaaaaaAaaaaaaaaaaaAaaaaaaaaaaaAaaaaaaaaaaaAaaaaaaaaaaa'
textwrap.wrap(txt, 12)

In [0]:
import re

my_str = 'AaaaaaaaaaaaAaaaaaaaaaaaAaaaaaaaaaaaAaaaaaaaaaaaAaaaaaaaaaaa'

my_list = re.findall('[a-zA-Z][^A-Z]*', my_str)

print(my_list)  # 👉️ ['Aaaaaaaaaaaa', 'Aaaaaaaaaaaa', 'Aaaaaaaaaaaa', 'Aaaaaaaaaaaa', 'Aaaaaaaaaaaa']


<pre><b>Q2 Knowledge </b>
Explain what Jobs, Stages and Tasks are and why Apache Spark splits up an application into Jobs, Stages and Tasks.
Explain what Transformations and Actions are. Give your reasoning of why these structures are built in this way. Give examples for both types.</pre>

<pre>
Overview of Spark Stages <br/>
Spark stages are the physical unit of execution for the computation of multiple tasks. The Spark stages are controlled by the Directed Acyclic Graph(DAG) for any data processing and transformations on the resilient distributed datasets(RDD). There are mainly two stages associated with the Spark frameworks such as, ShuffleMapStage and ResultStage. The Shuffle MapStage is the intermediate phase for the tasks which prepares data for subsequent stages, whereas resultStage is a final step to the spark function for the particular set of tasks in the spark job. ResultSet is associated with the initialization of parameter, counters and registry values in Spark.

The meaning of DAG is as follows:

Directed: All the nodes are connected to one another creating an acyclic graph. The sequence of this is determined by the actions called on the RDD.
Acyclic: The nodes are not connected as a cyclic loop i.e. if an action or a transformation was once done cannot be reverted back to its original value.
Graph: The entire pattern formed by the edges and vertices arranged together in a specific pattern is called a graph. Vertices are nothing but the RDD’s and the edges are the actions called on the RDD.
DAGScheduler is the one that divides the stages into a number of tasks. The DAGScheduler then passes the stage information to the cluster manager(YARN/Spark standalone) which triggers the task scheduler to run the tasks. Spark driver converts the logical plan to a physical execution plan. Spark jobs are executed in the pipelining method where all the transformation tasks are combined into a single stage.

Transformations <br/>
There are 2 kinds of transformations which take place:

1. Narrow Transformations: These are transformations that do not require the process of shuffling. These actions can be executed in a single stage.

Example: map() and filter()

2. Wide Transformations: These are transformations that require shuffling across various partitions. Hence it requires different stages to be created for communication across different partitions.

Example: ReduceByKey

Let’s take an example for a better understanding of how this works.

Example: In this example, we will see how a simple word count works using Spark DAGScheduler.

val data = sc.textFile(“data.txt”)
Result: data: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[46] at textFile at <console>:24

First, a textFile operation is performed to read the given input text file from the HDFS location.

data.flatMap(_.split(” “)).map(i=>(i,1)).reduceByKey(_ + _).collect
Result: res21: Array[(String, Int)] = Array()

Next, a flatMap operation is performed to split the lines in the entire input file into different words. Then a map operation is done to form (key, value) pairs like (word,1)  for each of the words. And the reduceByKey function is called to find the sum of counts for each word. Finally, the collective action will give the end result by collecting all the data.

<img src="https://github.com/SrushtiChauhan/DS-610_Data/blob/main/final/1.PNG?raw=true" alt="x">
  
During this program, 2 stages are created by Spark because a transformation is performed here. While transformation operation is done, shuffling needs to be performed because the data needs to be shuffled between 2 or more different partitions. Hence for this, a stage is created and then another single stage for the transformation task is created.
  
<img src="https://github.com/SrushtiChauhan/DS-610_Data/blob/main/final/2.PNG?raw=true" alt="x">
  
Also internally these stages will be divided into tasks. In this example, each stage is divided into 2 tasks since there are 2 partitions that exist. Each partition runs an individual task.

Types of Spark Stages <br/>
Here are the two types described in detail.

1. ShuffleMapStage
This is basically an intermediate stage in the process of DAG execution. The output of this stage is used as the input for further stage(s). The output of this is in the form of map output files which can be later used by reducing task. A ShuffleMapStage is considered ready when its all map outputs are available. Sometimes the output locations can be missing in cases where the partitions are either lost or not available.

This stage may contain many pipeline operations such as map() and filter() before the execution of shuffling. Internal registries outputLocs and _numAvailableOutputs are used by ShuffleMapStage to track the number of shuffle map outputs. A single ShuffleMapStage can be used commonly across various jobs.

2. ResultStage
As the name itself suggests, this is the final stage in a Spark job which performs an operation on one or more partitions of an RDD to calculate its result. Initialization of internal registries and counters is done by the ResultStage.

The DAGScheduler submits missing tasks if any to the ResultStage for computation. For computation, it requires various mandatory parameters such as stageId, stageAttempId, the broadcast variable of the serialized task, partition, preferred TaskLocations, outputId, some local properties, TaskMetrics of that particular stage. Some of the optional parameters required are Job Id, Application Id, and Application attempt Id.

Advantages of Spark Stages <br/>
Below are the different advantages of Spark Stages:

1. Dynamic allocation of executors <br/>
By seeing the Spark Job Event Timeline we can see that the allocation of executors is done dynamically. This means the executors are called from the cluster depending on the workload during the course of task execution. It is then released back to the cluster as soon as its job is done. This saves the resource allocation memory and allows the other applications running on the same cluster to reuse the executors. Hence the overall cluster utilization will increase and be optimal.
  
2. Caching <br/>
RDD’s are cached during the operations performed on them on each stage and stored in the memory. This is helpful in saving computational time when the end result requires the same RDD’s to be read again from HDFS.

3. Parallel execution <br/>
Spark jobs that are independent of each other are executed in parallel unless and until there is a shuffling required or the input of one stage is dependent on its previous output.

4. DAG Visualization <br/>
This is very helpful in cases of complex computations where a lot of operations and their dependencies are involved. Seeing this DAG Visualization, one can easily trace the flow and identify the performance blockages. Also, one can see each of the tasks run by each stage by clicking on the stages shown in this visualization. In this expanded view, all the details of the RDD’s which belong to this stage are shown.

5. Fault tolerance <br/>
Due to the caching operation performed on RDD’s, DAG will have a record of each action performed on them. Hence suppose in any case an RDD is lost, it can easily be retrieved with the help of DAG. Cluster manager can be used to identify the partition at which it was lost and the same RDD can be placed again at the same partition for data loss recovery.

Due to the above-mentioned benefits, Apache Spark is being widely used instead of the previously used MapReduce. It is nothing but an extended version of the MapReduce. Since MapReduce required the data to be read from and written to the HDFS multiple times, Spark was introduced which does these actions in its in-memory.

Conclusion <br/>
Hence we can conclude that it is more efficient because of their in-memory computation, increased processing speed even for iterative processing.
</pre>

<pre><b>Q3 Knowledge </b>
Explain what narrow and wide Transformations are and how they differ from each other. Give examples for both types.</pre>

Transformation in Spark <br/>
Spark Transformation is a function that produces new RDDfrom the existing RDDs. It takes RDD as input and produces one or more RDD as output. Each time it creates new RDD when we apply any transformation. As RDDs are immutable in nature, so input RDDs, cannot be changed.
An RDD lineage, built by Applying transformation built with the entire parent RDDs of the final RDD(s). In other words, it is also known as RDD operator graph or RDD dependency graph. It is a logical execution plan i.e., it is Directed Acyclic Graph (DAG) of the entire parent RDDs of RDD.

Transformations are lazy in nature i.e., they get execute when we call an action. They are not executed immediately. Two most basic type of transformations is a map(), filter().

Resultant RDD is always dissimilar from its parent RDD. It can be smaller (e.g. filter, count, distinct, sample), bigger (e.g. flatMap(), union(), Cartesian()) or the same size (e.g. map).

Now, let’s focus on the question, there are fundamentally two types of transformations:

1. Narrow transformation – <br/>
While talking about Narrow transformation, all the elements which are required to compute the records in single partition reside in the single partition of parent RDD. To calculate the result, a limited subset of partition is used. This Transformation are the result of map(), filter().

2. Wide Transformations –  <br/>
Wide transformation means all the elements that are required to compute the records in the single partition may live in many partitions of parent RDD. Partitions may reside in many different partitions of parent RDD. This Transformation is a result of groupbyKey() and reducebyKey().

<pre><b>Q4 Knowledge </b>
Which part of the Spark architecture is responsible for deciding how to subdivide the larger dataset into at 128 MB chunks?</pre>

The Spark Executors

The core responsibility of a Spark executor is to take the assigned tasks, run them, and report back their success or failure state and results. Each Spark application has its own separate executor processes.

<pre><b>Q5 Knowledge </b>
What term identifies the smallest unit of work in a Spark application?</pre>

Application - A user program built on Spark using its APIs. It consists of a driver program and executors on the cluster.

Job - A parallel computation consisting of multiple tasks that gets spawned in response to a Spark action (e.g., save(), collect()). During interactive sessions with Spark shells, the driver converts your Spark application into one or more Spark jobs. It then transforms each job into a DAG. This, in essence, is Spark’s execution plan, where each node within a DAG could be a single or multiple Spark stages.

Stage - Each job gets divided into smaller sets of tasks called stages that depend on each other. As part of the DAG nodes, stages are created based on what operations can be performed serially or in parallel. Not all Spark operations can happen in a single stage, so they may be divided into multiple stages. Often stages are delineated on the operator’s computation boundaries, where they dictate data transfer among Spark executors.

Task - A single unit of work or execution that will be sent to a Spark executor. Each stage is comprised of Spark tasks (a unit of execution), which are then federated across each Spark executor; each task maps to a single core and works on a single partition of data. As such, an executor with 16 cores can have 16 or more tasks working on 16 or more partitions in parallel, making the execution of Spark’s tasks exceedingly parallel!

 <br/>
<img src="https://i.stack.imgur.com/zxbzi.png?raw=true" alt="x">

<pre><b>Q6 Knowledge </b>
What term identifies the environment in which a task is executed?</pre>

<pre>
1. Spark Context
SparkContext is the main entry point to spark core. It allows us to access further functionalities of spark. This helps to establish a connection to spark execution environment. It provides access to spark cluster even with a resource manager. Sparkcontext act as master of spark application.

It offers various functions. Such as:

Getting the current status of spark application
Canceling the job
Canceling the Stage
Running job synchronously
Running job asynchronously
Accessing persistent RDD
Un-persisting RDD
Programmable dynamic allocation

2. Spark Shell
Apache spark provides interactive spark shell which allows us to run applications on. It helps in processing a large amount of data because it can read many types of data. Run/test of our application code interactively is possible by using spark shell.

3. Spark Application
Even when there is no job running, spark application can have processes running on its behalf. It is a self-contained computation that runs user-supplied code to compute a result.

4. Task
It is a unit of work, which we sent to the executor. Every stage has some task, one task per partition.

5. Job
It parallels computation consisting of multiple tasks.

6. Stages
Each job is divided into small sets of tasks which are known as stages.
</pre>

<pre><b>Q7 Knowledge </b>
Which two parts of the Spark architecture are run inside of a Java
Virtual Machine (JVM)?</pre>

All Spark components, including the Driver, Master, and Executor processes, run in Java virtual machines (JVMs).

<pre><b>Q8 Knowledge </b>
Explain the roles of the Spark Driver and why there is a need to such component in the Spark architecture.</pre>

Spark Driver works in conjunction with the Cluster Manager to control the execution of various other jobs. The cluster Manager does the task of allocating resources for the job. Once the job has been broken down into smaller jobs, which are then distributed to worker nodes, SparkDriver will control the execution.

<pre><b>Q9 Knowledge </b>
True or False, in a job with multiple stages, the execution of the secondary stages can be delayed by a single, slow-running task in the previous stage?</pre>

False

<pre><b>Q10 Knowledge </b>
Which part of the Spark architecture is responsible for deciding which task processes which piece of data?</pre>

The Spark Executors <br/>
The core responsibility of a Spark executor is to take the assigned tasks, run them, and report back their success or failure state and results. Each Spark application has its own separate executor processes.

<pre><b>Q11 Spark API </b>
Using sales.csv from the attached file:
Write a Spark code to return the sum, average, minimum, maximum value of sales amount by ordermonthyear & productcategory field.
Sort the results in descending order by ordermonthyear. Submit the screenshot of your code view showing query and the first 5 rows of the result.</pre>

In [0]:
# spark is from the previous example
sc = spark.sparkContext

# A CSV dataset is pointed to by path.
# The path can be either a single CSV file or a directory of CSV files
path = "/FileStore/tables/sales.csv"

df = spark.read.option("header", "true").csv(path)
#df.show()
# display(df)
df.select("SaleAmount","OrderMonthYear","ProductCategory").show(2)

# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("Sales")

sqlDF = spark.sql("SELECT OrderMonthYear,ProductCategory,sum(SaleAmount),avg(SaleAmount),min(SaleAmount),max(SaleAmount) FROM Sales group by OrderMonthYear, ProductCategory")
sqlDF.show()

<pre><b>Q12 Spark API </b>
Using movies dataset from databricks samples:
=> /databricks-datasets/cs110x/ml-20m/data-001/movies.csv
=> /databricks-datasets/cs110x/ml-20m/data-001/ratings.csv
Rank movies by their popularity i.e., count the ratings given for each movie. 
Sort the movies by their number of ratings in ascending order. 
Submit Python script and the screenshot of terminal’s output.</pre>

In [0]:
# spark is from the previous example
sc = spark.sparkContext
path1 = "/databricks-datasets/cs110x/ml-20m/data-001/movies.csv"
path2 = "/databricks-datasets/cs110x/ml-20m/data-001/ratings.csv"

df1 = spark.read.option("header", "true").csv(path1)
df2 = spark.read.option("header", "true").csv(path2)

#df1.show(2)
#df2.show(2)

df1.createOrReplaceTempView("Movies")
df2.createOrReplaceTempView("Ratings")

#sqlDF = spark.sql("select * from Movies m, Ratings r " + "where m.movieId == r.movieId")
#sqlDF.show()

spark.sql("select m.movieId,title,count(rating) from Movies m, Ratings r " + "where m.movieId == r.movieId group by m.movieId,m.title order by count(rating)").show()

<pre><b>Q13 Spark API </b>
For the ranking movies example, verify your result using SQL with SQLContext or SparkSession. Submit screenshots showing PySpark queries and the first few rows of the result set.</pre>

In [0]:
results = spark.sql("select m.movieId,title,count(rating) from Movies m, Ratings r " + 
                    "where m.movieId == r.movieId group by m.movieId,m.title order by count(rating) desc")
results.show()