# PySpark Basic

# Part 1. RDD Programming

### What is an RDD?
RDD (Resilient Distributed Dataset) is the fundamental data structure of Apache Spark. It represents an immutable distributed collection of objects that can be processed in parallel across a cluster.

Key characteristics of RDDs include:

+ Resilient: RDDs can recover from node failures using lineage information.
+ Distributed: Data is split across multiple nodes in a cluster.
+ Dataset: RDDs hold data that can be operated on using functional transformations like map, filter, and actions like reduce, collect.
RDDs are fault-tolerant and lazily evaluated, meaning transformations are only executed when an action is triggered.

## 1. RDD Creation
There are two different ways to create RDDs. 

1. You can create an RDD by reading data from an external file, such as a text file, using textFile().

2. You can create an RDD from a Python collection (like a list) using the parallelize() method.

Please use the given document and collections to create RDDs

In [1]:
# Create an RDD from a file
# Input your code here:
from pyspark import SparkContext

# Initialize a SparkContext
sc = SparkContext.getOrCreate()

# Load the text file into an RDD
rdd_from_file = sc.textFile("uq.txt")

# To verify the data is loaded, you can use the collect() action to print the contents
for line in rdd_from_file.collect():
    print(line)

The University of Queensland (UQ) is a public research university located primarily in Brisbane, the capital city of the Australian state of Queensland. Founded in 1909 by the state parliament, UQ is one of the six sandstone universities, an informal designation of the oldest university in each state. The University of Queensland is ranked second nationally by Excellence in Research for Australia and equal second in Australia based on the average of four major global university league tables. The University of Queensland is a founding member of edX, Australia's research-intensive Group of Eight, the international research network McDonnell International Scholars Academy, and the global Universitas 21 network.

The main St Lucia campus occupies much of the riverside inner suburb of St Lucia, southwest of the Brisbane central business district. Other UQ campuses and facilities are located throughout Queensland, the largest of which are the Gatton campus and the Mayne Medical School. UQ's

In [2]:
# Create an RDD from an existing data structure.
# Input your code here:

# open the source file
with open('uq.txt', 'r', encoding='utf-8') as file:
    lines = file.readlines()

# remove uneeded lines and convert to python List structure
data = [line.strip() for line in lines if line.strip()]

rdd_from_data = sc.parallelize(data)

for line in rdd_from_data.collect():
    print(line)


The University of Queensland (UQ) is a public research university located primarily in Brisbane, the capital city of the Australian state of Queensland. Founded in 1909 by the state parliament, UQ is one of the six sandstone universities, an informal designation of the oldest university in each state. The University of Queensland is ranked second nationally by Excellence in Research for Australia and equal second in Australia based on the average of four major global university league tables. The University of Queensland is a founding member of edX, Australia's research-intensive Group of Eight, the international research network McDonnell International Scholars Academy, and the global Universitas 21 network.
The main St Lucia campus occupies much of the riverside inner suburb of St Lucia, southwest of the Brisbane central business district. Other UQ campuses and facilities are located throughout Queensland, the largest of which are the Gatton campus and the Mayne Medical School. UQ's 

## 2. RDD Transfermations and Actions:
In Spark, Transformations are operations that transform an existing RDD to a new RDD.  (i.e., they are not executed until an action is triggered).

###  Common RDD Transformations:
1. `map()`: Applies a function to each element of the RDD and returns a new RDD.
2. `filter()`: Filters elements based on a condition and returns a new RDD with elements that satisfy the condition.
3. `flatMap()`: Similar to map(), but each input item can be mapped to multiple output items (i.e., it flattens the result).
4. `distinct()`: Returns a new RDD containing only distinct elements.
5. `union()`: Combines two RDDs and returns a new RDD containing all elements from both RDDs.

More operations can be found in [official documents](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations)

In [3]:
# Transfermation
# Case 1: word count
# Split the uq.txt document into individual words, and get the word count of the document.
# Input your code here:
import re
def tokenize_text(text):
    tokens = re.findall(r"\b(?:[A-Za-z]\.){2,}|\b[A-Za-z]+(?:['-][A-Za-z]+)*|\d+(?:-\d+)*\b", text.lower())
    # TODO: apply more advanced tokenizer methods here
    return tokens

