<a href="https://colab.research.google.com/github/bsrikanth24/Best-websites-a-programmer-should-visit/blob/master/Pyspark_Challenge.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

1. **Write a Spark code snippet to calculate the sum of a column in a DataFrame**

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("example").getOrCreate()

data = [("Srikanth", "john@example.com", 50000.0),
    ("Ram", "Ram@example.com", 60000.0),
    ("Venkat", "Venkat@example.com", 55000.0)]

schema="Name string,email string,salary double"
df=spark.createDataFrame(data,schema)
display(df)

DataFrame[Name: string, email: string, salary: double]

In [6]:
from pyspark.sql.functions import col,sum
df_final=df.agg(sum(col("salary")).alias("total_salary"))
# df_final=df.agg(sum(col("salary")).alias("total_salary")).first()[0]
# df_final

row_object = df_final.first()
print(row_object)

Row(total_salary=165000.0)


2. **Get all the Marks for individual in list**

In [16]:
# data=[(1,'Srikanth',34),(1,'Modi',40),(1,'Modi',34),(2,'Shah',45),(2,'Shah',50)]
# schema="ID int,Name string,Marks int"
# df=spark.createDataFrame(data,schema)

# from pyspark.sql.functions import collect_list,collect_set,col

# # df_final=df.groupBy(col("ID"),col("Name")).agg(collect_list(col('Marks')))
# df_final = df.groupBy(col("ID"), col("Name")).agg(collect_list(col('Marks')))
# display(df_final)

from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_list, col

data = [(1,'Srikanth',29),(1,'Modi',40),(1,'Modi',34),(2,'Shah',45),(2,'Shah',50)]
schema = "ID int, Name string, Marks int"
df = spark.createDataFrame(data, schema)

df_final = (df.groupBy("ID", "Name")
             .agg(collect_list("Marks").alias("Marks_List")))

df_final.show(truncate=False)

+---+--------+----------+
|ID |Name    |Marks_List|
+---+--------+----------+
|1  |Modi    |[40, 34]  |
|1  |Srikanth|[29]      |
|2  |Shah    |[45, 50]  |
+---+--------+----------+



3.**Identify rows containing non-numeric values in the “Quantity” column, if any**

In [20]:
from pyspark.sql.types import *
schema = StructType([
  StructField("ProductCode", StringType(), True),
  StructField("Quantity", StringType(), True),
  StructField("UnitPrice", StringType(), True),
  StructField("CustomerID", StringType(), True),
])

data = [
  ("Q001", 5, 20.0, "C001"),
  ("Q002", 3, 15.5, "C002"),
  ("Q003", 10, 5.99, "C003"),
  ("Q004", 2, 50.0, "C001"),
  ("Q005", "nan", 12.75, "C002"),
]

df = spark.createDataFrame(data, schema=schema)
df.show()

+-----------+--------+---------+----------+
|ProductCode|Quantity|UnitPrice|CustomerID|
+-----------+--------+---------+----------+
|       Q001|       5|     20.0|      C001|
|       Q002|       3|     15.5|      C002|
|       Q003|      10|     5.99|      C003|
|       Q004|       2|     50.0|      C001|
|       Q005|     nan|    12.75|      C002|
+-----------+--------+---------+----------+



In [21]:
from pyspark.sql.functions import col
df_final=df.filter(col("Quantity").rlike('^[a-zA-Z]*$'))
df_final.show()

+-----------+--------+---------+----------+
|ProductCode|Quantity|UnitPrice|CustomerID|
+-----------+--------+---------+----------+
|       Q005|     nan|    12.75|      C002|
+-----------+--------+---------+----------+



**Write PySpark Code to get the below output**

In [36]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, explode_outer, trim

# Create the input DataFrame
data = [('India', 'Hockey, Cricket'), ('UAE', 'Soccer, Fight'), ('Sam', None)]
schema = "Name string, Sports string"
df = spark.createDataFrame(data, schema)

# df_final = df.filter(col("Hobbies").isNotNull()) \
#               .select("Name", explode(split(col("Hobbies"), ",")).alias("col"))

# df_final.show(truncate=False)

df_final = (
    df.withColumn("Sports", split(col("Sports"), ","))
    .select("Name", explode_outer(col("Sports")).alias("col"))
    .withColumn("col", trim(col("col"))))

df_final.show(truncate=False)

