### PySpark

"PySpark is the Python API for Apache Spark, used to process massive datasets (TB/PB) across distributed clusters. It’s faster than Pandas for big data, supports SQL/ML/streaming, and scales horizontally. We use it when data outgrows single-machine tools."
###  Bonus Keywords:
- RDD (Resilient Distributed Dataset): PySpark’s core data structure.
- DataFrame API: SQL-like operations (similar to Pandas).
- Spark SQL: Run SQL queries directly on big data.

### Rule of Thumb:
- Use Pandas for small data (<1GB, quick analysis).
- Use PySpark for big data (or when you need scaling).

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark DataFrames").getOrCreate()
print ("Spark session created: ", spark.version)


Understand What a DataFrame Is

In [0]:

#Create Spark Dataframe 

data=[('Gourav',50000),('Priya',67000),('Ravi',45000),('SDC',90000),('Ankur',40000)]

column=['Name','Salary']

spark_df=spark.createDataFrame(data, column)
display(spark_df)

In [0]:
#Pandas Dataframe
import pandas as pd 
data=[('Gourav',50000),('Priya',67000),('Ravi',45000),('SDC',90000),('Ankur',40000)]

column=['Name','Salary']

pd_df=pd.DataFrame(data, columns=column)
display(pd_df)

In [0]:
df.printSchema()

In [0]:
print("Counts in the DF",df.count())

### Real-Life Use Case
These DataFrames are used for:
- Ingesting data from files
- Transforming at scale
- Writing to Delta Tables or Parquet files

In [0]:

#Add Data Types 
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
schema = StructType([
    StructField("Name", StringType(), True),
    StructField("Salary", IntegerType(), True)
])

df_typed = spark.createDataFrame(data, schema=schema)
df_typed.show()
df_typed.printSchema()

**Q: What's the difference between SparkSession and SparkContext?**

✅ Answer:

- SparkSession is the unified entry point from Spark 2.0 onwards.
- It internally manages SparkContext, SQLContext, and HiveContext.

**SparkContext is the old entry point for RDD operations, while SparkSession (introduced in Spark 2.0) unifies access to DataFrames, SQL, and streaming. Today, we mostly use SparkSession, which internally manages SparkContext for backward compatibility."**

In [0]:
json_data = [
    {"Name": "David", "Salary": 70000},
    {"Name": "Eva", "Salary": 65000}
]

df_json=spark.createDataFrame(json_data)
display(df_json)

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataX_Bootcamp").getOrCreate()



In [0]:

#create Realistic Dataframe of Employee

data={
        ("Alice","IT",50000),
        ("Gourav","Data Engineer",100000),
        ("Bob","HR",40000),
        ("SDC","Recruiter",150000),
        ("Ayushi","HR",30000),
        ("Ankur","IT",510000),
        ("Shruti","Finance",650000),
        ("Shalini","Finance",850000),
}
column=["Name","Department","Salary"]
df= spark.createDataFrame(data,column)
display(df)

In [0]:
#Select and filter on the Dataframe Created
display(df.select("Name"))

In [0]:
 display(df.select("Name","Salary"))


In [0]:
#filter rows based on salary
display(df.filter(df.Salary>50000))

In [0]:


#filter rows based on salary

display(df.filter(df.Salary>100000))

In [0]:
#Apply multiple filters
display(df.filter((df["Department"] == "Finance") & (df["Salary"] > 60000)))
     

In [0]:
#Create a Derived Column

from pyspark.sql.functions import col

#Add 10% Bonus to everyone 
df=df.withColumn("Bonus",col("Salary")*0.1)
display(df)
     

In [0]:
#Group by and Aggregation
#average of salary by department 

from pyspark.sql.functions import avg
df.groupBy("Department").avg("Salary").show()

In [0]:
#Group by and Aggregation
#average of salary by department
from pyspark.sql.functions import avg, min, max
display(df.groupBy("Department").agg(
    avg("Salary").alias("avg_salary"),
    min("salary").alias("min_salary"),
    max("salary").alias("max_salary")
    )) 

In [0]:
# join - combine tw0 dataframes
# create dept code mapping
dept_data = [("IT", 101),("HR", 202),("Finance",303)]
dep_columns = ["Department", "Dept_Code"]
dept_df = spark.createDataFrame(data=dept_data, schema=dep_columns)

# join operations
joined_df  = df.join(dept_df, on="Department",how="inner")
display(joined_df)

In [0]:
# Transformations V/s Action
#Transformations
#Transformations are lazy operations. They are not executed immediately.
#They are executed only when an action is called.
#Transformations are used to create a new DataFrame from an existing DataFrame.
filtered = df.filter(df.Salary>50000)

# Action  
display(filtered)

## Student Practice Assignment
## 
📂 Build a DataFrame with the following schema: Name, Department, Salary, Location

In [0]:
# sample data
data =[
     ("Gourav", "IT", 70000, "Bangalore"),
    ("John", "HR", 55000, "Mumbai"),
    ("Ravi", "Finance", 65000, "Delhi"),
    ("Sneha", "IT", 60000, "Hyderabad")
]
columns = ["Name", "Department", "Salary", "Location"]
df_task = spark.createDataFrame(data,columns)

# Task 1: Filter IT employees with salary > 60K
df_task.filter((col("Department")=="IT")& (col("Salary")>60000)).show()
# Task 2: Add column "Hike_Amount" as 15% of salary
df_task = df_task.withColumn("Hike_Amount",col("Salary")*0.15)
df_task.show()
# Task 3: Group by Department and show average salary
df_task.groupBy("Department").avg("Salary").show()




In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DeltaVersioningDemo").getOrCreate()

In [0]:
data = [("Gourav", "IT", 50000), ("SDC", "HR", 45000), ("Vaishnav", "IT", 60000)]
cols = ["Name", "Department", "Salary"]
df = spark.createDataFrame(data,cols)
display(df)

In [0]:
df.write.format("delta").mode("overwrite").saveAsTable("employee_delta")

In [0]:
%sql
update employee_delta set Salary= 10000000 where name ='SDC'


In [0]:
%sql
describe history employee_delta

In [0]:
# MERGE INTO ➝ Update a Row
from pyspark.sql import Row 
update_data = [Row (name = "Alice", Department = "IT", Salary = 60000)]
df_update = spark.createDataFrame(update_data)
df_update.createOrReplaceGlobalTempView("updates_view")

spark.sql("""
MERGE INTO employee_delta AS target
USING updates_view AS source
ON target.Name = source.Name
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")


In [0]:
%sql
describe history employee_delta

In [0]:
# schema Evolution (Add a New Coloumn)
new_data = [("David", "Finance", 70000, 5)] # 5 yrs experience
cols_new = ["Name", "Department", "Salary", "Experience"]
df_new = spark.createDataFrame(new_data,cols_new)
display(df_new)


In [0]:
%sql 

describe extended  employee_delta
