<a href="https://colab.research.google.com/github/VarshithaCVasireddy/pyspark/blob/main/PySpark_Exercise.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

- This practice is from https://www.machinelearningplus.com/pyspark/pyspark-exercises-101-pyspark-exercises-for-data-analysis/

In [None]:
!pip install pyspark



In [None]:
import pyspark

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName("exercise").getOrCreate()

Q) 2

In [None]:
df = spark.createDataFrame([
("Alice", 1),
("Bob", 2),
("Charlie", 3),
], ["Name", "Value"])

df.show()


+-------+-----+
|   Name|Value|
+-------+-----+
|  Alice|    1|
|    Bob|    2|
|Charlie|    3|
+-------+-----+



In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.orderBy("Name")
df = df.withColumn("index",row_number().over(window_spec)-1)
df.show()

+-------+-----+-----+
|   Name|Value|index|
+-------+-----+-----+
|  Alice|    1|    0|
|    Bob|    2|    1|
|Charlie|    3|    2|
+-------+-----+-----+



Q) 3

In [None]:
list1 = ["a", "b", "c", "d"]
list2 = [1, 2, 3, 4]

In [None]:
rdd = spark.sparkContext.parallelize(list(zip(list1,list2)))
df = rdd.toDF(["list1","list2"])

In [None]:
df = spark.createDataFrame(zip(list1,list2),["list1","list2"])
df.show()

+-----+-----+
|list1|list2|
+-----+-----+
|    a|    1|
|    b|    2|
|    c|    3|
|    d|    4|
+-----+-----+



Q) 4

In [None]:
list_A = [1, 2, 3, 4, 5]
list_B = [4, 5, 6, 7, 8]

In [None]:
df_A = spark.createDataFrame([(x,) for x in list_A ],["Value"])
df_B = spark.createDataFrame([(x,) for x in list_B ],["Value"])

result = df_A.join(df_B,on = "Value",how="left_anti")
result.show()

+-----+
|Value|
+-----+
|    1|
|    2|
|    3|
+-----+



In [None]:
sc = spark.sparkContext

rdd_A = sc.parallelize(list_A)
rdd_B = sc.parallelize(list_B)

result_rdd = rdd_A.subtract(rdd_B)

result_rdd = result_rdd.collect()
print(result_rdd)

[1, 2, 3]


Q) 5

In [None]:
sc = spark.sparkContext

rdd_A = sc.parallelize(list_A)
rdd_B = sc.parallelize(list_B)

result_rdd_A = rdd_A.subtract(rdd_B)
result_rdd_B = rdd_B.subtract(rdd_A)

result_rdd = result_rdd_A.union(result_rdd_B)

print(result_rdd.collect())

[1, 2, 3, 8, 6, 7]


Q) 6

In [None]:
data = [("A", 10), ("B", 20), ("C", 30), ("D", 40), ("E", 50), ("F", 15), ("G", 28), ("H", 54), ("I", 41), ("J", 86)]
df = spark.createDataFrame(data, ["Name", "Age"])

df.show()

+----+---+
|Name|Age|
+----+---+
|   A| 10|
|   B| 20|
|   C| 30|
|   D| 40|
|   E| 50|
|   F| 15|
|   G| 28|
|   H| 54|
|   I| 41|
|   J| 86|
+----+---+



In [None]:
quantile = df.approxQuantile("Age",[0.0,0.25,0.5,0.75,1.0],0.01)
print(quantile)

[10.0, 20.0, 30.0, 50.0, 86.0]


Q) 7

In [None]:
from pyspark.sql import Row