# Split each line into words using flatMap
words_rdd = rdd_from_file.flatMap(tokenize_text)

# Map each word to (word, 1) for counting
word_pairs_rdd = words_rdd.map(lambda word: (word, 1))

# Reduce by key to count the occurrences of each word
word_count_rdd = word_pairs_rdd.reduceByKey(lambda a, b: a + b)

# Sort the results in descending order of frequency
sorted_word_count_rdd = word_count_rdd.sortBy(lambda x: x[1], ascending=False)

# Collect and display the results
for word, count in sorted_word_count_rdd.collect():
    print(f"{word}: {count}")

#Stop the SparkContext Created in Block 1
sc.stop()

the: 30
of: 26
and: 20
university: 9
in: 8
uq: 7
research: 6
a: 6
for: 6
queensland: 5
australia: 5
is: 4
state: 3
school: 3
include: 3
brisbane: 2
global: 2
network: 2
scholars: 2
st: 2
lucia: 2
campus: 2
are: 2
united: 2
college: 2
institute: 2
technology: 2
vaccine: 2
human: 2
alumni: 2
winner: 2
located: 2
australian: 2
by: 2
one: 2
six: 2
second: 2
member: 2
international: 2
uq's: 2
over: 2
hundred: 2
centre: 2
dow: 2
president: 2
former: 2
public: 1
primarily: 1
1909: 1
parliament: 1
sandstone: 1
an: 1
informal: 1
oldest: 1
ranked: 1
nationally: 1
excellence: 1
equal: 1
based: 1
four: 1
major: 1
league: 1
founding: 1
australia's: 1
group: 1
eight: 1
mcdonnell: 1
21: 1
occupies: 1
riverside: 1
central: 1
business: 1
other: 1
gatton: 1
medical: 1
overseas: 1
america: 1
uq-ochsner: 1
offers: 1
bachelor: 1
master: 1
doctoral: 1
higher: 1
doctorate: 1
degrees: 1
graduate: 1
faculties: 1
institutes: 1
centres: 1
as: 1
molecular: 1
nanotechnology: 1
sustainable: 1
engineering: 1
achieve

In [4]:
# Case 2: collection operation
# Code below is to generate a ramdom integer collection 
import random
collection = [random.randint(0, 10) for _ in range(20)]
print("original collection", collection)
# Firstly, add each element in a collection by 10; secondly, display the elements that are greater than 12.
# Your Code Here:


sc = SparkContext.getOrCreate()
rdd = sc.parallelize(collection)

# 1. Add 10 to each element using map
rdd_added = rdd.map(lambda x: x + 10)

# 2. Filter elements greater than 12
rdd_filtered = rdd_added.filter(lambda x: x > 12)

# Collect and show the results
result = rdd_filtered.collect()
print("final collection with add 10 and filter > 12",result)

# Stop the SparkContext
sc.stop()

original collection [6, 10, 9, 3, 10, 6, 9, 8, 6, 10, 4, 8, 10, 5, 10, 9, 4, 3, 0, 4]
final collection with add 10 and filter > 12 [16, 20, 19, 13, 20, 16, 19, 18, 16, 20, 14, 18, 20, 15, 20, 19, 14, 13, 14]


In [5]:
# Case 3: set operation
"""
 Given two different sets:
    s1 = ("infs3202", "infs7208", "infs3208", "infs7202", "infs3208")
    s2 = ("infs3208", "infs3208", "infs3204", "infs7204", "infs7208")
    1. Create two RDDs using parallelize method
    2. Display all distinct elements in all the RDDs.
    3. Display all common elements between two RDDs.
    4. Subtract the first RDD with the second RDD and display the result.
    5. Display all the cartesian products between the first RDD and the second RDD:
"""
# Input your code here:

sc = SparkContext.getOrCreate()
# Given sets
s1 = ("infs3202", "infs7208", "infs3208", "infs7202", "infs3208")
s2 = ("infs3208", "infs3208", "infs3204", "infs7204", "infs7208")

