# Hadoop Ecosystem Assignment

## Question 1: Explain the core components of the Hadoop ecosystem and their respective roles in processing and storing big data. Provide a brief overview of HDFS, MapReduce, and YARN.

**HDFS (Hadoop Distributed File System):**
- **Role:** HDFS is designed for storing large datasets reliably and to stream those datasets at high bandwidth to user applications.
- **Key Features:** It is highly fault-tolerant and is designed to be deployed on low-cost hardware. It provides high throughput access to application data and is suitable for applications with large datasets.

**MapReduce:**
- **Role:** MapReduce is a programming model for processing large datasets in parallel across a Hadoop cluster.
- **Key Features:** It breaks down the processing into map and reduce tasks. The map tasks process input data and produce intermediate results, while reduce tasks aggregate these intermediate results to produce the final output.

**YARN (Yet Another Resource Negotiator):**
- **Role:** YARN manages resources in a Hadoop cluster and schedules jobs. It allows multiple data processing engines such as batch processing, interactive processing, and real-time processing to run and process data stored in HDFS.
- **Key Features:** YARN decouples the resource management and job scheduling from the data processing component (MapReduce).

## Question 2: Discuss the Hadoop Distributed File System (HDFS) in detail. Explain how it stores and manages data in a distributed environment. Describe the key concepts of HDFS, such as NameNode, DataNode, and blocks, and how they contribute to data reliability and fault tolerance.

**HDFS (Hadoop Distributed File System):**
- **Storage and Management:** HDFS divides files into large blocks (default size is 128 MB) and distributes them across multiple nodes in a cluster. It replicates each block across multiple DataNodes to ensure fault tolerance.
- **NameNode:** The master server that manages the file system namespace and controls access to files by clients. It keeps track of the metadata, like which blocks make up a file and where those blocks are located.
- **DataNode:** Nodes where the actual data blocks are stored. DataNodes are responsible for serving read and write requests from the file system’s clients.
- **Blocks:** The basic unit of storage in HDFS. Files are split into blocks and stored across DataNodes. Blocks are replicated to handle hardware failures.

**Reliability and Fault Tolerance:**
- HDFS ensures data reliability by replicating each block on multiple DataNodes (default replication factor is 3).
- If a DataNode fails, the system can read from the replicated blocks stored on other DataNodes.
- The NameNode maintains the replication factor by creating additional copies of blocks as needed.

## Question 3: Write a step-by-step explanation of how the MapReduce framework works. Use a real-world example to illustrate the Map and Reduce phases. Discuss the advantages and limitations of MapReduce for processing large datasets.

**MapReduce Framework:**
1. **Input Data Splitting:** The input data is split into fixed-size chunks (typically 128 MB to 256 MB).
2. **Map Phase:** Each split is processed by a Map task. The Map function takes input key-value pairs and produces a set of intermediate key-value pairs.
   - **Example:** Counting the frequency of words in a document.
     - Input: A large text document.
     - Map Function: Takes a line of text as input and emits each word with a count of 1.
     - Intermediate Output: ("word1", 1), ("word2", 1), ("word1", 1), ...
3. **Shuffle and Sort:** The intermediate key-value pairs are shuffled and sorted by key so that all values associated with the same key are grouped together.
4. **Reduce Phase:** Each group of intermediate key-value pairs is processed by a Reduce task. The Reduce function takes an intermediate key and a set of values for that key and merges them to form a possibly smaller set of values.
   - **Example:** Summing the word counts.
     - Reduce Function: Takes ("word1", [1, 1, 1, ...]) and sums the counts.
     - Output: ("word1", total_count), ("word2", total_count), ...

**Advantages:**
- Scalability: Can process petabytes of data.
- Fault Tolerance: Automatic handling of node failures.
- Flexibility: Can process structured and unstructured data.

**Limitations:**
- Latency: High latency for real-time data processing.
- Complexity: Requires writing custom Map and Reduce functions.
- Overhead: Significant overhead in the shuffle and sort phase.

## Question 4: Explore the role of YARN in Hadoop. Explain how it manages cluster resources and schedules applications. Compare YARN with the earlier Hadoop 1.x architecture and highlight the benefits of YARN.

