# Starting the Engine

In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install pyspark


import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]


!wget https://archive.apache.org/dist/hadoop/core/hadoop-2.7.7/hadoop-2.7.7.tar.gz
!tar -xvzf hadoop-2.7.7.tar.gz
!mv hadoop-2.7.7 hadoop

# Configure Hadoop
os.environ["HADOOP_HOME"] = "/content/hadoop"
os.environ["PATH"] += ":/content/hadoop/bin"


# Write core-site.xml using Python file handling
core_site_config = """
<configuration>
  <property>
    <name>fs.defaultFS</name>
    <value>file:///</value>
  </property>
</configuration>
"""

with open("/content/hadoop/etc/hadoop/core-site.xml", "w") as file:
    file.write(core_site_config)




[1;30;43mStreaming output truncated to the last 5000 lines.[0m
hadoop-2.7.7/share/doc/hadoop/hadoop-hdfs-httpfs/apidocs/src-html/org/apache/hadoop/lib/service/Instrumentation.html
hadoop-2.7.7/share/doc/hadoop/hadoop-hdfs-httpfs/apidocs/src-html/org/apache/hadoop/lib/service/FileSystemAccessException.html
hadoop-2.7.7/share/doc/hadoop/hadoop-hdfs-httpfs/apidocs/src-html/org/apache/hadoop/lib/service/instrumentation/
hadoop-2.7.7/share/doc/hadoop/hadoop-hdfs-httpfs/apidocs/src-html/org/apache/hadoop/lib/service/instrumentation/InstrumentationService.html
hadoop-2.7.7/share/doc/hadoop/hadoop-hdfs-httpfs/apidocs/src-html/org/apache/hadoop/lib/service/hadoop/
hadoop-2.7.7/share/doc/hadoop/hadoop-hdfs-httpfs/apidocs/src-html/org/apache/hadoop/lib/service/hadoop/FileSystemAccessService.html
hadoop-2.7.7/share/doc/hadoop/hadoop-hdfs-httpfs/apidocs/src-html/org/apache/hadoop/lib/service/FileSystemAccessException.ERROR.html
hadoop-2.7.7/share/doc/hadoop/hadoop-hdfs-httpfs/apidocs/src-html/org

In [None]:
!pip install pyhadoop

Collecting pyhadoop
  Downloading pyhadoop-0.1.tar.gz (2.3 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyhadoop
  Building wheel for pyhadoop (setup.py) ... [?25l[?25hdone
  Created wheel for pyhadoop: filename=pyhadoop-0.1-py3-none-any.whl size=2742 sha256=c39f4edb6fa2fb95799c015626141890e11a617e3d6d9d53b1cf3226db8d0809
  Stored in directory: /root/.cache/pip/wheels/1c/cb/96/1ea6ed56366f0b284d0adae89e26844f70c30919355b398ac8
Successfully built pyhadoop
Installing collected packages: pyhadoop
Successfully installed pyhadoop-0.1


# Map Reduce

In [None]:
# Defining a mapper function - for splitting and mapping
def mapper(input_data):
  words = input_data.split()

  return [(word,1) for word in words]

# Defining a reducer function - for reducing and aggregating
def reducer(mapped_data):
  word_counts = {}

  for key, value in mapped_data:
    word_counts[key] = word_counts.get(key, 0) + value

  return word_counts

# Taking a sample input
input_data = "MapReduce is a programming model and an associated implementation for processing and generating big data sets with a parallel, distributed algorithm on a cluster."

# MapReduce
mapped_data = mapper(input_data)
reduced_data = reducer(mapped_data)

print(f"Mapped data: {mapped_data}")
print(f"Reduced data: {reduced_data}")

Mapped data: [('MapReduce', 1), ('is', 1), ('a', 1), ('programming', 1), ('model', 1), ('and', 1), ('an', 1), ('associated', 1), ('implementation', 1), ('for', 1), ('processing', 1), ('and', 1), ('generating', 1), ('big', 1), ('data', 1), ('sets', 1), ('with', 1), ('a', 1), ('parallel,', 1), ('distributed', 1), ('algorithm', 1), ('on', 1), ('a', 1), ('cluster.', 1)]
Reduced data: {'MapReduce': 1, 'is': 1, 'a': 3, 'programming': 1, 'model': 1, 'and': 2, 'an': 1, 'associated': 1, 'implementation': 1, 'for': 1, 'processing': 1, 'generating': 1, 'big': 1, 'data': 1, 'sets': 1, 'with': 1, 'parallel,': 1, 'distributed': 1, 'algorithm': 1, 'on': 1, 'cluster.': 1}


# Debugging

In [None]:
# Defining a faulty mapper
def faulty_mapper(input_data):
  words = input_data.split()
  return [(word,None) for word in words]

# Defining a debugged recducer
def reducer(mapped_data):
  word_counts = {}

  for key, value in mapped_data:
    if value is None:
      print(f"Debug: Missing value for key '{key}'")
      continue
    word_counts[key] = word_counts.get(key, 0) + value
  return word_counts

# Taking a sample input and applying the functions
input_data = "Mapreduce is a hadoop tool for processing big data"
mapped_data = faulty_mapper(input_data)
reduced_data = reducer(mapped_data)

print(f"\nFaulty mapped data: {mapped_data}")

print(f"Final Reduced Data: {reduced_data}")

Debug: Missing value for key 'Mapreduce'
Debug: Missing value for key 'is'
Debug: Missing value for key 'a'
Debug: Missing value for key 'hadoop'
Debug: Missing value for key 'tool'
Debug: Missing value for key 'for'
Debug: Missing value for key 'processing'
Debug: Missing value for key 'big'
Debug: Missing value for key 'data'

Faulty mapped data: [('Mapreduce', None), ('is', None), ('a', None), ('hadoop', None), ('tool', None), ('for', None), ('processing', None), ('big', None), ('data', None)]
Final Reduced Data: {}


# Structured to Unstructred

In [None]:
input_data = """1,John,128
2,Mary,22
3,Sachin,35"""
print(input_data)

# Splitting into key value pairs
key_value_pairs = [(i,line) for i , line in enumerate(input_data.split("\n"))]
print(key_value_pairs)

for key, value in key_value_pairs:
  print(f"{key}\t{value}")

1,John,128
2,Mary,22
3,Sachin,35
[(0, '1,John,128'), (1, '2,Mary,22'), (2, '3,Sachin,35')]
0	1,John,128
1	2,Mary,22
2	3,Sachin,35


# Partitioning & Sorting

In [None]:
# Partitioning function
def partition_data(mapped_data, num_partitions):
    partitions = [[] for _ in range(num_partitions)]

    for key, value in mapped_data:
        partition_index = hash(key) % num_partitions
        partitions[partition_index].append((key, value))

    return partitions


# Sorting function
def sort_partition(partition):
    return sorted(partition, key=lambda x: x[0])


# Sample data
mapped_data = [("apple",3),("banana",2),("apple",1),("banana",1),("Peach",5)]
num_partitions = 2

# Partitioning
partitions = partition_data(mapped_data, num_partitions)
for i, partition in enumerate(partitions):          # to print the separate partitions
  print(f"Partition {i+1}: {partition}")

print("\n")

# Sorting
sorted_partitions = [sort_partition(p) for p in partitions]
for i, partition in enumerate(sorted_partitions):   # to print sorted partitions
  print(f"Sorted Partition {i+1}: {partition}")

Partition 1: [('banana', 2), ('banana', 1), ('Peach', 5)]
Partition 2: [('apple', 3), ('apple', 1)]


Sorted Partition 1: [('Peach', 5), ('banana', 2), ('banana', 1)]
Sorted Partition 2: [('apple', 3), ('apple', 1)]
