# Assignment

## Answer 1

### Core Components of the Hadoop Ecosystem:

Hadoop Distributed File System (HDFS): HDFS is the storage component of Hadoop. It divides large files into smaller blocks and stores multiple copies of these blocks across different nodes for fault tolerance.

MapReduce: MapReduce is the processing component that allows distributed processing of large datasets across a Hadoop cluster. It consists of a Map phase for processing and a Reduce phase for summarization.

YARN (Yet Another Resource Negotiator): YARN is the resource management layer that handles resource allocation and job scheduling in a Hadoop cluster. It decouples the processing engine (MapReduce, Spark, etc.) from resource management.

## Answer 2

### Hadoop Distributed File System (HDFS):

HDFS is designed for reliability and fault tolerance. It stores data by breaking it into blocks (typically 128 MB or 256 MB in size) and distributes these blocks across the cluster. Multiple copies of each block are stored on different nodes, ensuring fault tolerance. If a node fails, the data can be retrieved from other nodes holding copies.

## Answer 3

### MapReduce Framework:

Map Phase: Input data is divided into smaller chunks, and the Map function processes each chunk independently, emitting key-value pairs.

Shuffle and Sort Phase: The framework shuffles and sorts the output of the Map phase, ensuring that all values associated with a particular key are grouped together.

Reduce Phase: The Reduce function processes the sorted and shuffled data, producing the final output.

Example: Word Count

Map Phase: Count occurrences of each word in a document.
Shuffle and Sort Phase: Group occurrences of each word together.
Reduce Phase: Sum up the counts for each word.
Advantages of MapReduce:

Scalability: Scales horizontally to handle large datasets.
Fault Tolerance: Redundant storage and task re-execution ensure fault tolerance.
Limitations:

Batch Processing: Designed for batch processing and may not be suitable for real-time processing.

## Answer 4

### YARN (Yet Another Resource Negotiator):

YARN manages resources and schedules applications. It allows different processing engines like MapReduce, Spark, and others to share and efficiently utilize cluster resources. Compared to Hadoop 1.x, YARN provides a more flexible and scalable architecture, allowing diverse workloads to run concurrently on the same cluster.

Benefits of YARN:

Flexibility: Supports various processing engines.
Improved Resource Management: Efficient utilization of resources.
Scalability: Scales better than Hadoop 1.x for diverse workloads.

## Answer 5

### Popular Hadoop Ecosystem Components:

HBase: A NoSQL database for real-time read/write access to large datasets.

Hive: Provides a data warehousing and SQL-like interface for querying data stored in Hadoop.

Pig: A high-level scripting language for creating MapReduce programs.

Spark: A fast and general-purpose cluster computing system for big data processing.

Integration Example: HBase
HBase can be integrated into the Hadoop ecosystem to provide real-time access to large datasets. It stores data in HDFS and allows for low-latency reads and writes. It is suitable for use cases requiring random, real-time access to data, such as in applications involving time-series data or online analytical processing (OLAP).

## Answer 6

### Key Differences Between Apache Spark and Hadoop MapReduce:

Processing Model:

MapReduce: Batch-oriented, two-stage processing model.
Spark: Supports batch processing, interactive queries, streaming, and iterative algorithms.
Performance:

MapReduce: Writes intermediate data to disk after each stage, leading to slower performance.
Spark: In-memory computation and lazy evaluation result in faster processing.
Ease of Use:

MapReduce: Requires more lines of code for complex algorithms.
Spark: Offers high-level APIs in Java, Scala, Python, and R, making it more user-friendly.
Data Processing:

MapReduce: Limited to batch processing.
Spark: Supports batch, interactive, streaming, and machine learning workloads.
Data Sharing:

MapReduce: Communicates through disk, causing I/O overhead.
Spark: Utilizes in-memory data sharing, reducing communication overhead.
Fault Tolerance:

MapReduce: Achieves fault tolerance through data replication.
Spark: Utilizes lineage information to reconstruct lost data, minimizing data replication.

## Answer 7

### Spark Application for Word Count:

In [None]:
from pyspark import SparkContext, SparkConf

# Set up Spark configuration and context
conf = SparkConf().setAppName("WordCountApp")
sc = SparkContext(conf=conf)

# Read input text file
input_file = "path/to/your/textfile.txt"
text_file = sc.textFile(input_file)

# Perform word count
word_counts = text_file.flatMap(lambda line: line.split(" ")) \
                      .map(lambda word: (word, 1)) \
                      .reduceByKey(lambda a, b: a + b)

# Get the top 10 most frequent words
top_words = word_counts.takeOrdered(10, key=lambda x: -x[1])

# Print the result
for word, count in top_words:
    print(f"{word}: {count}")

# Stop the Spark context
sc.stop()

#### Key Components and Steps:

Spark Configuration: Set up a Spark configuration and create a Spark context.
Read Data: Use textFile to read the input text file.
Word Count: Use flatMap, map, and reduceByKey to perform word count.
Top Words: Use takeOrdered to retrieve the top 10 words based on count.
Print Result: Display the results.
Stop Spark Context: Terminate the Spark context.

## Answer 8

### Spark RDD Operations:

In [None]:
from pyspark import SparkContext, SparkConf

# Set up Spark configuration and context
conf = SparkConf().setAppName("RDDOperations")
sc = SparkContext(conf=conf)