**YARN (Yet Another Resource Negotiator):**
- **Role:** YARN is the resource management layer of Hadoop. It allocates resources to various applications running in a Hadoop cluster.
- **Resource Management:** YARN uses ResourceManager and NodeManager for resource management.
  - **ResourceManager:** The central authority that arbitrates resources among all the applications in the system.
  - **NodeManager:** Runs on each node and is responsible for managing the execution of containers on that node.
- **Scheduling:** YARN supports different types of schedulers like FIFO, Capacity Scheduler, and Fair Scheduler to manage how jobs are scheduled.

**Comparison with Hadoop 1.x:**
- **Hadoop 1.x:** Resource management and job scheduling were handled by JobTracker, and TaskTracker managed the execution of individual tasks on each node. JobTracker was a single point of failure.
- **YARN:** Separates resource management and job scheduling into different daemons, ResourceManager and ApplicationMaster, respectively. This separation enhances the scalability and efficiency of the cluster. YARN provides a more flexible and efficient resource management system, supporting multiple data processing engines other than MapReduce.

**Benefits of YARN:**
- Scalability: Better resource management leads to improved scalability.
- Flexibility: Supports different processing models (batch, interactive, streaming).
- Resource Utilization: More efficient resource utilization by allowing multiple applications to share the same resources.

## Question 5: Provide an overview of some popular components within the Hadoop ecosystem, such as HBase, Hive, Pig, and Spark. Describe the use cases and differences between these components. Choose one component and explain how it can be integrated into a Hadoop ecosystem for specific data processing tasks.

**HBase:**
- **Overview:** A distributed, scalable, big data store. It is modeled after Google's Bigtable and is used for random, real-time read/write access to large datasets.
- **Use Cases:** Real-time querying of large datasets, storing versioned data.

**Hive:**
- **Overview:** A data warehouse infrastructure built on top of Hadoop. It provides tools to enable easy data summarization, querying, and analysis of large datasets stored in HDFS.
- **Use Cases:** SQL-like querying on large datasets, data summarization.

**Pig:**
- **Overview:** A high-level platform for creating programs that run on Hadoop. The language for this platform is called Pig Latin, and it abstracts the programming model of MapReduce.
- **Use Cases:** Complex data transformations, ETL (Extract, Transform, Load) processes.

**Spark:**
- **Overview:** An open-source unified analytics engine for large-scale data processing. It provides an interface for programming entire clusters with implicit data parallelism and fault tolerance.
- **Use Cases:** Real-time data processing, machine learning, graph processing.

**Integration Example: Hive in Hadoop Ecosystem:**
- Hive can be integrated into a Hadoop ecosystem for running SQL-like queries on large datasets stored in HDFS. It translates SQL-like queries into MapReduce jobs, making it easier for users who are familiar with SQL to query big data without needing to write MapReduce code directly.

## Question 6: Explain the key differences between Apache Spark and Hadoop MapReduce. How does Spark overcome some of the limitations of MapReduce for big data processing tasks?

**Differences between Apache Spark and Hadoop MapReduce:**
- **Data Processing:** MapReduce processes data in batch mode, while Spark can process data in both batch and real-time modes.
- **Data Storage:** MapReduce stores intermediate results in HDFS, which introduces I/O overhead. Spark processes data in-memory, significantly reducing I/O operations and increasing performance.
- **Ease of Use:** Spark provides high-level APIs in Java, Scala, and Python, making it easier to use. MapReduce requires users to write complex Java code.
- **Performance:** Spark is generally faster than MapReduce due to its in-memory processing capabilities.

**How Spark Overcomes Limitations of MapReduce:**
- **In-Memory Processing:** By processing data in-memory, Spark reduces the need for frequent disk I/O operations.
- **Unified Engine:** Spark offers a unified engine for batch processing, real-time streaming, machine learning, and graph processing.
- **Fault Tolerance:** Spark's Resilient Distributed Datasets (RDDs) provide fault tolerance by tracking the lineage of transformations.

## Question 7: Write a Spark application in Scala or Python that reads a text file, counts the occurrences of each word, and returns the top 10 most frequent words. Explain the key components and steps involved in this application.

