# Pyspark Installation

In [None]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Check this site for the latest download link https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

import os
import sys
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"


import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark= SparkSession \
       .builder \
       .appName("Our First Spark Example") \
       .getOrCreate()

spark

[33m0% [Working][0m            Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:2 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:4 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Get:5 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Hit:6 https://cli.github.com/packages stable InRelease
Get:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:8 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:9 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease [18.1 kB]
Hit:10 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Get:11 http://archive.ubuntu.com/ubuntu jammy-updates/universe amd64 Packages [1,596 kB]
Hit:12 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:13 http://archive.ubuntu.com/ubuntu j

# Agenda



1. groupByKey
2. aggregateByKey
3. reduceByKey
4. distinct
5. filter

Use Case :- 1

Use Case :- 2

Use Case :- 3

# ReduceByKey

reduceByKey is a powerful transformation in PySpark that operates on key-value pair RDDs (Resilient Distributed Datasets).

("python",1)
("python",1)
("python",1)
("python",1)
("java",1)
("java",1)

("python",4)
("java",2)


The main purpose of reduceByKey is to aggregate the values of each key using a specified associative and commutative binary function, such as summation or multiplication.

This Transformation groups the values associated with each key and applies a specified reduction to combine then into a single value per key

How reduceByKey Works

When you use reduceByKey, PySpark groups the values of each key together and then applies the reduction function to those values. This transformation is particularly useful for tasks like counting, summing, or aggregating data by a key.

Key Points:

Key-Value Pairs: reduceByKey operates on RDDs where each element is a tuple (key, value).

Reduction Function: The function provided must be associative and commutative, meaning the order of operations doesn't change the result.

Shuffling: reduceByKey triggers a shuffle operation because it needs to group all values associated with each key together.

In [None]:
from pyspark import SparkContext

# Initialize SparkContext
sparkContext = spark.sparkContext

# List of transactions (store, sales)
transactions = [("store1", 100), ("store2", 200), ("store1", 150), ("store2", 300), ("store3", 120),("store1",200)]

# Create an RDD from the transactions list
transactions_rdd = sparkContext.parallelize(transactions)

# Use reduceByKey to sum the sales for each store
total_sales_rdd = transactions_rdd.reduceByKey(lambda x,y: x+y)



# Collect the results
total_sales = total_sales_rdd.collect()

# Print the results
for store, total in total_sales:
  print(f"{store}: {total}")

'''
  store1 -> [250,200]
  store2 -> [200,300]
  store3 -> [120]

  store1 -> 100+150 = 250
  store2 -> 200+300 = 500
  store3 -> 120


'''







store1: 450
store2: 500
store3: 120


'\n  store1 -> [250,200]\n  store2 -> [200,300]\n  store3 -> [120]\n\n  store1 -> 100+150 = 250\n  store2 -> 200+300 = 500\n  store3 -> 120\n\n\n'

hello

# GroupByKey

## GroupByKey

Description:

The groupByKey transformation in PySpark is used to group the values of a dataset (RDD) by a common key.

When applied, it returns an RDD of pairs where the key is associated with an iterable of all the values that have the same key. Unlike reduceByKey, which performs aggregation, groupByKey simply groups the values and allows you to perform any operation on the grouped data later on.

This transformation is particularly useful when you need to perform operations like aggregations or computations on data grouped by a key.

Use Case:
Imagine you are working with log data from a web server. The data consists of user IDs and the pages they visited. You want to analyze the data to find out which pages were visited by each user. Here, groupByKey can be used to group the pages by user ID.

Example:-01

# python list the users and pages visited

data = [("user1", "page1"), ("user2", "page1"), ("user1", "page2"), ("user2", "page2"), ("user3", "page1")]

# Creata a RDD by using parallelize api

rdd = sc.parallelize(data)

# Apply groupBykey to group the pages visited by each user

grouped_rdd = rdd.groupByKey()

result = grouped_rdd.mapValues(list).collect()

#  mapValues(list)

 This operation is used after groupByKey to convert the grouped values from an iterable (which is usually a generator in PySpark) to a list



In [None]:
# python list the users and pages visited

data = [("user1", "page1"), ("user2", "page1"), ("user1", "page2"), ("user2", "page2"), ("user3", "page1")]

# Creata a RDD by using parallelize api
sparkContext = spark.sparkContext

rdd = sparkContext.parallelize(data)

# Apply groupBykey to group the pages visited by each user

grouped_rdd = rdd.groupByKey()

result = grouped_rdd.mapValues(list).collect()
print(result)




[('user1', ['page1', 'page2']), ('user2', ['page1', 'page2']), ('user3', ['page1'])]


In [None]:
# python list the users and pages visited

data = [("user1", "page1"), ("user2", "page1"), ("user1", "page2"), ("user2", "page2"), ("user3", "page1")]

# Creata a RDD by using parallelize api
sparkContext = spark.sparkContext

rdd = sparkContext.parallelize(data)

# Apply groupBykey to group the pages visited by each user

grouped_rdd = rdd.groupByKey()

result = grouped_rdd.collect()
print(result)


[('user1', <pyspark.resultiterable.ResultIterable object at 0x7fa623033740>), ('user2', <pyspark.resultiterable.ResultIterable object at 0x7fa623033770>), ('user3', <pyspark.resultiterable.ResultIterable object at 0x7fa623033860>)]


In [None]:
#without mapValues
grouped_rdd = rdd.groupByKey()
grouped_rdd.collect()

[('user1', <pyspark.resultiterable.ResultIterable at 0x7fa622f03b30>),
 ('user2', <pyspark.resultiterable.ResultIterable at 0x7fa622f03bc0>),
 ('user3', <pyspark.resultiterable.ResultIterable at 0x7fa622e861e0>)]

Example 2:

Grouping Sales Data by Product

Consider an RDD containing sales data where each entry consists of a product ID and the amount sold:

sales_data = [("product1", 100), ("product2", 200), ("product1", 300), ("product3", 400), ("product2", 500)]

rdd = sc.parallelize(sales_data)

grouped_sales = rdd.groupByKey()

result = grouped_sales.mapValues(list).collect()



In [None]:
sales_data = [("product1", 100), ("product2", 200), ("product1", 300), ("product3", 400), ("product2", 500)]

rdd = sparkContext.parallelize(sales_data)

grouped_sales = rdd.groupByKey()

result = grouped_sales.mapValues(list).collect()

print(result)

[('product1', [100, 300]), ('product2', [200, 500]), ('product3', [400])]


x = [1,2,3,4,5,6,7]

In [None]:
# example:-2



# List of scores (student, score)
scores = [("Alice", 85), ("Bob", 78), ("Alice", 92), ("Bob", 88), ("Charlie", 79)]

# Create an RDD from the scores list
scores_rdd = sparkContext.parallelize(scores)

# Use reduceByKey to find the maximum score for each student
max_scores_rdd = scores_rdd.reduceByKey(lambda x, y: min(x, y))

# Collect the results
max_scores = max_scores_rdd.collect()

# Print the results
for student, max_score in max_scores:
    print(f"{student}: {max_score}")



Alice: 85
Bob: 78
Charlie: 79


# AggregrateBykey

Description:

The aggregateByKey transformation in PySpark is used to aggregate values for each key using a specified sequence of operations.

 This transformation is more flexible than reduceByKey because it allows you to apply different operations when combining values within a partition and when combining results across partitions.

 The general syntax for aggregateByKey is:

 rdd.aggregateByKey(zeroValue)(seqOp, combOp)

 zeroValue: The initial value for the aggregation in each partition.

seqOp: The function that combines a value from the RDD with the accumulator within a partition.

combOp: The function that combines accumulators from different partitions.



Use Case:

Consider a scenario where you want to calculate both the sum and count of values associated with each key in your dataset. This might be useful, for example, when calculating the average of values per key.



Example 1: Calculating the Sum and Count of Values by Key

Suppose you have the following RDD representing user scores in different games:


data = [("user1", 10), ("user2", 20), ("user1", 30), ("user2", 40), ("user3", 50)]

rdd = sc.parallelize(data)

You can use aggregateByKey to calculate the sum and count of scores for each user:


zero_value = (0, 0)  # (sum, count)

seqOp = lambda acc, value: (acc[0] + value, acc[1] + 1)

combOp = lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])