# 1. Create two RDDs using parallelize method
rdd1 = sc.parallelize(s1)
rdd2 = sc.parallelize(s2)

# 2. Display all distinct elements in all RDDs
distinct_rdd1 = rdd1.distinct()
distinct_rdd2 = rdd2.distinct()

print("Distinct elements in RDD1:")
print(distinct_rdd1.collect())

print("Distinct elements in RDD2:")
print(distinct_rdd2.collect())

# 3. Display all common elements between two RDDs
common_elements = rdd1.intersection(rdd2)

print("Common elements between RDD1 and RDD2:")
print(common_elements.collect())

# 4. Subtract the first RDD with the second RDD and display the result
rdd_subtracted = rdd1.subtract(rdd2)

print("Elements in RDD1 but not in RDD2:")
print(rdd_subtracted.collect())

# 5. Display all the Cartesian products between the first RDD and the second RDD
cartesian_rdd = rdd1.cartesian(rdd2)

print("Cartesian product between RDD1 and RDD2:")
print(cartesian_rdd.collect())

# Stop the Spark session
sc.stop()

Distinct elements in RDD1:
['infs3208', 'infs3202', 'infs7208', 'infs7202']
Distinct elements in RDD2:
['infs3208', 'infs7204', 'infs3204', 'infs7208']
Common elements between RDD1 and RDD2:
['infs3208', 'infs7208']
Elements in RDD1 but not in RDD2:
['infs3202', 'infs7202']
Cartesian product between RDD1 and RDD2:
[('infs3202', 'infs3208'), ('infs3202', 'infs3208'), ('infs7208', 'infs3208'), ('infs7208', 'infs3208'), ('infs3202', 'infs3204'), ('infs3202', 'infs7204'), ('infs7208', 'infs3204'), ('infs7208', 'infs7204'), ('infs3202', 'infs7208'), ('infs7208', 'infs7208'), ('infs3208', 'infs3208'), ('infs3208', 'infs3208'), ('infs7202', 'infs3208'), ('infs7202', 'infs3208'), ('infs3208', 'infs3208'), ('infs3208', 'infs3208'), ('infs3208', 'infs3204'), ('infs3208', 'infs7204'), ('infs7202', 'infs3204'), ('infs7202', 'infs7204'), ('infs3208', 'infs7208'), ('infs7202', 'infs7208'), ('infs3208', 'infs3204'), ('infs3208', 'infs7204'), ('infs3208', 'infs7208')]


### Basic RDD Actions
Actions in RDDs are operations that trigger the execution of transformations and return a result (or store the result).
1. `collect()`: Collects all elements of the RDD and returns them as a list to the driver program.
2. `count()`: Returns the number of elements in the RDD.
3. `first()`: Returns the first element in the RDD.
4. `reduce()`: Aggregates the elements of the RDD using a specified function.
5. `take(n)`: Returns the first n elements of the RDD.

More operations can be found [here](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions)

In [6]:
# Action operations
# Given below code to generate a ramdom collection
import random
collection = [random.randint(1, 6) for _ in range(20)]

# 1. Get the factorial result of the RDD
# 2. Count the elements in the RDD
# 3. Ramdomly select 4 elements out of the RDD, and print them out.
# 4. Display the two largest elements in the RDD
# 5. Check the frequency of each element in the RDD
# Input your code here:

# Create RDD from the collection
sc = SparkContext.getOrCreate()
rdd = sc.parallelize(collection)
print("original collection", rdd.collect())

# 1. Get the factorial result of the RDD (factorial of each element)
from math import factorial
factorial_rdd = rdd.map(lambda x: factorial(x))

print("Factorial of each element in the RDD:")
print(factorial_rdd.collect())

# 2. Count the elements in the RDD
count = rdd.count()
print("Number of elements in the RDD:", count)


# 3. Randomly select 4 elements out of the RDD, and print them out
sampled_elements = rdd.takeSample(False, 4)
print("Randomly selected 4 elements:", sampled_elements)

