# PySpark Interview Review
The goal of this notebook is to review basic Pyspark synthax for Data Engineering interview questions.

# Creating SparkSession

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('PySparkReview').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/26 16:14:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
print(spark.version)

3.5.5


# Exercises

## How to convert the index of a PySpark DataFrame into a column?

In [4]:
df = spark.createDataFrame([
("Alice", 1),
("Bob", 2),
("Charlie", 3),
], ["Name", "Value"])

df.show()

+-------+-----+
|   Name|Value|
+-------+-----+
|  Alice|    1|
|    Bob|    2|
|Charlie|    3|
+-------+-----+



In [6]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, monotonically_increasing_id

# Define window specification
w = Window.orderBy(monotonically_increasing_id())

In [13]:
df = df.withColumn('index', row_number().over(w) - 1)
df.show()

+-------+-----+-----+
|   Name|Value|index|
+-------+-----+-----+
|  Alice|    1|    0|
|    Bob|    2|    1|
|Charlie|    3|    2|
+-------+-----+-----+



25/04/26 16:18:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/26 16:18:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/26 16:18:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


## How to combine many lists to form a PySpark DataFrame?

In [14]:
list1 = ["a", "b", "c", "d"]
list2 = [1, 2, 3, 4]

In [15]:
list(zip(list1, list2))

[('a', 1), ('b', 2), ('c', 3), ('d', 4)]

In [16]:
rdd = spark.sparkContext.parallelize(list(zip(list1, list2)))

In [17]:
df = rdd.toDF(['Col1', 'Col2'])
df.show()

+----+----+
|Col1|Col2|
+----+----+
|   a|   1|
|   b|   2|
|   c|   3|
|   d|   4|
+----+----+



## How to get the items of list A not present in list B?

In [19]:
list_a = [1, 2, 3, 4, 5]
list_b = [4, 5, 6, 7, 8]

In [20]:
sc = spark.sparkContext
rdd_a = sc.parallelize(list_a)
rdd_b = sc.parallelize(list_b)

In [21]:
rdd_a_minus_b = rdd_a.subtract(rdd_b)

In [23]:
rdd_a_minus_b.collect()

[1, 2, 3]

## How to get the minimum, 25th percentile, median, 75th, and max of a numeric column?

In [24]:
data = [("A", 10), ("B", 20), ("C", 30), ("D", 40), ("E", 50), ("F", 15), ("G", 28), ("H", 54), ("I", 41), ("J", 86)]
df = spark.createDataFrame(data, ["Name", "Age"])

df.show()

+----+---+
|Name|Age|
+----+---+
|   A| 10|
|   B| 20|
|   C| 30|
|   D| 40|
|   E| 50|
|   F| 15|
|   G| 28|
|   H| 54|
|   I| 41|
|   J| 86|
+----+---+



In [25]:
quantiles = df.approxQuantile("Age", [0.0, 0.25, 0.5, 0.75, 1.0], 0.01)

In [26]:
quantiles

[10.0, 20.0, 30.0, 50.0, 86.0]

## How to get frequency counts of unique items of a column?

In [27]:
from pyspark.sql import Row