# Sample data
data = [
Row(name='John', job='Engineer'),
Row(name='John', job='Engineer'),
Row(name='Mary', job='Scientist'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Scientist'),
Row(name='Sam', job='Doctor'),
]

# create DataFrame
df = spark.createDataFrame(data)

# show DataFrame
df.show()

+----+---------+
|name|      job|
+----+---------+
|John| Engineer|
|John| Engineer|
|Mary|Scientist|
| Bob| Engineer|
| Bob| Engineer|
| Bob|Scientist|
| Sam|   Doctor|
+----+---------+



In [None]:
df.groupBy("job","name").count().orderBy("count",ascending=False).show()

+---------+----+-----+
|      job|name|count|
+---------+----+-----+
| Engineer|John|    2|
| Engineer| Bob|    2|
|Scientist|Mary|    1|
|   Doctor| Sam|    1|
|Scientist| Bob|    1|
+---------+----+-----+



Q) 8

In [None]:
from pyspark.sql import Row

# Sample data
data = [
Row(name='John', job='Engineer'),
Row(name='John', job='Engineer'),
Row(name='Mary', job='Scientist'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Scientist'),
Row(name='Sam', job='Doctor'),
]

# create DataFrame
df = spark.createDataFrame(data)

# show DataFrame
df.show()

+----+---------+
|name|      job|
+----+---------+
|John| Engineer|
|John| Engineer|
|Mary|Scientist|
| Bob| Engineer|
| Bob| Engineer|
| Bob|Scientist|
| Sam|   Doctor|
+----+---------+



In [None]:
df_top_2 = df.groupBy("job","name").count().orderBy("count",ascending=False).limit(2)
df_top_2.show()

+--------+----+-----+
|     job|name|count|
+--------+----+-----+
|Engineer|John|    2|
|Engineer| Bob|    2|
+--------+----+-----+



Q) 10

In [None]:
# suppose you have the following DataFrame
df = spark.createDataFrame([(1, 2, 3), (4, 5, 6)], ["col1", "col2", "col3"])

# old column names
old_names = ["col1", "col2", "col3"]

# new column names
new_names = ["new_col1", "new_col2", "new_col3"]

df.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   2|   3|
|   4|   5|   6|
+----+----+----+



In [None]:
for old_name, new_name in zip(old_names, new_names):
  df = df.withColumnRenamed(old_name, new_name)

df.show()

+--------+--------+--------+
|new_col1|new_col2|new_col3|
+--------+--------+--------+
|       1|       2|       3|
|       4|       5|       6|
+--------+--------+--------+



Q) 13

In [None]:
from pyspark.sql.functions import rand

# Generate a DataFrame with a single column "id" with 10 rows
df = spark.range(10)

# Generate a random float between 0 and 1, scale and shift it to get a random integer between 1 and 10
df = df.withColumn("random", ((rand(seed=42) * 10) + 1).cast("int"))

# Show the DataFrame
df.show()

+---+------+
| id|random|
+---+------+
|  0|     7|
|  1|     6|
|  2|     9|
|  3|     3|
|  4|     7|
|  5|     9|
|  6|     7|
|  7|     3|
|  8|     3|
|  9|     7|
+---+------+



In [None]:
df.filter(df["random"]% 3 == 0).select("id","random").show()

+---+------+
| id|random|
+---+------+
|  1|     6|
|  2|     9|
|  3|     3|
|  5|     9|
|  7|     3|
|  8|     3|
+---+------+



In [None]:
from pyspark.sql.functions import when
df.withColumn("Multiple of 3",when(df["random"]% 3 == 0, 1).otherwise(0)).show()

+---+------+-------------+
| id|random|Multiple of 3|
+---+------+-------------+
|  0|     7|            0|
|  1|     6|            1|
|  2|     9|            1|
|  3|     3|            1|
|  4|     7|            0|
|  5|     9|            1|
|  6|     7|            0|
|  7|     3|            1|
|  8|     3|            1|
|  9|     7|            0|
+---+------+-------------+



Q) 14

In [None]:
pos = [0,4,8,5]

# To create Index
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window = Window.orderBy("id")
df = df.withColumn("index", row_number().over(window) -1)

df.filter(df["index"].isin(pos)).show()

+---+------+-----+
| id|random|index|
+---+------+-----+
|  0|     7|    0|
|  4|     7|    4|
|  5|     9|    5|
|  8|     3|    8|
+---+------+-----+