# 4. Display the two largest elements in the RDD
largest_elements = rdd.takeOrdered(2, key=lambda x: -x)
print("Two largest elements in the RDD:", largest_elements)

# 5. Check the frequency of each element in the RDD
element_frequencies = rdd.countByValue()
print("Frequency of each element in the RDD:")
for element, frequency in element_frequencies.items():
    print(f"{element}: {frequency}")

sc.stop()

original collection [1, 5, 3, 3, 3, 6, 5, 3, 6, 4, 2, 4, 1, 4, 2, 1, 5, 5, 5, 1]
Factorial of each element in the RDD:
[1, 120, 6, 6, 6, 720, 120, 6, 720, 24, 2, 24, 1, 24, 2, 1, 120, 120, 120, 1]
Number of elements in the RDD: 20
Randomly selected 4 elements: [5, 6, 3, 3]
Two largest elements in the RDD: [6, 6]
Frequency of each element in the RDD:
1: 4
5: 5
3: 4
6: 2
4: 3
2: 2


## 3. Key/Value Pair Creation and Transformations

In [7]:
"""
Case 1:
Given a List of String:
["MapReduce is good","Spark is fast","Spark is better than MapReduce"]
 
create an RDD and count the word frequency using transformation operations, followed by displaying the results using action operations. 
"""
# Input your code here:


sc = SparkContext.getOrCreate()

data = ["MapReduce is good", "Spark is fast", "Spark is better than MapReduce"]

rdd = sc.parallelize(data)
words_rdd = rdd.flatMap(lambda line: line.split(" "))
word_pairs_rdd = words_rdd.map(lambda word: (word, 1))
word_count_rdd = word_pairs_rdd.reduceByKey(lambda a, b: a + b)

word_count_result = word_count_rdd.collect()
print("Word frequencies:")
for word, count in word_count_result:
    print(f"{word}: {count}")
    
sc.stop()


Word frequencies:
MapReduce: 2
is: 3
good: 1
Spark: 2
than: 1
fast: 1
better: 1


In [8]:
""" 
Case2:
Given scores of two studends below:
    "s123456",78
    "s123456",80
    "s123456",65
    "s123456",90
    "s654321",80
    "s654321",40
    "s654321",50
    "s654321",90 
    "s654321" 80 
 
 store the data into an RDD and calculate the average scores for each student.
"""
#Input your code here:

# Given scores for two students
data = [
    ("s123456", 78),
    ("s123456", 80),
    ("s123456", 65),
    ("s123456", 90),
    ("s654321", 80),
    ("s654321", 40),
    ("s654321", 50),
    ("s654321", 90),
    ("s654321", 80)
]

sc = SparkContext.getOrCreate()
rdd = sc.parallelize(data)

# Map each student and score to (student_id, (score, 1)) pair to keep track of the sum and count
student_scores = rdd.map(lambda x: (x[0], (x[1], 1)))
# Reduce by key to sum the scores and the counts for each student
student_totals = student_scores.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))
# Calculate the average by dividing total score by the count
student_averages = student_totals.map(lambda x: (x[0], x[1][0] / x[1][1]))
# Action: Collect the result and print the average scores
average_scores = student_averages.collect()
print("Average scores for each student:")
for student, avg in average_scores:
    print(f"{student}: {avg:.2f}")
sc.stop()

Average scores for each student:
s123456: 78.25
s654321: 68.00


# Part 2. PySQL DataFrame Programming

## What is a DataFrame?
A DataFrame in PySpark is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a dataframe in Python’s pandas library, but optimized for large-scale distributed computing.

Key characteristics of DataFrames:

+ Schema: DataFrames have a schema, meaning each column has a name and a type, which makes it easy to work with structured data.
+ Distributed: Like RDDs, DataFrames are distributed across a cluster.
+ Optimized: DataFrames benefit from Spark’s Catalyst Optimizer, which optimizes query execution, and Tungsten engine, which improves performance.

In [9]:
# Import essential functions for next operations
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col, desc
from pyspark.sql.types import ArrayType, StringType, IntegerType
from pyspark.sql import functions as F