# Sample data
data = [
Row(name='John', job='Engineer'),
Row(name='John', job='Engineer'),
Row(name='Mary', job='Scientist'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Scientist'),
Row(name='Sam', job='Doctor'),
]

# create DataFrame
df = spark.createDataFrame(data)

# show DataFrame
df.show()

+----+---------+
|name|      job|
+----+---------+
|John| Engineer|
|John| Engineer|
|Mary|Scientist|
| Bob| Engineer|
| Bob| Engineer|
| Bob|Scientist|
| Sam|   Doctor|
+----+---------+



In [29]:
df.groupBy('job').count().show()

+---------+-----+
|      job|count|
+---------+-----+
| Engineer|    4|
|Scientist|    2|
|   Doctor|    1|
+---------+-----+



## How to keep only top 2 most frequent values as it is and replace everything else as ‘Other’?

In [36]:
from pyspark.sql import Row

# Sample data
data = [
Row(name='John', job='Engineer'),
Row(name='John', job='Engineer'),
Row(name='Mary', job='Scientist'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Scientist'),
Row(name='Sam', job='Doctor'),
]

# create DataFrame
df = spark.createDataFrame(data)

# show DataFrame
df.show()

+----+---------+
|name|      job|
+----+---------+
|John| Engineer|
|John| Engineer|
|Mary|Scientist|
| Bob| Engineer|
| Bob| Engineer|
| Bob|Scientist|
| Sam|   Doctor|
+----+---------+



In [38]:
from pyspark.sql.functions import col, when

top2jobs =df.groupBy('job').count().orderBy('count', ascending=False).limit(2).select('job').rdd.flatMap(lambda x: x).collect()
top2jobs

['Engineer', 'Scientist']

In [42]:
df = df.withColumn('job', when(col('job').isin(top2jobs), col('job')).otherwise('Other'))

In [43]:
df.show()

+----+---------+
|name|      job|
+----+---------+
|John| Engineer|
|John| Engineer|
|Mary|Scientist|
| Bob| Engineer|
| Bob| Engineer|
| Bob|Scientist|
| Sam|    Other|
+----+---------+



## How to Drop rows with NA values specific to a particular column?

In [44]:
df = spark.createDataFrame([
("A", 1, None),
("B", None, "123" ),
("B", 3, "456"),
("D", None, None),
], ["Name", "Value", "id"])

df.show()

+----+-----+----+
|Name|Value|  id|
+----+-----+----+
|   A|    1|NULL|
|   B| NULL| 123|
|   B|    3| 456|
|   D| NULL|NULL|
+----+-----+----+



In [48]:
df2 = df.dropna(subset=['Value'])
df2.show()

+----+-----+----+
|Name|Value|  id|
+----+-----+----+
|   A|    1|NULL|
|   B|    3| 456|
+----+-----+----+



## How to rename columns of a PySpark DataFrame using two lists – one containing the old column names and the other containing the new column names?

In [49]:
# suppose you have the following DataFrame
df = spark.createDataFrame([(1, 2, 3), (4, 5, 6)], ["col1", "col2", "col3"])

# old column names
old_names = ["col1", "col2", "col3"]

# new column names
new_names = ["new_col1", "new_col2", "new_col3"]

df.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   2|   3|
|   4|   5|   6|
+----+----+----+



In [51]:
list(zip(old_names, new_names))

[('col1', 'new_col1'), ('col2', 'new_col2'), ('col3', 'new_col3')]

In [52]:
for rename_pair in list(zip(old_names, new_names)):
    df = df.withColumnRenamed(rename_pair[0], rename_pair[1])

In [53]:
df.show()

+--------+--------+--------+
|new_col1|new_col2|new_col3|
+--------+--------+--------+
|       1|       2|       3|
|       4|       5|       6|
+--------+--------+--------+



## How to create contigency table?

In [54]:
data = [("A", "X"), ("A", "Y"), ("A", "X"), ("B", "Y"), ("B", "X"), ("C", "X"), ("C", "X"), ("C", "Y")]
df = spark.createDataFrame(data, ["category1", "category2"])

df.show()

+---------+---------+
|category1|category2|
+---------+---------+
|        A|        X|
|        A|        Y|
|        A|        X|
|        B|        Y|
|        B|        X|
|        C|        X|
|        C|        X|
|        C|        Y|
+---------+---------+



In [55]:
df.cube('category1').count().show()

+---------+-----+
|category1|count|
+---------+-----+
|     NULL|    8|
|        A|    3|
|        B|    2|
|        C|    3|
+---------+-----+



In [56]:
df.crosstab('category1', 'category2').show()

+-------------------+---+---+
|category1_category2|  X|  Y|
+-------------------+---+---+
|                  B|  1|  1|
|                  C|  2|  1|
|                  A|  2|  1|
+-------------------+---+---+



## How to stack two DataFrames vertically ?

In [57]:
df_A = spark.createDataFrame([("apple", 3, 5), ("banana", 1, 10), ("orange", 2, 8)], ["Name", "Col_1", "Col_2"])
df_A.show()

# Create DataFrame for region B
df_B = spark.createDataFrame([("apple", 3, 5), ("banana", 1, 15), ("grape", 4, 6)], ["Name", "Col_1", "Col_3"])
df_B.show()

+------+-----+-----+
|  Name|Col_1|Col_2|
+------+-----+-----+
| apple|    3|    5|
|banana|    1|   10|
|orange|    2|    8|
+------+-----+-----+

+------+-----+-----+
|  Name|Col_1|Col_3|
+------+-----+-----+
| apple|    3|    5|
|banana|    1|   15|
| grape|    4|    6|
+------+-----+-----+



In [58]:
df_A.union(df_B).show()

+------+-----+-----+
|  Name|Col_1|Col_2|
+------+-----+-----+
| apple|    3|    5|
|banana|    1|   10|
|orange|    2|    8|
| apple|    3|    5|
|banana|    1|   15|
| grape|    4|    6|
+------+-----+-----+



## How to convert the first character of each element in a series to uppercase?

In [63]:
from pyspark.sql.functions import col, initcap

In [59]:
data = [("john",), ("alice",), ("bob",)]
df = spark.createDataFrame(data, ["name"])

df.show()

+-----+
| name|
+-----+
| john|
|alice|
|  bob|
+-----+



In [64]:
df = df.withColumn('capitalized_name', initcap(col('name')))

In [66]:
df.show()

+-----+----------------+
| name|capitalized_name|
+-----+----------------+
| john|            John|
|alice|           Alice|
|  bob|             Bob|
+-----+----------------+



## How to compute summary statistics for all columns in a dataframe

In [67]:
data = [('James', 34, 55000),
('Michael', 30, 70000),
('Robert', 37, 60000),
('Maria', 29, 80000),
('Jen', 32, 65000)]

df = spark.createDataFrame(data, ["name", "age" , "salary"])

df.show()

+-------+---+------+
|   name|age|salary|
+-------+---+------+
|  James| 34| 55000|
|Michael| 30| 70000|
| Robert| 37| 60000|
|  Maria| 29| 80000|
|    Jen| 32| 65000|
+-------+---+------+



In [69]:
df.summary().show()

25/04/26 16:46:08 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+------+-----------------+-----------------+
|summary|  name|              age|           salary|
+-------+------+-----------------+-----------------+
|  count|     5|                5|                5|
|   mean|  NULL|             32.4|          66000.0|
| stddev|  NULL|3.209361307176242|9617.692030835675|
|    min| James|               29|            55000|
|    25%|  NULL|               30|            60000|
|    50%|  NULL|               32|            65000|
|    75%|  NULL|               34|            70000|
|    max|Robert|               37|            80000|
+-------+------+-----------------+-----------------+



## How to calculate the number of characters in each word in a column?

In [71]:
from pyspark.sql.functions import length
data = [("john",), ("alice",), ("bob",)]
df = spark.createDataFrame(data, ["name"])

df.show()

+-----+
| name|
+-----+
| john|
|alice|
|  bob|
+-----+



In [73]:
df = df.withColumn('word_len', length(col('name')))

In [75]:
df.show()

+-----+--------+
| name|word_len|
+-----+--------+
| john|       4|
|alice|       5|
|  bob|       3|
+-----+--------+



## How to get the day of month, week number, day of year and day of week from a date strings?

In [76]:
data = [("2023-05-18","01 Jan 2010",), ("2023-12-31", "01 Jan 2010",)]
df = spark.createDataFrame(data, ["date_str_1", "date_str_2"])

df.show()

+----------+-----------+
|date_str_1| date_str_2|
+----------+-----------+
|2023-05-18|01 Jan 2010|
|2023-12-31|01 Jan 2010|
+----------+-----------+



In [77]:
from pyspark.sql.functions import to_date, dayofmonth, weekofyear, dayofyear, dayofweek

In [85]:
df = df.withColumn('date_str_1', to_date(col('date_str_1'), 'yyyy-MM-dd'))
df = df.withColumn('date_str_2', to_date(col('date_str_2'), 'dd MMM yyyy'))

In [86]:
df.schema

StructType([StructField('date_str_1', DateType(), True), StructField('date_str_2', DateType(), True)])

In [87]:
df.show()

+----------+----------+
|date_str_1|date_str_2|
+----------+----------+
|2023-05-18|2010-01-01|
|2023-12-31|2010-01-01|
+----------+----------+



In [90]:
df = df.withColumn(
    'day_of_month', dayofmonth(col('date_str_1'))
).withColumn(
    'week_numer', weekofyear(col('date_str_1'))
).withColumn(
    'day_of_year', dayofyear(col('date_str_1'))
).withColumn(
    'day_of_week', dayofweek(col('date_str_1'))
)

In [92]:
df.show()

+----------+----------+------------+----------+-----------+-----------+
|date_str_1|date_str_2|day_of_month|week_numer|day_of_year|day_of_week|
+----------+----------+------------+----------+-----------+-----------+
|2023-05-18|2010-01-01|          18|        20|        138|          5|
|2023-12-31|2010-01-01|          31|        52|        365|          1|
+----------+----------+------------+----------+-----------+-----------+



# Practicing on a real world Dataset

In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructType, StructField, IntegerType, StringType, DoubleType, BooleanType
)

In [18]:
spark = SparkSession.builder.appName('PySparkReview').getOrCreate()

In [19]:
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("age", IntegerType(), True),
    StructField("sex", StringType(), True),          
    StructField("dataset", StringType(), True), 
    StructField("cp", StringType(), True),       
    StructField("trestbps", IntegerType(), True),
    StructField("chol", IntegerType(), True),
    StructField("fbs", StringType(), True),         
    StructField("restecg", StringType(), True),     
    StructField("thalch", IntegerType(), True),
    StructField("exang", StringType(), True),   
    StructField("oldpeak", DoubleType(), True),
    StructField("slope", StringType(), True),    
    StructField("ca", IntegerType(), True),
    StructField("thal", StringType(), True),   
    StructField("num", IntegerType(), True)
])


