<a href="https://colab.research.google.com/github/Anvisimi/BEAD2025/blob/main/colab/03_Apache_Spark_Core_Functions_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Introduction to Apache PySpark
In this demo we will see how we can run PySpark in a Google
 Colaboratory notebook. We will also perform some basic data exploratory tasks common to data science problems.



## PySpark Install

The first step involves installing pyspark.  The next step is to install findspark library.

*Note: the --ignore-install flag is used to ignore previous installations and use the latest one built alongside the allocated cluster.*


In [52]:
# install pyspark using pip
!pip install --ignore-install -q pyspark
# install findspark using pip
!pip install --ignore-install -q findspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m6.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


## Spark Session

We import the basic object SparkSession from the Spark Framework. In PySpark, a Spark Session is a unified entry point for reading data, configuring the system, and managing various Spark services.

Here's a breakdown of what the Spark Session does:

1. Unified Entry Point: It's the central point to access all Spark  functionalities, making it simpler and more intuitive to use Spark for development.
2. Data Reading and Writing: We use the Spark Session to read data from various sources (like HDFS, S3, JDBC, Hive, etc.) and write data to various sinks.
3. Configuration Management: It allows us to configure various aspects of the Spark application, such as setting configuration parameters.
4. Creating DataFrames and Datasets: The Spark Session provides methods to create DataFrames and Datasets, which are the core data structures in Spark.
5. Execution of SQL Queries: We can run SQL queries by using the Spark Session, especially when dealing with structured data.
6. Managing Spark Services: It also helps in managing underlying Spark services like SparkContext, and it's the main point of interaction when dealing with structured data.

In PySpark, a Spark Session is created using the SparkSession.builder method. Here's an example:

In [53]:
from pyspark.sql import SparkSession
# import collections
spark = SparkSession.builder.master("local").appName("My App ").getOrCreate()

In [54]:
spark.sparkContext

## Word Count
To count the number of words from a file.

In [55]:
wc = spark.sparkContext.textFile("demo.txt") \
   .flatMap(lambda line: line.split(" ")) \
   .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b)
print(wc.collect())

[('apache', 3), ('kafka', 1), ('spark', 1), ('tomcat', 1)]


In [56]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## Line Count
To count the number of lines from a file.

In [57]:
from pyspark.sql import SparkSession
someFile = "wordcount.txt"
# the above file is under your pythonProject folder
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
print(spark.read.text(someFile).count())


20


## Mounting Google Drive
Connect to Google Drive

In [58]:
# to read in data from a text file, first upload the data file into your google drive and then mount your google drive onto colab
from google.colab import drive
# to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True)
drive.mount('/content/drive', force_remount=True)


Mounted at /content/drive


In [59]:
from pyspark.sql import SparkSession
someFile = "/content/drive/MyDrive/data/Customer.csv"
# the above file is under your pythonProject folder
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
print(spark.read.text(someFile).count())


51


## Data Creation

### Create a simple from spark RDD
In this examples below we want to see how to create a simple data structure using spark core commands

#### Example 1: From RDD
To create an RDD using a SparkSession in PySpark, you first need to initialize a SparkSession and then use it to create an RDD. Here's a simple example where we'll create an RDD from a tuple of numbers using a SparkSession

In [60]:
# Create an RDD from a list of numbers
numbers = (1, 2, 3, 4, 5)
numbers_rdd = spark.sparkContext.parallelize(numbers)
# Syntax print(spark.sparkContext.parallelize("(A B C)").collect())
print(f"using collect() function",numbers_rdd.collect())



using collect() function [1, 2, 3, 4, 5]


In [61]:
#Alternative is to convert to a new data structure that is print friendly
print(f"using pipeline of functions")
print(tuple(spark.sparkContext.parallelize(numbers).collect()))


using pipeline of functions
(1, 2, 3, 4, 5)


In [62]:
outfile = spark.read.text("/content/drive/MyDrive/data/guttenberg/AliceAdventuresInWonderland.txt")

Now let us count the lines in the out.txt

In [63]:
print(outfile.count())

3760


#### Example 2: From Local File

