Pyspark Basics

Working with DataFrames


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col, udf, avg, count, max, broadcast

spark = SparkSession.builder.appName("Assignment").getOrCreate()

In [3]:
# 1. Create a PySpark DataFrame manually using a list of tuples and a defined schema
data = [
    (1, "Aliya", 23),
    (2, "Binda", 29),
    (3, "Rita", 26),
    (4, "David", 32),
    (5, "Sita", 34)
]

schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Name", StringType(), True),
    StructField("Age", IntegerType(), True)
])

df = spark.createDataFrame(data, schema)

In [4]:
# 2. Display the schema and the first 5 rows of the DataFrame. Use .printSchema() and .show() to inspect the structure and data visually
df.printSchema() 
df.show(5)

root
 |-- ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)

+---+-----+---+
| ID| Name|Age|
+---+-----+---+
|  1|Aliya| 23|
|  2|Binda| 29|
|  3| Rita| 26|
|  4|David| 32|
|  5| Sita| 34|
+---+-----+---+



Transformations and Actions


In [5]:
# 1. Perform transformations such as .select(), .filter(), .withColumn(), and .drop() on a sample DataFrame.
df.select("Name", "Age").show()
df.filter(col("Age") > 25).show()
df.withColumn("Age_plus_5", col("Age") + 5).show()
df.drop("Age").show()

+-----+---+
| Name|Age|
+-----+---+
|Aliya| 23|
|Binda| 29|
| Rita| 26|
|David| 32|
| Sita| 34|
+-----+---+

+---+-----+---+
| ID| Name|Age|
+---+-----+---+
|  2|Binda| 29|
|  3| Rita| 26|
|  4|David| 32|
|  5| Sita| 34|
+---+-----+---+

+---+-----+---+----------+
| ID| Name|Age|Age_plus_5|
+---+-----+---+----------+
|  1|Aliya| 23|        28|
|  2|Binda| 29|        34|
|  3| Rita| 26|        31|
|  4|David| 32|        37|
|  5| Sita| 34|        39|
+---+-----+---+----------+

+---+-----+
| ID| Name|
+---+-----+
|  1|Aliya|
|  2|Binda|
|  3| Rita|
|  4|David|
|  5| Sita|
+---+-----+



In [6]:
# 2. Use .collect(), .count(), .first() and .take(n) as actions.
print("Total rows:", df.count())
print("First row:", df.first())
print("Take 3 rows:", df.take(3))

Total rows: 5
First row: Row(ID=1, Name='Aliya', Age=23)
Take 3 rows: [Row(ID=1, Name='Aliya', Age=23), Row(ID=2, Name='Binda', Age=29), Row(ID=3, Name='Rita', Age=26)]


Schema Inference and Manual Schema

In [7]:
# 1. Load a CSV file with schema inference enabled. Print the inferred schema.
csv_path = "E:\Desktop\GRITFEAT\pandas\student.csv" 

# With inference
df_csv_infer = spark.read.csv(csv_path, header=True, inferSchema=True)
df_csv_infer.printSchema()

root
 |-- student_id: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- major: string (nullable = true)
 |-- GPA: double (nullable = true)
 |-- course_load: integer (nullable = true)
 |-- avg_course_grade: double (nullable = true)
 |-- attendance_rate: double (nullable = true)
 |-- enrollment_status: string (nullable = true)
 |-- lms_logins_past_month: integer (nullable = true)
 |-- avg_session_duration_minutes: integer (nullable = true)
 |-- assignment_submission_rate: double (nullable = true)
 |-- forum_participation_count: integer (nullable = true)
 |-- video_completion_rate: double (nullable = true)
 |-- risk_level: string (nullable = true)



In [8]:
# 2. Load the same CSV with a manually defined schema. Compare both results
manual_schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Name", StringType(), True),
    StructField("Age", IntegerType(), True)
])
df_csv_manual = spark.read.csv(csv_path, header=True, schema=manual_schema)
df_csv_manual.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)



Reading/Writing Data (CSV, JSON, Parquet)

In [9]:
# 1. Read a CSV file into a DataFrame using spark.read.csv(). Include header and inferSchema options.

df_csv = spark.read.csv(csv_path, header=True, inferSchema=True)

In [10]:
# 2. Read a JSON file using spark.read.json().
df_json = spark.read.json("C:\\sample1.json") 

Filtering, Joins, Aggregations


In [12]:
# 1. Filter rows where a column value is greater than a given threshold.
df.filter(col("Age") > 26).show()

+---+-----+---+
| ID| Name|Age|
+---+-----+---+
|  2|Binda| 29|
|  4|David| 32|
|  5| Sita| 34|
+---+-----+---+



In [13]:
# 2. Perform an inner join between two DataFrames on a common column
df2 = spark.createDataFrame([
    (1, "HR"),
    (2, "IT"),
    (3, "Finance"),
    (4, "IT"),
    (5, "HR")
], ["ID", "Dept"])

df.join(df2, "ID", "inner").show()

+---+-----+---+-------+
| ID| Name|Age|   Dept|
+---+-----+---+-------+
|  1|Aliya| 23|     HR|
|  2|Binda| 29|     IT|
|  3| Rita| 26|Finance|
|  4|David| 32|     IT|
|  5| Sita| 34|     HR|
+---+-----+---+-------+



In [14]:
# 3. Perform left outer, right outer, and full outer joins and observe differences.
df.join(df2, "ID", "left").show()
df.join(df2, "ID", "right").show()
df.join(df2, "ID", "outer").show()

