In [3]:
!pip install pyspark



In [4]:
from pyspark.sql import SparkSession

In [5]:
#Create a SparkSession
spark = SparkSession.builder.appName("PySpark_Intro").getOrCreate()

#Verify Spark Session
print("Spark Session Created Successfully!")


Spark Session Created Successfully!


In [6]:
# Sample data
data = [
    ("Alice", 25, "New York"),
    ("Bob", 30, "Los Angeles"),
    ("Charlie", 22, "Chicago"),
    ("David", 28, "Houston")
]

# Defining the schema (columns)
columns = ["Name", "Age", "City"]

In [7]:
# Creating a DataFrame
df = spark.createDataFrame(data, schema=columns)

# Displaying the DataFrame
df.show()

+-------+---+-----------+
|   Name|Age|       City|
+-------+---+-----------+
|  Alice| 25|   New York|
|    Bob| 30|Los Angeles|
|Charlie| 22|    Chicago|
|  David| 28|    Houston|
+-------+---+-----------+



✅ **Key Points to Understand:**
*   DataFrame in PySpark is similar to pandas DataFrame but optimized for big data.
*   .show() is like pandas.head(), used to display the data.




In [8]:
df.select('Name','City').show()

+-------+-----------+
|   Name|       City|
+-------+-----------+
|  Alice|   New York|
|    Bob|Los Angeles|
|Charlie|    Chicago|
|  David|    Houston|
+-------+-----------+



In [9]:
df[['Name','City']]

DataFrame[Name: string, City: string]

In [10]:
df.filter(df.Age > 25).show()

+-----+---+-----------+
| Name|Age|       City|
+-----+---+-----------+
|  Bob| 30|Los Angeles|
|David| 28|    Houston|
+-----+---+-----------+



In [11]:
df.filter('Age > 25').show()

+-----+---+-----------+
| Name|Age|       City|
+-----+---+-----------+
|  Bob| 30|Los Angeles|
|David| 28|    Houston|
+-----+---+-----------+



In [12]:
from pyspark.sql.functions import col

# Add a new column by adding 5 to the existing Age
df = df.withColumn("Age_after_3_years", col("Age") + 3)
df.show()

+-------+---+-----------+-----------------+
|   Name|Age|       City|Age_after_3_years|
+-------+---+-----------+-----------------+
|  Alice| 25|   New York|               28|
|    Bob| 30|Los Angeles|               33|
|Charlie| 22|    Chicago|               25|
|  David| 28|    Houston|               31|
+-------+---+-----------+-----------------+



In [13]:
df = df.withColumnRenamed("City", "Location")
df.show()

+-------+---+-----------+-----------------+
|   Name|Age|   Location|Age_after_3_years|
+-------+---+-----------+-----------------+
|  Alice| 25|   New York|               28|
|    Bob| 30|Los Angeles|               33|
|Charlie| 22|    Chicago|               25|
|  David| 28|    Houston|               31|
+-------+---+-----------+-----------------+



In [14]:
df = df.drop("Age_after_3_years")
df.show()

+-------+---+-----------+
|   Name|Age|   Location|
+-------+---+-----------+
|  Alice| 25|   New York|
|    Bob| 30|Los Angeles|
|Charlie| 22|    Chicago|
|  David| 28|    Houston|
+-------+---+-----------+



In [15]:
df.filter("Age>28").show()

+----+---+-----------+
|Name|Age|   Location|
+----+---+-----------+
| Bob| 30|Los Angeles|
+----+---+-----------+



In [16]:
df.filter(df.Age>25).select("Name").show()

+-----+
| Name|
+-----+
|  Bob|
|David|
+-----+



In [17]:
df.select("Location").show()

+-----------+
|   Location|
+-----------+
|   New York|
|Los Angeles|
|    Chicago|
|    Houston|
+-----------+



In [18]:
df.select(df.Location).show()

+-----------+
|   Location|
+-----------+
|   New York|
|Los Angeles|
|    Chicago|
|    Houston|
+-----------+



In [19]:
df.select(col("Location")).show()

+-----------+
|   Location|
+-----------+
|   New York|
|Los Angeles|
|    Chicago|
|    Houston|
+-----------+



In [20]:
df.filter(df.Location == "Chicago").show()
df.filter(col("Location") == "Chicago").show()
df.filter(df["Location"] == "Chicago").show()
df.filter("Location == 'Chicago'").show()
df.where("Location == 'Chicago'").show()
df.where(col("Location") == "Chicago").show()
df.where(df["Location"] == "Chicago").show()
df.where(df.Location == "Chicago").show()