To create an RDD in PySpark by reading data from a CSV file, such as "customers.csv", you'll use a SparkSession to read the CSV and then convert the DataFrame to an RDD. Here's a step-by-step example:

1. Initialize a SparkSession. (Done)
2. Read the "customers.csv" file into a DataFrame.
3. Convert the DataFrame to an RDD.
4. Perform a simple action on the RDD, like counting the number of records.

Here's the code snippet for this process:

In [64]:
customers = spark.read.csv("/content/drive/MyDrive/data/Customer.csv", header=True, inferSchema=True).rdd

# Perform a simple action: count the number of records
record_count = customers.count()
print(f"Number of records: {record_count}")


Number of records: 50


#### Example 2: From Local TEXT File

To create an RDD in PySpark by reading data from a CSV file, such as "customers.csv", you'll use a SparkSession to read the CSV and then convert the DataFrame to an RDD. Here's a step-by-step example:

1. Initialize a SparkSession. (Done)
2. Read the "airports.txt" file into a DataFrame.
3. Convert the DataFrame to an RDD.
4. Perform a simple actions on this RDD latere.

A text dataset is pointed to by path. The path can be either a single text file or a directory of text files.

In [65]:
airports = spark.read.text("/content/drive/MyDrive/data/airport-data/airports.text")

## Actions and Transformation

In PySpark, operations on RDDs can be broadly classified into two categories: transformations and actions. Transformations create a new RDD from an existing one, while actions return a value after running a computation on the RDD. Below are simple examples demonstrating the use of transformations and actions.

###Transformations

####Map
Applies a function to each element and returns a new RDD.


In [66]:
rdd = spark.sparkContext.parallelize((1, 2, 3, 4, 5))
# Traditional Python map(function, collection) (few MBs - GB fails)
# Scalabale map (Peta - support)
# iterablecollection.map(function) -> Object
# collect() Object to collection
print(tuple(rdd.map(lambda x: x * x).collect()))

(1, 4, 9, 16, 25)


####Filter
Returns a new RDD containing only the elements that satisfy a condition.

In [67]:
print(rdd.filter(lambda x: x % 2 == 0).collect())  # Keeps even numbers


[2, 4]


####FlatMap
Similar to map, but each input item can be mapped to 0 or more output items.

In [68]:
words = spark.sparkContext.parallelize(["hello world", "hi", "hello mars", "hello jupiter", "hello saturn"])
print(words.flatMap(lambda x: x.split(" ")).collect())



['hello', 'world', 'hi', 'hello', 'mars', 'hello', 'jupiter', 'hello', 'saturn']


####Distinct
Returns a new RDD containing distinct elements from the original RDD.

In [69]:
print(words.flatMap(lambda x: x.split(" ")).distinct().collect())

['hello', 'world', 'hi', 'mars', 'jupiter', 'saturn']


In [70]:
tuples = spark.sparkContext.parallelize((1, 1, 2, 3, 3, 4))
print(tuples.distinct().collect())


[1, 2, 3, 4]


###Actions
####Collect
Returns all the elements of the RDD as an array to the driver program.

In [71]:
print(tuples.distinct().collect())

[1, 2, 3, 4]


####Count
Returns the number of elements in the RDD.

In [72]:
print(tuples.count())

6


####Take
Returns an array with the first n elements of the RDD.

In [73]:
first_three = tuples.take(3)
print(first_three)


[1, 1, 2]


In [74]:
customers.take(2)

[Row(CustomerID=1000, CustomerName='Lou Anna Tan', MemberCategory='A', Age=29, Gender='F', AmountSpent=4.14, Address='Blk 26, Telok Blangah Crescent #22-87, Singapore 0409', City='Frankfurt', CountryCode='GER', ContactTitle='Ms', PhoneNumber=2732287),
 Row(CustomerID=1001, CustomerName='Wong Sook Huey', MemberCategory='A', Age=37, Gender='F', AmountSpent=67.1, Address='Blk 1007 Teresa Ville Lower Delta Road #06-02, Singapore 0410', City='Singapore', CountryCode='SIN', ContactTitle='Ms', PhoneNumber=2740975)]

####Reduce
Aggregates the elements of the RDD using a function.

