## **Word Count Example**

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
        .appName("Log Word Count") \
        .getOrCreate()

25/02/10 16:14:15 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
!hadoop fs -ls /log_data

Found 1 items
-rw-r--r--   2 lokesh hadoop   66622621 2025-02-10 16:10 /log_data/logfile.txt


In [4]:
hdfs_path = "/log_data/logfile.txt"

In [5]:
rdd = spark.sparkContext.textFile(hdfs_path)

In [7]:
rdd.take(5)

                                                                                

['2025-02-01 17:54:46 - CRITICAL - Connection timeout on server server3',
 '2025-02-01 17:54:46 - INFO - Error in module moduleC: Null pointer exception',
 '2025-02-01 17:54:46 - ERROR - Disk space running low: 46% remaining',
 '2025-02-01 17:54:46 - INFO - Connection timeout on server server2',
 '2025-02-01 17:54:46 - ERROR - User 36 logged in']

In [8]:
rdd_split = rdd.map(lambda line: line.split("-"))

In [10]:
rdd_split.take(5)

[['2025',
  '02',
  '01 17:54:46 ',
  ' CRITICAL ',
  ' Connection timeout on server server3'],
 ['2025',
  '02',
  '01 17:54:46 ',
  ' INFO ',
  ' Error in module moduleC: Null pointer exception'],
 ['2025',
  '02',
  '01 17:54:46 ',
  ' ERROR ',
  ' Disk space running low: 46% remaining'],
 ['2025',
  '02',
  '01 17:54:46 ',
  ' INFO ',
  ' Connection timeout on server server2'],
 ['2025', '02', '01 17:54:46 ', ' ERROR ', ' User 36 logged in']]

In [11]:
flattened_rdd = rdd_split.flatMap(lambda x: [item.strip() for item in x if item.strip() != ''])

In [12]:
flattened_rdd.take(5)

['2025',
 '02',
 '01 17:54:46',
 'CRITICAL',
 'Connection timeout on server server3']

In [13]:
word_count = flattened_rdd.map(lambda word : (word, 1))

In [14]:
word_count_reduced = word_count.reduceByKey(lambda a, b : a + b)

In [15]:
word_count_reduced.collect()

                                                                                