## 1. Dataframe Creation

There are two different ways to create Dataframes. 

1. You can create a DataFrame from an RDD by specifying a schema.

2. You can also create a DataFrame by reading data from files like CSV, JSON, or Hadoop Parquet.

Please use the given document and collections to create RDDs

In [10]:
# 1. Create from files

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("TextFileToDataFrame") \
    .getOrCreate()

# Read the text file into a DataFrame
df = spark.read.text("uq.txt")

# Show the DataFrame content
df.show(truncate=True)

spark.stop()

+--------------------+
|               value|
+--------------------+
|The University of...|
|                    |
|The main St Lucia...|
|                    |
|The university of...|
|                    |
|UQ counts two Nob...|
+--------------------+



In [11]:
# 2. Create from a schema
# open the source file
with open('uq.txt', 'r', encoding='utf-8') as file:
    lines = file.readlines()

# remove uneeded lines and convert to python List data structure
data = [line.strip() for line in lines if line.strip()]


spark = SparkSession.builder \
    .appName("ListToDataFrame") \
    .getOrCreate()

# Read the text file and process it into a list
with open('uq.txt', 'r', encoding='utf-8') as file:
    lines = file.readlines()

# Remove unneeded lines and strip whitespace
data = [line.strip() for line in lines if line.strip()]

# Create a PySpark DataFrame from the list
df = spark.createDataFrame([(line,) for line in data], ["text"])

# Show the DataFrame content
df.show(truncate=True)

+--------------------+
|                text|
+--------------------+
|The University of...|
|The main St Lucia...|
|The university of...|
|UQ counts two Nob...|
+--------------------+



## 2. Dataframe Transfermations and Actions:

PySpark Dataframe also follows the Transform and Action mode.

Transformations in DataFrames are similar to SQL-like operations and are also lazily evaluated. They return a new DataFrame and are optimized by the Catalyst Optimizer.

### Common DataFrame Transformations:

1. `select()`: Selects specific columns from the DataFrame.
2. `filter() / where()`: Filters rows based on a condition.
3. `groupBy()`: Groups the DataFrame by a specific column(s) and can be used with aggregate functions.
4. `withColumn()`: Adds or modifies a column.
5. `join()`: Joins two DataFrames on a specified column.
6. `orderBy()`: Sorts the DataFrame by the specified column(s).

Actions in DataFrames trigger the execution of transformations and return results to the driver program.

### Common DataFrame Actions:

1. `show()`: Displays the content of the DataFrame in tabular format.
2. `collect()`: Returns all the data in the DataFrame as a list to the driver program.
3. `count()`: Returns the number of rows in the DataFrame.
4. `first()`: Returns the first row of the DataFrame.
5. `take(n)`: Returns the first n rows as a list.

In [12]:
# Transfermation in Dataframe
# Case 1: word count
# Split the uq.txt document into individual words, and get the word count of the document.
# Input your code here:
import re

# Define your custom tokenization function
def tokenize_text(text):
    tokens = re.findall(r"\b(?:[A-Za-z]\.){2,}|\b[A-Za-z]+(?:['-][A-Za-z]+)*|\d+(?:-\d+)*\b", text.lower())
    return tokens

# Register the tokenize_text function as a UDF
tokenize_udf = F.udf(tokenize_text, ArrayType(StringType()))


# Use the custom tokenize_text function to split text into words
# Apply the UDF and split the text into tokens (words)
tokenized_df = df.withColumn("words", tokenize_udf(col("text")))

# # Explode the array of words to individual rows
words_df = tokenized_df.select(explode(col("words")).alias("word"))

# Group by each word and count the occurrences
word_count_df = words_df.groupBy("word").count()

# Sort the results in descending order of frequency
sorted_word_count_df = word_count_df.orderBy(desc("count"))

# Show the result
sorted_word_count_df.show(truncate=False)

spark.stop()