+-----+-------+
|Name |col    |
+-----+-------+
|India|Hockey |
|India|Cricket|
|UAE  |Soccer |
|UAE  |Fight  |
|Sam  |NULL   |
+-----+-------+



**Find the year, start_week_date, end_week_date, week_num**

In [39]:
from pyspark.sql.types import *
data=[(2025,1,'2025-01-01'),
      (2025,1,'2025-01-02'),
      (2025,1,'2025-01-03'),
      (2025,1,'2025-01-04'),
      (2025,1,'2025-01-05'),
      (2025,1,'2025-01-06'),
      (2025,1,'2025-01-07'),
      (2025,2,'2025-01-08'),
      (2025,2,'2025-01-09'),
      (2025,2,'2025-01-10'),
      (2025,2,'2025-01-11'),
      (2025,2,'2025-01-12'),
      (2025,2,'2025-01-13'),
      (2025,2,'2025-01-14')]

schema=StructType([
    StructField('year',IntegerType(),True),
    StructField('week_num',IntegerType(),True),
    StructField('dates',StringType(),True)
    ])

df=spark.createDataFrame(data,schema)
df.show()

+----+--------+----------+
|year|week_num|     dates|
+----+--------+----------+
|2025|       1|2025-01-01|
|2025|       1|2025-01-02|
|2025|       1|2025-01-03|
|2025|       1|2025-01-04|
|2025|       1|2025-01-05|
|2025|       1|2025-01-06|
|2025|       1|2025-01-07|
|2025|       2|2025-01-08|
|2025|       2|2025-01-09|
|2025|       2|2025-01-10|
|2025|       2|2025-01-11|
|2025|       2|2025-01-12|
|2025|       2|2025-01-13|
|2025|       2|2025-01-14|
+----+--------+----------+



In [42]:
from pyspark.sql.functions import min, max

final_df = (
    df.groupBy("year", "week_num")
      .agg(
          min("dates").alias("start_week_date"),
          max("dates").alias("end_week_date")
      )
      .orderBy("year", "week_num")
)
final_df.show()

+----+--------+---------------+-------------+
|year|week_num|start_week_date|end_week_date|
+----+--------+---------------+-------------+
|2025|       1|     2025-01-01|   2025-01-07|
|2025|       2|     2025-01-08|   2025-01-14|
+----+--------+---------------+-------------+



**Find the top most department having highest salary**

In [43]:
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from pyspark.sql.functions import *

schema = StructType([
StructField("id", IntegerType(), nullable=False),
StructField("name", StringType(), nullable=False),
StructField("age", IntegerType(), nullable=False),
StructField("department", StringType(), nullable=False),
StructField("salary", DoubleType(), nullable=False)
])
data = [
    Row(1, "John", 30, "Sales", 50000.0),
    Row(2, "Alice", 28, "Marketing", 60000.0),
    Row(3, "Bob", 32, "Finance", 55000.0),
    Row(4, "Sarah", 29, "Sales", 52000.0),
    Row(5, "Mike", 31, "Finance", 58000.0)
]
employeeDF = spark.createDataFrame(data, schema)
display(employeeDF)

DataFrame[id: int, name: string, age: int, department: string, salary: double]

In [48]:
from pyspark.sql.functions import sum, col

# Group by department to get the top department
top_dept_df = (
employeeDF.groupBy("department")
                        .agg(sum("salary").alias("total_salary"))
                        .orderBy(col("total_salary").desc())
                        .limit(1))

top_dept_df.show()

+----------+------------+
|department|total_salary|
+----------+------------+
|   Finance|    113000.0|
+----------+------------+



**Find the origin and the destination flights for each customer**.

In [8]:
from pyspark.sql import Window
from pyspark.sql.functions import col, first, last

# Create DataFrame
flights_data = [
    (1, 'Flight2', 'Los Angeles', 'London'),
    (1, 'Flight1', 'London', 'Vatican'),
    (1, 'Flight3', 'Vatican', 'Dubai'),
    (2, 'Flight1', 'Houston', 'Paris'),
    (2, 'Flight2', 'Paris', 'Hyderabad')
]

schema = "cust_id int, flight_id string, origin string, destination string"
flights_df = spark.createDataFrame(flights_data, schema)

# Show input
flights_df.show()

