<a href="https://colab.research.google.com/github/Anilesh05/Big_Data_Laboratory/blob/main/13_implement_k_means_clustering.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Download and install the apache hadoop

In [None]:
!apt-get install openjdk-8-jdk
!wget https://dlcdn.apache.org/hadoop/common/hadoop-3.3.6/hadoop-3.3.6.tar.gz
!tar fx hadoop-3.3.6.tar.gz
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["HADOOP_HOME"] = "/content/hadoop-3.3.6"
!ln -s /content/hadoop-3.3.6/bin/* /usr/bin

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  fonts-dejavu-core fonts-dejavu-extra libatk-wrapper-java libatk-wrapper-java-jni libfontenc1
  libgail-common libgail18 libgtk2.0-0 libgtk2.0-bin libgtk2.0-common libice-dev librsvg2-common
  libsm-dev libxkbfile1 libxt-dev libxtst6 libxxf86dga1 openjdk-8-jdk-headless openjdk-8-jre
  openjdk-8-jre-headless x11-utils
Suggested packages:
  gvfs libice-doc libsm-doc libxt-doc openjdk-8-demo openjdk-8-source visualvm libnss-mdns
  fonts-nanum fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-microhei fonts-wqy-zenhei
  fonts-indic mesa-utils
The following NEW packages will be installed:
  fonts-dejavu-core fonts-dejavu-extra libatk-wrapper-java libatk-wrapper-java-jni libfontenc1
  libgail-common libgail18 libgtk2.0-0 libgtk2.0-bin libgtk2.0-common libice-dev librsvg2-common
  libsm-dev libxkbfile1 libxt-dev libxtst6 libxxf86dga1 openjdk-

## Create mapper.py

In [None]:
%%writefile mapper.py
#!/usr/bin/env python

import sys
import numpy as np

# Load centroids from file
centroids = np.loadtxt('centroids.txt', delimiter=',')

# Function to calculate Euclidean distance
euclidean_distance = lambda point1, point2: np.sqrt(np.sum((point1 - point2) ** 2))

# Input comes from STDIN
for line in sys.stdin:
    # Parse the data point
    point = np.array([float(field) for field in line.strip().split(',')])
    # Find the closest centroid for the data point
    closest_centroid = min(range(len(centroids)), key=lambda i: euclidean_distance(point, centroids[i]))
    # Emit the closest centroid ID and the data point
    print(f'{closest_centroid}\t{",".join(map(str, point))}')


Writing mapper.py


In [None]:
%%writefile reducer.py
#!/usr/bin/env python

import sys
import numpy as np

# Initialize a dictionary to store centroids and their associated points
centroids = {}

# Read input from stdin
for line in sys.stdin:
    # Parse the centroid ID and point coordinates from the input line
    centroid_id, point_str = line.strip().split('\t', 1)
    # Convert the point coordinates to a NumPy array
    point = np.array(list(map(float, point_str.split(','))))
    # Add the point to the list of points associated with the centroid ID
    centroids.setdefault(centroid_id, []).append(point)

# Calculate new centroids
# Calculate new centroids
print("Cluster ID\tCluster Centroid(X,Y)")
print()

for centroid_id, points in centroids.items():
    # Compute the mean of all points associated with the centroid
    new_centroid = np.mean(points, axis=0)
    # Print the centroid ID and its new coordinates
    print(f'{centroid_id}\t\t{",".join(map(str, new_centroid))}')


Writing reducer.py


## Create Input Directory

In [None]:
!mkdir input

## Write input file

In [None]:
%%writefile centroids.txt
1.0,2.0
5.0,6.0
10.0,11.0

Writing centroids.txt


In [None]:
%%writefile input/text.txt
1.0,2.0
2.0,3.0
3.0,4.0
4.0,5.0
5.0,6.0
6.0,7.0
7.0,8.0
8.0,9.0
9.0,10.0
10.0,11.0
11.0,12.0
12.0,13.0
13.0,14.0
14.0,15.0
15.0,16.0

Writing input/text.txt


## Run hadoop mapreduce

In [None]:
!hadoop jar /content/hadoop-3.3.6/share/hadoop/tools/lib/hadoop-streaming-3.3.6.jar \
    -files mapper.py,reducer.py \
    -mapper mapper.py \
    -reducer reducer.py \
    -input input \
    -output output1

2024-04-14 16:29:48,325 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2024-04-14 16:29:48,428 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2024-04-14 16:29:48,429 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2024-04-14 16:29:48,454 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized!
2024-04-14 16:29:48,757 INFO mapred.FileInputFormat: Total input files to process : 1
2024-04-14 16:29:48,779 INFO mapreduce.JobSubmitter: number of splits:1
2024-04-14 16:29:48,948 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1300433353_0001
2024-04-14 16:29:48,948 INFO mapreduce.JobSubmitter: Executing with tokens: []
2024-04-14 16:29:49,289 INFO mapred.LocalDistributedCacheManager: Localized file:/content/mapper.py as file:/tmp/hadoop-root/mapred/local/job_local1300433353_0001_7ee1d4f0-c5d6-4a70-bccd-a149a9469313/mapper.py
2024-04-14 16:29:49,320 INFO mapred.LocalDistributedCacheMa

## Display Output

In [None]:
!cat output1/part-00000

Cluster ID	Cluster Centroid(X,Y)
	
0		2.0,3.0
1		5.5,6.5
2		11.5,12.5