In [20]:
df_heartrate = spark.read.options(
    header='True', delimiter=','
).csv(
    '/home/gabrielsgoncalves/Documents/Repositories/databricks_apache_spark_associate_certificate_study/datasets/heart_disease_uci.csv',
    header=True,
    schema=schema

)

## Getting an overview of the Dataset

In [25]:
df_heartrate.select(['age', 'sex', 'dataset', 'cp', 'trestbps']).summary().show()

+-------+-----------------+------+-------------+--------------+------------------+
|summary|              age|   sex|      dataset|            cp|          trestbps|
+-------+-----------------+------+-------------+--------------+------------------+
|  count|              920|   920|          920|           920|               861|
|   mean|53.51086956521739|  NULL|         NULL|          NULL|132.13240418118468|
| stddev|9.424685209576863|  NULL|         NULL|          NULL|19.066069518587465|
|    min|               28|Female|    Cleveland|  asymptomatic|                 0|
|    25%|               47|  NULL|         NULL|          NULL|               120|
|    50%|               54|  NULL|         NULL|          NULL|               130|
|    75%|               60|  NULL|         NULL|          NULL|               140|
|    max|               77|  Male|VA Long Beach|typical angina|               200|
+-------+-----------------+------+-------------+--------------+------------------+



## Saving file as parquet