result = rdd.aggregateByKey(zero_value)(seqOp, combOp).collect()

Output:-

[('user1', (40, 2)), ('user2', (60, 2)), ('user3', (50, 1))]

Explanation of the Functions:

zero_value = (0, 0): This initializes the sum and count for each key as (0, 0).

seqOp = lambda acc, value: (acc[0] + value, acc[1] + 1): This function adds the current value to the running sum and increments the count by 1 for values within a partition.

combOp = lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]): This function combines the results from different partitions by summing the sums and counts.

In [None]:
acc[0] -> current sum so far
acc[1] -> current count so far

(10+ 30, 1 + 1)

In [None]:
data = [("user1", 10), ("user2", 20), ("user1", 30), ("user2", 40), ("user3", 50)]

rdd = spark.sparkContext.parallelize(data)

# You can use aggregateByKey to calculate the sum and count of scores for each user:

zero_value =  (0,0)# (sum, count)

seqOp = lambda acc , value: (acc[0]+ value, acc[1] + 1)

combOp = lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])

result = rdd.aggregateByKey(zero_value,seqOp,combOp).collect()
print(result)

[('user1', (40, 2)), ('user2', (60, 2)), ('user3', (50, 1))]


Steps in the Program:
Initialization:

zero_value = (0, 0): This is the initial value for each key (sum = 0, count = 0).
Sequential Operation (seqOp):

