[מקור](https://www.machinelearningplus.com/pyspark/pyspark-exercises-101-pyspark-exercises-for-data-analysis/)

1. How to import PySpark and check the version?

Difficulty Level: L1

In [217]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Name").getOrCreate()
print(spark.version)

4.0.1


2. How to convert the index of a PySpark DataFrame into a column?

In [218]:
df = spark.createDataFrame([
("Alice", 1),
("Bob", 2),
("Charlie", 3),
], ["Name", "Value"])

df.show()

+-------+-----+
|   Name|Value|
+-------+-----+
|  Alice|    1|
|    Bob|    2|
|Charlie|    3|
+-------+-----+



In [219]:
from pyspark.sql.functions import row_number, to_date
from pyspark.sql.window import Window

win = Window.orderBy("Name")
df_index = df.withColumn("index", row_number().over(win) - 1)
df_index.show()

25/11/13 15:00:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/13 15:00:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/13 15:00:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+-------+-----+-----+
|   Name|Value|index|
+-------+-----+-----+
|  Alice|    1|    0|
|    Bob|    2|    1|
|Charlie|    3|    2|
+-------+-----+-----+



3. How to combine many lists to form a PySpark DataFrame?

In [220]:
list1 = ["a", "b", "c", "d"]
list2 = [1, 2, 3, 4]


data = list(zip(list1, list2))
df_list = spark.createDataFrame(data, ["Letter", "Number"])
df_list.show()

+------+------+
|Letter|Number|
+------+------+
|     a|     1|
|     b|     2|
|     c|     3|
|     d|     4|
+------+------+



4. How to get the items of list A not present in list B?

In [221]:
list_A = [1, 2, 3, 4, 5]
list_B = [4, 5, 6, 7, 8]


df_A = spark.createDataFrame([(x,) for x in list_A], ["value"])
df_B = spark.createDataFrame([(x,) for x in list_B], ["value"])
df_diff = df_A.join(df_B, on="value", how="left_anti")
df_diff.show()


# sc = spark.sparkContext
# # Convert lists to RDD
# rdd_A = sc.parallelize(list_A)
# rdd_B = sc.parallelize(list_B)
# # Perform subtract operation
# result_rdd_A = rdd_A.subtract(rdd_B)
# result_rdd_B = rdd_B.subtract(rdd_A)
# # Union the two RDDs
# result_rdd = result_rdd_A.union(result_rdd_B)
# print(result_rdd.collect())

+-----+
|value|
+-----+
|    1|
|    2|
|    3|
+-----+



6. How to get the minimum, 25th percentile, median, 75th, and max of a numeric column?


In [222]:
# Create a sample DataFrame
data = [("A", 10), ("B", 20), ("C", 30), ("D", 40), ("E", 50), ("F", 15), ("G", 28), ("H", 54), ("I", 41), ("J", 86)]
df = spark.createDataFrame(data, ["Name", "Age"])


quantile = df.approxQuantile("Age", [0.0, 0.25, 0.5, 0.75, 1.0], 0)

print(f"min : {quantile[0]}")
print(f"25th percentile : {quantile[1]}")
print(f"median : {quantile[2]}")
print(f"75th percentile : {quantile[3]}")
print(f"max : {quantile[4]}")

min : 10.0
25th percentile : 20.0
median : 30.0
75th percentile : 50.0
max : 86.0


7. How to get frequency counts of unique items of a column?


In [223]:
from pyspark.sql import Row
data = [
Row(name='John', job='Engineer'),
Row(name='John', job='Engineer'),
Row(name='Mary', job='Scientist'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Scientist'),
Row(name='Sam', job='Doctor'),
]
df = spark.createDataFrame(data)


count_df = df.groupBy("job").count().orderBy("count", ascending=False)
count_df.show()

+---------+-----+
|      job|count|
+---------+-----+
| Engineer|    4|
|Scientist|    2|
|   Doctor|    1|
+---------+-----+



8. How to keep only top 2 most frequent values as it is and replace everything else as ‘Other’?

In [224]:
from pyspark.sql.functions import col, when

top_2 = count_df.select("job").limit(2).rdd.flatMap(lambda x: x).collect()
df_top_2 = df.withColumn('job', when(col('job').isin(top_2), col('job')).otherwise('Other'))
df_top_2.show()


+----+---------+
|name|      job|
+----+---------+
|John| Engineer|
|John| Engineer|
|Mary|Scientist|
| Bob| Engineer|
| Bob| Engineer|
| Bob|Scientist|
| Sam|    Other|
+----+---------+



9. How to Drop rows with NA values specific to a particular column?

In [225]:
df = spark.createDataFrame([
("A", 1, None),
("B", None, "123" ),
("B", 3, "456"),
("D", None, None),
], ["Name", "Value", "id"])

df_dropped = df.na.drop(subset=["Value"])
df_dropped.show()

+----+-----+----+
|Name|Value|  id|
+----+-----+----+
|   A|    1|NULL|
|   B|    3| 456|
+----+-----+----+



10. How to rename columns of a PySpark DataFrame using two lists – one containing the old column names and the other containing the new column names?

In [226]:
# suppose you have the following DataFrame
df = spark.createDataFrame([(1, 2, 3), (4, 5, 6)], ["col1", "col2", "col3"])
# old column names
old_names = ["col1", "col2", "col3"]
# new column names
new_names = ["new_col1", "new_col2", "new_col3"]
df.show()

for old_name, new_name in zip(old_names, new_names):
    df = df.withColumnRenamed(old_name, new_name)
df.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   2|   3|
|   4|   5|   6|
+----+----+----+

+--------+--------+--------+
|new_col1|new_col2|new_col3|
+--------+--------+--------+
|       1|       2|       3|
|       4|       5|       6|
+--------+--------+--------+



11. How to bin a numeric list to 10 groups of equal size?

In [227]:
from pyspark.sql.functions import rand
from pyspark.ml.feature import Bucketizer
df = spark.range(100).select(rand(seed=42).alias("values"))


num_buckets = 10
list_quantiles = [i / num_buckets for i in range(num_buckets + 1)]
quantiles = df.stat.approxQuantile("values", list_quantiles, 0)
bucketizer = Bucketizer(splits=quantiles, inputCol="values", outputCol="buckets")
df_buck = bucketizer.transform(df)
df_buck.groupBy("buckets").count().orderBy("buckets").show()


+-------+-----+
|buckets|count|
+-------+-----+
|    0.0|    9|
|    1.0|   10|
|    2.0|   10|
|    3.0|   10|
|    4.0|   10|
|    5.0|   10|
|    6.0|   10|
|    7.0|   10|
|    8.0|   10|
|    9.0|   11|
+-------+-----+



12. How to create contigency table?


In [228]:
data = [("A", "X"), ("A", "Y"), ("A", "X"), ("B", "Y"), ("B", "X"), ("C", "X"), ("C", "X"), ("C", "Y")]
df = spark.createDataFrame(data, ["category1", "category2"])

df.groupBy("category1", "category2").count().show()


df.crosstab('category1', 'category2').show()


+---------+---------+-----+
|category1|category2|count|
+---------+---------+-----+
|        A|        X|    2|
|        A|        Y|    1|
|        B|        Y|    1|
|        B|        X|    1|
|        C|        X|    2|
|        C|        Y|    1|
+---------+---------+-----+

+-------------------+---+---+
|category1_category2|  X|  Y|
+-------------------+---+---+
|                  B|  1|  1|
|                  C|  2|  1|
|                  A|  2|  1|
+-------------------+---+---+



13. How to find the numbers that are multiples of 3 from a column?

In [229]:
from pyspark.sql.functions import rand

df = spark.range(10)
df = df.withColumn("random", ((rand(seed=42) * 10) + 1).cast("int"))


df = df.withColumn("multiple_of_3", when(col("random") % 3 == 0, 1).otherwise(0))
df.show()

+---+------+-------------+
| id|random|multiple_of_3|
+---+------+-------------+
|  0|     7|            0|
|  1|     9|            1|
|  2|     8|            0|
|  3|     8|            0|
|  4|     3|            1|
|  5|     1|            0|
|  6|     7|            0|
|  7|     4|            0|
|  8|     5|            0|
|  9|     1|            0|
+---+------+-------------+



14. How to extract items at given positions from a column?

In [230]:
from pyspark.sql.functions import rand

df = spark.range(10)
df = df.withColumn("random", ((rand(seed=42) * 10) + 1).cast("int"))
pos = [0, 4, 8, 5]


df_filtered = df.filter(col("id").isin(pos))
df_filtered.show()

+---+------+
| id|random|
+---+------+
|  0|     7|
|  4|     3|
|  5|     1|
|  8|     5|
+---+------+



15. How to stack two DataFrames vertically ?

In [231]:
df_A = spark.createDataFrame([("apple", 3, 5), ("banana", 1, 10), ("orange", 2, 8)], ["Name", "Col_1", "Col_2"])
df_B = spark.createDataFrame([("apple", 3, 5), ("banana", 1, 15), ("grape", 4, 6)], ["Name", "Col_1", "Col_3"])


df_A.union(df_B).groupBy("Name").sum().show()

+------+----------+----------+
|  Name|sum(Col_1)|sum(Col_2)|
+------+----------+----------+
| apple|         6|        10|
|banana|         2|        25|
|orange|         2|         8|
| grape|         4|         6|
+------+----------+----------+



16. How to compute the mean squared error on a truth and predicted columns?

In [232]:
from pyspark.sql.functions import mean

data = [(1, 1), (2, 4), (3, 9), (4, 16), (5, 25)]
df = spark.createDataFrame(data, ["actual", "predicted"])


# Calculate the squared differences
df = df.withColumn("squared_error", pow((col("actual") - col("predicted")), 2))

# Calculate the mean squared error
df.agg(mean("squared_error")).show()


+------------------+
|avg(squared_error)|
+------------------+
|             116.8|
+------------------+



17. How to convert the first character of each element in a series to uppercase?

In [233]:
from pyspark.sql.functions import initcap

data = [("john",), ("alice",), ("bob",)]
df = spark.createDataFrame(data, ["name"])


df = df.withColumn("name", initcap(df["name"]))
df.show()

+-----+
| name|
+-----+
| John|
|Alice|
|  Bob|
+-----+



18. How to compute summary statistics for all columns in a dataframe

In [234]:
data = [('James', 34, 55000),
('Michael', 30, 70000),
('Robert', 37, 60000),
('Maria', 29, 80000),
('Jen', 32, 65000)]
df = spark.createDataFrame(data, ["name", "age" , "salary"])


df.summary().show()


+-------+------+-----------------+-----------------+
|summary|  name|              age|           salary|
+-------+------+-----------------+-----------------+
|  count|     5|                5|                5|
|   mean|  NULL|             32.4|          66000.0|
| stddev|  NULL|3.209361307176242|9617.692030835675|
|    min| James|               29|            55000|
|    25%|  NULL|               30|            60000|
|    50%|  NULL|               32|            65000|
|    75%|  NULL|               34|            70000|
|    max|Robert|               37|            80000|
+-------+------+-----------------+-----------------+



19. How to calculate the number of characters in each word in a column?

In [235]:
from pyspark.sql.functions import length

data = [("john",), ("alice",), ("bob",)]
df = spark.createDataFrame(data, ["name"])

df = df.withColumn("name_length", length(df["name"]))
df.show()

+-----+-----------+
| name|name_length|
+-----+-----------+
| john|          4|
|alice|          5|
|  bob|          3|
+-----+-----------+



20 How to compute difference of differences between consecutive numbers of a column?

In [236]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, col

data = [('James', 34, 55000),
('Michael', 30, 70000),
('Robert', 37, 60000),
('Maria', 29, 80000),
('Jen', 32, 65000)]
df = spark.createDataFrame(data, ["name", "age" , "salary"])


win = Window.orderBy("salary")
df = df.withColumn("salary_diff", col("salary") - lag(col("salary"), 1).over(win))
df.show()

+-------+---+------+-----------+
|   name|age|salary|salary_diff|
+-------+---+------+-----------+
|  James| 34| 55000|       NULL|
| Robert| 37| 60000|       5000|
|    Jen| 32| 65000|       5000|
|Michael| 30| 70000|       5000|
|  Maria| 29| 80000|      10000|
+-------+---+------+-----------+



25/11/13 15:00:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/13 15:00:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/13 15:00:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/13 15:00:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/13 15:00:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


21. How to get the day of month, week number, day of year and day of week from a date strings?

In [237]:
from pyspark.sql.functions import to_date, dayofmonth, weekofyear, dayofyear, date_format

data = [("2023-05-18","01 Jan 2010",), ("2023-12-31", "01 Jan 2010",)]
df = spark.createDataFrame(data, ["date_str_1", "date_str_2"])


df = df.withColumn("date_1", to_date(col("date_str_1"), "yyyy-MM-dd")) \
    .withColumn("date_2", to_date(col("date_str_2"), "dd MMM yyyy"))

df = df.withColumn("day_of_month", dayofmonth(col("date_1"))) \
    .withColumn("week_of_year", weekofyear(col("date_1"))) \
    .withColumn("day_of_year", dayofyear(col("date_1"))) \
    .withColumn("day_of_week", date_format(col("date_1"), "EEEE"))
df.select("date_str_1", "day_of_month", "week_of_year", "day_of_year", "day_of_week","date_str_2","date_2").show()

+----------+------------+------------+-----------+-----------+-----------+----------+
|date_str_1|day_of_month|week_of_year|day_of_year|day_of_week| date_str_2|    date_2|
+----------+------------+------------+-----------+-----------+-----------+----------+
|2023-05-18|          18|          20|        138|   Thursday|01 Jan 2010|2010-01-01|
|2023-12-31|          31|          52|        365|     Sunday|01 Jan 2010|2010-01-01|
+----------+------------+------------+-----------+-----------+-----------+----------+



22. How to convert year-month string to dates corresponding to the 4th day of the month?

In [238]:
from pyspark.sql.functions import to_date
df = spark.createDataFrame([('Jan 2010',), ('Feb 2011',), ('Mar 2012',)], ['MonthYear'])


df = df.withColumn('Date', to_date("MonthYear", 'MMM yyyy')+3)
df.show()

+---------+----------+
|MonthYear|      Date|
+---------+----------+
| Jan 2010|2010-01-04|
| Feb 2011|2011-02-04|
| Mar 2012|2012-03-04|
+---------+----------+



23 How to filter words that contain atleast 2 vowels from a series?

In [239]:
from pyspark.sql.functions import translate , col, length
df = spark.createDataFrame([('Apple',), ('Orange',), ('Plan',) , ('Python',) , ('Money',)], ['Word'])
df.show()

df_filt = df.withColumn("Wrd",translate(col('Word'),'AEIOUaeiou', ''))\
    .withColumn("diff", length(col('Word')) - length(col('Wrd')))\
    .filter(col('diff') >= 2)\
    .drop('Wrd','diff')
df_filt.show()


+------+
|  Word|
+------+
| Apple|
|Orange|
|  Plan|
|Python|
| Money|
+------+

+------+
|  Word|
+------+
| Apple|
|Orange|
| Money|
+------+



24. How to filter valid emails from a list?

In [272]:
data = [('buying books at amazom.com',),
        ('rameses@egypt.com',),
        ('matt@t.co',),
        ('narendra@modi.com',)]
df = spark.createDataFrame(data, ["value"])


pattern = r'^[A-Za-z0-9_.+\-]+@[A-Za-z0-9\-]+\.[A-Za-z0-9\-.]+$'
df.filter(col("value").rlike(pattern)).show(truncate =False)

+-----------------+
|value            |
+-----------------+
|rameses@egypt.com|
|matt@t.co        |
|narendra@modi.com|
+-----------------+



25. How to Pivot PySpark DataFrame?

In [275]:
data = [
(2021, 1, "US", 5000),
(2021, 1, "EU", 4000),
(2021, 2, "US", 5500),
(2021, 2, "EU", 4500),
(2021, 3, "US", 6000),
(2021, 3, "EU", 5000),
(2021, 4, "US", 7000),
(2021, 4, "EU", 6000),
]

# Create DataFrame
columns = ["year", "quarter", "region", "revenue"]
df = spark.createDataFrame(data, columns)
df.show()


pivot_df = df.groupBy("year", "quarter").pivot("region").sum("revenue").orderBy("year","quarter").show()

+----+-------+------+-------+
|year|quarter|region|revenue|
+----+-------+------+-------+
|2021|      1|    US|   5000|
|2021|      1|    EU|   4000|
|2021|      2|    US|   5500|
|2021|      2|    EU|   4500|
|2021|      3|    US|   6000|
|2021|      3|    EU|   5000|
|2021|      4|    US|   7000|
|2021|      4|    EU|   6000|
+----+-------+------+-------+

+----+-------+----+----+
|year|quarter|  EU|  US|
+----+-------+----+----+
|2021|      1|4000|5000|
|2021|      2|4500|5500|
|2021|      3|5000|6000|
|2021|      4|6000|7000|
+----+-------+----+----+



26. How to get the mean of a variable grouped by another variable?

In [277]:
from pyspark.sql.functions import avg

data = [("1001", "Laptop", 1000),
("1002", "Mouse", 50),
("1003", "Laptop", 1200),
("1004", "Mouse", 30),
("1005", "Smartphone", 700)]
columns = ["OrderID", "Product", "Price"]
df = spark.createDataFrame(data, columns)
df.show()


result = df.groupBy("Product").agg(avg("Price").alias("Total_Sales"))
result.show()

+-------+----------+-----+
|OrderID|   Product|Price|
+-------+----------+-----+
|   1001|    Laptop| 1000|
|   1002|     Mouse|   50|
|   1003|    Laptop| 1200|
|   1004|     Mouse|   30|
|   1005|Smartphone|  700|
+-------+----------+-----+

+----------+-----------+
|   Product|Total_Sales|
+----------+-----------+
|    Laptop|     1100.0|
|     Mouse|       40.0|
|Smartphone|      700.0|
+----------+-----------+



27. How to compute the euclidean distance between two columns?

In [282]:
from pyspark.sql.functions import expr
from pyspark.ml.feature import VectorAssembler

data = [(1, 10), (2, 9), (3, 8), (4, 7), (5, 6), (6, 5), (7, 4), (8, 3), (9, 2), (10, 1)]
df = spark.createDataFrame(data, ["series1", "series2"])


vecAssembler = VectorAssembler(inputCols=["series1", "series2"], outputCol="vectors")
df = vecAssembler.transform(df)
df = df.withColumn("squared_diff", expr("POW(series1 - series2, 2)"))
df.agg(expr("SQRT(SUM(squared_diff))").alias("euclidean_distance")).show()

+------------------+
|euclidean_distance|
+------------------+
| 18.16590212458495|
+------------------+



28. How to replace missing spaces in a string with the least frequent character?

In [297]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from collections import Counter

df = spark.createDataFrame([('dbc deb abed gade',),], ["string"])


def least_freq_char_replace_spaces(s):
    counter = Counter(s.replace(" ", ""))
    least_freq_char = min(counter, key = counter.get)
    return s.replace(' ', least_freq_char)

udf_least_freq_char_replace_spaces = udf(least_freq_char_replace_spaces, StringType())

df2 = df.withColumn('modified_string', udf_least_freq_char_replace_spaces(df['string']))
df2.show(truncate=False)

+-----------------+-----------------+
|string           |modified_string  |
+-----------------+-----------------+
|dbc deb abed gade|dbccdebcabedcgade|
+-----------------+-----------------+



29. How to create a TimeSeries starting ‘2000-01-01’ and 10 weekends (saturdays) after that having random numbers as values?

In [313]:
from pyspark.sql.functions import explode, sequence, rand, dayofweek

data = [('2000-01-01','2000-04-01')]
df = (spark.createDataFrame(data, ["start_date","end_date"])
      .withColumn("start_date", to_date(col("start_date")))
      .withColumn("end_date", to_date(col("end_date"))))

df = df.withColumn("dates" , explode(sequence(col("start_date"), col("end_date"))))

df = df.filter(dayofweek(df.dates) == 7).limit(10)

df = df.withColumn("random_numbers", (rand(seed=42)*10+1).cast("int")).drop("start_date","end_date")
df.show(truncate=False)

+----------+--------------+
|dates     |random_numbers|
+----------+--------------+
|2000-01-01|7             |
|2000-01-08|6             |
|2000-01-15|9             |
|2000-01-22|3             |
|2000-01-29|7             |
|2000-02-05|6             |
|2000-02-12|10            |
|2000-02-19|1             |
|2000-02-26|10            |
|2000-03-04|8             |
+----------+--------------+



30. How to get the nrows, ncolumns, datatype of a dataframe?

In [320]:
data = [("1001", "Laptop", 1000),
("1002", "Mouse", 50),
("1003", "Laptop", 1200),
("1004", "Mouse", 30),
("1005", "Smartphone", 700)]
columns = ["OrderID", "Product", "Price"]
df = spark.createDataFrame(data, columns)

print("Number of Rows: ", df.count())
print("Number of Columns: ", len(df.columns))
df.printSchema()

Number of Rows:  5
Number of Columns:  3
root
 |-- OrderID: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Price: long (nullable = true)



31. How to rename a specific columns in a dataframe?

In [319]:
df = spark.createDataFrame([('Alice', 1, 30),('Bob', 2, 35)], ["name", "age", "qty"])
old_names = ["qty", "age"]
new_names = ["user_qty", "user_age"]


for old_name, new_name in zip(old_names, new_names):
    df = df.withColumnRenamed(old_name, new_name)
df.show()

+-----+--------+--------+
| name|user_age|user_qty|
+-----+--------+--------+
|Alice|       1|      30|
|  Bob|       2|      35|
+-----+--------+--------+



32. How to check if a dataframe has any missing values and count of missing values in each column?

In [321]:
from pyspark.sql.functions import col, sum

df = spark.createDataFrame([
("A", 1, None),
("B", None, "123" ),
("B", 3, "456"),
("D", None, None),
], ["Name", "Value", "id"])


missing_counts = df.select([sum(when(col(c).isNull() , 1).otherwise(0)).alias(c) for c in df.columns])
missing_counts.show()

+----+-----+---+
|Name|Value| id|
+----+-----+---+
|   0|    2|  2|
+----+-----+---+



33 How to replace missing values of multiple numeric columns with the mean?

In [329]:
from pyspark.sql.types import  LongType
from pyspark.ml.feature import Imputer

df = spark.createDataFrame([
("A", 1, None),
("B", None, 123 ),
("B", 3, 456),
("D", 6, None),
], ["Name", "var1", "var2"])


column_names = [field.name for field in df.schema.fields if isinstance(field.dataType, LongType)]
imputer = Imputer(inputCols= column_names, outputCols= column_names, strategy="mean")
model = imputer.fit(df)
imputed_df = model.transform(df)
imputed_df.show(5)

+----+----+----+
|Name|var1|var2|
+----+----+----+
|   A|   1| 289|
|   B|   3| 123|
|   B|   3| 456|
|   D|   6| 289|
+----+----+----+



34. How to change the order of columns of a dataframe?

In [331]:
data = [("John", "Doe", 30), ("Jane", "Doe", 25), ("Alice", "Smith", 22)]
df = spark.createDataFrame(data, ["First_Name", "Last_Name", "Age"])
df.show()


df = df.select("Age", "First_Name", "Last_Name")
df.show()

+----------+---------+---+
|First_Name|Last_Name|Age|
+----------+---------+---+
|      John|      Doe| 30|
|      Jane|      Doe| 25|
|     Alice|    Smith| 22|
+----------+---------+---+

+---+----------+---------+
|Age|First_Name|Last_Name|
+---+----------+---------+
| 30|      John|      Doe|
| 25|      Jane|      Doe|
| 22|     Alice|    Smith|
+---+----------+---------+



35. How to format or suppress scientific notations in a PySpark DataFrame?


In [333]:
from pyspark.sql.functions import format_number

df = spark.createDataFrame([(1, 0.000000123), (2, 0.000023456), (3, 0.000345678)], ["id", "your_column"])


df = df.withColumn("your_column", format_number("your_column", 10))
df.show()

+---+------------+
| id| your_column|
+---+------------+
|  1|0.0000001230|
|  2|0.0000234560|
|  3|0.0003456780|
+---+------------+



36. How to format all the values in a dataframe as percentages?

In [334]:
from pyspark.sql.functions import concat, col, lit

columns = ["numbers_1", "numbers_2"]
data = [(0.1, .08), (0.2, .06), (0.33, .02)]
df = spark.createDataFrame(data, columns)


columns = ["numbers_1", "numbers_2"]
for col_name in columns:
    df = df.withColumn(col_name, concat((col(col_name) * 100).cast('decimal(10, 2)'), lit("%")))

df.show()

+---------+---------+
|numbers_1|numbers_2|
+---------+---------+
|   10.00%|    8.00%|
|   20.00%|    6.00%|
|   33.00%|    2.00%|
+---------+---------+



37. How to filter every nth row in a dataframe?


In [335]:
from pyspark.sql.functions import monotonically_increasing_id

data = [("Alice", 1), ("Bob", 2), ("Charlie", 3), ("Dave", 4), ("Eve", 5),
("Frank", 6), ("Grace", 7), ("Hannah", 8), ("Igor", 9), ("Jack", 10)]
df = spark.createDataFrame(data, ["Name", "Number"])


window = Window.orderBy(monotonically_increasing_id())
df = df.withColumn("rn", row_number().over(window))
n = 5
df = df.filter((df.rn % n) == 0)
df.show()

25/11/13 16:59:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/13 16:59:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/13 16:59:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/13 16:59:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/13 16:59:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+----+------+---+
|Name|Number| rn|
+----+------+---+
| Eve|     5|  5|
|Jack|    10| 10|
+----+------+---+



38 How to get the row number of the nth largest value in a column?


In [344]:
from pyspark.sql import Row
from pyspark.sql.window import Window
from pyspark.sql.functions import desc, row_number

data = [
Row(id=1, column1=5),
Row(id=2, column1=8),
Row(id=3, column1=12),
Row(id=4, column1=1),
Row(id=5, column1=15),
Row(id=6, column1=7),
]
df = spark.createDataFrame(data)


win = Window.orderBy(desc("column1"))
df = df.withColumn("row_number", row_number().over(win))

n = 3
row = df.filter(df.row_number == n)

row.show()

+---+-------+----------+
| id|column1|row_number|
+---+-------+----------+
|  2|      8|         3|
+---+-------+----------+



25/11/13 17:08:29 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/13 17:08:29 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/13 17:08:29 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


39. How to get the last n rows of a dataframe with row sum > 100?


In [349]:
from functools import reduce

data = [(10, 25, 70),
(40, 5, 20),
(70, 80, 100),
(10, 2, 60),
(40, 50, 20)]
df = spark.createDataFrame(data, ["col1", "col2", "col3"])


df = df.withColumn('row_sum', reduce(lambda a, b: a+b, [col(x) for x in df.columns]))
df = df.filter(col('row_sum') > 100)
df = df.withColumn('id', monotonically_increasing_id())
df_last_2 = df.sort(desc('id')).limit(2)
df_last_2.show()

+----+----+----+-------+-----------+
|col1|col2|col3|row_sum|         id|
+----+----+----+-------+-----------+
|  40|  50|  20|    110|60129542144|
|  70|  80| 100|    250|34359738368|
+----+----+----+-------+-----------+



40. How to create a column containing the minimum by maximum of each row?


In [354]:
from pyspark.sql.functions import udf, array
from pyspark.sql.types import FloatType

data = [(1, 2, 3), (4, 5, 6), (7, 8, 9), (10, 11, 12)]
df = spark.createDataFrame(data, ["col1", "col2", "col3"])


def min_max_ratio(row):
    return float(min(row)) / max(row)

min_max_ratio_udf = udf(min_max_ratio, FloatType())
df = df.withColumn('min_by_max', min_max_ratio_udf(array(df.columns)))
df.show()

+----+----+----+----------+
|col1|col2|col3|min_by_max|
+----+----+----+----------+
|   1|   2|   3|0.33333334|
|   4|   5|   6| 0.6666667|
|   7|   8|   9| 0.7777778|
|  10|  11|  12| 0.8333333|
+----+----+----+----------+



41. How to create a column that contains the penultimate value in each row?


In [355]:
from pyspark.sql.types import ArrayType, IntegerType

data = [(10, 20, 30),
(40, 60, 50),
(80, 70, 90)]
df = spark.createDataFrame(data, ["Column1", "Column2", "Column3"])


sort_array_desc = udf(lambda arr: sorted(arr), ArrayType(IntegerType()))

df = df.withColumn("row_as_array", sort_array_desc(array(df.columns)))
df = df.withColumn("Penultimate", df['row_as_array'].getItem(1))
df = df.drop('row_as_array')
df.show()

+-------+-------+-------+-----------+
|Column1|Column2|Column3|Penultimate|
+-------+-------+-------+-----------+
|     10|     20|     30|         20|
|     40|     60|     50|         50|
|     80|     70|     90|         80|
+-------+-------+-------+-----------+



42. How to normalize all columns in a dataframe?


In [356]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

input_cols = ["Col1", "Col2", "Col3"]
data = [(1, 2, 3),
(2, 3, 4),
(3, 4, 5),
(4, 5, 6)]
df = spark.createDataFrame(data, input_cols)


assembler = VectorAssembler(inputCols=input_cols, outputCol="features")
df_assembled = assembler.transform(df)
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
scalerModel = scaler.fit(df_assembled)
df_normalized = scalerModel.transform(df_assembled)
df_normalized = df_normalized.drop('features')
df_normalized.show(truncate=False)

+----+----+----+-------------------------------------------------------------+
|Col1|Col2|Col3|scaled_features                                              |
+----+----+----+-------------------------------------------------------------+
|1   |2   |3   |[-1.161895003862225,-1.161895003862225,-1.161895003862225]   |
|2   |3   |4   |[-0.3872983346207417,-0.3872983346207417,-0.3872983346207417]|
|3   |4   |5   |[0.3872983346207417,0.3872983346207417,0.3872983346207417]   |
|4   |5   |6   |[1.161895003862225,1.161895003862225,1.161895003862225]      |
+----+----+----+-------------------------------------------------------------+



43. How to get the positions where values of two columns match?


In [357]:
data = [("John", "John"), ("Lily", "Lucy"), ("Sam", "Sam"), ("Lucy", "Lily")]
df = spark.createDataFrame(data, ["Name1", "Name2"])


df = df.withColumn("Match", when(col("Name1") == col("Name2"), True).otherwise(False))
df.show()

+-----+-----+-----+
|Name1|Name2|Match|
+-----+-----+-----+
| John| John| true|
| Lily| Lucy|false|
|  Sam|  Sam| true|
| Lucy| Lily|false|
+-----+-----+-----+



44. How to create lags and leads of a column by group in a dataframe?


In [358]:
from pyspark.sql.functions import lead

data = [("2023-01-01", "Store1", 100),
("2023-01-02", "Store1", 150),
("2023-01-03", "Store1", 200),
("2023-01-04", "Store1", 250),
("2023-01-05", "Store1", 300),
("2023-01-01", "Store2", 50),
("2023-01-02", "Store2", 60),
("2023-01-03", "Store2", 80),
("2023-01-04", "Store2", 90),
("2023-01-05", "Store2", 120)]
df = spark.createDataFrame(data, ["Date", "Store", "Sales"])


df = df.withColumn("Date", to_date(df.Date, 'yyyy-MM-dd'))
windowSpec = Window.partitionBy("Store").orderBy("Date")

df = df.withColumn("Lag_Sales", lag(df["Sales"]).over(windowSpec))
df = df.withColumn("Lead_Sales", lead(df["Sales"]).over(windowSpec))

df.show()

+----------+------+-----+---------+----------+
|      Date| Store|Sales|Lag_Sales|Lead_Sales|
+----------+------+-----+---------+----------+
|2023-01-01|Store1|  100|     NULL|       150|
|2023-01-02|Store1|  150|      100|       200|
|2023-01-03|Store1|  200|      150|       250|
|2023-01-04|Store1|  250|      200|       300|
|2023-01-05|Store1|  300|      250|      NULL|
|2023-01-01|Store2|   50|     NULL|        60|
|2023-01-02|Store2|   60|       50|        80|
|2023-01-03|Store2|   80|       60|        90|
|2023-01-04|Store2|   90|       80|       120|
|2023-01-05|Store2|  120|       90|      NULL|
+----------+------+-----+---------+----------+



45. How to get the frequency of unique values in the entire dataframe?


In [359]:
columns = ["Column1", "Column2", "Column3"]
data = [(1, 2, 3),
(2, 3, 4),
(1, 2, 3),
(4, 5, 6),
(2, 3, 4)]
df = spark.createDataFrame(data, columns)


df_single = None
for c in columns:
    if df_single is None:
        df_single = df.select(col(c).alias("single_column"))
    else:
        df_single = df_single.union(df.select(col(c).alias("single_column")))
frequency_table = df_single.groupBy("single_column").count().orderBy('count', ascending=False)
frequency_table.show()

+-------------+-----+
|single_column|count|
+-------------+-----+
|            2|    4|
|            3|    4|
|            4|    3|
|            1|    2|
|            5|    1|
|            6|    1|
+-------------+-----+



46. How to replace both the diagonals of dataframe with 0?


In [360]:
data = [(1, 2, 3, 4),
(2, 3, 4, 5),
(1, 2, 3, 4),
(4, 5, 6, 7)]
df = spark.createDataFrame(data, ["col_1", "col_2", "col_3", "col_4"])


w = Window.orderBy(monotonically_increasing_id())
df = df.withColumn("id", row_number().over(w) - 1)
df = df.select([when(col("id") == i, 0).otherwise(col("col_"+str(i+1))).alias("col_"+str(i+1)) for i in range(4)])

# Create a reverse id column
df = df.withColumn("id", row_number().over(w) - 1)
df = df.withColumn("id_2", df.count() - 1 - df["id"])
df_with_diag_zero = df.select([when(col("id_2") == i, 0).otherwise(col("col_"+str(i+1))).alias("col_"+str(i+1)) for i in range(4)])
df_with_diag_zero.show()

+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
|    0|    2|    3|    0|
|    2|    0|    0|    5|
|    1|    0|    0|    4|
|    0|    5|    6|    0|
+-----+-----+-----+-----+



25/11/13 17:41:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/13 17:41:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/13 17:41:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/13 17:41:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/13 17:41:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/13 17:41:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/13 1

47. How to reverse the rows of a dataframe?


In [361]:
data = [(1, 2, 3, 4),
(2, 3, 4, 5),
(3, 4, 5, 6),
(4, 5, 6, 7)]
df = spark.createDataFrame(data, ["col_1", "col_2", "col_3", "col_4"])


w = Window.orderBy(monotonically_increasing_id())
df = df.withColumn("id", row_number().over(w) - 1)
df_2 = df.orderBy("id", ascending=False).drop("id")
df_2.show()

+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
|    4|    5|    6|    7|
|    3|    4|    5|    6|
|    2|    3|    4|    5|
|    1|    2|    3|    4|
+-----+-----+-----+-----+



25/11/13 17:42:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/13 17:42:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/13 17:42:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/13 17:42:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/13 17:42:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


48. How to create one-hot encodings of a categorical variable (dummy variables)?


In [362]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

data = [("A", 10),("A", 20),("B", 30),("B", 20),("B", 30),("C", 40),("C", 10),("D", 10)]
columns = ["Categories", "Value"]
df = spark.createDataFrame(data, columns)


indexer = StringIndexer(inputCol="Categories", outputCol="Categories_Indexed")
indexerModel = indexer.fit(df)
indexed_df = indexerModel.transform(df)
encoder = OneHotEncoder(inputCol="Categories_Indexed", outputCol="Categories_onehot")
encoded_df = encoder.fit(indexed_df).transform(indexed_df)
encoded_df = encoded_df.drop("Categories_Indexed")
encoded_df.show(truncate=False)


+----------+-----+-----------------+
|Categories|Value|Categories_onehot|
+----------+-----+-----------------+
|A         |10   |(3,[1],[1.0])    |
|A         |20   |(3,[1],[1.0])    |
|B         |30   |(3,[0],[1.0])    |
|B         |20   |(3,[0],[1.0])    |
|B         |30   |(3,[0],[1.0])    |
|C         |40   |(3,[2],[1.0])    |
|C         |10   |(3,[2],[1.0])    |
|D         |10   |(3,[],[])        |
+----------+-----+-----------------+



50. How to UnPivot the dataframe (converting columns into rows) ?


In [364]:
data = [(2021, 2, 4500, 5500),
(2021, 1, 4000, 5000),
(2021, 3, 5000, 6000),
(2021, 4, 6000, 7000)]
columns = ["year", "quarter", "EU", "US"]
df = spark.createDataFrame(data, columns)

unpivotExpr = "stack(2, 'EU',EU, 'US', US) as (region,revenue)"
unPivotDF = pivot_df.select("year","quarter", expr(unpivotExpr)).where("revenue is not null")
unPivotDF.show()

+----+-------+------+-------+
|year|quarter|region|revenue|
+----+-------+------+-------+
|2021|      2|    EU|   4500|
|2021|      2|    US|   5500|
|2021|      1|    EU|   4000|
|2021|      1|    US|   5000|
|2021|      3|    EU|   5000|
|2021|      3|    US|   6000|
|2021|      4|    EU|   6000|
|2021|      4|    US|   7000|
+----+-------+------+-------+

