In [68]:
from pyspark.sql import SparkSession
from sklearn import datasets
import time

#1.2 Initialize a SparkSession
spark = SparkSession.builder \
    .appName("SimModeExample") \
    .master("local[*]") \
    .getOrCreate()
sc = spark.sparkContext

In [69]:
#Printing Configs
for k, v in spark.sparkContext.getConf().getAll():
    print(f"{k} = {v}")

spark.driver.extraJavaOptions = -Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false
spark.app.id = local-1755073533927
spark.executor.id = driver
spark.sql.warehouse.dir = file:/content/spark-ware

In [70]:
sample_text = """Apache Spark is an open-source distributed general-purpose cluster-computing framework.
It provides an interface for programming entire clusters with implicit data parallelism and fault-tolerance.
Spark is designed to cover a wide range of workloads such as batch applications, iterative algorithms, interactive queries, and streaming."""

with open("sample_text.txt", "w") as f:
    f.write(sample_text)

In [76]:
#1.4 Create RDDs from lists or text files
text_rdd = sc.textFile("sample_text.txt")

In [77]:
#1.5	map(), filter(), flatMap() transformations
print(text_rdd.flatMap(lambda line: line.split(" ")).collect())
print(text_rdd.flatMap(lambda line: line.split(" ")).filter(lambda x: "a" in x).collect())

['Apache', 'Spark', 'is', 'an', 'open-source', 'distributed', 'general-purpose', 'cluster-computing', 'framework.', 'It', 'provides', 'an', 'interface', 'for', 'programming', 'entire', 'clusters', 'with', 'implicit', 'data', 'parallelism', 'and', 'fault-tolerance.', 'Spark', 'is', 'designed', 'to', 'cover', 'a', 'wide', 'range', 'of', 'workloads', 'such', 'as', 'batch', 'applications,', 'iterative', 'algorithms,', 'interactive', 'queries,', 'and', 'streaming.']
['Apache', 'Spark', 'an', 'general-purpose', 'framework.', 'an', 'interface', 'programming', 'data', 'parallelism', 'and', 'fault-tolerance.', 'Spark', 'a', 'range', 'workloads', 'as', 'batch', 'applications,', 'iterative', 'algorithms,', 'interactive', 'and', 'streaming.']


In [78]:
#1.6	collect(), count(), take() actions
print(text_rdd.flatMap(lambda line: line.split(" ")).filter(lambda x: "a" in x).count())
print(text_rdd.flatMap(lambda line: line.split(" ")).filter(lambda x: "a" in x).collect())
print(text_rdd.flatMap(lambda line: line.split(" ")).filter(lambda x: "a" in x).take(3))

24
['Apache', 'Spark', 'an', 'general-purpose', 'framework.', 'an', 'interface', 'programming', 'data', 'parallelism', 'and', 'fault-tolerance.', 'Spark', 'a', 'range', 'workloads', 'as', 'batch', 'applications,', 'iterative', 'algorithms,', 'interactive', 'and', 'streaming.']
['Apache', 'Spark', 'an']


In [79]:
#1.7	Create DataFrame from Python dictionary/list
data_frame_data = [
    {"name": "Alice", "age": 25},
    {"name": "Bob", "age": 30},
    {"name": "Charlie", "surname:":"theron","age": 35}
]

df = spark.createDataFrame(data_frame_data)
df.show()

+---+-------+--------+
|age|   name|surname:|
+---+-------+--------+
| 25|  Alice|    NULL|
| 30|    Bob|    NULL|
| 35|Charlie|  theron|
+---+-------+--------+



In [80]:
#1.8	Create DataFrame from CSV/JSON/Parquet
csv_data = """id,name,age,salary
1,Alice,30,100000
2,Bob,,85000
3,Charlie,25,70000
4,David,35,
5,Eve,29,90000
"""

with open("sample_employees.csv", "w") as f:
    f.write(csv_data)

df = spark.read.csv("sample_employees.csv", header=True, inferSchema=True)
df.show()

+---+-------+----+------+
| id|   name| age|salary|
+---+-------+----+------+
|  1|  Alice|  30|100000|
|  2|    Bob|NULL| 85000|
|  3|Charlie|  25| 70000|
|  4|  David|  35|  NULL|
|  5|    Eve|  29| 90000|
+---+-------+----+------+



In [81]:
#1.9	Show schema and data (printSchema(), show())
print(df.printSchema())

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary: integer (nullable = true)

None


In [82]:
#1.10	Select specific columns
df.select("name","age").show()

+-------+----+
|   name| age|
+-------+----+
|  Alice|  30|
|    Bob|NULL|
|Charlie|  25|
|  David|  35|
|    Eve|  29|
+-------+----+