+-------+---------+-----------+-----------+
|cust_id|flight_id|     origin|destination|
+-------+---------+-----------+-----------+
|      1|  Flight2|Los Angeles|     London|
|      1|  Flight1|     London|    Vatican|
|      1|  Flight3|    Vatican|      Dubai|
|      2|  Flight1|    Houston|      Paris|
|      2|  Flight2|      Paris|  Hyderabad|
+-------+---------+-----------+-----------+



In [10]:
from pyspark.sql import Window
from pyspark.sql.functions import first, last

# Create a window partitioned by cust_id and ordered by flight_id
w = Window.partitionBy("cust_id").orderBy("flight_id").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

# Add columns for first origin and last destination
result_df = (
    flights_df.withColumn("origin_first", first("origin").over(w))
    .withColumn("destination_last", last("destination").over(w))
    .select("cust_id", "origin_first", "destination_last")
    .dropDuplicates()
)
result_df.show()

+-------+------------+----------------+
|cust_id|origin_first|destination_last|
+-------+------------+----------------+
|      1|      London|           Dubai|
|      2|     Houston|       Hyderabad|
+-------+------------+----------------+



**Write a PySpark program to select every 3rd (nth) row in the dataset**

In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.window import Window
from pyspark.sql.functions import *

schema = StructType([
 StructField("emp_id", IntegerType(), True),
 StructField("name", StringType(), True),
 StructField("salary", IntegerType(), True)
])

data = [
 (1001, "John Doe", 50000),
 (2001, "Jane Smith", 60000),
 (1003, "Michael Johnson", 75000),
 (4000, "Emily Davis", 55000),
 (1005, "Robert Brown", 70000),
 (6000, "Emma Wilson", 80000),
 (1700, "James Taylor", 65000),
 (8000, "Olivia Martinez", 72000),
 (2900, "William Anderson", 68000),
 (3310, "Sophia Garcia", 67000)
]

df = spark.createDataFrame(data, schema)
# df.display()
df.show(truncate=False)

+------+----------------+------+
|emp_id|name            |salary|
+------+----------------+------+
|1001  |John Doe        |50000 |
|2001  |Jane Smith      |60000 |
|1003  |Michael Johnson |75000 |
|4000  |Emily Davis     |55000 |
|1005  |Robert Brown    |70000 |
|6000  |Emma Wilson     |80000 |
|1700  |James Taylor    |65000 |
|8000  |Olivia Martinez |72000 |
|2900  |William Anderson|68000 |
|3310  |Sophia Garcia   |67000 |
+------+----------------+------+



In [14]:
# Define window ordered by emp_id (or any stable column)
window_spec = Window.orderBy("emp_id")

# Add row number and filter every 3rd row
df_with_rn = df.withColumn("row_num", row_number().over(window_spec))
result_df = df_with_rn.filter(col("row_num") % 3 == 0).drop("row_num")

# Show result
result_df.show()

+------+----------------+------+
|emp_id|            name|salary|
+------+----------------+------+
|  1005|    Robert Brown| 70000|
|  2900|William Anderson| 68000|
|  6000|     Emma Wilson| 80000|
+------+----------------+------+



**Handle Data that has Error**

In [18]:
data = [
 ('2025-01-01', '$100.50'),
 ('Jan 15, 2025', '€169.75'),
 ('03/20/25', '£150.20'),
 ('2025-05-13', '¥300.00')
]
columns = ['date', 'amount']

# Create DataFrame
df = spark.createDataFrame(data, columns)

df.show(truncate=False)

from pyspark.sql.functions import col, regexp_replace, udf, coalesce, to_date
from pyspark.sql.types import DoubleType

# Define possible date formats
date_formats = ["yyyy-MM-dd", "MMM dd, yyyy", "MM/dd/yy"]

# Parse date column with multiple formats
df_parsed = df.withColumn(
    "date_parsed",
    coalesce(
        *[to_date(col("date"), fmt) for fmt in date_formats]
    )
)

# Remove currency symbols from amount and convert to float
df_cleaned = df_parsed.withColumn(
    "amount_num",
    regexp_replace(col("amount"), r"[^0-9.]", "").cast(DoubleType())
)

df_cleaned.show()

+------------+-------+
|date        |amount |
+------------+-------+
|2025-01-01  |$100.50|
|Jan 15, 2025|€169.75|
|03/20/25    |£150.20|
|2025-05-13  |¥300.00|
+------------+-------+