+-------+---+--------+
|   Name|Age|Location|
+-------+---+--------+
|Charlie| 22| Chicago|
+-------+---+--------+

+-------+---+--------+
|   Name|Age|Location|
+-------+---+--------+
|Charlie| 22| Chicago|
+-------+---+--------+

+-------+---+--------+
|   Name|Age|Location|
+-------+---+--------+
|Charlie| 22| Chicago|
+-------+---+--------+

+-------+---+--------+
|   Name|Age|Location|
+-------+---+--------+
|Charlie| 22| Chicago|
+-------+---+--------+

+-------+---+--------+
|   Name|Age|Location|
+-------+---+--------+
|Charlie| 22| Chicago|
+-------+---+--------+

+-------+---+--------+
|   Name|Age|Location|
+-------+---+--------+
|Charlie| 22| Chicago|
+-------+---+--------+

+-------+---+--------+
|   Name|Age|Location|
+-------+---+--------+
|Charlie| 22| Chicago|
+-------+---+--------+

+-------+---+--------+
|   Name|Age|Location|
+-------+---+--------+
|Charlie| 22| Chicago|
+-------+---+--------+



In [21]:
df.count

In [22]:
from pyspark.sql.functions import count, avg, min, max, sum, stddev, variance, skewness, kurtosis, corr, covar_pop, covar_samp, collect_list, collect_set, array_contains

df.select(count("Name")).show()

df.agg(count("Name")).show()

+-----------+
|count(Name)|
+-----------+
|          4|
+-----------+

+-----------+
|count(Name)|
+-----------+
|          4|
+-----------+



In [23]:
df.select(min("Age"), max("Age")).show()

+--------+--------+
|min(Age)|max(Age)|
+--------+--------+
|      22|      30|
+--------+--------+



In [24]:
df.select(avg("Age")).show()

+--------+
|avg(Age)|
+--------+
|   26.25|
+--------+



In [25]:
df.groupby("Location").avg("Age").show()

+-----------+--------+
|   Location|avg(Age)|
+-----------+--------+
|Los Angeles|    30.0|
|   New York|    25.0|
|    Chicago|    22.0|
|    Houston|    28.0|
+-----------+--------+



In [26]:
df.groupby("Location").count().show()

+-----------+-----+
|   Location|count|
+-----------+-----+
|Los Angeles|    1|
|   New York|    1|
|    Chicago|    1|
|    Houston|    1|
+-----------+-----+



In [27]:
df.orderBy("Age").show()

+-------+---+-----------+
|   Name|Age|   Location|
+-------+---+-----------+
|Charlie| 22|    Chicago|
|  Alice| 25|   New York|
|  David| 28|    Houston|
|    Bob| 30|Los Angeles|
+-------+---+-----------+



In [28]:
df.createOrReplaceTempView("people")

In [29]:
spark.sql("Select * from people").show()

+-------+---+-----------+
|   Name|Age|   Location|
+-------+---+-----------+
|  Alice| 25|   New York|
|    Bob| 30|Los Angeles|
|Charlie| 22|    Chicago|
|  David| 28|    Houston|
+-------+---+-----------+



In [30]:
spark.sql("Select * from people where Age > 25").show()

+-----+---+-----------+
| Name|Age|   Location|
+-----+---+-----------+
|  Bob| 30|Los Angeles|
|David| 28|    Houston|
+-----+---+-----------+



In [31]:
spark.sql("Select AVG(Age) AS Average_Age from people").show()

+-----------+
|Average_Age|
+-----------+
|      26.25|
+-----------+



In [32]:
spark.sql("Select Location, Count(*) AS Count from people GROUP BY Location").show()

+-----------+-----+
|   Location|Count|
+-----------+-----+
|Los Angeles|    1|
|   New York|    1|
|    Chicago|    1|
|    Houston|    1|
+-----------+-----+



In [33]:
from pyspark.sql import Row

In [34]:
# Employee DataFrame
emp_data = [
    Row(Emp_ID=1, Name="Alice", Dept_ID=101),
    Row(Emp_ID=2, Name="Bob", Dept_ID=102),
    Row(Emp_ID=3, Name="Charlie", Dept_ID=103),
    Row(Emp_ID=4, Name="David", Dept_ID=None)
]

emp_df = spark.createDataFrame(emp_data)

# Department DataFrame
dept_data = [
    Row(Dept_ID=101, Dept_Name="HR"),
    Row(Dept_ID=102, Dept_Name="IT"),
    Row(Dept_ID=104, Dept_Name="Finance")
]

