In [1]:
from pyspark.sql import SparkSession

In [2]:
spark_session = (
    SparkSession.
    builder.
    appName('tutorial').
    getOrCreate()
)

In [3]:
sc = spark_session.sparkContext

In [None]:
rdd_1 = sc.parallelize([
    1, 2, 3, 4, 5
])

In [None]:
rdd_1.collect()

In [None]:
squared_rdd_1  = rdd_1.map(lambda x: x ** 2)
squared_rdd_1.collect()

In [None]:
mod_2_rdd = rdd_1.filter(lambda x: x % 2 == 0)
mod_2_rdd.collect()

In [None]:
rdd_2 = sc.parallelize(
    [
        (1, 2),
        (2, 3),
        (1, 3),
        (2, 5)
    ]
)

In [None]:
reduced_rdd = rdd_2.reduceByKey(lambda x, y: x + y)
reduced_rdd.collect()

In [None]:
rdd_3 = sc.parallelize(
    [
        (1, 'Apple'),
        (3, 'Apricot'),
        (1, 'Banana'),
        (2, 'Watermelon'),
        (2, 'Jackfruit'),
    ]
)

In [None]:
rdd_3_grouped = rdd_3.groupByKey()
rdd_3_res = rdd_3_grouped.collect()

In [None]:
[(k, list(v)) for k, v in rdd_3_res]

In [10]:
emp_rdd = sc.parallelize(
    [
        (1, 'Saurabh', 24, 90000, 'Job-1'),
        (2, 'axy', 14, 842, 'Job-1'),
        (3, 'uwb', 17, 98746, 'Job'),
        (1, 'Saurabh', 24, 151, 'Job-3'),
        (2, 'axy', 14, 218, 'Job-2'),
        (1, 'Saurabh', 24, 188, 'Job-2'),
    ]
)
emp_rdd.collect()

[(1, 'Saurabh', 24, 90000, 'Job-1'),
 (2, 'axy', 14, 842, 'Job-1'),
 (3, 'uwb', 17, 98746, 'Job'),
 (1, 'Saurabh', 24, 151, 'Job-3'),
 (2, 'axy', 14, 218, 'Job-2'),
 (1, 'Saurabh', 24, 188, 'Job-2')]

### Narrow Transformation

In [None]:
filtered_emp_rdd = (
    emp_rdd.
    filter(lambda x: x[2] <= 18)
)
filtered_emp_rdd.collect()

[(2, 'axy', 14, 842, 'Job-1'),
 (3, 'uwb', 17, 98746, 'Job'),
 (2, 'axy', 14, 218, 'Job-2')]

### Wide Transformation

In [None]:
name_income_rdd = (
    emp_rdd.
    map(lambda x: (x[1], x[3]))
)
name_income_rdd.collect()

[('Saurabh', 90000),
 ('axy', 842),
 ('uwb', 98746),
 ('Saurabh', 151),
 ('axy', 218),
 ('Saurabh', 188)]

In [14]:
group_by_name = (
    name_income_rdd.
    reduceByKey(lambda x, y: x + y)
)
group_by_name.collect()

[('Saurabh', 90339), ('axy', 1060), ('uwb', 98746)]

In [None]:
# session_id, user_id, page, duration_second, timestamp
website_session_data = sc.parallelize(
    [
        ('sess_3959', 10, 'checkout', 133, '2025-06-07 18:06:19'),
        ('sess_8297', 15, 'contact', 17, '2025-06-07 12:59:19'),
        ('sess_8211', 13, 'contact', 138, '2025-06-08 05:32:19'),
        ('sess_3900', 8, 'about', 114, '2025-06-08 01:54:19'),
        ('sess_8141', 4, 'contact', 41, '2025-06-08 03:38:19'),
        ('sess_2352', 18, 'checkout', 263, '2025-06-07 19:52:19'),
        ('sess_9936', 9, 'checkout', 89, '2025-06-08 02:15:19'),
        ('sess_7090', 11, 'checkout', 299, '2025-06-07 16:54:19'),
        ('sess_5099', 6, 'product', 127, '2025-06-07 15:25:19'),
        ('sess_3155', 6, 'contact', 64, '2025-06-07 21:08:19'),
        ('sess_6126', 12, 'about', 58, '2025-06-07 21:46:19'),
        ('sess_2292', 2, 'home', 40, '2025-06-08 04:06:19'),
        ('sess_4633', 2, 'product', 34, '2025-06-07 16:34:19'),
        ('sess_6737', 9, 'product', 249, '2025-06-07 13:56:19'),
        ('sess_3199', 12, 'checkout', 282, '2025-06-07 23:22:19'),
        ('sess_9647', 10, 'home', 51, '2025-06-08 02:11:19'),
        ('sess_6170', 6, 'about', 157, '2025-06-07 15:02:19'),
        ('sess_1295', 14, 'checkout', 132, '2025-06-08 04:47:19'),
        ('sess_4190', 2, 'about', 148, '2025-06-08 01:27:19'),
        ('sess_1949', 2, 'contact', 137, '2025-06-07 17:37:19'),
        ('sess_5837', 13, 'checkout', 116, '2025-06-07 21:35:19'),
        ('sess_2626', 1, 'cart', 184, '2025-06-07 13:15:19'),
        ('sess_3479', 16, 'cart', 78, '2025-06-07 23:59:19'),
        ('sess_5949', 5, 'contact', 225, '2025-06-07 21:28:19'),
        ('sess_9303', 3, 'checkout', 72, '2025-06-07 22:25:19')
    ]
)

### application → job → stage → task

In [15]:
spark = (
    SparkSession.
    builder.
    appName('JobStageTaskExample').
    getOrCreate()
)

In [17]:
data = [
    ('xwq', 24),
    ('avc', 24),
    ('yui', 28),
    ('ops', 50),
    ('ps', 50)
]
df = spark.createDataFrame(data, ['name', 'age'])

In [18]:
df.count()

5

In [19]:
df.filter("age > 25").show()

+----+---+
|name|age|
+----+---+
| yui| 28|
| ops| 50|
|  ps| 50|
+----+---+



In [20]:
spark.stop()