[('2025', 1000000),
 ('INFO', 199567),
 ('User 36 logged in', 1971),
 ('User 100 logged in', 2019),
 ('DEBUG', 200535),
 ('User 48 logged in', 2031),
 ('Disk space running low: 6% remaining', 3974),
 ('User 58 logged in', 2060),
 ('User 34 logged in', 1966),
 ('User 22 logged in', 1982),
 ('User 8 logged in', 2002),
 ('Disk space running low: 50% remaining', 3963),
 ('Disk space running low: 32% remaining', 4045),
 ('Disk space running low: 29% remaining', 3961),
 ("File 'file1.txt' uploaded successfully", 66484),
 ('Disk space running low: 20% remaining', 3954),
 ('User 84 logged in', 2027),
 ('User 42 logged in', 1974),
 ('User 73 logged in', 1962),
 ('Disk space running low: 30% remaining', 4066),
 ('Disk space running low: 48% remaining', 4111),
 ('Disk space running low: 5% remaining', 3967),
 ('Disk space running low: 31% remaining', 3984),
 ('User 78 logged in', 1955),
 ('Disk space running low: 12% remaining', 4041),
 ('User 80 logged in', 1992),
 ('Error in module moduleB: Arr

In [16]:
spark.stop()

### **Partition Demo**

In [18]:
spark = SparkSession.builder \
        .appName("Partition Demo") \
        .getOrCreate()

25/02/10 16:45:40 INFO SparkEnv: Registering MapOutputTracker
25/02/10 16:45:40 INFO SparkEnv: Registering BlockManagerMaster
25/02/10 16:45:40 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
25/02/10 16:45:40 INFO SparkEnv: Registering OutputCommitCoordinator


In [19]:
spark

In [21]:
log_rdd = spark.sparkContext.parallelize(hdfs_path)

In [24]:
print(f"Default Partition {log_rdd.getNumPartitions()}")

Default Partition 2


##### Set partition to 200

In [25]:
log_rdd = log_rdd.repartition(200)

In [28]:
print(f"Partition {log_rdd.getNumPartitions()}")

Partition 200


In [29]:
spark.stop()

### **RDD Operations**

In [30]:
spark = SparkSession.builder \
        .appName("RDD Operation") \
        .getOrCreate()

25/02/10 16:51:18 INFO SparkEnv: Registering MapOutputTracker
25/02/10 16:51:18 INFO SparkEnv: Registering BlockManagerMaster
25/02/10 16:51:18 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
25/02/10 16:51:18 INFO SparkEnv: Registering OutputCommitCoordinator


In [32]:
spark

In [33]:
!hadoop fs -ls /ecommerce_data/ecommerce_data/500MB

Found 5 items
-rw-r--r--   2 lokesh hadoop  570783961 2025-02-10 16:00 /ecommerce_data/ecommerce_data/500MB/customers.csv
-rw-r--r--   2 lokesh hadoop  480952071 2025-02-10 16:00 /ecommerce_data/ecommerce_data/500MB/items.csv
-rw-r--r--   2 lokesh hadoop  472632078 2025-02-10 16:00 /ecommerce_data/ecommerce_data/500MB/orders.csv
-rw-r--r--   2 lokesh hadoop  468231725 2025-02-10 16:01 /ecommerce_data/ecommerce_data/500MB/payments.csv
-rw-r--r--   2 lokesh hadoop  448185359 2025-02-10 16:01 /ecommerce_data/ecommerce_data/500MB/shippings.csv


In [34]:
hdfs_path = "/ecommerce_data/ecommerce_data/500MB/customers.csv"

In [37]:
customer_rdd = spark.sparkContext.textFile(hdfs_path)

In [38]:
customer_rdd.take(5)

                                                                                

['customer_id,name,city,state,country,registration_date,is_active',
 '0,Customer_0,Mumbai,Telangana,India,2023-03-21,True',
 '1,Customer_1,Chennai,West Bengal,India,2023-05-27,False',
 '2,Customer_2,Pune,Karnataka,India,2023-10-11,False',
 '3,Customer_3,Hyderabad,Gujarat,India,2023-11-11,False']

In [39]:
header = customer_rdd.first()

In [40]:
header

'customer_id,name,city,state,country,registration_date,is_active'

In [43]:
customer_filter_rdd = customer_rdd.filter(lambda row : row != header)

customer_filter_rdd.take(3)

['0,Customer_0,Mumbai,Telangana,India,2023-03-21,True',
 '1,Customer_1,Chennai,West Bengal,India,2023-05-27,False',
 '2,Customer_2,Pune,Karnataka,India,2023-10-11,False']

### **Map**

#### Parse data

In [44]:
def parse_data(row):
    fields = row.split(",")
    
    return (
        int(fields[0]),
        fields[1],
        fields[2],
        fields[3],
        fields[4],
        fields[5],
        fields[6] == 'true'
    )

In [47]:
customer_parsed_rdd = customer_filter_rdd.map(parse_data)

customer_parsed_rdd.take(5)

[(0, 'Customer_0', 'Mumbai', 'Telangana', 'India', '2023-03-21', False),
 (1, 'Customer_1', 'Chennai', 'West Bengal', 'India', '2023-05-27', False),
 (2, 'Customer_2', 'Pune', 'Karnataka', 'India', '2023-10-11', False),
 (3, 'Customer_3', 'Hyderabad', 'Gujarat', 'India', '2023-11-11', False),
 (4, 'Customer_4', 'Mumbai', 'Karnataka', 'India', '2023-05-09', False)]

In [48]:
## Reduce by key with map
customer_parsed_rdd.map(lambda row : (row[2], 1)).reduceByKey(lambda x, y : x + y).collect()

                                                                                

[('Kolkata', 1096777),
 ('Pune', 1095748),
 ('Chennai', 1095052),
 ('Hyderabad', 1096426),
 ('Ahmedabad', 1097162),
 ('Mumbai', 1095815),
 ('Delhi', 1096183),
 ('Bangalore', 1094195)]

In [49]:
spark.stop()

### **Wide V/S Narrow Transformation**

In [50]:
spark = SparkSession.builder \
        .appName("Wide-Transformation") \
        .master('yarn') \
        .getOrCreate()

25/02/10 17:06:49 INFO SparkEnv: Registering MapOutputTracker
25/02/10 17:06:49 INFO SparkEnv: Registering BlockManagerMaster
25/02/10 17:06:49 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
25/02/10 17:06:49 INFO SparkEnv: Registering OutputCommitCoordinator


In [52]:
hdfs_path = "/ecommerce_data/ecommerce_data/10MB/customers.csv"

In [53]:
customer_rdd = spark.sparkContext.textFile(hdfs_path)

In [55]:
header = customer_rdd.first()

customer_rdd = customer_rdd.filter(lambda row : row != header)

                                                                                

In [57]:
parsed_rdd = customer_rdd.map(parse_data)

In [58]:
active_customers = parsed_rdd.filter(lambda row:row[6])

In [59]:
grouped_by_city_rdd = parsed_rdd.map(lambda row: (row[2], 1)).reduceByKey(lambda x, y: x + y)

In [60]:
grouped_by_city_rdd.collect()

                                                                                

[('Chennai', 21046),
 ('Mumbai', 21041),
 ('Pune', 21481),
 ('Bangalore', 21272),
 ('Hyderabad', 21174),
 ('Ahmedabad', 21272),
 ('Delhi', 21123),
 ('Kolkata', 21264)]

In [61]:
spark.stop()