+------------+-------+-----------+----------+
|        date| amount|date_parsed|amount_num|
+------------+-------+-----------+----------+
|  2025-01-01|$100.50| 2025-01-01|     100.5|
|Jan 15, 2025|€169.75| 2025-01-15|    169.75|
|    03/20/25|£150.20| 2025-03-20|     150.2|
|  2025-05-13|¥300.00| 2025-05-13|     300.0|
+------------+-------+-----------+----------+



**PIVOT in PySpark**

agg(first(col2)) is necessary because:

pivot() requires an aggregation function to combine values.
first() is a simple aggregation that returns the single value you want when there is exactly one value per group.
If you tried to do .pivot("col1") without an aggregation, PySpark would raise an error because it needs to know how to combine multiple values.

In [21]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import first

# Sample data
data = [
    (1, 'A', 10),
    (1, 'B', 20),
    (2, 'A', 30),
    (2, 'B', 40)
]
columns = ['ID', 'col1', 'col2']

df = spark.createDataFrame(data, columns)

# Pivot the DataFrame
pivot_df = df.groupBy("ID").pivot("col1").agg(first("col2"))
pivot_df.show()

+---+---+---+
| ID|  A|  B|
+---+---+---+
|  1| 10| 20|
|  2| 30| 40|
+---+---+---+



**Find the first non null phone number for the customer**

In [22]:
data=[(1, "90909090900", None, None),
 (2, None, "8893029324903", "79767679988"),
 (3, None, None, "9090897576"),
 (4, None, None, None)
]

columns = ["customer_id", "home_phone", "mobile_phone", "work_phone"]

df = spark.createDataFrame(data, columns)
df.show(truncate=False)

+-----------+-----------+-------------+-----------+
|customer_id|home_phone |mobile_phone |work_phone |
+-----------+-----------+-------------+-----------+
|1          |90909090900|NULL         |NULL       |
|2          |NULL       |8893029324903|79767679988|
|3          |NULL       |NULL         |9090897576 |
|4          |NULL       |NULL         |NULL       |
+-----------+-----------+-------------+-----------+



In [24]:
from pyspark.sql.functions import coalesce,col
df_final=df.withColumn("first_not_null",coalesce(col("home_phone"),col("mobile_phone"),col("work_phone"))).drop("home_phone","mobile_phone","work_phone")
# display(df_final)
df_final.show()

+-----------+--------------+
|customer_id|first_not_null|
+-----------+--------------+
|          1|   90909090900|
|          2| 8893029324903|
|          3|    9090897576|
|          4|          NULL|
+-----------+--------------+



**Group the data by department and find the employee with the highest salary in each department**

In [25]:
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from pyspark.sql.functions import *

schema = StructType([
StructField("id", IntegerType(), nullable=False),
StructField("name", StringType(), nullable=False),
StructField("age", IntegerType(), nullable=False),
StructField("department", StringType(), nullable=False),
StructField("salary", DoubleType(), nullable=False)
])
data = [
    Row(1, "Jonathan", 30, "Sales", 50000.0),
    Row(2, "Jan", 28, "Marketing", 60000.0),
    Row(3, "Steve", 32, "Finance", 55000.0),
    Row(4, "Michelle", 29, "Sales", 52000.0),
]
employeeDF = spark.createDataFrame(data, schema)
employeeDF.show(truncate=False)
# display(employeeDF)

+---+--------+---+----------+-------+
|id |name    |age|department|salary |
+---+--------+---+----------+-------+
|1  |Jonathan|30 |Sales     |50000.0|
|2  |Jan     |28 |Marketing |60000.0|
|3  |Steve   |32 |Finance   |55000.0|
|4  |Michelle|29 |Sales     |52000.0|
+---+--------+---+----------+-------+



In [26]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number

# Define the window specification
windowSpec = Window.partitionBy("department").orderBy(col("salary").desc())

# Add a row number within each department partition
employee_with_rank = employeeDF.withColumn("rank", row_number().over(windowSpec))

# Filter to keep only the top salary per department
top_earners = employee_with_rank.filter(col("rank") == 1).drop("rank")
top_earners.show()

+---+--------+---+----------+-------+
| id|    name|age|department| salary|
+---+--------+---+----------+-------+
|  3|   Steve| 32|   Finance|55000.0|
|  2|     Jan| 28| Marketing|60000.0|
|  4|Michelle| 29|     Sales|52000.0|
+---+--------+---+----------+-------+

