1. How to import PySpark and check the version?

In [2]:
import pyspark
from pyspark.sql import SparkSession

spark=SparkSession.builder.master("local[1]").appName("SparkAssessment.com").getOrCreate()

print(spark.version)

24/09/20 15:35:10 WARN Utils: Your hostname, AI-CJB-LAP-459 resolves to a loopback address: 127.0.1.1; using 192.168.1.164 instead (on interface wlp0s20f3)
24/09/20 15:35:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/20 15:35:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


3.5.2


In [47]:
from pyspark.sql.functions import monotonically_increasing_id,row_number,min,max
from pyspark.sql import Window,functions as F

2. How to convert the index of a PySpark DataFrame into a column?


In [19]:
df = spark.createDataFrame([
("Alice", 1),
("Bob", 2),
("Charlie", 3),
], ["Name", "Value"])

df.show()
df_with_index=df.withColumn("index",monotonically_increasing_id())
df_with_index.show()

#or
window_spec=Window.orderBy("value")
df_index=df.withColumn("index",row_number().over(window_spec)-1)
df_index.show()

+-------+-----+
|   Name|Value|
+-------+-----+
|  Alice|    1|
|    Bob|    2|
|Charlie|    3|
+-------+-----+

+-------+-----+-----+
|   Name|Value|index|
+-------+-----+-----+
|  Alice|    1|    0|
|    Bob|    2|    1|
|Charlie|    3|    2|
+-------+-----+-----+

+-------+-----+-----+
|   Name|Value|index|
+-------+-----+-----+
|  Alice|    1|    0|
|    Bob|    2|    1|
|Charlie|    3|    2|
+-------+-----+-----+



24/09/19 15:24:42 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/19 15:24:42 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/19 15:24:42 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


3. How to combine many lists to form a PySpark DataFrame?

In [23]:
list1 = ["a", "b", "c", "d"]
list2 = [1, 2, 3, 4]
data=list(zip(list1,list2))
df=spark.createDataFrame(data,["c1","c2"])
df.show()

+---+---+
| c1| c2|
+---+---+
|  a|  1|
|  b|  2|
|  c|  3|
|  d|  4|
+---+---+



4. How to get the items of list A not present in list B?

In [25]:
list_A = [1, 2, 3, 4, 5]
list_B = [4, 5, 6, 7, 8]

rdd1=spark.sparkContext.parallelize(list_A)
rdd2=spark.sparkContext.parallelize(list_B)

res=rdd1.subtract(rdd2)
res.collect()

[2, 1, 3]

5.How to get the items not common to both list A and list B?

In [26]:
res1=rdd2.subtract(rdd1)
res_union=res.union(res1)
res_union.collect()

[2, 1, 3, 6, 8, 7]

6. How to get the minimum, 25th percentile, median, 75th, and max of a numeric column?

In [33]:
data = [("A", 10), ("B", 20), ("C", 30), ("D", 40), ("E", 50), ("F", 15), ("G", 28), ("H", 54), ("I", 41), ("J", 86)]
df = spark.createDataFrame(data, ["Name", "Age"])

df.show()
min_val=df.agg(min("age")).collect()[0][0]
max_val=df.agg(max("age")).collect()[0][0]

quantiles=df.approxQuantile("age",[0.25,0.50,0.75],0.01)

print("min value:",min_val)
print("25th quartile:",quantiles[0])
print("median:",quantiles[1])
print("75th quartile:",quantiles[2])
print("max val",max_val)

+----+---+
|Name|Age|
+----+---+
|   A| 10|
|   B| 20|
|   C| 30|
|   D| 40|
|   E| 50|
|   F| 15|
|   G| 28|
|   H| 54|
|   I| 41|
|   J| 86|
+----+---+

min value: 10
25th quartile: 20.0
median: 30.0
75th quartile: 50.0
max val 86


7. How to get frequency counts of unique items of a column?

In [51]:
from pyspark.sql import Row