+----------+-----+
|word      |count|
+----------+-----+
|the       |30   |
|of        |26   |
|and       |20   |
|university|9    |
|in        |8    |
|uq        |7    |
|research  |6    |
|for       |6    |
|a         |6    |
|australia |5    |
|queensland|5    |
|is        |4    |
|include   |3    |
|school    |3    |
|state     |3    |
|technology|2    |
|six       |2    |
|former    |2    |
|united    |2    |
|human     |2    |
+----------+-----+
only showing top 20 rows



In [13]:
# Case 2: collection operation with Dataframe
# Code below is to generate a ramdom integer collection 
import random
collection = [random.randint(0, 10) for _ in range(20)]
print("original collection", collection)

# We can operate collection with SparkSQL DataFrame
# Firstly, add each element in a collection by 10; secondly, display the elements that are greater than 12.
# Input your code here:


# Initialize a Spark session
spark = SparkSession.builder.appName("CollectionTransformation").getOrCreate()

# The collection of random integers
# random_integers = [98, 2, 48, 66, 7, 83, 90, 37, 81, 69, 96, 99, 98, 71, 73, 32, 74, 71, 97, 16]

# Create a DataFrame from the collection
df = spark.createDataFrame([(i,) for i in collection], ["number"])

# Add 10 to each element
df_added = df.withColumn("added_number", df["number"] + 10)

# Filter elements that are greater than 12
df_filtered = df_added.filter(df_added["added_number"] > 12)

# Show the results
df_filtered.show()

# Stop the Spark session after operation
spark.stop()

original collection [3, 0, 10, 1, 2, 4, 5, 1, 6, 5, 9, 9, 6, 0, 6, 8, 9, 10, 0, 10]
+------+------------+
|number|added_number|
+------+------------+
|     3|          13|
|    10|          20|
|     4|          14|
|     5|          15|
|     6|          16|
|     5|          15|
|     9|          19|
|     9|          19|
|     6|          16|
|     6|          16|
|     8|          18|
|     9|          19|
|    10|          20|
|    10|          20|
+------+------------+



In [14]:
# Case 3: set operation with Dataframe
"""
 Given two different sets:
    s1 = ("infs3202", "infs7208", "infs3208", "infs7202", "infs3208")
    s2 = ("infs3208", "infs3208", "infs3204", "infs7204", "infs7208")
    1. Create two DataFrames
    2. Display all distinct elements in both the df.
    3. Display all common elements between two df.
    4. Subtract the first df with the second df and display the result.
    5. Display all the cartesian products between the first df and the second df:
"""
# Input your code here:

spark = SparkSession.builder \
    .appName("SetOperations") \
    .getOrCreate()

s1 = ("infs3202", "infs7208", "infs3208", "infs7202", "infs3208")
s2 = ("infs3208", "infs3208", "infs3204", "infs7204", "infs7208")

# 1: Create two DataFrames
df1 = spark.createDataFrame([(x,) for x in s1], ["subject"])
df2 = spark.createDataFrame([(x,) for x in s2], ["subject"])

# 2: Display all distinct elements in both DataFrames
distinct_df1 = df1.select("subject").distinct()
distinct_df2 = df2.select("subject").distinct()

print("Distinct elements in df1:")
distinct_df1.show()

print("Distinct elements in df2:")
distinct_df2.show()

# 3: Display all common elements between two DataFrames
common_elements_df = distinct_df1.intersect(distinct_df2)

print("Common elements between df1 and df2:")
common_elements_df.show()

# 4: Subtract the first DataFrame with the second DataFrame and display the result
subtract_df = distinct_df1.subtract(distinct_df2)

print("Elements in df1 but not in df2:")
subtract_df.show()

# 5: Display all the Cartesian products between the two DataFrames
cartesian_product_df = df1.crossJoin(df2)

print("Cartesian product of df1 and df2:")
cartesian_product_df.show(truncate=False)

spark.stop()

Distinct elements in df1:
+--------+
| subject|
+--------+
|infs3208|
|infs3202|
|infs7208|
|infs7202|
+--------+

Distinct elements in df2:
+--------+
| subject|
+--------+
|infs3208|
|infs7204|
|infs3204|
|infs7208|
+--------+

