In [13]:
from pyspark import SparkContext

In [2]:
# Problem 1: Word Count with Filtering (Key-Value RDD)
# Objective: Count the number of times each word appears, excluding stopwords.

# Steps:

# Load a list of sentences into an RDD.
# Split each sentence into words.
# Remove common stopwords like "is", "the", "a", "an", etc.
# Create key-value pairs (word, 1).
# Use reduceByKey to get the final word counts.

# Transformations: flatMap, filter, map, reduceByKey
# Actions: collect, take

sc = SparkContext("local", "WordCountWithFilter")

sentences = [
    "This is a sample sentence",
    "The spark RDD is powerful",
    "A word count example using RDD"
]

stopwords = {"is", "the", "a", "an", "this", "using"}

rdd = sc.parallelize(sentences)

word_counts = (
    rdd.flatMap(lambda line: line.lower().split())
       .filter(lambda word: word not in stopwords)
       .map(lambda word: (word, 1))
       .reduceByKey(lambda a, b: a + b)
)

print(word_counts.collect())


[('sample', 1), ('sentence', 1), ('spark', 1), ('rdd', 2), ('powerful', 1), ('word', 1), ('count', 1), ('example', 1)]


In [3]:
# Problem 2: Average Score per Student (Key-Value RDD)
# Objective: Calculate average score of each student from a list of (name, score).

# Input:

# data = [("Alice", 80), ("Bob", 90), ("Alice", 70), ("Bob", 85), ("Charlie", 60)]


# Steps:

# Map to key-value: (name, (score, 1))
# Use reduceByKey to sum scores and counts: (name, (total_score, count))
# Map to calculate average.

# Transformations: map, reduceByKey
# Actions: collect, takeOrdered

data = [("Alice", 80), ("Bob", 90), ("Alice", 70), ("Bob", 85), ("Charlie", 60)]
rdd = sc.parallelize(data)

average_scores = (
    rdd.map(lambda x: (x[0], (x[1], 1)))
       .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))
       .map(lambda x: (x[0], round(x[1][0] / x[1][1], 2)))
)

print(average_scores.collect())


[('Alice', 75.0), ('Bob', 87.5), ('Charlie', 60.0)]


In [14]:
# Problem 3: Frequency of Each Number in List (List RDD)
# Objective: From a list of numbers, count frequency of each number and sort in descending order of frequency.

# Input:

# numbers = [5, 3, 4, 5, 2, 3, 5, 3, 4]


# Steps:

# Convert to (number, 1)
# Use reduceByKey to count occurrences
# Swap to (count, number) and sort descending
# Return top 3 frequent numbers

# Transformations: map, reduceByKey, map, sortByKey
# Actions: take, collect

numbers = [5, 3, 4, 5, 2, 3, 5, 3, 4]
rdd = sc.parallelize(numbers)

top_frequencies = (
    rdd.map(lambda x: (x, 1))
       .reduceByKey(lambda a, b: a + b)
       .map(lambda x: (x[1], x[0]))
       .sortByKey(ascending=False)
       .map(lambda x: (x[1], x[0]))
       .take(3)
)

print(top_frequencies)


[(5, 3), (3, 3), (4, 2)]


In [7]:
from google.colab import files
uploaded = files.upload()




Saving titanic.parquet to titanic.parquet


In [8]:
import shutil

shutil.move("titanic.parquet", "/content/titanic.parquet")


'/content/titanic.parquet'

In [9]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Titanic Parquet Load") \
    .getOrCreate()


df = spark.read.parquet("/content/titanic.parquet")

df.show()
df.printSchema()


+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| NULL|       S|
|          6|       0|     3|    Moran, Mr. James|  male|NULL|    0|    0|      

In [10]:
# 1. Survival Rate by Passenger Class and Gender
# Task:
# Calculate the survival rate grouped by Pclass and Sex. Sort the result by Pclass and descending survival rate.

# Expected Output Columns:

# Pclass
# Sex
# SurvivalRate (rounded to 2 decimal places)

# SQL:

