__RDD__

__Create an RDD from the given dataset and display its first 5 elements.__

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("assignment").getOrCreate()

rdd_data = [
"Alice 50000 HR",
"Bob 60000 IT",
"Charlie 55000 Finance",
"David 45000 HR",
"Eve 70000 IT"
]
rdd = spark.sparkContext.parallelize(rdd_data)
print(rdd.collect())
print(rdd.take(5))

25/03/10 11:02:46 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


['Alice 50000 HR', 'Bob 60000 IT', 'Charlie 55000 Finance', 'David 45000 HR', 'Eve 70000 IT']
['Alice 50000 HR', 'Bob 60000 IT', 'Charlie 55000 Finance', 'David 45000 HR', 'Eve 70000 IT']


__Split each line in an RDD into words using flatMap().__

In [2]:
words_rdd = rdd.flatMap(lambda line: line.split(" "))
print(words_rdd.collect())

['Alice', '50000', 'HR', 'Bob', '60000', 'IT', 'Charlie', '55000', 'Finance', 'David', '45000', 'HR', 'Eve', '70000', 'IT']


__Count the occurrences of each word in an RDD.__

In [3]:
num_rdd_data = [3, 6, 2, 9, 5]
rdd1 = spark.sparkContext.parallelize(num_rdd_data)
print(rdd1.collect())

[3, 6, 2, 9, 5]


In [4]:
num_count = rdd1.map(lambda no: (no, 1))
result = num_count.reduceByKey(lambda a,b: a+b)
print(result.collect())

[(9, 1), (5, 1), (6, 1), (2, 1), (3, 1)]


__Find the maximum value in an RDD of numbers.__

In [5]:
max_value = rdd1.reduce(lambda a,b:a if a>b else b)
print(max_value)

9


__Sort an RDD in descending order.__

In [6]:
sort_rdd = rdd1.sortBy(lambda x:x, ascending=False)
print(sort_rdd.collect())

[9, 6, 5, 3, 2]


__Dataframe__

In [31]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql.functions import upper

In [20]:
spark = SparkSession.builder.appName("PySparkAssignment").getOrCreate()

data = [
(1, "Alice", "HR", 50000.0, "2018-07-10"),
(2, "Bob", "IT", None, "2017-08-15"),
(3, "Charlie", "Finance", 55000.0, "2019-06-25"),
(4, "David", "HR", None, "2020-09-12"),
(5, "Eve", "IT", 70000.0, None),
(6, None, "Sales", 48000.0, "2019-03-22"),
(7, "Grace", None, 52000.0, "2021-01-10")
]

schema = StructType([
StructField("ID", IntegerType(), True),
StructField("Name", StringType(), True),
StructField("Department", StringType(), True),
StructField("Salary", DoubleType(), True),
StructField("Hire_Date", StringType(), True)
])

df = spark.createDataFrame(data, schema)
df.show()

+---+-------+----------+-------+----------+
| ID|   Name|Department| Salary| Hire_Date|
+---+-------+----------+-------+----------+
|  1|  Alice|        HR|50000.0|2018-07-10|
|  2|    Bob|        IT|   NULL|2017-08-15|
|  3|Charlie|   Finance|55000.0|2019-06-25|
|  4|  David|        HR|   NULL|2020-09-12|
|  5|    Eve|        IT|70000.0|      NULL|
|  6|   NULL|     Sales|48000.0|2019-03-22|
|  7|  Grace|      NULL|52000.0|2021-01-10|
+---+-------+----------+-------+----------+



__Find the distinct values of salary column in a DataFrame.__

In [21]:
df.select("Salary").distinct().show()

+-------+
| Salary|
+-------+
|50000.0|
|55000.0|
|   NULL|
|70000.0|
|48000.0|
|52000.0|
+-------+



__Replace null values in a DataFrame column salary with a default value 40000__

In [25]:
changed_df = df.fillna({"Salary":40000.0})
changed_df.show()

+---+-------+----------+-------+----------+
| ID|   Name|Department| Salary| Hire_Date|
+---+-------+----------+-------+----------+
|  1|  Alice|        HR|50000.0|2018-07-10|
|  2|    Bob|        IT|40000.0|2017-08-15|
|  3|Charlie|   Finance|55000.0|2019-06-25|
|  4|  David|        HR|40000.0|2020-09-12|
|  5|    Eve|        IT|70000.0|      NULL|
|  6|   NULL|     Sales|48000.0|2019-03-22|
|  7|  Grace|      NULL|52000.0|2021-01-10|
+---+-------+----------+-------+----------+



__Drop rows with missing values in a DataFrame.__

In [41]:
new_df = changed_df.dropna()
new_df.show()

+---+-------+----------+-------+----------+
| ID|   Name|Department| Salary| Hire_Date|
+---+-------+----------+-------+----------+
|  1|  Alice|        HR|50000.0|2018-07-10|
|  2|    Bob|        IT|40000.0|2017-08-15|
|  3|Charlie|   Finance|55000.0|2019-06-25|
|  4|  David|        HR|40000.0|2020-09-12|
+---+-------+----------+-------+----------+



