In [None]:
# 1. Install Java
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

# 2. Download Spark 3.5.0 with Hadoop 3
!wget -q https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz

# 3. Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

In [None]:
# 4. Install findspark
!pip install -q findspark
import findspark
findspark.init()

In [2]:
# 5. Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("RDD Example").getOrCreate()

In [None]:
spark

In [None]:
#create a dataframe
df = spark.read.csv("/content/sample_data/sample_census_data.csv",header=True,inferSchema=True)

In [None]:
#convert dataframe to rdd
census_rdd=df.rdd

In [None]:
#show the rdd's contents using collect()
census_rdd.collect()

[Row(State='California', Population=39538223, Median_Age=36.5, Households=13000000, Average_Income=78000, Year=2020),
 Row(State='Texas', Population=29145505, Median_Age=34.8, Households=9800000, Average_Income=67000, Year=2020),
 Row(State='Florida', Population=21538187, Median_Age=42.0, Households=8000000, Average_Income=61000, Year=2020),
 Row(State='New York', Population=20201249, Median_Age=39.0, Households=7500000, Average_Income=75000, Year=2020),
 Row(State='Illinois', Population=12812508, Median_Age=38.5, Households=4900000, Average_Income=68000, Year=2020)]

DataFrames   vs RDDs
| Feature | RDD | DataFrame |
|----------|-----|------------|
| **Abstraction Level** | Low-level; requires manual operations | High-level; easy to use with SQL-like syntax |
| **Optimization** | No automatic optimization | Optimized by Catalyst engine |
| **Schema Information** | Does not store schema | Has schema (structured data) |
| **Use Case** | Complex, custom transformations | Structured data analysis and SQL queries |


#SPARK SQL
Module in Apache Spark for processing structured and semi-structured data using SQL Syntax.

**Creating Temporary Tables**-Register data frame as temporary view


In [None]:
# Sample df
data = [("Alice","HR",30),("Bob","IT",40),("Cathy","HR",27)]
columns=["Name","Department","Age"]
df1=spark.createDataFrame(data,schema=columns)

# Register df1 as a temporary view
df1.createOrReplaceTempView("People")

In [None]:
#query using SQL
result = spark.sql("SELECT Name,Age FROM People WHERE Age>30")
result.show()

+----+---+
|Name|Age|
+----+---+
| Bob| 40|
+----+---+



**READING FILES as temporary views**

In [10]:
df2=spark.read.csv("/content/sample_data/employee_table.csv",header=True,inferSchema=True)

In [11]:
df2.show()

+-------+----------+---+------+
|   name|department|age|salary|
+-------+----------+---+------+
|  Alice|        HR| 28| 48000|
|    Bob|   Finance| 34| 56000|
|Charlie|        IT| 30| 72000|
|  Diana| Marketing| 27| 50000|
|  Ethan|        IT| 40| 85000|
|  Fiona|        HR| 29| 47000|
+-------+----------+---+------+



In [None]:
#register to a temporary view
df2.createOrReplaceTempView("employees")

In [None]:
#Combining SQL and Dataframe Operation
query_result=spark.sql("SELECT name,salary FROM employees WHERE salary >50000")

In [None]:
#Dataframe transformation
high_earners=query_result.withColumn("bonus",query_result.salary * 0.1)

In [None]:
high_earners.show()

+-------+------+------+
|   name|salary| bonus|
+-------+------+------+
|    Bob| 56000|5600.0|
|Charlie| 72000|7200.0|
|  Ethan| 85000|8500.0|
+-------+------+------+



#Pyspark SQL Aggregations

In [None]:
spark.sql(""" SELECT department,sum(salary) AS Total_Salary,Avg(salary) AS Average FROM employees Group BY department """).show()

+----------+------------+-------+
|department|Total_Salary|Average|
+----------+------------+-------+
|        HR|       95000|47500.0|
|   Finance|       56000|56000.0|
| Marketing|       50000|50000.0|
|        IT|      157000|78500.0|
+----------+------------+-------+



*Filtered views*

In [None]:
#filter salaries over 50000
filtered_df2=df2.filter(df2.salary>50000)

#register filtered dataframe as view
filtered_df2.createOrReplaceTempView("filtered_employees")

In [None]:
#aggregate using SQL
spark.sql(""" SELECT department,count(*) AS Employee_Count FROM filtered_employees Group By department """).show()

+----------+--------------+
|department|Employee_Count|
+----------+--------------+
|   Finance|             1|
|        IT|             2|
+----------+--------------+



#Type Casting

In [3]:
data1 = [("HR","3000"),("IT","4000"),("Finance","3500")]
columns = ["Department", "Salary"]
df3= spark.createDataFrame(data1, schema=columns)

In [4]:
df3.show()

+----------+------+
|Department|Salary|
+----------+------+
|        HR|  3000|
|        IT|  4000|
|   Finance|  3500|
+----------+------+



In [None]:
#Convert Salary column to integer
df3 = df3.withColumn("Salary", df3["Salary"].cast("int"))

In [None]:
#perform aggregation
df3.groupBy("Department").sum("Salary").show()

+----------+-----------+
|Department|sum(Salary)|
+----------+-----------+
|        HR|       3000|
|   Finance|       3500|
|        IT|       4000|
+----------+-----------+



**RDDs for aggregations**

In [None]:
rdd=df3.rdd.map(lambda row:(row["Department"],row["Salary"]))

In [None]:
rdd_agg=rdd.reduceByKey(lambda x,y:x+y)

In [None]:
print(rdd_agg.collect())

[('HR', 3000), ('IT', 4000), ('Finance', 3500)]


**Execution Plans**

In [None]:
df3.filter(df3.Salary>3500).select("Department").explain()

== Physical Plan ==
*(1) Project [Department#119]
+- *(1) Filter (isnotnull(Salary#120) AND (cast(Salary#120 as int) > 3500))
   +- *(1) Scan ExistingRDD[Department#119,Salary#120]




**Caching and Persisting DataFrames**


*   Caching: Stores data in memory for faster access of smaller datasets.
*   Persisting:Stores data in different storage level for large datasets.



In [5]:
df3.cache()

DataFrame[Department: string, Salary: string]

In [6]:
df3.filter(df3["Salary"]>3000).show()

+----------+------+
|Department|Salary|
+----------+------+
|        IT|  4000|
|   Finance|  3500|
+----------+------+



In [7]:
df3.groupBy(["Department"]).count().show()

+----------+-----+
|Department|count|
+----------+-----+
|        HR|    1|
|   Finance|    1|
|        IT|    1|
+----------+-----+



**Persisting the DataFrame**

In [8]:
from pyspark import StorageLevel

In [12]:
#persist a dataframe to storage level
df2.persist(StorageLevel.MEMORY_AND_DISK)

DataFrame[name: string, department: string, age: int, salary: int]

In [13]:
#perform transformations
result1= df2.groupBy("department").agg({"Salary":"sum"})

In [14]:
result1.show()

+----------+-----------+
|department|sum(Salary)|
+----------+-----------+
|        HR|      95000|
|   Finance|      56000|
| Marketing|      50000|
|        IT|     157000|
+----------+-----------+



In [15]:
#unpersist after use
df2.unpersist()

DataFrame[name: string, department: string, age: int, salary: int]

**Optimizing PySpark**


*   Small subsections: eg-pick tools like map() over groupBy().
*   Broadcast Joins: It uses all compute even on smaller datasets, also avoid shuffles.


*   Avoid repeated actions: Costs time and compute if its same data.i.e Avoid using count(),show()