```python
from pyspark import SparkContext
from pyspark.sql import SparkSession

# Initialize SparkContext
sc = SparkContext("local", "Word Count")

# Read the input file
text_file = sc.textFile("path/to/textfile.txt")

# Perform word count
word_counts = (text_file
               .flatMap(lambda line: line.split())
               .map(lambda word: (word, 1))


# Question 7: Write a Spark application in Scala or Python that reads a text file, counts the occurrences of each word,
# and returns the top 10 most frequent words. Explain the key components and steps involved in this application.

from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder.appName("WordCount").getOrCreate()

# Read text file
text_file = spark.read.text("path/to/textfile.txt")

# Split each line into words and count occurrences
word_counts = text_file.rdd.flatMap(lambda line: line.value.split(" ")) \
                           .map(lambda word: (word, 1)) \
                           .reduceByKey(lambda a, b: a + b)

# Get the top 10 most frequent words
top_10_words = word_counts.map(lambda x: (x[1], x[0])) \
                          .sortByKey(False) \
                          .take(10)

# Show the result
for count, word in top_10_words:
    print(f"{word}: {count}")

spark.stop()

# Explanation:
# 1. **SparkSession:** The entry point for Spark functionality.
# 2. **Reading the text file:** Load the data into an RDD.
# 3. **FlatMap:** Split each line into words.
# 4. **Map:** Transform words into (word, 1) pairs.
# 5. **ReduceByKey:** Aggregate the counts of each word.
# 6. **SortByKey:** Sort the words by their counts in descending order.
# 7. **Take:** Retrieve the top 10 words with the highest counts.

# Question 8: Using Spark RDDs (Resilient Distributed Datasets), perform the following tasks on a dataset of your choice:
# a. Filter the data to select only rows that meet specific criteria.
# b. Map a transformation to modify a specific column in the dataset.
# c. Reduce the dataset to calculate a meaningful aggregation (e.g., sum, average).

from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder.appName("RDDOperations").getOrCreate()

# Load a dataset
data = [("Alice", 29), ("Bob", 31), ("Cathy", 23), ("David", 34)]
rdd = spark.sparkContext.parallelize(data)

# a. Filter the data to select only rows where age > 30
filtered_rdd = rdd.filter(lambda x: x[1] > 30)
print("Filtered RDD:", filtered_rdd.collect())

# b. Map a transformation to modify the age column by adding 1
mapped_rdd = filtered_rdd.map(lambda x: (x[0], x[1] + 1))
print("Mapped RDD:", mapped_rdd.collect())

# c. Reduce the dataset to calculate the sum of ages
sum_ages = mapped_rdd.map(lambda x: x[1]).reduce(lambda a, b: a + b)
print("Sum of Ages:", sum_ages)

spark.stop()

# Question 9: Create a Spark DataFrame in Python or Scala by loading a dataset (e.g., CSV or JSON) and perform the following operations:
# a. Select specific columns from the DataFrame.
# b. Filter rows based on certain conditions.
# c. Group the data by a particular column and calculate aggregations (e.g., sum, average).
# d. Join two DataFrames based on a common key.

from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder.appName("DataFrameOperations").getOrCreate()

# Load a dataset (CSV file)
df = spark.read.csv("path/to/csvfile.csv", header=True, inferSchema=True)

# a. Select specific columns from the DataFrame
selected_df = df.select("column1", "column2")
print("Selected Columns:")
selected_df.show()

# b. Filter rows based on certain conditions
filtered_df = df.filter(df["column1"] > 30)
print("Filtered Rows:")
filtered_df.show()

# c. Group the data by a particular column and calculate aggregations
grouped_df = df.groupBy("column1").agg({"column2": "sum", "column3": "avg"})
print("Grouped Data with Aggregations:")
grouped_df.show()

# d. Join two DataFrames based on a common key
df1 = spark.read.csv("path/to/csvfile1.csv", header=True, inferSchema=True)
df2 = spark.read.csv("path/to/csvfile2.csv", header=True, inferSchema=True)
joined_df = df1.join(df2, df1["common_key"] == df2["common_key"])
print("Joined DataFrames:")
joined_df.show()

spark.stop()

# Question 10: Set up a Spark Streaming application to process real-time data from a source (e.g., Apache Kafka or a simulated data source).
# The application should:
# a. Ingest data in micro-batches.
# b. Apply a transformation to the streaming data (e.g., filtering, aggregation).
# c. Output the processed data to a sink (e.g., write to a file, a database, or display it).

from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext

# Initialize a Spark session and Streaming context
spark = SparkSession.builder.appName("SparkStreamingApp").getOrCreate()
ssc = StreamingContext(spark.sparkContext, 1)  # 1 second batch interval

# Simulated data source (socket stream)
lines = ssc.socketTextStream("localhost", 9999)

# a. Ingest data in micro-batches
words = lines.flatMap(lambda line: line.split(" "))

# b. Apply a transformation (word count)
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)

# c. Output the processed data to the console
word_counts.pprint()

# Start the streaming computation
ssc.start()
ssc.awaitTermination()

# Question 11: Explain the fundamental concepts of Apache Kafka. What is it, and what problems does it aim to solve in the context of big data and real-time data processing?

"""
**Apache Kafka:**
- **Concepts:** Kafka is a distributed streaming platform used for building real-time data pipelines and streaming applications. It is designed to handle high throughput and fault tolerance.
- **Problems Solved:**
  - **Real-Time Data Processing:** Kafka enables real-time processing of data streams, making it possible to build applications that can respond to data as it is generated.
  - **Scalability:** Kafka can handle large volumes of data and can scale horizontally by adding more brokers to the cluster.
  - **Durability and Fault Tolerance:** Kafka stores data in a durable and fault-tolerant manner, ensuring that data is not lost even in the event of hardware failures.
"""

# Question 12: Describe the architecture of Kafka, including its key components such as Producers, Topics, Brokers, Consumers, and ZooKeeper.
# How do these components work together in a Kafka cluster to achieve data streaming?

"""
**Kafka Architecture:**
- **Producers:** Producers are applications that send data (messages) to Kafka topics.
- **Topics:** Topics are categories or feed names to which records are published. A topic is split into partitions to enable parallel processing.
- **Brokers:** Brokers are Kafka servers that store data and serve client requests. Each broker holds one or more partitions of a topic.
- **Consumers:** Consumers are applications that read data from Kafka topics.
- **ZooKeeper:** ZooKeeper manages the Kafka cluster metadata and coordinates the brokers, ensuring leader election and configuration management.

**Data Streaming Workflow:**
1. Producers send data to a specific topic.
2. Kafka brokers receive and store the data in partitions.
3. Consumers subscribe to topics and consume the data from the partitions.
4. ZooKeeper coordinates the brokers, ensuring that the system operates smoothly.
"""

# Question 13: Create a step-by-step guide on how to produce data to a Kafka topic using a programming language of your choice and then consume that data from the topic.
# Explain the role of Kafka producers and consumers in this process.

# Step-by-Step Guide (Python):

from kafka import KafkaProducer, KafkaConsumer

# Produce data to Kafka topic
producer = KafkaProducer(bootstrap_servers='localhost:9092')

# Send messages to the topic 'my_topic'
producer.send('my_topic', b'Hello, Kafka!')
producer.send('my_topic', b'Another message')
producer.flush()

# Consume data from Kafka topic
consumer = KafkaConsumer('my_topic', bootstrap_servers='localhost:9092')

# Print consumed messages
for message in consumer:
    print(f"Received message: {message.value}")

# Explanation:
# - **Producers:** Producers are responsible for sending data to Kafka topics. In this example, we create a producer and send messages to the topic 'my_topic'.
# - **Consumers:** Consumers subscribe to topics and read the data. In this example, we create a consumer that reads messages from 'my_topic' and prints them.

# Question 14: Discuss the importance of data retention and data partitioning in Kafka. How can these features be configured, and what are the implications for data storage and processing?

"""
**Data Retention:**
- **Importance:** Data retention determines how long Kafka retains messages in the topic. This is crucial for ensuring that consumers can read data even if they are temporarily offline.
- **Configuration:** Retention can be configured using the `log.retention.hours` property for the retention period and `log.retention.bytes` for the maximum size of the log.

**Data Partitioning:**
- **