__Convert a DataFrame column Name to uppercase.__

In [42]:
upper_df = new_df.withColumn("Name", upper(new_df["Name"]))
upper_df.show()

+---+-------+----------+-------+----------+
| ID|   Name|Department| Salary| Hire_Date|
+---+-------+----------+-------+----------+
|  1|  ALICE|        HR|50000.0|2018-07-10|
|  2|    BOB|        IT|40000.0|2017-08-15|
|  3|CHARLIE|   Finance|55000.0|2019-06-25|
|  4|  DAVID|        HR|40000.0|2020-09-12|
+---+-------+----------+-------+----------+



__find the average salary per department__

In [34]:
from pyspark.sql.functions import avg

In [43]:
new_df.groupBy("Department").agg(avg("Salary").alias("Avg Salary")).show()

+----------+----------+
|Department|Avg Salary|
+----------+----------+
|        HR|   45000.0|
|   Finance|   55000.0|
|        IT|   40000.0|
+----------+----------+



__Extract the year from a date column in a DataFrame.__

In [36]:
from pyspark.sql.functions import year

In [44]:
convert_df = new_df.withColumn("Hire_Date", new_df["Hire_Date"].cast("date"))
year_df = convert_df.withColumn("Hire_Date", year(convert_df["Hire_Date"]))
year_df.show()

+---+-------+----------+-------+---------+
| ID|   Name|Department| Salary|Hire_Date|
+---+-------+----------+-------+---------+
|  1|  Alice|        HR|50000.0|     2018|
|  2|    Bob|        IT|40000.0|     2017|
|  3|Charlie|   Finance|55000.0|     2019|
|  4|  David|        HR|40000.0|     2020|
+---+-------+----------+-------+---------+



__Sort a DataFrame by multiple columns — sort the DataFrame in ascending order by the department column and in descending order by the salary column__

In [45]:
from pyspark.sql.functions import col

In [46]:
new_df.orderBy(col("Department"),col("Salary").desc()).show()

+---+-------+----------+-------+----------+
| ID|   Name|Department| Salary| Hire_Date|
+---+-------+----------+-------+----------+
|  3|Charlie|   Finance|55000.0|2019-06-25|
|  1|  Alice|        HR|50000.0|2018-07-10|
|  4|  David|        HR|40000.0|2020-09-12|
|  2|    Bob|        IT|40000.0|2017-08-15|
+---+-------+----------+-------+----------+



__Extract the first 3 characters from a Name column and display__

In [55]:
from pyspark.sql.functions import substr

In [58]:
new_df.withColumn("Name_3char", new_df["Name"].substr(1, 3)).show()

+---+-------+----------+-------+----------+----------+
| ID|   Name|Department| Salary| Hire_Date|Name_3char|
+---+-------+----------+-------+----------+----------+
|  1|  Alice|        HR|50000.0|2018-07-10|       Ali|
|  2|    Bob|        IT|40000.0|2017-08-15|       Bob|
|  3|Charlie|   Finance|55000.0|2019-06-25|       Cha|
|  4|  David|        HR|40000.0|2020-09-12|       Dav|
+---+-------+----------+-------+----------+----------+



__Check if Name column contains a substring ‘lic’__

In [59]:
new_df.filter(col("Name").rlike("(?i)lic")).show()

+---+-----+----------+-------+----------+
| ID| Name|Department| Salary| Hire_Date|
+---+-----+----------+-------+----------+
|  1|Alice|        HR|50000.0|2018-07-10|
+---+-----+----------+-------+----------+



__Get the row count of a DataFrame.__

In [60]:
row_count = new_df.count()
print("Number of rows:", row_count)

Number of rows: 4


__Write this DataFrame to a CSV file.__

In [61]:
new_df.write.csv("assignment_output", header=True, mode="overwrite")

__Apply a User-Defined Function (UDF) to categorize employees based on their salary. Introduce a new column, Salary_Category, where:
If the salary is less than 50,000, it should be categorized as "Low".
If the salary is greater than 50,000 but less than 100,000, it should be categorized as "Medium".
If the salary is greater than 100,000, it should be categorized as "High".I
f the salary is None, it should return "Unknown".
How would you implement this logic in PySpark using a UDF?__

In [62]:
from pyspark.sql.functions import udf

In [63]:
def categorize_salary(salary):
    if salary < 50000:
        return "Low"
    elif salary>=50000 and salary<100000:
        return "Medium"
    elif salary>=100000:
        return "High"
    elif salary == None:
        return "Unknown"

salary_udf = udf(categorize_salary, StringType())

salary_df = new_df.withColumn("Salary_Category", salary_udf(new_df["Salary"]))
salary_df.show()