+---+-----+---+-------+
| ID| Name|Age|   Dept|
+---+-----+---+-------+
|  1|Aliya| 23|     HR|
|  2|Binda| 29|     IT|
|  3| Rita| 26|Finance|
|  4|David| 32|     IT|
|  5| Sita| 34|     HR|
+---+-----+---+-------+

+---+-----+---+-------+
| ID| Name|Age|   Dept|
+---+-----+---+-------+
|  1|Aliya| 23|     HR|
|  2|Binda| 29|     IT|
|  3| Rita| 26|Finance|
|  4|David| 32|     IT|
|  5| Sita| 34|     HR|
+---+-----+---+-------+

+---+-----+---+-------+
| ID| Name|Age|   Dept|
+---+-----+---+-------+
|  1|Aliya| 23|     HR|
|  2|Binda| 29|     IT|
|  3| Rita| 26|Finance|
|  4|David| 32|     IT|
|  5| Sita| 34|     HR|
+---+-----+---+-------+



In [15]:
# 4. Group data by a category column and compute average, count, and max of numeric columns.
df.groupBy("Age").agg(
    avg("Age").alias("avg_age"),
    count("ID").alias("count_id"),
    max("Age").alias("max_age")
).show()

+---+-------+--------+-------+
|Age|avg_age|count_id|max_age|
+---+-------+--------+-------+
| 23|   23.0|       1|     23|
| 29|   29.0|       1|     29|
| 26|   26.0|       1|     26|
| 32|   32.0|       1|     32|
| 34|   34.0|       1|     34|
+---+-------+--------+-------+



In [16]:
# 5. Chain transformations like .filter() → .groupBy() → .agg() in a single statement.
df.filter(col("Age") > 24).groupBy("Age").agg(count("ID")).show()

+---+---------+
|Age|count(ID)|
+---+---------+
| 29|        1|
| 26|        1|
| 32|        1|
| 34|        1|
+---+---------+



User-Defined Functions (UDFs)


In [17]:
# 1. Create a simple Python function to categorize numerical columns into groups and convert it into a PySpark UDF.
def age_group(age):
    if age < 25:
        return "Young"
    elif age < 30:
        return "Mid"
    else:
        return "Senior"

In [18]:
# 2. Use your UDF in a .withColumn() transformation to apply it on a DataFrame column.
age_group_udf = udf(age_group, StringType())
df.withColumn("AgeGroup", age_group_udf(col("Age"))).show()

+---+-----+---+--------+
| ID| Name|Age|AgeGroup|
+---+-----+---+--------+
|  1|Aliya| 23|   Young|
|  2|Binda| 29|     Mid|
|  3| Rita| 26|     Mid|
|  4|David| 32|  Senior|
|  5| Sita| 34|  Senior|
+---+-----+---+--------+



Spark SQL (Querying DataFrames)

In [19]:
# 1. Register a DataFrame as a temporary view using .createOrReplaceTempView().
df.createOrReplaceTempView("people")

In [20]:
# 2. Use spark.sql() to run SQL queries on the registered view
spark.sql("SELECT COUNT(*) FROM people").show()

+--------+
|count(1)|
+--------+
|       5|
+--------+



In [21]:
# 3. Write an SQL query to count rows, group by category, and sort the results.
spark.sql("SELECT Age, COUNT(*) as cnt FROM people GROUP BY Age ORDER BY cnt DESC").show()


+---+---+
|Age|cnt|
+---+---+
| 23|  1|
| 29|  1|
| 26|  1|
| 32|  1|
| 34|  1|
+---+---+



In [None]:
# 4. What are the advantages of using Spark SQL over native DataFrame operations?

# - Easier for people with SQL background
# - Integrates with BI tools
# - Can optimize queries internally (Catalyst optimizer)

Optimization and Performance Management

Partitioning and Bucketing


In [22]:
# 1. Use .repartition() and .coalesce() to change the number of partitions in a DataFrame. Observe the effect.
df_repart = df.repartition(4)
df_coalesce = df_repart.coalesce(2)

Caching and Performance Tuning

In [24]:
# 1. Cache a DataFrame using .cache().
df.cache()
df.show()

+---+-----+---+
| ID| Name|Age|
+---+-----+---+
|  1|Aliya| 23|
|  2|Binda| 29|
|  3| Rita| 26|
|  4|David| 32|
|  5| Sita| 34|
+---+-----+---+



In [25]:
# 2. Use .persist().
df.persist()
df.show()


+---+-----+---+
| ID| Name|Age|
+---+-----+---+
|  1|Aliya| 23|
|  2|Binda| 29|
|  3| Rita| 26|
|  4|David| 32|
|  5| Sita| 34|
+---+-----+---+



In [26]:
# 3. Perform a broadcast join and observe performance improvements when joining a large and small DataFrame.
from pyspark.sql.functions import broadcast
df_large = df.join(df2, "ID")
df_broadcast = df.join(broadcast(df2), "ID")
df_large.show()
df_broadcast.show()

+---+-----+---+-------+
| ID| Name|Age|   Dept|
+---+-----+---+-------+
|  1|Aliya| 23|     HR|
|  2|Binda| 29|     IT|
|  3| Rita| 26|Finance|
|  4|David| 32|     IT|
|  5| Sita| 34|     HR|
+---+-----+---+-------+

+---+-----+---+-------+
| ID| Name|Age|   Dept|
+---+-----+---+-------+
|  1|Aliya| 23|     HR|
|  2|Binda| 29|     IT|
|  3| Rita| 26|Finance|
|  4|David| 32|     IT|
|  5| Sita| 34|     HR|
+---+-----+---+-------+