In [83]:
#1.11	Filter rows using conditions
test_lambda_func = lambda x: x-2
df.select("name","age").filter(test_lambda_func(df.age)>25).show()

+-----+---+
| name|age|
+-----+---+
|Alice| 30|
|David| 35|
|  Eve| 29|
+-----+---+



In [84]:
#1.12	Rename columns
df = df.withColumnRenamed("name", "full_name")
df.printSchema()

new_column_names = ["user_id", "full_name","age", "salary net"]
df_renamed = df.toDF(*new_column_names)
df_renamed.printSchema()

root
 |-- id: integer (nullable = true)
 |-- full_name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary: integer (nullable = true)

root
 |-- user_id: integer (nullable = true)
 |-- full_name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary net: integer (nullable = true)



In [85]:
#1.13	Add new columns (withColumn)
df = df.withColumn("age2", df.age+2)
df.printSchema()
df.show()

root
 |-- id: integer (nullable = true)
 |-- full_name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary: integer (nullable = true)
 |-- age2: integer (nullable = true)

+---+---------+----+------+----+
| id|full_name| age|salary|age2|
+---+---------+----+------+----+
|  1|    Alice|  30|100000|  32|
|  2|      Bob|NULL| 85000|NULL|
|  3|  Charlie|  25| 70000|  27|
|  4|    David|  35|  NULL|  37|
|  5|      Eve|  29| 90000|  31|
+---+---------+----+------+----+



In [86]:
#1.14	Drop columns
df = df.drop("age2")
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- full_name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary: integer (nullable = true)



In [90]:
#1.15	Register DataFrame as a SQL temporary view
#1.16	Run simple SELECT queries with spark.sql()

df.createOrReplaceTempView("people")
result = spark.sql("SELECT full_name, age FROM people WHERE age > 26")
result.show()

# createOrReplaceTempView: Visible only within the current Spark session
# createGlobalTempView   : Visible across multiple Spark sessions and accessed with the prefix global_temp.

+---------+---+
|full_name|age|
+---------+---+
|    Alice| 30|
|    David| 35|
|      Eve| 29|
+---------+---+



In [123]:
from pyspark.sql.functions import concat, upper, lower, trim, col, lit
#New Sample DF

data = [(" Alice ", "Smith","01-01-1996"), (" Bob ", "Brown","01-01-1996"), (" Cathy ", "Johnson","01-01-1996")]
columns = ["first_name", "last_name", "dob"]

df = spark.createDataFrame(data, columns)
df.show()

#2.1	String functions (concat, upper, lower, trim)
df = df.withColumn("first_name", trim(col("first_name")))
df = df.withColumn("last_name", trim(col("last_name")))
df = df.withColumn("Fullname", concat(col("first_name"),lit(" "),"last_name"))
df.show()

+----------+---------+----------+
|first_name|last_name|       dob|
+----------+---------+----------+
|    Alice |    Smith|01-01-1996|
|      Bob |    Brown|01-01-1996|
|    Cathy |  Johnson|01-01-1996|
+----------+---------+----------+

+----------+---------+----------+-------------+
|first_name|last_name|       dob|     Fullname|
+----------+---------+----------+-------------+
|     Alice|    Smith|01-01-1996|  Alice Smith|
|       Bob|    Brown|01-01-1996|    Bob Brown|
|     Cathy|  Johnson|01-01-1996|Cathy Johnson|
+----------+---------+----------+-------------+



In [131]:
from pyspark.sql.functions import to_date, col, datediff, current_date, floor
#2.2	Date/time functions (current_date, datediff, date_format)
df = df.withColumn("dob", to_date(col("dob"), format= "dd-mm-yyyy"))
df = df.withColumn("age", floor(datediff(current_date(), col("dob"))/365))
df.show()

+----------+---------+----------+-------------+---+
|first_name|last_name|       dob|     Fullname|age|
+----------+---------+----------+-------------+---+
|     Alice|    Smith|1996-01-01|  Alice Smith| 29|
|       Bob|    Brown|1996-01-01|    Bob Brown| 29|
|     Cathy|  Johnson|1996-01-01|Cathy Johnson| 29|
+----------+---------+----------+-------------+---+



In [136]:
from pyspark.sql.functions import to_date, col, when
#2.3	Conditional logic with when() and otherwise()
df.withColumn("age2",when(col("age") > 25, "y").otherwise("n")).show()