+---+-------+----------+-------+----------+---------------+
| ID|   Name|Department| Salary| Hire_Date|Salary_Category|
+---+-------+----------+-------+----------+---------------+
|  1|  Alice|        HR|50000.0|2018-07-10|         Medium|
|  2|    Bob|        IT|40000.0|2017-08-15|            Low|
|  3|Charlie|   Finance|55000.0|2019-06-25|         Medium|
|  4|  David|        HR|40000.0|2020-09-12|            Low|
+---+-------+----------+-------+----------+---------------+



__Convert all columns to string Datatype__

In [64]:
new_df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Salary: double (nullable = false)
 |-- Hire_Date: string (nullable = true)



In [65]:
schema_change_df = new_df.select([col(column).cast(StringType()).alias(column) for column in df.columns])
schema_change_df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Salary: string (nullable = false)
 |-- Hire_Date: string (nullable = true)



__Convert all column names to lowercase.__

In [66]:
new_df.select([col(column).alias(column.lower()) for column in df.columns]).show()

+---+-------+----------+-------+----------+
| id|   name|department| salary| hire_date|
+---+-------+----------+-------+----------+
|  1|  Alice|        HR|50000.0|2018-07-10|
|  2|    Bob|        IT|40000.0|2017-08-15|
|  3|Charlie|   Finance|55000.0|2019-06-25|
|  4|  David|        HR|40000.0|2020-09-12|
+---+-------+----------+-------+----------+



__Find duplicate rows and display__

In [69]:
data = [
(1, "Alice", 50000),
(2, "Bob", 60000),
(3, "Charlie", 55000),
(4, "David", 45000),
(5, "Eve", 70000)
]
columns = ["ID", "Name", "Salary"]
df2 = spark.createDataFrame(data, columns)
df2.show()

+---+-------+------+
| ID|   Name|Salary|
+---+-------+------+
|  1|  Alice| 50000|
|  2|    Bob| 60000|
|  3|Charlie| 55000|
|  4|  David| 45000|
|  5|    Eve| 70000|
+---+-------+------+



In [70]:
df2.groupBy(df2.columns).count().filter("count > 1").show()

+---+----+------+-----+
| ID|Name|Salary|count|
+---+----+------+-----+
+---+----+------+-----+



__How to filter valid emails from a list?__

In [71]:
data = [
    (1, "alice@example.com"),
    (2, "bob@company"),
    (3, "charlie@gmail..com"),
    (4, "david@domain.org"),
    (5, "invalid-email.com")
]

df3 = spark.createDataFrame(data, ["ID", "Email"])
df3.show()

+---+------------------+
| ID|             Email|
+---+------------------+
|  1| alice@example.com|
|  2|       bob@company|
|  3|charlie@gmail..com|
|  4|  david@domain.org|
|  5| invalid-email.com|
+---+------------------+



In [80]:
email_regex = r'^[a-zA-Z0-9_.±]+@[a-zA-Z0-9-]+\.[a-zA-Z]{2,}$'

valid_emails_df = df3.filter(col("Email").rlike(email_regex))

valid_emails_df.show()

+---+-----------------+
| ID|            Email|
+---+-----------------+
|  1|alice@example.com|
|  4| david@domain.org|
+---+-----------------+



__How to keep only top 2 most frequent values as it is and replace everything else as ‘Other’__

In [81]:
data = [
    (1, "Alice"), (2, "Bob"), (3, "Alice"), (4, "Charlie"), 
    (5, "Bob"), (6, "Alice"), (7, "David"), (8, "Charlie"), 
    (9, "Eve"), (10, "Bob"), (11, "Charlie"), (12, "Eve")
]

columns = ["ID", "Name"]
df4 = spark.createDataFrame(data, columns)
df4.show()

+---+-------+
| ID|   Name|
+---+-------+
|  1|  Alice|
|  2|    Bob|
|  3|  Alice|
|  4|Charlie|
|  5|    Bob|
|  6|  Alice|
|  7|  David|
|  8|Charlie|
|  9|    Eve|
| 10|    Bob|
| 11|Charlie|
| 12|    Eve|
+---+-------+



In [83]:
from pyspark.sql.functions import col, count, when

In [84]:
name_counts = df4.groupBy("Name").agg(count("*").alias("count"))
top_2_names = name_counts.orderBy(col("count").desc()).limit(2)
top_2_names_list = [row["Name"] for row in top_2_names.collect()] 
print(top_2_names_list)

['Bob', 'Alice']


In [86]:
df_transformed = df4.withColumn(
    "Updated_Name",
    when(col("Name").isin(top_2_names_list), col("Name")).otherwise("Other")
)
df_transformed.show()

+---+-------+------------+
| ID|   Name|Updated_Name|
+---+-------+------------+
|  1|  Alice|       Alice|
|  2|    Bob|         Bob|
|  3|  Alice|       Alice|
|  4|Charlie|       Other|
|  5|    Bob|         Bob|
|  6|  Alice|       Alice|
|  7|  David|       Other|
|  8|Charlie|       Other|
|  9|    Eve|       Other|
| 10|    Bob|         Bob|
| 11|Charlie|       Other|
| 12|    Eve|       Other|
+---+-------+------------+

