In [None]:
import csv
import glob
import sys
import math
import pandas
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler,StandardScaler,OneHotEncoderEstimator,StringIndexer,PCA
from pyspark.ml.clustering import KMeans,BisectingKMeans
from pyspark.ml.evaluation import ClusteringEvaluator

In [None]:
sc=SparkContext()
spark=SQLContext(sc)

# Remove NULL Values

In [None]:
def remove_null(dataset,column_list=None):
    #col=["hours"]
    dataset=dataset.na.drop(how='any',subset=column_list)
    return dataset

# Identifying rows with Null values

In [None]:
def identify_null(dataset,column_name=None):
    with_null=dataset
    with_null=with_null.na.fill('Null')
    for i in column_name:
        with_null=with_null.filter(with_null[i] == "Null")
    return with_null

# Remove DUPLICATE Values

In [None]:
def remove_dup(dataset,):
    dataset=dataset.dropDuplicates()
    return dataset

# Trim Space

In [None]:
def trim_space(dataset,column_list=None):
    df=dataset
    column_names=dataset.columns
    for i in column_list:
        df=df.withColumn(i,trim(df[i]))
    return df


# CASE NORMALIZATION


1-Lower Case Normalization                         
2-Upper Case Normalization                        
Converting the contents in columns in upper or lower case

In [None]:
def lower_case(dataset,column_list=None):
    df=dataset
    #column_names=df.columns
    for i in column_list:
        df=df.withColumn(i,lower(df[i]))
    return df

def upper_case(dataset,column_list=None):
    df=dataset
    #column_names=df.columns
    for i in column_list:
        df=df.withColumn(i,lower(df[i]))
    return df

# StringIndexer

In [None]:
def stringIndex(dataset,column_name=None):
    df=dataset
    indexed=df
    i=list(column_name)
    i="location_1/human_address/city"
    d=["location_1/human_address/city","hours"]
    feature_list=[]
    for i in column_name:
        s="features_"+i
        feature_list.append(s)
        indexer = StringIndexer(inputCol=i, outputCol=s)
        indexed = indexer.setHandleInvalid("keep").fit(indexed).transform(indexed)
    return indexed,feature_list

# OneHotEncoding

In [None]:
def encoder(dataset,column_name=None):
    df=dataset
    encoder_columns=[]
    for i in column_name:
        name=i+"_encoded"
        encoder_columns.append(name)
    encoder = OneHotEncoderEstimator(inputCols=column_name,
                        outputCols=encoder_columns)
    model = encoder.fit(df)
    encoded = model.transform(df)
    return encoded,encoder_columns

# VECTOR ASSEMBLER

Converting various columns into single vector for clustering

In [None]:
def vector_assembler(dataset,column_name=None):
    df=dataset
    assembler = VectorAssembler(
        inputCols=column_name,
        outputCol="vector_features")

    output = assembler.transform(df)
    return output

# PCA

Training a model to project vectors to a low-dimensional space

In [None]:
def reduce_dimension(dataset,column_name=None):
    df=dataset
    pca = PCA(k=2, inputCol="vector_features", outputCol="features")
    model =pca.fit(df)
    df = model.transform(df)
    return df

# K-MEANS CLUSTERING

In [None]:
cluster_center={}
def distance_calc(point,center):
        global cluster_center
        print(type(point))
        x=float(point[0])
        y=float(point[1])
        center=cluster_center[int(center)]
        center_x=float(center[0])
        center_y=float(center[1])
        return (((x-center_x)**2 + (y-center_y)**2)**(1/2))
    
def threshold_calc(max_distance,min_distance):
        threshold=(float(max_distance)+float(min_distance))/2
        return threshold

def compare(point,center,threshold):
        global cluster_center
        #print(type(point))
        x=float(point[0])
        y=float(point[1])
        #print(x)
        #print(y)
        center=cluster_center[int(center)]
        center_x=float(center[0])
        center_y=float(center[1])
        dist=(((x-center_x)**2 + (y-center_y)**2)**(1/2))
        threshold=float(threshold)
        if dist>threshold:
            return True
        else:
            return False
    
def kmeans_cluster(df):
    dataset=df
    kvalue={}
    finalk=0
    for i in range(2,11):
        kmeans = KMeans(initMode="k-means||").setK(i)
        model = kmeans.fit(dataset)
        predictions = model.transform(dataset)
        evaluator = ClusteringEvaluator()
        silhouette = evaluator.evaluate(predictions)
        kvalue[i]=silhouette

    kvalue=dict(sorted(kvalue.items(), key=lambda x: x[1],reverse=True)[:1])
    for k in kvalue:
        finalk=k
    print(finalk)
    kmeans = KMeans(initMode="k-means||").setK(finalk)
    model = kmeans.fit(dataset)
    predictions = model.transform(dataset)
    # Shows the result.
    centers = model.clusterCenters()
    #print("Cluster Centers: ")
    index=0
    global cluster_center
    for center in centers:
        center=list(center)
        cluster_center[index]=list(center)
        index+=1
    #print(cluster_center)
    predict=predictions.select('features').collect()
    #predictions.select("features","prediction").show()
    
    identify = udf(distance_calc)
    threshold=predictions.select("features","prediction").groupBy("prediction").agg(max(identify("features","prediction")).alias("max_distance"),min(identify("features","prediction")).alias("min_distance"))
    #threshold.show()
        

    thresh=udf(threshold_calc)
    threshold=threshold.select("prediction",thresh("max_distance","min_distance").alias("threshold")).orderBy("prediction")
    
    test=udf(compare)
    outlier_df=predictions.join(threshold,"prediction","inner")
    outlier_df=outlier_df.select("features","prediction",test("features","prediction","threshold").alias("outlier")).orderBy("prediction")
    outlier_df=outlier_df.join(predictions,["features","prediction"],"inner")
    #outlier_df.filter(outlier_df["prediction"]==2).show()
    return outlier_df.distinct()