seqOp = lambda acc, value: (acc[0] + value, acc[1] + 1)

This operation adds the current value to the accumulated sum and increments the count by 1 within each partition.

Combining Operation (combOp):

combOp = lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])
This operation merges the results from different partitions by adding the sums and counts.

Processing:
For user1:

The first value is 10, so (0 + 10, 0 + 1) → (10, 1).
The next value is 30, so (10 + 30, 1 + 1) → (40, 2).
For user2:

The first value is 20, so (0 + 20, 0 + 1) → (20, 1).
The next value is 40, so (20 + 40, 1 + 1) → (60, 2).
For user3:

The only value is 50, so (0 + 50, 0 + 1) → (50, 1).

# Distinct and Filter


The filter transformation in PySpark is used to create a new RDD (Resilient Distributed Dataset) or DataFrame containing only the elements that satisfy a given condition or predicate.

It is a simple yet powerful operation that allows you to refine and process data by selecting only the rows or records that meet specific criteria.

How filter Works

RDDs: When applied to an RDD, filter takes a function as an argument. This function is applied to each element in the RDD, and only those elements for which the function returns True are included in the resulting RDD.

DataFrames: In the context of DataFrames, filter (or where, which is an alias for filter) is used to apply conditions on columns. It returns a new DataFrame with rows that match the given condition.

Syntax :-

filtered_rdd = rdd.filter(lambda x: condition)

from pyspark import SparkContext

numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] rdd = spark.sparkContext.parallelize(numbers)

Filter out even numbers
filtered_rdd = rdd.filter(lambda x: x % 2 == 0)

print(filtered_rdd.collect()) # Output: [2, 4, 6, 8, 10]

sc.stop()

Common Use Cases

Data Cleaning: Filtering out invalid or missing data (e.g., filter(lambda x: x is not None)).

Data Analysis: Selecting specific subsets of data for analysis (e.g., filtering out customers who have made purchases above a certain threshold).

Optimization: Reducing the size of data by filtering irrelevant information, which can lead to more efficient processing.

Distinct Operation in PySpark
The distinct transformation in PySpark is used to remove duplicate elements from an RDD (Resilient Distributed Dataset) or rows from a DataFrame, resulting in a dataset that contains only unique elements or rows. This operation is particularly useful when you need to ensure that your data contains no duplicates, which is a common requirement in data processing and analysis tasks.

How distinct Works

RDDs: When applied to an RDD, distinct will remove all duplicate elements, returning a new RDD containing only unique elements.

Syntax:-

distinct_rdd = rdd.distinct()

Example -01

numbers = [1, 2, 3, 4, 5, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2] rdd = sc.parallelize(numbers)

Remove duplicates
distinct_rdd = rdd.distinct()

print(distinct_rdd.collect()) # Output: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [None]:
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = spark.sparkContext.parallelize(numbers)

filtered_rdd = rdd.filter(lambda x: x % 2 == 0)

print(filtered_rdd.collect()) # Output: [2, 4, 6, 8, 10]



[2, 4, 6, 8, 10]


In [None]:

numbers1 = [1, 2, 3, 4, 5,1,2]
rdd1 = spark.sparkContext.parallelize(numbers1)

distinct_rdd1 = rdd1.distinct()
print(distinct_rdd1.collect())

[2, 4, 1, 3, 5]