Common elements between df1 and df2:
+--------+
| subject|
+--------+
|infs3208|
|infs7208|
+--------+

Elements in df1 but not in df2:
+--------+
| subject|
+--------+
|infs3202|
|infs7202|
+--------+

Cartesian product of df1 and df2:
+--------+--------+
|subject |subject |
+--------+--------+
|infs3202|infs3208|
|infs3202|infs3208|
|infs7208|infs3208|
|infs7208|infs3208|
|infs3202|infs3204|
|infs3202|infs7204|
|infs3202|infs7208|
|infs7208|infs3204|
|infs7208|infs7204|
|infs7208|infs7208|
|infs3208|infs3208|
|infs3208|infs3208|
|infs7202|infs3208|
|infs7202|infs3208|
|infs3208|infs3208|
|infs3208|infs3208|
|infs3208|infs3204|
|infs3208|infs7204|
|infs3208|infs7208|
|infs7202|infs3204|
+--------+--------+
only showing top 20 rows



In [15]:
# Action operations with Dataframe (DF)
# Given below code to generate a ramdom collection
import random
import math
collection = [random.randint(1, 6) for _ in range(20)]

# 1. Get the factorial result of the DF
# 2. Count the elements in the DF
# 3. Ramdomly select 4 elements out of the DF, and print them out.
# 4. Display the two largest elements in the DF
# 5. Check the frequency of each element in the DF

spark = SparkSession.builder \
    .appName("RandomCollectionOperations") \
    .getOrCreate()

# Generate random collection and create a DataFrame
collection = [random.randint(1, 6) for _ in range(20)]
df = spark.createDataFrame([(x,) for x in collection], ["number"])

# 1: Get the factorial result of the DF

def factorial(n):
    return math.factorial(n)

factorial_udf = F.udf(factorial, IntegerType())
df_with_factorial = df.withColumn("factorial", factorial_udf(col("number")))
print("Factorial result of each element:")
df_with_factorial.show(truncate=False)

# 2: Count the elements in the DataFrame
element_count = df.count()
print(f"Count of elements in the DataFrame: {element_count}")

# 3: Randomly select 4 elements from the DataFrame and print them
random_elements = df.orderBy(F.rand()).limit(4)
print("Randomly selected 4 elements:")
random_elements.show(truncate=False)

# 4: Display the two largest elements in the DataFrame
two_largest_elements = df.orderBy(col("number").desc()).limit(2)
print("Two largest elements:")
two_largest_elements.show()

# 5: Check the frequency of each element in the DataFrame
from pyspark.sql.functions import count as _count

frequency_df = df.groupBy("number").agg(_count("number").alias("frequency")).orderBy(desc("frequency"))
print("Frequency of each element:")
frequency_df.show(truncate=False)

spark.stop()

Factorial result of each element:
+------+---------+
|number|factorial|
+------+---------+
|2     |2        |
|2     |2        |
|6     |720      |
|4     |24       |
|1     |1        |
|1     |1        |
|2     |2        |
|5     |120      |
|5     |120      |
|5     |120      |
|3     |6        |
|3     |6        |
|1     |1        |
|4     |24       |
|3     |6        |
|4     |24       |
|5     |120      |
|5     |120      |
|4     |24       |
|3     |6        |
+------+---------+

Count of elements in the DataFrame: 20
Randomly selected 4 elements:
+------+
|number|
+------+
|5     |
|3     |
|1     |
|2     |
+------+

Two largest elements:
+------+
|number|
+------+
|     6|
|     5|
+------+

Frequency of each element:
+------+---------+
|number|frequency|
+------+---------+
|5     |5        |
|3     |4        |
|4     |4        |
|2     |3        |
|1     |3        |
|6     |1        |
+------+---------+



## 3. Key/Value Pair Creation and Transformations

In [16]:
"""
Given a List of String:
["MapReduce is good","Spark is fast","Spark is better than MapReduce"]
 
create an DataFrame and count the word frequency using transformation operations, followed by displaying the results using action operations. 
"""
# Input your code here:

