In [155]:
#https://www.machinelearningplus.com/pyspark/pyspark-exercises-101-pyspark-exercises-for-data-analysis/

#PySpark Exercises – 101 PySpark Exercises for Data Analysis 

from pyspark.sql.types import StructType,StructField, StringType
import pandas as pd
from pyspark.sql.functions import *
from pyspark.sql.functions import to_timestamp,current_timestamp
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, monotonically_increasing_id
sc= spark.sparkContext
from pyspark.sql.functions import col, length, translate


In [27]:
#2. How to convert the index of a PySpark DataFrame into a column?

df = spark.createDataFrame([
("Alice", 10),
("Bob", 100),
("Charlie", 30),
], ["Name", "Value"])

df.show()
window_spec= Window.orderBy(monotonically_increasing_id())
df.withColumn("row_id",row_number().over(window_spec)-1).show()


+-------+-----+
|   Name|Value|
+-------+-----+
|  Alice|   10|
|    Bob|  100|
|Charlie|   30|
+-------+-----+

+-------+-----+------+
|   Name|Value|row_id|
+-------+-----+------+
|  Alice|   10|     0|
|    Bob|  100|     1|
|Charlie|   30|     2|
+-------+-----+------+



In [46]:
#3.  How to combine many lists to form a PySpark DataFrame?

list1 = ["a", "b", "c", "d","e","f"]
list2 = [1, 2, 3, 4,5,6]
list(zip(list1,list2))
sc= spark.sparkContext
df= sc.parallelize(list(zip(list1,list2))).toDF(["Col1","Col2"])
df.show()

+----+----+
|Col1|Col2|
+----+----+
|   a|   1|
|   b|   2|
|   c|   3|
|   d|   4|
|   e|   5|
|   f|   6|
+----+----+



In [55]:
#4. How to get the items of list A not present in list B?

list_A = [1, 2, 3, 4, 5]
list_B = [4, 5, 6, 7, 8]
rdd1= sc.parallelize(list_A)
rdd2= sc.parallelize(list_B)
result_rdd=rdd1.subtract(rdd2)
result_rdd.collect()

#5. How to get the items not common to both list A and list B?
result_rddA=rdd1.subtract(rdd2)
result_rddB=rdd2.subtract(rdd1)
result_rdd2= result_rddA.union(result_rddB)
result_rdd2.collect()





[1, 2, 3, 8, 6, 7]

In [56]:
data = [("A", 10), ("B", 20), ("C", 30), ("D", 40), ("E", 50), ("F", 15), ("G", 28), ("H", 54), ("I", 41), ("J", 86)]
df = spark.createDataFrame(data, ["Name", "Age"])

df.show()

+----+---+
|Name|Age|
+----+---+
|   A| 10|
|   B| 20|
|   C| 30|
|   D| 40|
|   E| 50|
|   F| 15|
|   G| 28|
|   H| 54|
|   I| 41|
|   J| 86|
+----+---+



In [61]:
#6. How to get the minimum, 25th percentile, median, 75th, and max of a numeric column?
# Create a sample DataFrame
data = [("A", 10), ("B", 20), ("C", 30), ("D", 40), ("E", 50), ("F", 15), ("G", 28), ("H", 54), ("I", 41), ("J", 86)]
df = spark.createDataFrame(data, ["Name", "Age"])

df.show()
# Calculate percentiles
quantiles = df.approxQuantile("Age", [0.0, 0.25, 0.5, 0.75, 1.0], 0.01)
print("Min: ", quantiles[0])
print("25th percentile: ", quantiles[1])
print("Median: ", quantiles[2])
print("75th percentile: ", quantiles[3])
print("Max: ", quantiles[4])

+----+---+
|Name|Age|
+----+---+
|   A| 10|
|   B| 20|
|   C| 30|
|   D| 40|
|   E| 50|
|   F| 15|
|   G| 28|
|   H| 54|
|   I| 41|
|   J| 86|
+----+---+

Min:  10.0
25th percentile:  20.0
Median:  30.0
75th percentile:  50.0
Max:  86.0


In [89]:
#8. How to keep only top 2 most frequent values as it is and replace everything else as ‘Other’?

from pyspark.sql import Row