Summary ❎

-> create RDD's
-> map
-> FlatMap
-> sortbykey
-> sortby
-> reduceBykey
-> groupbykey
-> mapValues(list)
-> aggregratebykey
-> collect
-> take
-> distinct
-> filter


# USE CASE - 01

Website Log Data Analysis

Objective:

Identify the top 2 users with the most unique page visits.


logs = [
    ("U001", "/home", "2024-08-25 10:00:00"),
    ("U002", "/about", "2024-08-25 10:05:00"),
    ("U001", "/products", "2024-08-25 10:10:00"),
    ("U002", "/home", "2024-08-25 10:15:00"),
    ("U003", "/products", "2024-08-25 10:20:00"),
    ("U001", "/home", "2024-08-25 10:25:00")
]


Expected Output:-

[('U001', ['/home', '/products']), ('U002', ['/about', '/home'])]

Operations to Perfrom:-

✅Map

✅Distinct

✅GroupBykey and mapvalues

✅SortBy

✅take


user_pages_rdd = rdd.map(lambda x: (x[0], x[1]))

distinct_user_pages = user_pages_rdd.distinct()

grouped_pages_by_user = distinct_user_pages.groupByKey().mapValues(list)

sorted_users_by_activity = grouped_pages_by_user.sortBy(lambda x: len(x[1]), ascending=False)


top_active_users = sorted_users_by_activity.take(2)



In [None]:
logs = [ ("U001", "/home", "2024-08-25 10:00:00"), ("U002", "/about", "2024-08-25 10:05:00"), ("U001", "/products", "2024-08-25 10:10:00"), ("U002", "/home", "2024-08-25 10:15:00"), ("U003", "/products", "2024-08-25 10:20:00"), ("U001", "/home", "2024-08-25 10:25:00") ]

rdd= spark.sparkContext.parallelize(logs)
rdd.collect()

users_pages_rdd = rdd.map(lambda x: (x[0],x[1]))

# users_pages_rdd.collect()

distinct_rdd = users_pages_rdd.distinct()
distinct_rdd.collect()
grouped_rdd = distinct_rdd.groupByKey().mapValues(list)
grouped_rdd.collect()
sorted_rdd = grouped_rdd.sortBy(lambda x: len(x[1]),ascending=False)
#sorted_rdd.collect()
sorted_rdd.take(2)



[('U001', ['/home', '/products']), ('U002', ['/about', '/home'])]

# USE CASE - 02

Objective:

Identify machines with more than one abnormal sensor reading (e.g., reading > 90) and sort them by the number of such readings.


sensor_data = [
    ("M001", 85, "2024-08-25 10:00:00"),
    ("M002", 95, "2024-08-25 10:05:00"),
    ("M001", 75, "2024-08-25 10:10:00"),
    ("M002", 105, "2024-08-25 10:15:00"),
    ("M003", 65, "2024-08-25 10:20:00"),
    ("M001", 95, "2024-08-25 10:25:00"),
    ("M002", 100, "2024-08-25 10:30:00")
]


Expected Output:

[('M002', 3), ('M001', 1)]


Operations to Perfrom:-

✅Filter

✅map

✅ReduceBykey

✅SortBy


abnormal_readings = rdd.filter(lambda x: x[1] > 90)

machine_abnormal_counts = abnormal_readings.map(lambda x: (x[0], 1))

total_abnormal_counts = machine_abnormal_counts.reduceByKey(lambda x, y: x + y)

sorted_abnormal_counts = total_abnormal_counts.sortBy(lambda x: x[1], ascending=False)



In [None]:
sensor_data = [ ("M001", 85, "2024-08-25 10:00:00"), ("M002", 95, "2024-08-25 10:05:00"), ("M001", 75, "2024-08-25 10:10:00"), ("M002", 105, "2024-08-25 10:15:00"), ("M003", 65, "2024-08-25 10:20:00"), ("M001", 95, "2024-08-25 10:25:00"), ("M002", 100, "2024-08-25 10:30:00") ]
rdd = spark.sparkContext.parallelize(sensor_data)
rdd.collect()
rdd2= rdd.filter(lambda x: x[1] > 90)
rdd3 = rdd2.map(lambda x : (x[0],1))
rdd4 = rdd3.reduceByKey(lambda x,y: x+y)
rdd5 = rdd4.sortBy(lambda x: x[1],ascending=True)
rdd5.collect()


