<a href="https://colab.research.google.com/github/Anilesh05/Anilesh/blob/main/Implement_k_means_clustering.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Download and install the apache hadoop

In [None]:
!apt-get install openjdk-8-jdk
!wget https://dlcdn.apache.org/hadoop/common/hadoop-3.3.6/hadoop-3.3.6.tar.gz
!tar fx hadoop-3.3.6.tar.gz
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["HADOOP_HOME"] = "/content/hadoop-3.3.6"
!ln -s /content/hadoop-3.3.6/bin/* /usr/bin

## *Create centroids.txt file*

In [None]:
%%writefile centroids.txt
1.0,2.0
5.0,6.0
10.0,11.0

## *Create points.txt file*

In [None]:
%%writefile points.txt
1.0,2.0
2.0,3.0
3.0,4.0
4.0,5.0
5.0,6.0
6.0,7.0
7.0,8.0
8.0,9.0
9.0,10.0
10.0,11.0
11.0,12.0
12.0,13.0
13.0,14.0
14.0,15.0
15.0,16.0

In [None]:
%%writefile mapper.py
#!/usr/bin/env python

import sys
import numpy as np

# Load centroids from file
centroids = np.loadtxt('centroids.txt', delimiter=',')

# Function to calculate Euclidean distance
euclidean_distance = lambda point1, point2: np.sqrt(np.sum((point1 - point2) ** 2))

# Input comes from STDIN
for line in sys.stdin:
    # Parse the data point
    point = np.array([float(field) for field in line.strip().split(',')])
    # Find the closest centroid for the data point
    closest_centroid = min(range(len(centroids)), key=lambda i: euclidean_distance(point, centroids[i]))
    # Emit the closest centroid ID and the data point
    print(f'{closest_centroid}\t{",".join(map(str, point))}')


In [None]:
%%writefile reducer.py
#!/usr/bin/env python

import sys
import numpy as np

# Initialize a dictionary to store centroids and their associated points
centroids = {}

# Read input from stdin
for line in sys.stdin:
    # Parse the centroid ID and point coordinates from the input line
    centroid_id, point_str = line.strip().split('\t', 1)
    # Convert the point coordinates to a NumPy array
    point = np.array(list(map(float, point_str.split(','))))
    # Add the point to the list of points associated with the centroid ID
    centroids.setdefault(centroid_id, []).append(point)

# Calculate new centroids
# Calculate new centroids
print("Cluster ID\tCluster Centroid(X,Y)")
print()

for centroid_id, points in centroids.items():
    # Compute the mean of all points associated with the centroid
    new_centroid = np.mean(points, axis=0)
    # Print the centroid ID and its new coordinates
    print(f'{centroid_id}\t\t{",".join(map(str, new_centroid))}')


In [None]:
!hdfs dfs -mkdir input

In [None]:
!hdfs dfs -mv points.txt input/

In [None]:
!hdfs dfs -cat centroids.txt

In [None]:
!hdfs dfs -cat input/points.txt

In [None]:
!hadoop jar /content/hadoop-3.3.6/share/hadoop/tools/lib/hadoop-streaming-3.3.6.jar \
    -files mapper.py,reducer.py \
    -mapper mapper.py \
    -reducer reducer.py \
    -input input \
    -output output

In [None]:
!cat output/part-00000