Q) 15

In [None]:
# Create DataFrame for region A
df_A = spark.createDataFrame([("apple", 3, 5), ("banana", 1, 10), ("orange", 2, 8)], ["Name", "Col_1", "Col_2"])
df_A.show()

# Create DataFrame for region B
df_B = spark.createDataFrame([("apple", 3, 5), ("banana", 1, 15), ("grape", 4, 6)], ["Name", "Col_1", "Col_3"])
df_B.show()

+------+-----+-----+
|  Name|Col_1|Col_2|
+------+-----+-----+
| apple|    3|    5|
|banana|    1|   10|
|orange|    2|    8|
+------+-----+-----+

+------+-----+-----+
|  Name|Col_1|Col_3|
+------+-----+-----+
| apple|    3|    5|
|banana|    1|   15|
| grape|    4|    6|
+------+-----+-----+



In [None]:
df_A.unionAll(df_B).show()

+------+-----+-----+
|  Name|Col_1|Col_2|
+------+-----+-----+
| apple|    3|    5|
|banana|    1|   10|
|orange|    2|    8|
| apple|    3|    5|
|banana|    1|   15|
| grape|    4|    6|
+------+-----+-----+



Q) 17

In [None]:
# Suppose you have the following DataFrame
data = [("john",), ("alice",), ("bob",)]
df = spark.createDataFrame(data, ["name"])

df.show()

+-----+
| name|
+-----+
| john|
|alice|
|  bob|
+-----+



In [None]:
from pyspark.sql.functions import initcap

df.withColumn("name",initcap("name")).show()

+-----+
| name|
+-----+
| John|
|Alice|
|  Bob|
+-----+



Q) 18

In [None]:
# For the sake of example, we'll create a sample DataFrame
data = [('James', 34, 55000),
('Michael', 30, 70000),
('Robert', 37, 60000),
('Maria', 29, 80000),
('Jen', 32, 65000)]

df = spark.createDataFrame(data, ["name", "age" , "salary"])

df.show()

+-------+---+------+
|   name|age|salary|
+-------+---+------+
|  James| 34| 55000|
|Michael| 30| 70000|
| Robert| 37| 60000|
|  Maria| 29| 80000|
|    Jen| 32| 65000|
+-------+---+------+



In [None]:
df.summary().show()

+-------+------+-----------------+-----------------+
|summary|  name|              age|           salary|
+-------+------+-----------------+-----------------+
|  count|     5|                5|                5|
|   mean|  NULL|             32.4|          66000.0|
| stddev|  NULL|3.209361307176242|9617.692030835671|
|    min| James|               29|            55000|
|    25%|  NULL|               30|            60000|
|    50%|  NULL|               32|            65000|
|    75%|  NULL|               34|            70000|
|    max|Robert|               37|            80000|
+-------+------+-----------------+-----------------+



In [None]:
df.describe().show()

+-------+------+-----------------+-----------------+
|summary|  name|              age|           salary|
+-------+------+-----------------+-----------------+
|  count|     5|                5|                5|
|   mean|  NULL|             32.4|          66000.0|
| stddev|  NULL|3.209361307176242|9617.692030835671|
|    min| James|               29|            55000|
|    max|Robert|               37|            80000|
+-------+------+-----------------+-----------------+



Q) 19

In [None]:
data = [("john",), ("alice",), ("bob",)]
df = spark.createDataFrame(data, ["name"])

df.show()

+-----+
| name|
+-----+
| john|
|alice|
|  bob|
+-----+



In [None]:
from pyspark.sql.functions import char_length
df = df.withColumn("character_present", char_length("name")).show()


+-----+-----------------+
| name|character_present|
+-----+-----------------+
| john|                4|
|alice|                5|
|  bob|                3|
+-----+-----------------+



Q) 20

In [None]:
df.max("character_present").show()

AttributeError: 'NoneType' object has no attribute 'max'