# BK-Means Clustering

In [None]:
def bkmeans_cluster(df):
    dataset=df
    kvalue={}
    finalk=0
    for i in range(2,11):
        bkm = BisectingKMeans().setK(i).setSeed(1)
        model = bkm.fit(dataset)
        predictions = model.transform(dataset)
        evaluator = ClusteringEvaluator()
        silhouette = evaluator.evaluate(predictions)
        kvalue[i]=silhouette

    kvalue=dict(sorted(kvalue.items(), key=lambda x: x[1],reverse=True)[:1])
    for k in kvalue:
        finalk=k
    print(finalk)
    bkm = BisectingKMeans().setK(i).setSeed(1)
    model = bkm.fit(dataset)
    predictions = model.transform(dataset)
    # Shows the result.
    centers = model.clusterCenters()
    #print("Cluster Centers: ")
    index=0
    global cluster_center
    for center in centers:
        center=list(center)
        cluster_center[index]=list(center)
        index+=1
    #print(cluster_center)
    predict=predictions.select('features').collect()
    #predictions.select("features","prediction").show()
    
    identify = udf(distance_calc)
    threshold=predictions.select("features","prediction").groupBy("prediction").agg(max(identify("features","prediction")).alias("max_distance"),min(identify("features","prediction")).alias("min_distance"))
    #threshold.show()
        

    thresh=udf(threshold_calc)
    threshold=threshold.select("prediction",thresh("max_distance","min_distance").alias("threshold")).orderBy("prediction")
    
    test=udf(compare)
    outlier_df=predictions.join(threshold,"prediction","inner")
    outlier_df=outlier_df.select("features","prediction",test("features","prediction","threshold").alias("outlier")).orderBy("prediction")
    outlier_df=outlier_df.join(predictions,["features","prediction"],"inner")
    return outlier_df.distinct()

# Compare results from K-Means and BK-Means

In [None]:
def compare_df(dataset1,dataset2):
    df1=dataset1.filter(dataset1["outlier"]==True).select("features")
    df2=dataset2.filter(dataset2["outlier"]==True).select("features")
    df3=df1.join(df2,"features","inner")
    df3=dataset1.join(df3,"features","left_outer")
    return df3.distinct()
    
    
    
    

# Parsing files one by one

In [None]:
name="datasets/rmmq-46n5.tsv"
dataset = spark.read.format("csv").options(header="true",inferschema="true",delimiter="\t").load(name)
l1=dataset.dtypes
m=dataset.count()
dataset=remove_dup(dataset)
j=0
column_names=dataset.columns
for i in column_names:
    print(str(j)+"-"+i)
    j+=1
print("Enter the number for the columns you want to transform")
while(True):
    column_send=[]
    while(True):
        n=int(input())
        s=column_names[n]
        column_send.append(s)
        print("Continue Y-yes N-no")
        decision=input()
        if(decision.upper()=="N"):
            break
            
        
    print("Which tranformation you want to perform")
    print("Trim-T Lowercase-L Uppercase-U RemoveNull-RN IdentifyNull-IN")
    transformation=input()
    if(transformation.upper()=="T"):
        dataset=trim_space(dataset,column_send)
    if(transformation.upper()=="L"):
        dataset=lower_case(dataset,column_send)
    if(transformation.upper()=="U"):
        dataset=upper_case(dataset,column_send)
    if(transformation.upper()=="RN"):
        dataset=remove_null(dataset,column_send)
    if(transformation.upper()=="IN"):
        dataset=identify_null(dataset,column_send)
    print("Continue Y-yes N-no")
    decision=input()
    if(decision.upper()=="N"):
        break
print("Enter the numbers for columns you want to perform clustering on")
col=input()
col=col.split(",")
indexing_list=[]
for i in col:
    s=column_names[int(i)]
    indexing_list.append(s)
stringlist=[]
numericlist=[]
for i in l1:
    if i[0] in indexing_list:
        if i[1]=="string":
            stringlist.append(i[0])
        else:
            numericlist.append(i[0])
if stringlist:
    result,feature_list=stringIndex(dataset,stringlist)
    result,encoded_names=encoder(result,feature_list)
    if numericlist:
        for i in numericlist:
            encoded_names.append(i)
        result=vector_assembler(result,encoded_names)
    else:
        result=vector_assembler(result,encoded_names)
    result=reduce_dimension(result)
    df1=kmeans_cluster(result)
    df2=bkmeans_cluster(result)
    df=compare_df(df1,df2)
    file_name=(name.split("/")[1]).split(".")[0]
    file_name="csvfiles/"+file_name+".csv"
    df.toPandas().to_csv(file_name)
    n=df.count()
    print("Before cleaning:",m)
    print("After cleaning:",n)
    
if not stringlist:
    print("hi")
    result=vector_assembler(dataset,numericlist)
    result=reduce_dimension(result)
    df1=kmeans_cluster(result)
    df2=bkmeans_cluster(result)
    df=compare_df(df1,df2)
    file_name=(name.split("/")[1]).split(".")[0]
    file_name="csvfiles/"+file_name+".csv"
    df.toPandas().to_csv(file_name)
    n=df.count()
    print("Before cleaning:",m)
    print("After cleaning:",n)
    