# CODING ASSESSMENT - Spark and PySpark

## Transformations and Actions

In [1]:
# Start Spark session
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, sum, row_number
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("PySparkTransformations").getOrCreate()

In [2]:
#  Sample DataFrames
data1 = [
    (1, "Alice", 2000, "HR"),
    (2, "Bob", 3000, "Finance"),
    (3, "Cathy", 2500, "HR"),
    (4, "David", 4000, "IT"),
    (5, "Eva", 2800, "Finance")
]
columns1 = ["id", "name", "salary", "dept"]

df1 = spark.createDataFrame(data1, columns1)

data2 = [
    (1, "India"),
    (2, "USA"),
    (3, "UK"),
    (4, "Germany"),
    (6, "Canada")
]
columns2 = ["id", "country"]

df2 = spark.createDataFrame(data2, columns2)

In [3]:
# Transformation - Filter
filtered_df = df1.filter(df1.salary > 2500)
print("Filtered rows where salary > 2500")
filtered_df.show()

Filtered rows where salary > 2500
+---+-----+------+-------+
| id| name|salary|   dept|
+---+-----+------+-------+
|  2|  Bob|  3000|Finance|
|  4|David|  4000|     IT|
|  5|  Eva|  2800|Finance|
+---+-----+------+-------+



In [4]:
# Transformation - Join
joined_df = df1.join(df2, on="id", how="inner")
print("Joined DataFrame on 'id'")
joined_df.show()


Joined DataFrame on 'id'
+---+-----+------+-------+-------+
| id| name|salary|   dept|country|
+---+-----+------+-------+-------+
|  1|Alice|  2000|     HR|  India|
|  2|  Bob|  3000|Finance|    USA|
|  3|Cathy|  2500|     HR|     UK|
|  4|David|  4000|     IT|Germany|
+---+-----+------+-------+-------+



In [7]:
# Transformation - GroupBy and Aggregation
grouped_df = df1.groupBy("dept").agg(
    avg("salary").alias("avg_salary"),
    sum("salary").alias("total_salary"))
print("GroupBy dept and aggregate")
grouped_df.show()


GroupBy dept and aggregate
+-------+----------+------------+
|   dept|avg_salary|total_salary|
+-------+----------+------------+
|     HR|    2250.0|        4500|
|Finance|    2900.0|        5800|
|     IT|    4000.0|        4000|
+-------+----------+------------+



In [8]:
# Transformation - Window Function
window_spec = Window.partitionBy("dept").orderBy(col("salary").desc())
ranked_df = df1.withColumn("rank", row_number().over(window_spec))
print("Window function: Rank employees within each dept by salary")
ranked_df.show()

Window function: Rank employees within each dept by salary
+---+-----+------+-------+----+
| id| name|salary|   dept|rank|
+---+-----+------+-------+----+
|  2|  Bob|  3000|Finance|   1|
|  5|  Eva|  2800|Finance|   2|
|  3|Cathy|  2500|     HR|   1|
|  1|Alice|  2000|     HR|   2|
|  4|David|  4000|     IT|   1|
+---+-----+------+-------+----+



In [9]:
# Action - Collect
print("Collecting all data as list of Rows")
collected = df1.collect()
for row in collected:
    print(row)


Collecting all data as list of Rows
Row(id=1, name='Alice', salary=2000, dept='HR')
Row(id=2, name='Bob', salary=3000, dept='Finance')
Row(id=3, name='Cathy', salary=2500, dept='HR')
Row(id=4, name='David', salary=4000, dept='IT')
Row(id=5, name='Eva', salary=2800, dept='Finance')


In [10]:
# Action - Count
print("Total number of records in df1:", df1.count())

Total number of records in df1: 5


In [11]:
# Action - Show
print("Showing original df1")
df1.show()

Showing original df1
+---+-----+------+-------+
| id| name|salary|   dept|
+---+-----+------+-------+
|  1|Alice|  2000|     HR|
|  2|  Bob|  3000|Finance|
|  3|Cathy|  2500|     HR|
|  4|David|  4000|     IT|
|  5|  Eva|  2800|Finance|
+---+-----+------+-------+



In [12]:
# Action - first
print("First row of DataFrame:")
print(df1.first())

First row of DataFrame:
Row(id=1, name='Alice', salary=2000, dept='HR')


In [13]:
# Action - Take(n)
print("First 3 rows using take():")
first_three = df1.take(3)
for row in first_three:
    print(row)


First 3 rows using take():
Row(id=1, name='Alice', salary=2000, dept='HR')
Row(id=2, name='Bob', salary=3000, dept='Finance')
Row(id=3, name='Cathy', salary=2500, dept='HR')


In [15]:
# ACTION - Reduce
salary_rdd = df1.select("salary").rdd.map(lambda row: row[0])
total_salary = salary_rdd.reduce(lambda x, y: x + y)
print("Total salary:", total_salary)

Total salary: 14300


In [16]:
# Action saveAsTextFile
# Convert df1 to RDD and save it
rdd_to_save = df1.rdd
output_path = "/content/df1_saved_output"
rdd_to_save.saveAsTextFile(output_path)
print(f"DataFrame RDD saved as text file in: {output_path}")


DataFrame RDD saved as text file in: /content/df1_saved_output
