In [0]:
spark

#Creating spark object

In [0]:
sc = spark.sparkContext

#Fetching Data from CSV

In [0]:
customerdata = sc.textFile("/FileStore/tables/customerdata_csv.csv" )
customerdata.take(20)

Out[2]: ['1,41100,48.75,,48.75,0.930,41100,0.665499124,,',
 '2,54100,28.1,,28.1,0.059,54100,0.893169877,,49.614',
 '3,47800,46.75,,46.75,0.846,47800,0.782837128,,63.81',
 '4,19100,40.25,,40.25,0.572,19100,0.280210158,,',
 '5,18200,35.8,,35.8,0.384,18200,0.264448336,,',
 '6,19800,41.45,,41.45,0.622,19800,0.292469352,,',
 '7,51500,31.55,,31.55,0.205,51500,0.847635727,,',
 '8,39900,48.2,,48.2,0.907,39900,0.644483363,,',
 '9,13000,36.8,,36.8,0.426,13000,0.173380035,,',
 '10,31900,43.05,,43.05,0.690,31900,0.504378284,,',
 '11,51700,29.7,,29.7,0.127,51700,0.851138354,,',
 '12,56700,29.25,,29.25,0.108,56700,0.938704028,,',
 '13,58800,29.65,,29.65,0.124,58800,0.975481611,,',
 '14,46500,47.2,,47.2,0.865,46500,0.760070053,,',
 '15,55000,28.35,,28.35,0.070,55000,0.908931699,,',
 '16,18200,41.15,,41.15,0.610,18200,0.264448336,,',
 '17,46400,45.15,,45.15,0.778,46400,0.758318739,,',
 '18,46300,44.3,,44.3,0.743,46300,0.756567426,,',
 '19,22100,46.65,,46.65,0.842,22100,0.332749562,,',
 '20,53200,29,,2

#Splitting on the basis of ","

In [0]:
# Create tuples

def make_pair(i):
    i = i.split(',')
    return (int(i[0]), (int(i[1]), float(i[2])))


customerdata = customerdata.map(lambda i: make_pair(i))
customerdata.take(20)

Out[3]: [(1, (41100, 48.75)),
 (2, (54100, 28.1)),
 (3, (47800, 46.75)),
 (4, (19100, 40.25)),
 (5, (18200, 35.8)),
 (6, (19800, 41.45)),
 (7, (51500, 31.55)),
 (8, (39900, 48.2)),
 (9, (13000, 36.8)),
 (10, (31900, 43.05)),
 (11, (51700, 29.7)),
 (12, (56700, 29.25)),
 (13, (58800, 29.65)),
 (14, (46500, 47.2)),
 (15, (55000, 28.35)),
 (16, (18200, 41.15)),
 (17, (46400, 45.15)),
 (18, (46300, 44.3)),
 (19, (22100, 46.65)),
 (20, (53200, 29.0))]

#Finding minimum maximum values of income and age

In [0]:
# Finding out minimum and maximum of income and age for doing normalization

min_income = customerdata.sortBy(lambda row: row[1][0], True).first()[1][0]
max_income = customerdata.sortBy(lambda row: row[1][0], False).first()[1][0]

min_age = customerdata.sortBy(lambda row: row[1][1], True).first()[1][1]
max_age = customerdata.sortBy(lambda row: row[1][1], False).first()[1][1]


print("minimum income:" + str(min_income), "maximum income:" + str(max_income), "minimum age:" + str(min_age), "maximum age:" + str(max_age), sep = '\n')

minimum income:3100
maximum income:60200
minimum age:26.7
maximum age:50.4


#Normalization

In [0]:
# Normalizing the data for income and age columns

def normalize_data(row):
    normalized_income = (row[1][0] - min_income) / (max_income - min_income)
    normalized_age = (row[1][1] - min_age) / (max_age - min_age)
    return (row[0], (normalized_income, normalized_age))

normalized_customerdata = customerdata.map(lambda row: normalize_data(row))

normalized_customerdata.take(20)

Out[5]: [(1, (0.6654991243432574, 0.9303797468354431)),
 (2, (0.8931698774080561, 0.059071729957806)),
 (3, (0.7828371278458844, 0.8459915611814347)),
 (4, (0.28021015761821366, 0.5717299578059072)),
 (5, (0.26444833625218916, 0.3839662447257383)),
 (6, (0.29246935201401053, 0.6223628691983124)),
 (7, (0.8476357267950964, 0.20464135021097052)),
 (8, (0.6444833625218914, 0.9071729957805909)),
 (9, (0.1733800350262697, 0.4261603375527425)),
 (10, (0.5043782837127846, 0.6898734177215189)),
 (11, (0.851138353765324, 0.12658227848101267)),
 (12, (0.9387040280210157, 0.10759493670886079)),
 (13, (0.9754816112084063, 0.12447257383966243)),
 (14, (0.7600700525394045, 0.8649789029535867)),
 (15, (0.9089316987740805, 0.06962025316455706)),
 (16, (0.26444833625218916, 0.609704641350211)),
 (17, (0.7583187390542907, 0.7784810126582279)),
 (18, (0.7565674255691769, 0.7426160337552742)),
 (19, (0.3327495621716287, 0.8417721518987342)),
 (20, (0.8774080560420315, 0.09704641350210974))]

#Finding Eucledian distance

In [0]:
# Finding out the distance between 2 points in cartesian plan

import math

def find_eucledian_distance(p1, p2):
    return math.sqrt((p1[0] - p2[0])**2 + (p1[1] - p2[1])**2)

# Clustering data

In [0]:
def find_cluster(row):
    dist_from_c1 = find_eucledian_distance(c1, row[1])
    dist_from_c2 = find_eucledian_distance(c2, row[1])
    dist_from_c3 = find_eucledian_distance(c3, row[1])
    
    if dist_from_c1 < dist_from_c2 and dist_from_c1 < dist_from_c3:
        cluster = 1
    elif dist_from_c2 < dist_from_c1 and dist_from_c2 < dist_from_c3:
        cluster = 2
    else:
        cluster = 3
    
    return (cluster, (row[0], row[1][0], row[1][1]))

In [0]:
# find the centroid

def calc_centroid(clst):
    x, y = 0, 0
    for point in clst[1]:
        x += point[1]
        y += point[2]
    return (clst[0], (x/len(clst[1]), y/len(clst[1])))

In [0]:
def fetch_centroid_coordinate(centroid_list, cluster_number):
    for point in centroid_list:
        if point[0] == cluster_number:
            return (point[1][0], point[1][1])
    

In [0]:
c1 = (0.6654991243432574, 0.9303797468354431)
c2 = (0.8931698774080561, 0.059071729957806)
c3 = (0.7828371278458844, 0.8459915611814347)

#Iterating 40 times

In [0]:

c1 = (0.6654991243432574, 0.9303797468354431)
c2 = (0.8931698774080561, 0.059071729957806)
c3 = (0.7828371278458844, 0.8459915611814347)

for attempt in range(40):
    if attempt != 0: 
        centroid_list = centroid.collect()
        print(centroid_list)
        c1 = fetch_centroid_coordinate(centroid_list, 1)
        c2 = fetch_centroid_coordinate(centroid_list, 2)
        c3 = fetch_centroid_coordinate(centroid_list, 3)
            
        
    clustered_customerdata = normalized_customerdata.map(lambda row: find_cluster(row))
    #clustered_customerdata.take(10)
    centroid = clustered_customerdata.groupByKey().map(lambda clst: calc_centroid(clst))
    #centroid.take(10)

[(2, (0.8503336962181095, 0.12538487854943556)), (1, (0.3753599097628306, 0.6649502967889583)), (3, (0.7431489109790089, 0.7952397931895171))]
[(2, (0.8978457143828746, 0.11439514972758189)), (3, (0.7273029772329249, 0.8458227848101267)), (1, (0.2704605773918067, 0.5410196180782114))]
[(2, (0.9125043782837128, 0.11324894514767936)), (3, (0.7216941832588035, 0.8408094711400599)), (1, (0.266578800079441, 0.5249032145808867))]
[(2, (0.9125043782837128, 0.11324894514767936)), (3, (0.7216941832588035, 0.8408094711400599)), (1, (0.266578800079441, 0.5249032145808867))]
[(2, (0.9125043782837128, 0.11324894514767936)), (3, (0.7216941832588035, 0.8408094711400599)), (1, (0.266578800079441, 0.5249032145808867))]
[(2, (0.9125043782837128, 0.11324894514767936)), (3, (0.7216941832588035, 0.8408094711400599)), (1, (0.266578800079441, 0.5249032145808867))]
[(2, (0.9125043782837128, 0.11324894514767936)), (3, (0.7216941832588035, 0.8408094711400599)), (1, (0.266578800079441, 0.5249032145808867))]
[(2,

#Fetching 10 customers

In [0]:
def fetch_ten_records(clst):
    return list(clst[1])[:10]

In [0]:
ten_points_of_cluster = clustered_customerdata.groupByKey().map(lambda clst: fetch_ten_records(clst))
ten_points_of_cluster.take(10)

Out[13]: [[(2, 0.8931698774080561, 0.059071729957806),
  (7, 0.8476357267950964, 0.20464135021097052),
  (11, 0.851138353765324, 0.12658227848101267),
  (12, 0.9387040280210157, 0.10759493670886079),
  (13, 0.9754816112084063, 0.12447257383966243),
  (15, 0.9089316987740805, 0.06962025316455706),
  (20, 0.8774080560420315, 0.09704641350210974),
  (23, 0.8774080560420315, 0.10548523206751055),
  (29, 0.9387040280210157, 0.1329113924050634),
  (30, 0.9422066549912435, 0.17299578059071735)],
 [(1, 0.6654991243432574, 0.9303797468354431),
  (3, 0.7828371278458844, 0.8459915611814347),
  (8, 0.6444833625218914, 0.9071729957805909),
  (10, 0.5043782837127846, 0.6898734177215189),
  (14, 0.7600700525394045, 0.8649789029535867),
  (17, 0.7583187390542907, 0.7784810126582279),
  (18, 0.7565674255691769, 0.7426160337552742),
  (22, 0.7513134851138353, 0.7637130801687763),
  (24, 0.6672504378283712, 0.8691983122362869),
  (27, 0.6567425569176882, 0.7172995780590719)],
 [(4, 0.28021015761821366, 0

In [0]:
normalized_customerdata.map(lambda row: find_cluster(row)).groupByKey().map(lambda clst: calc_centroid(clst)).take(10)

Out[14]: [(2, (0.9125043782837128, 0.11324894514767936)),
 (3, (0.7216941832588035, 0.8408094711400599)),
 (1, (0.266578800079441, 0.5249032145808867))]

#Chaining

In [0]:
normalized_customerdata = sc.textFile( "/FileStore/tables/customerdata_csv.csv" ).map(lambda i: make_pair(i)).map(lambda row: normalize_data(row))

c1 = (0.6654991243432574, 0.9303797468354431)
c2 = (0.8931698774080561, 0.059071729957806)
c3 = (0.7828371278458844, 0.8459915611814347)

for attempt in range(40):
    
    if attempt != 0: 
        centroid_list = centroid.collect()
        #print(centroid_list)
        c1 = fetch_centroid_coordinate(centroid_list, 1)
        c2 = fetch_centroid_coordinate(centroid_list, 2)
        c3 = fetch_centroid_coordinate(centroid_list, 3)
    
    centroid = normalized_customerdata.map(lambda row: find_cluster(row)).groupByKey().map(lambda clst: calc_centroid(clst))
    ten_points_of_cluster = normalized_customerdata.map(lambda row: find_cluster(row)).groupByKey().map(lambda clst: fetch_ten_records(clst))

In [0]:
centroid.collect()

Out[16]: [(2, (0.9125043782837128, 0.11324894514767936)),
 (3, (0.7216941832588035, 0.8408094711400599)),
 (1, (0.266578800079441, 0.5249032145808867))]

In [0]:
ten_points_of_cluster.take(10)

Out[17]: [[(2, 0.8931698774080561, 0.059071729957806),
  (7, 0.8476357267950964, 0.20464135021097052),
  (11, 0.851138353765324, 0.12658227848101267),
  (12, 0.9387040280210157, 0.10759493670886079),
  (13, 0.9754816112084063, 0.12447257383966243),
  (15, 0.9089316987740805, 0.06962025316455706),
  (20, 0.8774080560420315, 0.09704641350210974),
  (23, 0.8774080560420315, 0.10548523206751055),
  (29, 0.9387040280210157, 0.1329113924050634),
  (30, 0.9422066549912435, 0.17299578059071735)],
 [(1, 0.6654991243432574, 0.9303797468354431),
  (3, 0.7828371278458844, 0.8459915611814347),
  (8, 0.6444833625218914, 0.9071729957805909),
  (10, 0.5043782837127846, 0.6898734177215189),
  (14, 0.7600700525394045, 0.8649789029535867),
  (17, 0.7583187390542907, 0.7784810126582279),
  (18, 0.7565674255691769, 0.7426160337552742),
  (22, 0.7513134851138353, 0.7637130801687763),
  (24, 0.6672504378283712, 0.8691983122362869),
  (27, 0.6567425569176882, 0.7172995780590719)],
 [(4, 0.28021015761821366, 0