+----------+---------+----------+-------------+---+----+
|first_name|last_name|       dob|     Fullname|age|age2|
+----------+---------+----------+-------------+---+----+
|     Alice|    Smith|1996-01-01|  Alice Smith| 29|   y|
|       Bob|    Brown|1996-01-01|    Bob Brown| 29|   y|
|     Cathy|  Johnson|1996-01-01|Cathy Johnson| 29|   y|
+----------+---------+----------+-------------+---+----+



In [137]:
#2.4	groupBy() with aggregation functions (count, avg, sum, max, min)
df.groupBy("age").agg({"age":"count"}).show()

+---+----------+
|age|count(age)|
+---+----------+
| 29|         3|
+---+----------+



In [139]:
#2.5	Multiple aggregations in one statement
df.groupBy("age").agg({"age":"count","age":"avg"}).show()

+---+--------+
|age|avg(age)|
+---+--------+
| 29|    29.0|
+---+--------+



In [141]:
#2.6	orderBy() ascending/descending
df.orderBy(col("first_name").desc()).show()

+----------+---------+----------+-------------+---+
|first_name|last_name|       dob|     Fullname|age|
+----------+---------+----------+-------------+---+
|     Cathy|  Johnson|1996-01-01|Cathy Johnson| 29|
|       Bob|    Brown|1996-01-01|    Bob Brown| 29|
|     Alice|    Smith|1996-01-01|  Alice Smith| 29|
+----------+---------+----------+-------------+---+



In [143]:
#2.7	Multi-column ordering
df.orderBy([col("last_name").desc(),col("first_name").desc()]).show()

+----------+---------+----------+-------------+---+
|first_name|last_name|       dob|     Fullname|age|
+----------+---------+----------+-------------+---+
|     Alice|    Smith|1996-01-01|  Alice Smith| 29|
|     Cathy|  Johnson|1996-01-01|Cathy Johnson| 29|
|       Bob|    Brown|1996-01-01|    Bob Brown| 29|
+----------+---------+----------+-------------+---+



In [151]:
#2.8	Inner, left, right, full joins
data1 = [
    (1, "Alice", "HR"),
    (2, "Bob", "IT"),
    (3, "Cathy", "Finance"),
    (4, "David", "IT")
]
columns1 = ["emp_id", "name", "dept"]

df1 = spark.createDataFrame(data1, columns1)
df1.show()

data2 = [
    (1, 5000),
    (2, 6000),
    (4, 7000),
    (5, 8000)
]
columns2 = ["emp_id", "salary"]

df2 = spark.createDataFrame(data2, columns2)
df2.show()

print("INNER JOIN\n")
inner_join_df = df1.join(df2, on = "emp_id", how = "inner")
inner_join_df.show()

print("LEFT JOIN\n")
df1.join(df2, on = "emp_id", how = "left").show()

print("RIGHT JOIN\n")
df1.join(df2, on = "emp_id", how = "right").show()

+------+-----+-------+
|emp_id| name|   dept|
+------+-----+-------+
|     1|Alice|     HR|
|     2|  Bob|     IT|
|     3|Cathy|Finance|
|     4|David|     IT|
+------+-----+-------+

+------+------+
|emp_id|salary|
+------+------+
|     1|  5000|
|     2|  6000|
|     4|  7000|
|     5|  8000|
+------+------+

INNER JOIN

+------+-----+----+------+
|emp_id| name|dept|salary|
+------+-----+----+------+
|     1|Alice|  HR|  5000|
|     2|  Bob|  IT|  6000|
|     4|David|  IT|  7000|
+------+-----+----+------+

LEFT JOIN

+------+-----+-------+------+
|emp_id| name|   dept|salary|
+------+-----+-------+------+
|     1|Alice|     HR|  5000|
|     2|  Bob|     IT|  6000|
|     3|Cathy|Finance|  NULL|
|     4|David|     IT|  7000|
+------+-----+-------+------+

RIGHT JOIN

+------+-----+----+------+
|emp_id| name|dept|salary|
+------+-----+----+------+
|     1|Alice|  HR|  5000|
|     2|  Bob|  IT|  6000|
|     5| NULL|NULL|  8000|
|     4|David|  IT|  7000|
+------+-----+----+------+



In [None]:
# 2.9	Semi and anti joins


In [None]:


# Load text file into RDD
text_rdd = sc.textFile("sample_text.txt")

def top_words_no_cache():
    word_counts = (
        text_rdd
        .flatMap(lambda line: line.split())
        .map(lambda w: (w.lower().strip(".,!?"), 1))
        .reduceByKey(lambda a, b: a + b)
    )
    top_10 = word_counts.takeOrdered(10, key=lambda x: -x[1])
    return top_10