In [26]:
df_heartrate.write.parquet('/home/gabrielsgoncalves/Documents/Repositories/databricks_apache_spark_associate_certificate_study/datasets/heart_disease_uci.parquet')

## Reading the parquet file

In [27]:
df_heartrate = spark.read.parquet(
    '/home/gabrielsgoncalves/Documents/Repositories/databricks_apache_spark_associate_certificate_study/datasets/heart_disease_uci.parquet/'
)

In [37]:
df_heartrate.show(5)

+---+---+------+---------+---------------+--------+----+-----+--------------+------+-----+-------+-----------+---+-----------------+---+
| id|age|   sex|  dataset|             cp|trestbps|chol|  fbs|       restecg|thalch|exang|oldpeak|      slope| ca|             thal|num|
+---+---+------+---------+---------------+--------+----+-----+--------------+------+-----+-------+-----------+---+-----------------+---+
|  1| 63|  Male|Cleveland| typical angina|     145| 233| TRUE|lv hypertrophy|   150|FALSE|    2.3|downsloping|  0|     fixed defect|  0|
|  2| 67|  Male|Cleveland|   asymptomatic|     160| 286|FALSE|lv hypertrophy|   108| TRUE|    1.5|       flat|  3|           normal|  2|
|  3| 67|  Male|Cleveland|   asymptomatic|     120| 229|FALSE|lv hypertrophy|   129| TRUE|    2.6|       flat|  2|reversable defect|  1|
|  4| 37|  Male|Cleveland|    non-anginal|     130| 250|FALSE|        normal|   187|FALSE|    3.5|downsloping|  0|           normal|  0|
|  5| 41|Female|Cleveland|atypical angina

In [38]:
df_heartrate.count()

920

## Get the average age for male and female

In [34]:
from pyspark.sql.functions import avg, round

In [35]:
df_heartrate.groupBy('sex').agg(round(avg('age'), 2).alias('avg_age')).show()

+------+-------+
|   sex|avg_age|
+------+-------+
|Female|  52.47|
|  Male|  53.79|
+------+-------+



In [14]:
df_heartrate.schema

StructType([StructField('id', StringType(), True), StructField('age', StringType(), True), StructField('sex', StringType(), True), StructField('dataset', StringType(), True), StructField('cp', StringType(), True), StructField('trestbps', StringType(), True), StructField('chol', StringType(), True), StructField('fbs', StringType(), True), StructField('restecg', StringType(), True), StructField('thalch', StringType(), True), StructField('exang', StringType(), True), StructField('oldpeak', StringType(), True), StructField('slope', StringType(), True), StructField('ca', StringType(), True), StructField('thal', StringType(), True), StructField('num', StringType(), True)])

In [None]:
# Practicing the 