# Load dataset (replace 'your_dataset_path' with the actual path)
data = sc.textFile('your_dataset_path')

# a. Filter data to select only rows that meet specific criteria
filtered_data = data.filter(lambda line: "specific_criteria" in line)

# b. Map a transformation to modify a specific column in the dataset
mapped_data = data.map(lambda line: line.replace("old_value", "new_value"))

# c. Reduce the dataset to calculate a meaningful aggregation (e.g., sum, average)
numeric_data = mapped_data.map(lambda line: int(line.split(',')[1]))  # Assuming the second column is numeric
sum_result = numeric_data.reduce(lambda x, y: x + y)
average_result = sum_result / numeric_data.count()

# Print results
print("Filtered Data:")
print(filtered_data.collect())
print("\nMapped Data:")
print(mapped_data.collect())
print("\nSum Result:", sum_result)
print("Average Result:", average_result)

# Stop the Spark context
sc.stop()

The above example demonstrates how to filter data based on specific criteria, map a transformation to modify a column, and reduce the dataset to calculate the sum and average of a numeric column. Adjust the operations based on your dataset and requirements.

## Answer 9

### Spark DataFrame Operations:

#### For this example, let's assume we have a CSV dataset with the following structure:

ID,Name,Salary,Department

1,John,50000,IT

2,Jane,60000,HR

3,Bob,55000,IT

4,Alice,70000,Finance

5,Charlie,45000,HR


#### a. Select Specific Columns:

In [None]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("DataFrameOperations").getOrCreate()

# Load the CSV dataset
df = spark.read.csv("path/to/your/dataset.csv", header=True, inferSchema=True)

# Select specific columns
selected_columns = df.select("Name", "Salary")
selected_columns.show()

#### b. Filter Rows Based on Conditions:

In [None]:
# Filter rows based on a condition (e.g., Salary greater than 55000)
filtered_data = df.filter(df["Salary"] > 55000)
filtered_data.show()

#### c. Group Data and Calculate Aggregations:

In [None]:
# Group data by the Department column and calculate average salary
grouped_data = df.groupBy("Department").agg({"Salary": "avg"})
grouped_data.show()

#### d. Join Two DataFrames Based on a Common Key:



In [None]:
# Create a second DataFrame for demonstration purposes
df2 = spark.read.csv("path/to/your/second_dataset.csv", header=True, inferSchema=True)

# Join the two DataFrames based on the common key "ID"
joined_data = df.join(df2, df["ID"] == df2["ID"], "inner")
joined_data.show()

## Answer 10

### Spark Streaming Application:

#### For this example, let's assume you have a Spark cluster and Apache Kafka running.

In [None]:
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Create a Spark session
spark = SparkSession.builder.appName("SparkStreamingApp").getOrCreate()

# Set up the StreamingContext with a batch interval of 5 seconds
ssc = StreamingContext(spark.sparkContext, 5)

# Define Kafka parameters
kafka_params = {"bootstrap.servers": "your_kafka_broker", "group.id": "spark-streaming-group"}

# Create a DStream that connects to Kafka and ingests data in micro-batches
stream = KafkaUtils.createDirectStream(ssc, ["your_kafka_topic"], kafka_params)

# Apply a transformation to the streaming data (e.g., word count)
transformed_data = stream.map(lambda x: x[1].split(" ")) \
                        .flatMap(lambda words: [(word, 1) for word in words]) \
                        .reduceByKey(lambda a, b: a + b)

# Output the processed data to a sink (e.g., print to console)
transformed_data.pprint()

# Start the StreamingContext
ssc.start()
ssc.awaitTermination()

## Answer 11

### Fundamental Concepts of Apache Kafka:

Apache Kafka is a distributed streaming platform that aims to solve the problems of data integration, real-time data processing, and event-driven architectures. It provides fault tolerance, scalability, and durability in handling large-scale, real-time data streams.

## Answer 13

### Producing and Consuming Data in Kafka:

#### For Python, you can use the confluent_kafka library.

In [None]:
from confluent_kafka import Producer, Consumer, KafkaError

# Produce data to a Kafka topic
producer = Producer({'bootstrap.servers': 'your_kafka_broker'})

def delivery_report(err, msg):
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

producer.produce('your_kafka_topic', key='key', value='value', callback=delivery_report)

# Consume data from a Kafka topic
consumer = Consumer({
    'bootstrap.servers': 'your_kafka_broker',
    'group.id': 'your_consumer_group',
    'auto.offset.reset': 'earliest'
})

consumer.subscribe(['your_kafka_topic'])

while True:
    msg = consumer.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            continue
        else:
            print(msg.error())
            break

    print('Received message: {}'.format(msg.value().decode('utf-8')))

## Answer 14

### Data Retention and Partitioning in Kafka:

Data Retention: Configured by setting the log.retention.hours property. It determines how long Kafka retains messages.
Data Partitioning: Allows parallel processing and scalability. Topics can be partitioned, and each partition is processed independently.


## Answer 15

### Real-World Use Cases of Apache Kafka:

Log Aggregation: Centralized log collection for distributed systems.
Data Integration: Connecting various data sources and sinks.
Event Sourcing: Capturing and storing events as a source of truth.
Stream Processing: Real-time analytics and monitoring.
Messaging System: Reliable, scalable communication between microservices.
Kafka is preferred for its durability, scalability, and ability to handle high-throughput, fault-tolerant data streams.