In [86]:
sum1 = tuples.reduce(lambda a, b: a + b)
print(sum1)


14


These examples illustrate basic operations in PySpark, allowing you to manipulate and analyze large datasets efficiently. To run these examples, ensure you have a SparkContext (sc) initialized in your PySpark environment.

### How to pretty print in PySaprk?

The take() function and iteration in PySpark will mimic the pretty print function, but use them wisely.

In [76]:
pprint()

Pretty printing has been turned OFF


In [77]:
print("First five records of customer data set", customers.take(5))
print("Not so pretty....")
print("Now let us pretty print:")
# To pretty print, you need to iterate
for element in customers.take(10):
    print(element)

First five records of customer data set [Row(CustomerID=1000, CustomerName='Lou Anna Tan', MemberCategory='A', Age=29, Gender='F', AmountSpent=4.14, Address='Blk 26, Telok Blangah Crescent #22-87, Singapore 0409', City='Frankfurt', CountryCode='GER', ContactTitle='Ms', PhoneNumber=2732287), Row(CustomerID=1001, CustomerName='Wong Sook Huey', MemberCategory='A', Age=37, Gender='F', AmountSpent=67.1, Address='Blk 1007 Teresa Ville Lower Delta Road #06-02, Singapore 0410', City='Singapore', CountryCode='SIN', ContactTitle='Ms', PhoneNumber=2740975), Row(CustomerID=1002, CustomerName='Ng Choon Seng', MemberCategory='C', Age=23, Gender='M', AmountSpent=63.18, Address='Blk 63 Bishan St 21 #06-01, Singapore 1057', City='Toronto', CountryCode='CAN', ContactTitle='Mr', PhoneNumber=2580742), Row(CustomerID=1003, CustomerName='Chew Teck Kuan', MemberCategory='C', Age=63, Gender='M', AmountSpent=64.49, Address='Blk 109 Bedok North Rd #06-2316, Singapore 1046', City='Singapore', CountryCode='SIN', 

###Key Operations

PySpark examples for key based functions are groupByKey, reduceByKey, and sortByKey operations. Let us look at how they work.

####groupByKey
This operation groups the values for each key in the RDD into a single sequence.
####reduceByKey
This operation merges the values for each key using an associative reduce function.
####sortByKey
This operation sorts the dataset by keys.

Let us put together an example to compare and contrast


In [78]:
rdd = spark.sparkContext.parallelize([(3, 6),(1, 2),(3, 4)])
grouped = rdd.groupByKey()
for key, values in grouped.collect():
    print(f"{key}: {tuple(values)}")
reduced = rdd.reduceByKey(lambda a, b: a + b)
print(reduced.collect())
sorted_rdd = rdd.sortByKey()
print(sorted_rdd.collect())

3: (6, 4)
1: (2,)
[(3, 10), (1, 2)]
[(1, 2), (3, 6), (3, 4)]


Please note that these operations are transformations and require an action like collect to retrieve the data. Also, keep in mind that groupByKey can cause a lot of data shuffling over the network, and it's generally more efficient to use reduceByKey where possible because it combines output values locally before sending data over the network.

### Sampling

 The sample() transformation is used to sample a fraction of the data from an RDD. You can sample with or without replacement. Here's how you can use it:

####Sampling without replacement


In [79]:
# Now, use the sample function to take a random sample of about 10% of the customers without replacement
sampled_customers_rdd = customers.sample(False, 0.2)

# Collect the results
sampled_customers = sampled_customers_rdd.collect()

# Print the sampled list of customers
for customer in sampled_customers:
    print(customer)

Row(CustomerID=1002, CustomerName='Ng Choon Seng', MemberCategory='C', Age=23, Gender='M', AmountSpent=63.18, Address='Blk 63 Bishan St 21 #06-01, Singapore 1057', City='Toronto', CountryCode='CAN', ContactTitle='Mr', PhoneNumber=2580742)
Row(CustomerID=2345, CustomerName='Ng Teck Kie Anthony', MemberCategory='A', Age=56, Gender='M', AmountSpent=73.93, Address='Blk 105, Gangsa Road, #02-103, Singapore 2367', City='Singapore', CountryCode='SIN', ContactTitle='Mr', PhoneNumber=7690237)
Row(CustomerID=2688, CustomerName='Kathleen Loh Swat Hong', MemberCategory='A', Age=38, Gender='F', AmountSpent=39.16, Address='Blk 56, #08-161 Telok Blangah Heights, Singapore 0410', City='Singapore', CountryCode='SIN', ContactTitle='Ms', PhoneNumber=2735765)
Row(CustomerID=2741, CustomerName='Goh Chee Eng', MemberCategory='C', Age=45, Gender='F', AmountSpent=25.91, Address='Blk 267 Sembawang Drive #08-349 Singapore 2369', City='Singapore', CountryCode='SIN', ContactTitle='Ms', PhoneNumber=5553849)
Row(Cu

In the sample method:

1. The first argument is withReplacement. Set it to False for sampling without replacement, meaning a particular customer can be chosen only once.
2. The second argument is the fraction of the data to sample, which is 0.1 in this case, meaning approximately 10% of the data.

This will output a random sample of the customers from your customers_rdd. The collect() action is used here for demonstration purposes, and it should be used with caution if the dataset is large, as it will gather all the sampled data to the driver node.

####Sampling with replacement

The following example shows how to use sample() with replacement. This means an element can be included in the sample multiple times.

In [80]:
# Now, use the sample function to take a random sample of about 10% of the customers with replacement
sampled_customers_rdd = customers.sample(True, 0.2)

# Collect the results
sampled_customers = sampled_customers_rdd.collect()

# Print the sampled list of customers
for customer in sampled_customers:
    print(customer)

Row(CustomerID=1634, CustomerName='Sridharan Jayanthi', MemberCategory='A', Age=55, Gender='F', AmountSpent=61.51, Address='Blk 232, Jurong East Street 21 #02-436, Singapore 1234', City='Singapore', CountryCode='SIN', ContactTitle='Ms', PhoneNumber=6658037)
Row(CustomerID=2323, CustomerName='Richard Kwan', MemberCategory='A', Age=26, Gender='M', AmountSpent=89.52, Address='Blk 27, Marine Crescent, #05-05, Singapore 2345', City='Singapore', CountryCode='SIN', ContactTitle='Mr', PhoneNumber=2352345)
Row(CustomerID=7345, CustomerName='P Ravichandran', MemberCategory='B', Age=34, Gender='M', AmountSpent=78.73, Address='Blk 612, Clementi West Street 1, #09-290, Singapore 2612', City='Singapore', CountryCode='SIN', ContactTitle='Mr', PhoneNumber=7755113)
Row(CustomerID=8080, CustomerName='Chan Chin Fung', MemberCategory='B', Age=56, Gender='M', AmountSpent=24.95, Address='6 Dover Rise, #17-11, Singapore 1234', City='Singapore', CountryCode='SIN', ContactTitle='Mr', PhoneNumber=8738529)
Row(C

These examples will give you an array of customers sampled from the original RDD. The actual elements in the sample will vary each time you run the code due to the randomness of the sampling process.

### More Transformations
In PySpark, you can perform various RDD operations such as union, join, and cartesian (cross) to combine data in different ways. Here are simple examples for each:
#### Union
The union operation combines two RDDs to form a new RDD that contains elements from both RDDs.

In [81]:
# Create two RDDs
rdd1 = spark.sparkContext.parallelize([("Alice", 1), ("Bob", 2)])
rdd2 = spark.sparkContext.parallelize([("Charlie", 3), ("David", 4)])

# Perform the union operation
union_rdd = rdd1.union(rdd2)

# Collect and print the results
print(union_rdd.collect())



[('Alice', 1), ('Bob', 2), ('Charlie', 3), ('David', 4)]


####Join
The join operation combines two RDDs based on their key.

In [82]:

# Create two RDDs with common keys
rdd3 = spark.sparkContext.parallelize([("Alice", "Apple"), ("Bob", "Banana")])
rdd4 = spark.sparkContext.parallelize([("Alice", 1), ("Bob", 2)])

# Perform the join operation
join_rdd = rdd3.join(rdd4)

# Collect and print the results
print(join_rdd.collect())


[('Alice', ('Apple', 1)), ('Bob', ('Banana', 2))]


####Cross or Catesian
The cartesian operation returns all possible pairs of (a, b) where a is in the first RDD and b is in the second RDD.

In [83]:
# Create two RDDs
rdd5 = spark.sparkContext.parallelize([1, 2])
rdd6 = spark.sparkContext.parallelize(["a", "b"])

# Perform the cartesian operation
cross_rdd = rdd5.cartesian(rdd6)

# Collect and print the results
print(cross_rdd.collect())

[(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]


Please note that the cartesian operation can be very expensive in terms of computation and memory usage, especially with large datasets, because it forms all possible combinations of elements between the two RDDs.

#### mapValues
mapValues transformation is applied to RDD datasets that consist of key-value pairs, and it allows us to transform the value of each pair while keeping the key unchanged. Here's a simple example of how we can use mapValues in PySpark:



In [85]:
# Creating a Pair RDD with student ID as the key and a list of grades as the value
studentdata = [(1, [88, 92, 96]), (2, [78, 81, 85]), (3, [68, 72, 74])]
studentrdd = spark.sparkContext.parallelize(studentdata)

# Function to calculate average
def calculate_average(grades):
    return sum(grades) / len(grades)

# Using mapValues to apply the calculate_average function to each value
average_grades = studentrdd.mapValues(calculate_average).toDF()


# Print the results
for result in average_grades:
    print(f"Student ID: {result[0]}, Average Grade: {result[1]:.2f}")


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 93.0 failed 1 times, most recent failure: Lost task 0.0 in stage 93.0 (TID 96) (cae9914ebdff executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pyspark/rdd.py", line 2849, in takeUpToNumLeft
    yield next(iterator)
          ^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pyspark/rdd.py", line 4260, in map_values_fn
    return kv[0], f(kv[1])
                  ^^^^^^^^
  File "<ipython-input-85-271fa4f254c1>", line 7, in calculate_average
TypeError: 'int' object is not callable

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:181)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:181)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at jdk.internal.reflect.GeneratedMethodAccessor102.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pyspark/rdd.py", line 2849, in takeUpToNumLeft
    yield next(iterator)
          ^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pyspark/rdd.py", line 4260, in map_values_fn
    return kv[0], f(kv[1])
                  ^^^^^^^^
  File "<ipython-input-85-271fa4f254c1>", line 7, in calculate_average
TypeError: 'int' object is not callable

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:181)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


This script will output the average grades for each student ID, maintaining the structure of the RDD with the student ID as the key.

#### cogroup
 The cogroup transformation is used to group data from two or more RDDs based on their key. It returns an RDD consisting of pairs where the key is found in the original RDDs, and the value is a tuple containing Iterable collections of values for that key from each RDD.

In [50]:
# Create RDD with student ID and names
student_names = sc.parallelize([(1, "John"), (2, "Sally"), (3, "Bob")])

# Create RDD with student ID and courses
courses = sc.parallelize([(1, "Math"), (2, "History"), (1, "Biology"), (3, "Chemistry"), (2, "Physics")])

# CoGroup the RDDs
cogrouped_data = student_names.cogroup(courses)

# Collect and print results
results = cogrouped_data.collect()

for student_id, (names, course_list) in results:
    print(f"Student ID: {student_id}")
    print(f"Name: {list(names)}")
    print(f"Courses: {list(course_list)}")
    print("---")

Student ID: 2
Name: ['Sally']
Courses: ['History', 'Physics']
---
Student ID: 1
Name: ['John']
Courses: ['Math', 'Biology']
---
Student ID: 3
Name: ['Bob']
Courses: ['Chemistry']
---


The cogroup operation groups the values for each key in both RDDs into a single pair, where each value is an iterable collection.

### More Actions

####save
Saving an RDD in PySpark can be done in a variety of formats. Common formats include saving as text files, sequence files, or other file-based data sources. Below are examples of how to save an RDD that contains customer data as a text file.

But we will see about this action after the NoSQL lecture.

End of Demo

Thank you for the patient listening. 🙏🌞