spark = SparkSession.builder \
    .appName("WordFrequency") \
    .getOrCreate()

# Given list of strings
data = [("MapReduce is good",), 
        ("Spark is fast",), 
        ("Spark is better than MapReduce",)]

# Step 1: Create a DataFrame from the list of strings
df = spark.createDataFrame(data, ["sentence"])

from pyspark.sql.functions import split
# Step 2: Split each sentence into words (transformation operation)
# We use the split() function instead of UDF to split each sentence into an array of words
words_df = df.select(explode(split(col("sentence"), "\s+")).alias("word"))

# Step 3: Group by word and count occurrences (transformation operation)
word_count_df = words_df.groupBy("word").count()

# Step 4: Sort the words by frequency in descending order (optional, transformation)
sorted_word_count_df = word_count_df.orderBy(F.desc("count"))

# Step 5: Display the results (action operation)
sorted_word_count_df.show(truncate=False)

spark.stop()

+---------+-----+
|word     |count|
+---------+-----+
|is       |3    |
|MapReduce|2    |
|Spark    |2    |
|than     |1    |
|better   |1    |
|fast     |1    |
|good     |1    |
+---------+-----+



In [17]:
""" 
Given scores of two studends below:
    "s123456",78
    "s123456",80
    "s123456",65
    "s123456",90
    "s654321",80
    "s654321",40
    "s654321",50
    "s654321",90 
    "s654321" 80 
 
 store the data into a DataFrame and calculate the average scores for each student.
"""
# Input your code here:

spark = SparkSession.builder \
    .appName("StudentAverageScores") \
    .getOrCreate()

data = [
    ("s123456", 78),
    ("s123456", 80),
    ("s123456", 65),
    ("s123456", 90),
    ("s654321", 80),
    ("s654321", 40),
    ("s654321", 50),
    ("s654321", 90),
    ("s654321", 80)
]

# Create DataFrame with columns 'student_id' and 'score'
df = spark.createDataFrame(data, ["student_id", "score"])

# Group by student_id and calculate the average score for each student
average_scores_df = df.groupBy("student_id").agg(F.avg("score").alias("average_score"))

# Show the results
average_scores_df.show(truncate=False)

spark.stop()

+----------+-------------+
|student_id|average_score|
+----------+-------------+
|s123456   |78.25        |
|s654321   |68.0         |
+----------+-------------+



### the key differences between RDD and DataFrame in PySpark:

| **Aspect**              | **RDD (Resilient Distributed Dataset)**                      | **DataFrame**                                             |
|-------------------------|--------------------------------------------------------------|-----------------------------------------------------------|
| **Data Structure**       | Distributed collection of objects                            | Distributed collection of data organized into named columns |
| **Schema**               | No schema (unstructured)                                     | Has a schema (structured with column names and data types)  |
| **Optimization**         | No built-in optimizations, manual control over operations    | Optimized by Spark's Catalyst Optimizer and Tungsten engine |
| **Ease of Use**          | Lower-level API, more verbose                                | Higher-level API, more user-friendly, SQL-like syntax       |
| **Operations**           | Functional transformations (`map()`, `filter()`, etc.)       | SQL-like operations (`select()`, `groupBy()`, `agg()`, etc.) |
| **Performance**          | Slower, no automatic optimization                            | Faster, due to query optimization and better memory management |
| **Error Handling**       | More difficult to debug due to lack of schema                | Easier to debug with clear schema and error messages        |
| **Interoperability with SQL** | Cannot directly use SQL                                  | Can use SQL queries directly on DataFrames                  |
| **Use Cases**            | Ideal for low-level transformations, unstructured data       | Ideal for structured data, SQL-style analytics, and optimizations |
| **Memory Usage**         | Less efficient memory usage                                  | More efficient memory usage through Tungsten engine         |

### Key Points:
- **RDD** is better suited for complex transformations with fine-grained control, but requires more manual work and lacks automatic optimizations.
- **DataFrame** is more user-friendly, performs better due to optimizations, and is ideal for working with structured data and running SQL queries.