# Sample data
data = [
Row(name='John', job='Engineer'),
Row(name='John', job='Engineer'),
Row(name='Mary', job='Scientist'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Scientist'),
Row(name='Sam', job='Doctor'),
]

# create DataFrame
df = spark.createDataFrame(data)

# show DataFrame
df.show()
name_count=df.groupBy("job").count()
name_count.show()


+----+---------+
|name|      job|
+----+---------+
|John| Engineer|
|John| Engineer|
|Mary|Scientist|
| Bob| Engineer|
| Bob| Engineer|
| Bob|Scientist|
| Sam|   Doctor|
+----+---------+

+---------+-----+
|      job|count|
+---------+-----+
|Scientist|    2|
|   Doctor|    1|
| Engineer|    4|
+---------+-----+



8. How to keep only top 2 most frequent values as it is and replace everything else as ‘Other’?

In [52]:
top_2_jobs = name_count.orderBy(F.desc("count")).limit(2).select("job").rdd.flatMap(lambda x: x).collect()

# Replace other jobs with 'Other'
result_df = df.withColumn("job", F.when(df["job"].isin(top_2_jobs), df["job"]).otherwise("Other"))

# Show the resulting DataFrame
result_df.show()

+----+---------+
|name|      job|
+----+---------+
|John| Engineer|
|John| Engineer|
|Mary|Scientist|
| Bob| Engineer|
| Bob| Engineer|
| Bob|Scientist|
| Sam|    Other|
+----+---------+



9. How to Drop rows with NA values specific to a particular column?

In [53]:
# Assuming df is your DataFrame
df = spark.createDataFrame([
("A", 1, None),
("B", None, "123" ),
("B", 3, "456"),
("D", None, None),
], ["Name", "Value", "id"])

df.show()

+----+-----+----+
|Name|Value|  id|
+----+-----+----+
|   A|    1|NULL|
|   B| NULL| 123|
|   B|    3| 456|
|   D| NULL|NULL|
+----+-----+----+



In [55]:
df.dropna(subset="id").show()

+----+-----+---+
|Name|Value| id|
+----+-----+---+
|   B| NULL|123|
|   B|    3|456|
+----+-----+---+



10. How to rename columns of a PySpark DataFrame using two lists – one containing the old column names and the other containing the new column names?

In [58]:

df = spark.createDataFrame([(1, 2, 3), (4, 5, 6)], ["col1", "col2", "col3"])

old_names = ["col1", "col2", "col3"]

new_names = ["new_col1", "new_col2", "new_col3"]

df.show()

for old_name, new_name in zip(old_names, new_names):
    df = df.withColumnRenamed(old_name, new_name)
print("DataFrame with Renamed Columns:")
df.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   2|   3|
|   4|   5|   6|
+----+----+----+

DataFrame with Renamed Columns:
+--------+--------+--------+
|new_col1|new_col2|new_col3|
+--------+--------+--------+
|       1|       2|       3|
|       4|       5|       6|
+--------+--------+--------+



11. How to bin a numeric list to 10 groups of equal size?
CONFUSING.

In [8]:
from pyspark.sql.functions import rand
from pyspark.ml.feature import Bucketizer

# Create a DataFrame with a single column "values" filled with random numbers
num_items = 100
df = spark.range(num_items).select(rand(seed=42).alias("values"))
bucketizer=Bucketizer(splits=[0,10,float("Inf")],inputCol="values",outputCol="Buckets")
df_buck=bucketizer.setHandleInvalid("keep").transform(df)
df_buck.show(5)
df.show(5)

+-------------------+-------+
|             values|Buckets|
+-------------------+-------+
|  0.619189370225301|    0.0|
| 0.5096018842446481|    0.0|
| 0.8325259388871524|    0.0|
|0.26322809041172357|    0.0|
| 0.6702867696264135|    0.0|
+-------------------+-------+
only showing top 5 rows

+-------------------+
|             values|
+-------------------+
|  0.619189370225301|
| 0.5096018842446481|
| 0.8325259388871524|
|0.26322809041172357|
| 0.6702867696264135|
+-------------------+
only showing top 5 rows



12. How to create contigency table?

In [15]:
data = [("A", "X"), ("A", "Y"), ("A", "X"), ("B", "Y"), ("B", "X"), ("C", "X"), ("C", "X"), ("C", "Y")]
# df = spark.createDataFrame(data, ["category1", "category2"])
df.crosstab("category1","category2").sort("category1_category2").show()
# df.show()

+-------------------+---+---+
|category1_category2|  X|  Y|
+-------------------+---+---+
|                  A|  2|  1|
|                  B|  1|  1|
|                  C|  2|  1|
+-------------------+---+---+



13. How to find the numbers that are multiples of 3 from a column?

In [18]:
from pyspark.sql.functions import rand,col

# Generate a DataFrame with a single column "id" with 10 rows
df = spark.range(10)

# Generate a random float between 0 and 1, scale and shift it to get a random integer between 1 and 10
df = df.withColumn("random", ((rand(seed=42) * 10) + 1).cast("int"))

mul_3=df.filter(col("random")%3==0)
mul_3.show()
# Show the DataFrame
# df.show()

+---+------+
| id|random|
+---+------+
|  1|     6|
|  2|     9|
|  3|     3|
|  5|     6|
+---+------+



14. How to extract items at given positions from a column?

In [21]:
from pyspark.sql.functions import rand

# Generate a DataFrame with a single column "id" with 10 rows
df = spark.range(10)

# Generate a random float between 0 and 1, scale and shift it to get a random integer between 1 and 10
df = df.withColumn("random", ((rand(seed=42) * 10) + 1).cast("int"))

# Show the DataFrame
df.show()

pos = [0, 4, 8, 5]

item=df.select("random").collect()
res=[item[i][0] for i in pos]
print(res)


+---+------+
| id|random|
+---+------+
|  0|     7|
|  1|     6|
|  2|     9|
|  3|     3|
|  4|     7|
|  5|     6|
|  6|    10|
|  7|     1|
|  8|    10|
|  9|     8|
+---+------+

[7, 7, 10, 6]


15. How to stack two DataFrames vertically ?

In [23]:
# Create DataFrame for region A
df_A = spark.createDataFrame([("apple", 3, 5), ("banana", 1, 10), ("orange", 2, 8)], ["Name", "Col_1", "Col_2"])
df_A.show()

# Create DataFrame for region B
df_B = spark.createDataFrame([("apple", 3, 5), ("banana", 1, 15), ("grape", 4, 6)], ["Name", "Col_1", "Col_3"])
df_B.show()

res_df=df_A.union(df_B)

res_df.show()

+------+-----+-----+
|  Name|Col_1|Col_2|
+------+-----+-----+
| apple|    3|    5|
|banana|    1|   10|
|orange|    2|    8|
+------+-----+-----+

+------+-----+-----+
|  Name|Col_1|Col_3|
+------+-----+-----+
| apple|    3|    5|
|banana|    1|   15|
| grape|    4|    6|
+------+-----+-----+

+------+-----+-----+
|  Name|Col_1|Col_2|
+------+-----+-----+
| apple|    3|    5|
|banana|    1|   10|
|orange|    2|    8|
| apple|    3|    5|
|banana|    1|   15|
| grape|    4|    6|
+------+-----+-----+



16. How to compute the mean squared error on a truth and predicted columns?

In [27]:
# Assume you have a DataFrame df with two columns "actual" and "predicted"
# For the sake of example, we'll create a sample DataFrame
from pyspark.sql.functions import mean, col


data = [(1, 1), (2, 4), (3, 9), (4, 16), (5, 25)]
df = spark.createDataFrame(data, ["actual", "predicted"])

df.show()
squared_error=df.withColumn("mean-squared-error",(col("actual")-col("predicted"))**2)
mse=squared_error.select(mean("mean-squared-error")).collect()[0][0]
print(mse)

+------+---------+
|actual|predicted|
+------+---------+
|     1|        1|
|     2|        4|
|     3|        9|
|     4|       16|
|     5|       25|
+------+---------+

116.8