def top_words_with_cache():
    cached_rdd = (
        text_rdd
        .flatMap(lambda line: line.split())
        .map(lambda w: (w.lower().strip(".,!?"), 1))
        .cache()  # Cache the intermediate RDD
    )
    word_counts = cached_rdd.reduceByKey(lambda a, b: a + b)
    top_10 = word_counts.takeOrdered(10, key=lambda x: -x[1])
    return top_10

# Measure time without caching
start = time.time()
result_no_cache = top_words_no_cache()
time_no_cache = time.time() - start

# Measure time with caching
start = time.time()
result_with_cache = top_words_with_cache()
time_with_cache = time.time() - start

# Print results
print("\nTop 10 words without caching:")
for word, count in result_no_cache:
    print(f"{word}: {count}")

print(f"\nTime taken without caching: {time_no_cache:.4f} seconds")

print("\nTop 10 words with caching:")
for word, count in result_with_cache:
    print(f"{word}: {count}")

print(f"\nTime taken with caching: {time_with_cache:.4f} seconds")



Top 10 words without caching:
an: 2
and: 2
spark: 2
is: 2
apache: 1
open-source: 1
distributed: 1
cluster-computing: 1
framework: 1
it: 1

Time taken without caching: 1.1633 seconds

Top 10 words with caching:
an: 2
and: 2
spark: 2
is: 2
apache: 1
open-source: 1
distributed: 1
cluster-computing: 1
framework: 1
it: 1

Time taken with caching: 1.3649 seconds


In [None]:
#Map Reduce Word Frequency
text_rdd = spark.sparkContext.parallelize([
    "PySpark is great",
    "PySpark runs locally",
    "Word count is a classic example"
])

# Word count
word_counts = (
    text_rdd
    .flatMap(lambda line: line.split(" "))
    .map(lambda word: (word, 1))
    .reduceByKey(lambda a, b: a + b)
)

print("\n=== Word Count ===")
for word, count in word_counts.collect():
    print(f"{word}: {count}")


=== Word Count ===
PySpark: 2
runs: 1
Word: 1
is: 2
great: 1
locally: 1
count: 1
a: 1
classic: 1
example: 1


In [None]:
text_rdd = sc.textFile("sample_text.txt")
text_rdd.map(lambda word:(word,1)).reduceByKey(lambda a,b:a+b).collect()

[('It provides an interface for programming entire clusters with implicit data parallelism and fault-tolerance.',
  1),
 ('Apache Spark is an open-source distributed general-purpose cluster-computing framework.',
  1),
 ('Spark is designed to cover a wide range of workloads such as batch applications, iterative algorithms, interactive queries, and streaming.',
  1)]

In [None]:
text_rdd.flatMap(lambda sent: sent.split(" ")).map(lambda word: (word,1)).reduceByKey(lambda a,b:a+b).collect()

In [None]:
text_rdd.flatMap(lambda sent: sent.split(" ")).filter(lambda x: "a" in x).collect()

['Apache',
 'Spark',
 'an',
 'general-purpose',
 'framework.',
 'an',
 'interface',
 'programming',
 'data',
 'parallelism',
 'and',
 'fault-tolerance.',
 'Spark',
 'a',
 'range',
 'workloads',
 'as',
 'batch',
 'applications,',
 'iterative',
 'algorithms,',
 'interactive',
 'and',
 'streaming.']

In [None]:
df = spark.read.csv("sample_employees.csv", header=True, inferSchema=True)

print("Original Data:")
df.show()

Original Data:
+---+-------+----+------+
| id|   name| age|salary|
+---+-------+----+------+
|  1|  Alice|  30|100000|
|  2|    Bob|NULL| 85000|
|  3|Charlie|  25| 70000|
|  4|  David|  35|  NULL|
|  5|    Eve|  29| 90000|
+---+-------+----+------+



In [None]:
dataset_iris = datasets.load_iris()
spdf = spark.createDataFrame(dataset_iris.data, schema = dataset_iris.feature_names)

In [None]:
spdf.head(5)

[Row(sepal length (cm)=5.1, sepal width (cm)=3.5, petal length (cm)=1.4, petal width (cm)=0.2),
 Row(sepal length (cm)=4.9, sepal width (cm)=3.0, petal length (cm)=1.4, petal width (cm)=0.2),
 Row(sepal length (cm)=4.7, sepal width (cm)=3.2, petal length (cm)=1.3, petal width (cm)=0.2),
 Row(sepal length (cm)=4.6, sepal width (cm)=3.1, petal length (cm)=1.5, petal width (cm)=0.2),
 Row(sepal length (cm)=5.0, sepal width (cm)=3.6, petal length (cm)=1.4, petal width (cm)=0.2)]