# SELECT
#     Pclass,
#     Sex,
#     ROUND(AVG(Survived), 2) AS SurvivalRate
# FROM passengers
# GROUP BY Pclass, Sex
# ORDER BY Pclass, SurvivalRate DESC

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SurvivalRateSQL").getOrCreate()
df = spark.read.parquet("/content/titanic.parquet")
df.createOrReplaceTempView("passengers")
result_df = spark.sql("""
    SELECT
        Pclass,
        Sex,
        ROUND(AVG(Survived), 2) AS SurvivalRate
    FROM passengers
    GROUP BY Pclass, Sex
    ORDER BY Pclass ASC, SurvivalRate DESC
""")
result_df.show()

+------+------+------------+
|Pclass|   Sex|SurvivalRate|
+------+------+------------+
|     1|female|        0.97|
|     1|  male|        0.37|
|     2|female|        0.92|
|     2|  male|        0.16|
|     3|female|         0.5|
|     3|  male|        0.14|
+------+------+------------+



In [15]:
# Problem 2: Average Fare and Age by Embarkation Port
# Task:
# Find the average Fare and Age of passengers grouped by Embarked. Exclude rows where Fare or Age is NULL. Order by average fare descending.

# Expected Output Columns:

# Embarked
# AvgFare
# AvgAge

# SQL:

# SELECT
#     Embarked,
#     ROUND(AVG(Fare), 2) AS AvgFare,
#     ROUND(AVG(Age), 2) AS AvgAge
# FROM passengers
# WHERE Fare IS NOT NULL AND Age IS NOT NULL
# GROUP BY Embarked
# ORDER BY AvgFare DESC


from pyspark.sql import SparkSession


spark = SparkSession.builder.appName("AverageFareAgeByPort").getOrCreate()


df = spark.read.parquet("/content/titanic.parquet")


df.createOrReplaceTempView("passengers")


result_df = spark.sql("""
    SELECT
        Embarked,
        ROUND(AVG(Fare), 2) AS AvgFare,
        ROUND(AVG(Age), 2) AS AvgAge
    FROM passengers
    WHERE Fare IS NOT NULL AND Age IS NOT NULL
    GROUP BY Embarked
    ORDER BY AvgFare DESC
""")


result_df.show()


+--------+-------+------+
|Embarked|AvgFare|AvgAge|
+--------+-------+------+
|    NULL|   80.0|  50.0|
|       C|   68.3| 30.81|
|       S|  27.48| 29.45|
|       Q|  18.27| 28.09|
+--------+-------+------+



In [16]:
# Problem 3: Top 5 Paying Passengers Who Survived
# Task:
# Find the top 5 passengers (by Fare) who survived. Display their Name, Pclass, Sex, Fare, and Cabin.

# Expected Output Columns:

# Name
# Pclass
# Sex
# Fare
# Cabin

# SQL:

# SELECT
#     Name, Pclass, Sex, Fare, Cabin
# FROM passengers
# WHERE Survived = 1
# ORDER BY Fare DESC
# LIMIT 5

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Top5PayingSurvivors").getOrCreate()
df = spark.read.parquet("/content/titanic.parquet")
df.createOrReplaceTempView("passengers")
result_df = spark.sql("""
    SELECT
        Name, Pclass, Sex, Fare, Cabin
    FROM passengers
    WHERE Survived = 1
    ORDER BY Fare DESC
    LIMIT 5
""")
result_df.show(truncate=False)

+----------------------------------+------+------+--------+-----------+
|Name                              |Pclass|Sex   |Fare    |Cabin      |
+----------------------------------+------+------+--------+-----------+
|Ward, Miss. Anna                  |1     |female|512.3292|NULL       |
|Cardeza, Mr. Thomas Drake Martinez|1     |male  |512.3292|B51 B53 B55|
|Lesurer, Mr. Gustave J            |1     |male  |512.3292|B101       |
|Fortune, Miss. Mabel Helen        |1     |female|263.0   |C23 C25 C27|
|Fortune, Miss. Alice Elizabeth    |1     |female|263.0   |C23 C25 C27|
+----------------------------------+------+------+--------+-----------+

