In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") \
                    .appName('task') \
                    .getOrCreate()

In [2]:
# Define schema for the CSV file
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StringType
schema = StructType().add("publish_date", StringType()) \
                     .add("headline_category", StringType()) \
                     .add("headline_text", StringType())

In [3]:
# Read CSV file with explicit schema
df = spark.read.csv("hdfs://localhost:9000/data/india-news-headlines.csv",
                    header=True, schema=schema)

In [4]:
# Register DataFrame as a temporary view
df.createOrReplaceTempView("hdfs://localhost:9000/data/india_news_headlines")

In [5]:
import time
# Measure execution time for DataFrame operations
start_time_df = time.time()

In [6]:
# Perform SQL operations using DataFrame API
result_df = spark.sql("""
    WITH temp_table AS (
        SELECT publish_date, headline_category, headline_text
        FROM india_news_headlines
        WHERE publish_date BETWEEN '20010102' AND '20230630'
    )
    SELECT headline_category, COUNT(*) AS num_headlines
    FROM temp_table
    GROUP BY headline_category
    ORDER BY num_headlines DESC
    LIMIT 10
""")

# Display DataFrame result
result_df.show()

end_time_df = time.time()
execution_time_df = end_time_df - start_time_df


+--------------------+-------------+
|   headline_category|num_headlines|
+--------------------+-------------+
|               india|       307371|
|             unknown|       209583|
|         city.mumbai|       168086|
|          city.delhi|       148240|
|      city.hyderabad|       132872|
|business.india-bu...|       125900|
|     city.chandigarh|       121785|
|      city.bengaluru|       108363|
|      city.ahmedabad|       107307|
|        city.kolkata|       107052|
+--------------------+-------------+



In [7]:
import time
from pyspark import SparkContext

# Create SparkContext
sc = SparkContext.getOrCreate()

In [8]:
# Load data into RDD
rdd = sc.textFile("hdfs://localhost:9000/data/india-news-headlines.csv") \
        .map(lambda line: line.split(","))

In [9]:
# Filter data based on date range and map to key-value pairs
filtered_rdd = rdd.filter(lambda row: '20010102' <= row[0] <= '20230630') \
                  .map(lambda row: (row[1], 1))

In [10]:
# Perform RDD operations
start_time_rdd = time.time()

In [11]:
# Reduce by key to aggregate counts
aggregated_rdd = filtered_rdd.reduceByKey(lambda a, b: a + b)

In [12]:
# Swap key-value pairs
swapped_rdd = aggregated_rdd.map(lambda x: (x[1], x[0]))

In [13]:
try:
    # Sort by count in descending order
    sorted_rdd = swapped_rdd.sortByKey(ascending=False)

    # Take top 10 records
    result_rdd = sorted_rdd.take(10)

    # Display RDD result
    print("Top 10 records after sorting:")
    for item in result_rdd:
        print(item)

except Exception as e:
    print("Error occurred while sorting RDD:", e)
    
end_time_rdd = time.time()   
execution_time_rdd = end_time_rdd - start_time_rdd

Top 10 records after sorting:
(307371, 'india')
(209583, 'unknown')
(168086, 'city.mumbai')
(148240, 'city.delhi')
(132872, 'city.hyderabad')
(125900, 'business.india-business')
(121785, 'city.chandigarh')
(108363, 'city.bengaluru')
(107307, 'city.ahmedabad')
(107052, 'city.kolkata')


In [14]:
# Measure execution time for DataSet operations
start_time_ds = time.time()
# Perform SQL operations using DataSet API directly from DataFrame
result_ds = df.filter(func.col("publish_date").between('20010102', '20230630')) \
              .groupby("headline_category") \
              .count() \
              .orderBy("count", ascending=False) \
              .limit(10)

# Display DataSet result
result_ds.show()
end_time_ds = time.time()
execution_time_ds = end_time_ds - start_time_ds

+--------------------+------+
|   headline_category| count|
+--------------------+------+
|               india|307371|
|             unknown|209583|
|         city.mumbai|168086|
|          city.delhi|148240|
|      city.hyderabad|132872|
|business.india-bu...|125900|
|     city.chandigarh|121785|
|      city.bengaluru|108363|
|      city.ahmedabad|107307|
|        city.kolkata|107052|
+--------------------+------+



In [15]:
# Print execution times
print("Execution time using DataFrame: {} seconds".format(execution_time_df))
print("Execution time using DataSet: {} seconds".format(execution_time_ds))
print("Execution time using rdd: {} seconds".format(execution_time_rdd))

Execution time using DataFrame: 6.738982677459717 seconds
Execution time using DataSet: 5.4899842739105225 seconds
Execution time using rdd: 20.501856088638306 seconds


In [16]:
# Load data from HDFS into RDD
rdd = sc.textFile("hdfs://localhost:9000/data/india-news-headlines.csv")

In [17]:
# Define your processing functions
def process_map(line):
    # Perform map operation
    return line.split(',')

def process_flatMap(line):
    # Perform flatMap operation
    return line.split(',')

def process_reduceByKey(line):
    # Perform reduceByKey operation
    return (line[1], 1)

In [18]:
# Apply RDD transformations
rdd_map = rdd.map(process_map)
rdd_flatMap = rdd.flatMap(process_flatMap)
rdd_reduceByKey = rdd_map.map(process_reduceByKey).reduceByKey(lambda x, y: x + y)

In [19]:
#results
print("RDD with map transformation:")
try:
    print(rdd_map.take(5))
except Exception as e:
    print("An error occurred while executing take(5) on RDD with map transformation:", str(e))

print("\nRDD with flatMap transformation:")
try:
    print(rdd_flatMap.take(10))
except Exception as e:
    print("An error occurred while executing take(10) on RDD with flatMap transformation:", str(e))

print("\nRDD with reduceByKey transformation:")
try:
    print(rdd_reduceByKey.take(5))
except Exception as e:
    print("An error occurred while executing take(5) on RDD with reduceByKey transformation:", str(e))


RDD with map transformation:
[['publish_date', 'headline_category', 'headline_text'], ['20010102', 'unknown', '"Status quo will not be disturbed at Ayodhya; says Vajpayee"'], ['20010102', 'unknown', '"Fissures in Hurriyat over Pak visit"'], ['20010102', 'unknown', '"America\'s unwanted heading for India?"'], ['20010102', 'unknown', '"For bigwigs; it is destination Goa"']]

RDD with flatMap transformation:
['publish_date', 'headline_category', 'headline_text', '20010102', 'unknown', '"Status quo will not be disturbed at Ayodhya; says Vajpayee"', '20010102', 'unknown', '"Fissures in Hurriyat over Pak visit"', '20010102']

RDD with reduceByKey transformation:
[('home.sunday-times.all-that-matters', 3501), ('delhi-times', 8449), ('life-style.beauty', 3051), ('all-colour-edition', 17), ('pm-on-europe-tour.news', 28)]