dept_df = spark.createDataFrame(dept_data)

#Show both DataFrames
print("Employee DataFrame:")
emp_df.show()

print("\nDepartment DataFrame:")
dept_df.show()

Employee DataFrame:
+------+-------+-------+
|Emp_ID|   Name|Dept_ID|
+------+-------+-------+
|     1|  Alice|    101|
|     2|    Bob|    102|
|     3|Charlie|    103|
|     4|  David|   NULL|
+------+-------+-------+


Department DataFrame:
+-------+---------+
|Dept_ID|Dept_Name|
+-------+---------+
|    101|       HR|
|    102|       IT|
|    104|  Finance|
+-------+---------+



In [35]:
inner_join = emp_df.join(dept_df, emp_df.Dept_ID == dept_df.Dept_ID, "inner")
inner_join.show()

+------+-----+-------+-------+---------+
|Emp_ID| Name|Dept_ID|Dept_ID|Dept_Name|
+------+-----+-------+-------+---------+
|     1|Alice|    101|    101|       HR|
|     2|  Bob|    102|    102|       IT|
+------+-----+-------+-------+---------+



In [36]:
left_join = emp_df.join(dept_df, emp_df.Dept_ID == dept_df.Dept_ID, "left")
left_join.show()

+------+-------+-------+-------+---------+
|Emp_ID|   Name|Dept_ID|Dept_ID|Dept_Name|
+------+-------+-------+-------+---------+
|     1|  Alice|    101|    101|       HR|
|     2|    Bob|    102|    102|       IT|
|     4|  David|   NULL|   NULL|     NULL|
|     3|Charlie|    103|   NULL|     NULL|
+------+-------+-------+-------+---------+



In [37]:
right_join = emp_df.join(dept_df, emp_df.Dept_ID == dept_df.Dept_ID, "right")
right_join.show()

+------+-----+-------+-------+---------+
|Emp_ID| Name|Dept_ID|Dept_ID|Dept_Name|
+------+-----+-------+-------+---------+
|     1|Alice|    101|    101|       HR|
|  NULL| NULL|   NULL|    104|  Finance|
|     2|  Bob|    102|    102|       IT|
+------+-----+-------+-------+---------+



In [38]:
full_outer_join = emp_df.join(dept_df, emp_df.Dept_ID == dept_df.Dept_ID, "Full")
full_outer_join.show()

+------+-------+-------+-------+---------+
|Emp_ID|   Name|Dept_ID|Dept_ID|Dept_Name|
+------+-------+-------+-------+---------+
|     4|  David|   NULL|   NULL|     NULL|
|     1|  Alice|    101|    101|       HR|
|     2|    Bob|    102|    102|       IT|
|     3|Charlie|    103|   NULL|     NULL|
|  NULL|   NULL|   NULL|    104|  Finance|
+------+-------+-------+-------+---------+



In [39]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, lag, lead, sum, avg

# Sample Data
data = [
    ("Alice", "HR", 5000),
    ("Bob", "HR", 4500),
    ("Charlie", "IT", 7000),
    ("David", "IT", 6000),
    ("Emma", "Finance", 5500),
    ("Frank", "Finance", 5200),
    ("Grace", "HR", 4800),
]

columns = ["Employee", "Department", "Salary"]
df = spark.createDataFrame(data, columns)

df.show()

+--------+----------+------+
|Employee|Department|Salary|
+--------+----------+------+
|   Alice|        HR|  5000|
|     Bob|        HR|  4500|
| Charlie|        IT|  7000|
|   David|        IT|  6000|
|    Emma|   Finance|  5500|
|   Frank|   Finance|  5200|
|   Grace|        HR|  4800|
+--------+----------+------+



In [46]:
window_spec = Window.partitionBy("Department").orderBy("Salary")
window_spec

<pyspark.sql.window.WindowSpec at 0x7eef53e4eb10>

In [47]:
df.withColumn("Row Number", row_number().over(window_spec)).show()

+--------+----------+------+----------+
|Employee|Department|Salary|Row Number|
+--------+----------+------+----------+
|   Frank|   Finance|  5200|         1|
|    Emma|   Finance|  5500|         2|
|     Bob|        HR|  4500|         1|
|   Grace|        HR|  4800|         2|
|   Alice|        HR|  5000|         3|
|   David|        IT|  6000|         1|
| Charlie|        IT|  7000|         2|
+--------+----------+------+----------+