# Sample data
data = [
Row(name='John', job='Engineer'),
Row(name='John', job='Engineer'),
Row(name='Mary', job='Scientist'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Scientist'),
Row(name='Sam', job='Doctor'),
]

# create DataFrame
df = spark.createDataFrame(data)

# show DataFrame
df.show()

#get top 2 jobs

top_2_jobs=df.groupBy("job").count().orderBy("count",ascending=False).limit(2)\
                .select("job").rdd.flatMap(lambda x : x).collect()
top_2_jobs
df= df.withColumn("job",when(col("job").isin(top_2_jobs),col("job")).otherwise("other"))
df.show()


+----+---------+
|name|      job|
+----+---------+
|John| Engineer|
|John| Engineer|
|Mary|Scientist|
| Bob| Engineer|
| Bob| Engineer|
| Bob|Scientist|
| Sam|   Doctor|
+----+---------+

+----+---------+
|name|      job|
+----+---------+
|John| Engineer|
|John| Engineer|
|Mary|Scientist|
| Bob| Engineer|
| Bob| Engineer|
| Bob|Scientist|
| Sam|    other|
+----+---------+



In [96]:
#9. How to Drop rows with NA values specific to a particular column?
# Assuming df is your DataFrame
df = spark.createDataFrame([
("A", 1, None),
("B", None, "123" ),
("B", 3, "456"),
("D", None, None),
], ["Name", "Value", "id"])

df.show()
df.dropna(subset=["Value"]).show()
df.dropna(subset=["Value","id"]).show()

+----+-----+----+
|Name|Value|  id|
+----+-----+----+
|   A|    1|null|
|   B| null| 123|
|   B|    3| 456|
|   D| null|null|
+----+-----+----+

+----+-----+----+
|Name|Value|  id|
+----+-----+----+
|   A|    1|null|
|   B|    3| 456|
+----+-----+----+

+----+-----+---+
|Name|Value| id|
+----+-----+---+
|   B|    3|456|
+----+-----+---+



In [101]:
#10. How to rename columns of a PySpark DataFrame using two lists – one containing the old column names and the other containing the new column names?
# suppose you have the following DataFrame
df = spark.createDataFrame([(1, 2, 3), (4, 5, 6)], ["col1", "col2", "col3"])

# old column names
old_names = ["col1", "col2", "col3"]

# new column names
new_names = ["new_col1", "new_col2", "new_col3"]

df.show()

list(zip(old_names,new_names))

for old_names,new_names in zip(old_names,new_names):
   df=df.withColumnRenamed( old_names,new_names)
df.show()


+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   2|   3|
|   4|   5|   6|
+----+----+----+

+--------+--------+--------+
|new_col1|new_col2|new_col3|
+--------+--------+--------+
|       1|       2|       3|
|       4|       5|       6|
+--------+--------+--------+



In [112]:
#14. How to extract items at given positions from a column?
df=spark.range(10).withColumn("value",when(col("id") % 3 == 0,1).otherwise(0))
df.show()
window_spec= Window.orderBy(monotonically_increasing_id())
df= df.withColumn("index",row_number().over(window_spec)-1)
pos=[9,3,1]
df.filter(col("index").isin(pos)).show()


+---+-----+
| id|value|
+---+-----+
|  0|    1|
|  1|    0|
|  2|    0|
|  3|    1|
|  4|    0|
|  5|    0|
|  6|    1|
|  7|    0|
|  8|    0|
|  9|    1|
+---+-----+

+---+-----+-----+
| id|value|index|
+---+-----+-----+
|  1|    0|    1|
|  3|    1|    3|
|  9|    1|    9|
+---+-----+-----+



In [114]:
#15. How to stack two DataFrames vertically ?

# Create DataFrame for region A
df_A = spark.createDataFrame([("apple", 3, 5), ("banana", 1, 10), ("orange", 2, 8)], ["Name", "Col_1", "Col_2"])
df_A.show()

# Create DataFrame for region B
df_B = spark.createDataFrame([("apple", 3, 5), ("banana", 1, 15), ("grape", 4, 6)], ["Name", "Col_1", "Col_3"])
df_B.show()
df_A.union(df_B).show()


+------+-----+-----+
|  Name|Col_1|Col_2|
+------+-----+-----+
| apple|    3|    5|
|banana|    1|   10|
|orange|    2|    8|
+------+-----+-----+

+------+-----+-----+
|  Name|Col_1|Col_3|
+------+-----+-----+
| apple|    3|    5|
|banana|    1|   15|
| grape|    4|    6|
+------+-----+-----+

+------+-----+-----+
|  Name|Col_1|Col_2|
+------+-----+-----+
| apple|    3|    5|
|banana|    1|   10|
|orange|    2|    8|
| apple|    3|    5|
|banana|    1|   15|
| grape|    4|    6|
+------+-----+-----+



In [116]:
#17. How to convert the first character of each element in a series to uppercase?
data = [("john",), ("alice",), ("bob",)]
df = spark.createDataFrame(data, ["name"])

df.show()
df.withColumn("name",initcap(col("name"))).show()

+-----+
| name|
+-----+
| john|
|alice|
|  bob|
+-----+

+-----+
| name|
+-----+
| John|
|Alice|
|  Bob|
+-----+



In [120]:
#18. How to compute summary statistics for all columns in a dataframe
data = [('James', 34, 55000),
('Michael', 30, 70000),
('Robert', 37, 60000),
('Maria', 29, 80000),
('Jen', 32, 65000)]

df = spark.createDataFrame(data, ["name", "age" , "salary"])

df.show()
summary=df.summary()
summary.show()

+-------+---+------+
|   name|age|salary|
+-------+---+------+
|  James| 34| 55000|
|Michael| 30| 70000|
| Robert| 37| 60000|
|  Maria| 29| 80000|
|    Jen| 32| 65000|
+-------+---+------+

+-------+------+------------------+-----------------+
|summary|  name|               age|           salary|
+-------+------+------------------+-----------------+
|  count|     5|                 5|                5|
|   mean|  null|              32.4|          66000.0|
| stddev|  null|3.2093613071762426|9617.692030835675|
|    min| James|                29|            55000|
|    25%|  null|                30|            60000|
|    50%|  null|                32|            65000|
|    75%|  null|                34|            70000|
|    max|Robert|                37|            80000|
+-------+------+------------------+-----------------+



In [133]:
#20 How to compute difference of differences between consecutive numbers of a column?
# lag of salary difference of employee

data = [('James', 34, 55000),
('Michael', 30, 70000),
('Robert', 37, 60000),
('Maria', 29, 80000),
('Jen', 32, 65000)]

df = spark.createDataFrame(data, ["name", "age" , "salary"])
df= df.withColumn("id",monotonically_increasing_id())
window_spec= Window.orderBy("id")

#calc prev value of salary
df= df.withColumn("prev_salary",lag("salary").over(window_spec))
df = df.withColumn("diff",
                  when(isnull(col("salary")-col("prev_salary")),1
                  ).otherwise(col("salary")-col("prev_salary"))).drop("id")
df.show()


+-------+---+------+-----------+------+
|   name|age|salary|prev_salary|  diff|
+-------+---+------+-----------+------+
|  James| 34| 55000|       null|     1|
|Michael| 30| 70000|      55000| 15000|
| Robert| 37| 60000|      70000|-10000|
|  Maria| 29| 80000|      60000| 20000|
|    Jen| 32| 65000|      80000|-15000|
+-------+---+------+-----------+------+



In [140]:
#21. How to get the day of month, week number, day of year and day of week from a date strings?
data = [("2023-05-18","01 Jan 2010",), ("2023-12-31", "01 Jan 2010",)]
df = spark.createDataFrame(data, ["date_str_1", "date_str_2"])

df= df.withColumn("date_1",to_date("date_str_1","yyyy-MM-dd"))
df= df.withColumn("date_2",to_date("date_str_2","dd MMM yyyy"))
df.show()

df = df.withColumn("day_of_month", dayofmonth(df.date_1))\
.withColumn("week_number", weekofyear(df.date_1))\
.withColumn("day_of_year", dayofyear(df.date_1))\
.withColumn("day_of_week", dayofweek(df.date_1))
df.show()

+----------+-----------+----------+----------+
|date_str_1| date_str_2|    date_1|    date_2|
+----------+-----------+----------+----------+
|2023-05-18|01 Jan 2010|2023-05-18|2010-01-01|
|2023-12-31|01 Jan 2010|2023-12-31|2010-01-01|
+----------+-----------+----------+----------+

+----------+-----------+----------+----------+------------+-----------+-----------+-----------+
|date_str_1| date_str_2|    date_1|    date_2|day_of_month|week_number|day_of_year|day_of_week|
+----------+-----------+----------+----------+------------+-----------+-----------+-----------+
|2023-05-18|01 Jan 2010|2023-05-18|2010-01-01|          18|         20|        138|          5|
|2023-12-31|01 Jan 2010|2023-12-31|2010-01-01|          31|         52|        365|          1|
+----------+-----------+----------+----------+------------+-----------+-----------+-----------+



In [153]:
#22. How to convert year-month string to dates corresponding to the 4th day of the month?

df = spark.createDataFrame([('May 2010',), ('Feb 2011',), ('Mar 2012',)], ['MonthYear'])

df.show()

df= df.withColumn("date",to_date("MonthYear","MMM yyyy"))
df.show()
df= df.withColumn("date",expr("date_add(date_sub(date,day(date)-1),3)"))
df.show()

+---------+
|MonthYear|
+---------+
| May 2010|
| Feb 2011|
| Mar 2012|
+---------+

+---------+----------+
|MonthYear|      date|
+---------+----------+
| May 2010|2010-05-01|
| Feb 2011|2011-02-01|
| Mar 2012|2012-03-01|
+---------+----------+

+---------+----------+
|MonthYear|      date|
+---------+----------+
| May 2010|2010-05-04|
| Feb 2011|2011-02-04|
| Mar 2012|2012-03-04|
+---------+----------+



In [178]:
#23 How to filter words that contain atleast 2 vowels from a series?
from pyspark.sql.functions import col, length, translate

df = spark.createDataFrame([('Apple',), ('Orange',), ('Plan',) , ('Python',) , ('Money',)], ['Word'])
df= df.withColumn("tsl",translate('Word','aeiouAEIOU','')).\
        withColumn("lngth",length(translate('Word','aeiouAEIOU','')))
df.show()
df.filter((length("Word") - col("lngth")) >=2).select("Word").show()


+------+-----+-----+
|  Word|  tsl|lngth|
+------+-----+-----+
| Apple|  ppl|    3|
|Orange|  rng|    3|
|  Plan|  Pln|    3|
|Python|Pythn|    5|
| Money|  Mny|    3|
+------+-----+-----+

+------+
|  Word|
+------+
| Apple|
|Orange|
| Money|
+------+



In [184]:
#24. How to filter valid emails from a list?

# Create a list
data = ['buying books at amazom.com', 'rameses@egypt.com', 'matt@t.co', 'narendra@modi.com']

# Convert the list to DataFrame
df = spark.createDataFrame(data, "string")
df.show(truncate =False)
pattern = "^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"
df_filtered = df.filter(col("value").rlike(pattern))
df_filtered.show()

+--------------------------+
|value                     |
+--------------------------+
|buying books at amazom.com|
|rameses@egypt.com         |
|matt@t.co                 |
|narendra@modi.com         |
+--------------------------+

+-----------------+
|            value|
+-----------------+
|rameses@egypt.com|
|        matt@t.co|
|narendra@modi.com|
+-----------------+



In [193]:
#25. How to Pivot PySpark DataFrame?


# Sample data
data = [
(2021, 1, "US", 5000),
(2021, 1, "EU", 4000),
(2021, 2, "US", 5500),
(2021, 2, "EU", 4500),
(2021, 3, "US", 6000),
(2021, 3, "EU", 5000),
(2021, 4, "US", 7000),
(2021, 4, "EU", 6000),
]

# Create DataFrame
columns = ["year", "quarter", "region", "revenue"]
df = spark.createDataFrame(data, columns)
df.show()

df.groupBy("year","quarter","region").sum("revenue").show()

pivot_df = df.groupBy("year","quarter").pivot("region").sum("revenue")
pivot_df.show()

+----+-------+------+-------+
|year|quarter|region|revenue|
+----+-------+------+-------+
|2021|      1|    US|   5000|
|2021|      1|    EU|   4000|
|2021|      2|    US|   5500|
|2021|      2|    EU|   4500|
|2021|      3|    US|   6000|
|2021|      3|    EU|   5000|
|2021|      4|    US|   7000|
|2021|      4|    EU|   6000|
+----+-------+------+-------+

+----+-------+------+------------+
|year|quarter|region|sum(revenue)|
+----+-------+------+------------+
|2021|      2|    EU|        4500|
|2021|      2|    US|        5500|
|2021|      4|    US|        7000|
|2021|      4|    EU|        6000|
|2021|      1|    EU|        4000|
|2021|      1|    US|        5000|
|2021|      3|    EU|        5000|
|2021|      3|    US|        6000|
+----+-------+------+------------+

+----+-------+----+----+
|year|quarter|  EU|  US|
+----+-------+----+----+
|2021|      3|5000|6000|
|2021|      4|6000|7000|
|2021|      1|4000|5000|
|2021|      2|4500|5500|
+----+-------+----+----+



In [196]:
#26. How to get the mean of a variable grouped by another variable?

# Sample data
data = [("1001", "Laptop", 1000),
("1002", "Mouse", 50),
("1003", "Laptop", 1200),
("1004", "Mouse", 30),
("1005", "Smartphone", 700)]

# Create DataFrame
columns = ["OrderID", "Product", "Price"]
df = spark.createDataFrame(data, columns)

df.show()
df.groupBy("Product").agg(mean("Price").alias("total_price")).show()

+-------+----------+-----+
|OrderID|   Product|Price|
+-------+----------+-----+
|   1001|    Laptop| 1000|
|   1002|     Mouse|   50|
|   1003|    Laptop| 1200|
|   1004|     Mouse|   30|
|   1005|Smartphone|  700|
+-------+----------+-----+

+----------+-----------+
|   Product|total_price|
+----------+-----------+
|    Laptop|     1100.0|
|     Mouse|       40.0|
|Smartphone|      700.0|
+----------+-----------+



In [290]:
#28. How to replace missing spaces in a string with the least frequent character?
#Sample DataFrame
df = spark.createDataFrame([('dbc deb abed gade',),], ["string"])
df.show()

def least_freq_char_replace_spaces(str1):
    final_str= str1.replace(" ","")
    #print(final_str)
    dict1={}
    for ele in final_str:
        for j in ele:
            if j in dict1.keys():
                dict1[j]+=1
            else:
                dict1[j]=1
    #print(dict1)
    #sort dict on values
    res=dict(sorted(dict1.items(), key=lambda item: item[1]))
    least_occur_element= list(res.keys())[0]
    str2=str1.replace(" ",least_occur_element)
    return str2

udf_least_freq_char_replace_spaces = udf(least_freq_char_replace_spaces, StringType())
final_df= df.withColumn("modified_string",udf_least_freq_char_replace_spaces(df.string))
final_df.show()

+-----------------+
|           string|
+-----------------+
|dbc deb abed gade|
+-----------------+

+-----------------+-----------------+
|           string|  modified_string|
+-----------------+-----------------+
|dbc deb abed gade|dbccdebcabedcgade|
+-----------------+-----------------+



In [319]:
#32. How to check if a dataframe has any missing values and count of missing values in each column?

# Assuming df is your DataFrame
df = spark.createDataFrame([
("A", 1, None),
("B", None, "123" ),
("B", 3, "456"),
("D", None, None),
], ["Name", "Value", "id"])

df.show()

#missing = df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns))

missing = df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns))

missing.show()
#missing.collect()[0].asDict()
has_missing = any(row.asDict().values() for row in missing.collect())
print(has_missing)

missing_count = missing.collect()[0].asDict()
print(missing_count)

+----+-----+----+
|Name|Value|  id|
+----+-----+----+
|   A|    1|null|
|   B| null| 123|
|   B|    3| 456|
|   D| null|null|
+----+-----+----+

+----+-----+---+
|Name|Value| id|
+----+-----+---+
|   0|    2|  2|
+----+-----+---+

True
{'Name': 0, 'Value': 2, 'id': 2}