[('M001', 1), ('M002', 3)]

# USE CASE - 03

E-commerce Transactions

Objective:
Calculate the total amount spent by each customer and sort the customers by their IDs.


transactions = [
    ("C001", "Item1", 100),
    ("C002", "Item2", 150),
    ("C001", "Item3", 200),
    ("C002", "Item1", 300),
    ("C003", "Item2", 250),
    ("C003", "Item3", 100)
]

Expected Output

[('C001', 300), ('C002', 450), ('C003', 350)]

Operation to perform ->

✅ Map

✅ ReduceByKey

✅ SortByKey

mapped_rdd = rdd.map(lambda x: (x[0], x[2]))

total_spent_per_customer = mapped_rdd.reduceByKey(lambda x, y: x + y)

sorted_customers = total_spent_per_customer.sortByKey()



In [None]:
transactions = [ ("C001", "Item1", 100), ("C002", "Item2", 150), ("C001", "Item3", 200), ("C002", "Item1", 300), ("C003", "Item2", 250), ("C003", "Item3", 100) ]
rdd = spark.sparkContext.parallelize(transactions)
rdd2 = rdd.map(lambda x:(x[0],x[2]))
rdd3 = rdd2.reduceByKey (lambda x,y: x+y)
rdd4 = rdd3.sortByKey()
rdd4.collect()

[('C001', 300), ('C002', 450), ('C003', 350)]

# Assignment

In [None]:

# In this assignment, students will perform a word count on the provided dataset containing airline customer feedback. The goal is to count the number of occurrences of each word, ensuring that the word count is case-insensitive (i.e., "Flight" and "flight" should be treated as the same word). After counting the words, students should sort the results by the word counts in d

import os
import sys
import findspark
import pyspark
from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark import SparkContext

spark = SparkSession.builder.appName("WordCount").getOrCreate()
sc = spark.sparkContext

# Sample airline feedback data
feedback_data = [
    "The flight was delayed and the customer service was terrible.",
    "The food was good but the seats were uncomfortable.",
    "The flight was smooth and the crew was friendly.",
    "I had a great experience on this flight.",
    "The entertainment system was not working properly."
]

# Create an RDD from the feedback data
rdd = sc.parallelize(feedback_data)
stopwords = ["the", "and", "was", "but", "on", "this", "had", "a", "in", "it", "i"]

# Broadcast the stopwords list to all nodes
stopwords_broadcast = sc.broadcast(stopwords)
# Split the sentences into words and convert to lowercase
words = rdd.flatMap(lambda line: line.lower().split()) \
           .filter(lambda word: word not in stopwords_broadcast.value)

# Map each word to a key-value pair (word, 1)
word_pairs = words.map(lambda word: (word, 1))

# Reduce by key to count word occurrences
word_counts = word_pairs.reduceByKey(lambda a, b: a + b)

# Sort the word counts in descending order
sorted_word_counts = word_counts.sortBy(lambda x: x[1], ascending=False)

# Collect and print the results
for word, count in sorted_word_counts.collect():
    print(f"{word}: {count}")


flight: 2
delayed: 1
customer: 1
service: 1
terrible.: 1
good: 1
seats: 1
uncomfortable.: 1
smooth: 1
crew: 1
flight.: 1
working: 1
properly.: 1
food: 1
were: 1
friendly.: 1
great: 1
experience: 1
entertainment: 1
system: 1
not: 1


Assignment Description:

In this assignment, students will perform a word count on the provided dataset containing airline customer feedback. The goal is to count the number of occurrences of each word, ensuring that the word count is case-insensitive (i.e., "Flight" and "flight" should be treated as the same word). After counting the words, students should sort the results by the word counts in descending order.

Dataset : will be provided by the trainer

Assignment Tasks:

Mount the Dataset:

Load the dataset (airline_feedback_short.txt) into a PySpark RDD

Convert all the text to lowercase to ensure the word count is case-insensitive.

Split the text into individual words.

Task :- 1

Word Count:

Count how many times each word appears in the dataset. Sort the Word Count:

Task :- 2

Sort the words by their count in descending order. Capitalize Words:

Task :- 3

Apply capitalization to each word in the final output (i.e., make the first letter of each word uppercase). Save and Display Results:

Task :- 4

Save the final sorted word counts to a file. Display the top 20 most frequent words with their counts