In [None]:
!wget https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar -xvf spark-3.2.1-bin-hadoop3.2.tgz
!java -version
!pip install findspark

In [2]:
import os 
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"
import findspark
findspark.init()
from pyspark.sql import SparkSession
import findspark
findspark.init()
from pyspark.sql import SparkSession

## You can add more config while building 
spark = SparkSession.builder.master("local[8]").\
                    config("spark.app.name","session_one").\
                    getOrCreate() #number of threads = 16

In [3]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix
import numpy as np


In [4]:
def dataframe2NumpyArray(df, colName):
    '''
    convert spark dataframe to numpy array
    '''
    return np.array(df.select(colName).collect())


def numpyArray2Matrix(array):
    '''
    convert numpy array to spark Rowmatrix
    ----------------------------
    return: Rowmatrix
    '''
    if len(array.shape) == 3:
        array.reshape((array.shape[0], array.shape[-1]))
    
    
    denseVectorList = []
    for i in range(len(array)):
        denseVectorList.append(Vectors.dense(array[i]))
        
    
    RDD = spark.sparkContext.parallelize(denseVectorList)
    normVectors = RDD.map(lambda x: x/(np.linalg.norm(x, 2)))
    
    RDD = spark.sparkContext.parallelize(normVectors.collect())
    matrix = RowMatrix(RDD)
        
    return matrix


def SVDsimilarity(matrix, numDimension = 1, normalization = False):
    '''
    generalized cosine similarity using SVD(singular value decomposition)
    by doing SVD, the input matrix Y will be decomposited into 3 matrix: U, S, V, with Y = USV^T
    where S can be considered as a lower rank approximation of Y
    the SVD optimal in the sense that minimizing the Frobinius norm of reconstruction error || \hat{Y} - Y ||^{2}_{F}
    therefore, by comparing the 'order K coefficient of determination' \frac{||\hat{Y} ||^2_F}{||Y ||^2_F}, we shall a similarity.
    ----------------------
    in the case of only 2 vectors, the SVD similarity is equal to the cosine similarity
    ----------------------
    the original SVD similarity is ranged from 1/n to 1, where n is the number of vectors
    to get it can range over the entire [0,1] interval, one can normalize it by \frac{}{} if only using the first sigular value
    
    ------------------------------------------
    matrix: pyspark RowMatrix, represents a row-oriented distributed Matrix with no meaningful row indices
            each column/row is an input vector
            all element in matrix should be positive
    numDimension: integer, if not 1 then use the first(largest) few singular value
    normalization: if true then do normalization
    
    '''
    N = matrix.numRows()
    # SVD
    svd = matrix.computeSVD(numDimension, computeU=False)
    sVector = svd.s.toArray()
    YApproximate = np.sum(sVector*sVector)**0.5
    
    GramianMatrix = matrix.computeGramianMatrix().toArray()
    Y = np.trace(GramianMatrix)**0.5
    
    # normalization
    if not normalization:
        similarityScore = YApproximate/Y
    else:
        similarityScore = ((YApproximate/Y * N) - 1)/(N -1)
        
    # return 2 * similarityScore**2 - 1
    # double angle formula
    return 2 * similarityScore**2 - 1

In [5]:
def crossHomogeneityScore(df, queryColName, featureColName):
    '''
    
    '''
    if queryColName not in df.schema.names:
        
        npArray = dataframe2NumpyArray(df, featureColName)
        matrix = numpyArray2Matrix(npArray)
        similarity = SVDsimilarity(matrix)
        
        return similarity
    
    
    totalRows = df.count()
    queries = list(set(df.select(queryColName).collect()))
    
    homogeneityScore = 0
    
    for query in queries:
        # get each cluster
        dfQuery = df.filter(df[queryColName] == query.query)
        # get number of rows
        numRows = dfQuery.count()
        
        npArray = dataframe2NumpyArray(dfQuery, featureColName)
        matrix = numpyArray2Matrix(npArray)
        
        similarity = SVDsimilarity(matrix)
        homogeneityScore += similarity * numRows/totalRows
    
    return homogeneityScore

In [57]:
df = spark.read.csv("cities.csv",header=True,inferSchema=True)
df.show()

+----+-------+-------+-----+-------+-------+-------+-----+------------------+--------+
|LatD| "LatM"| "LatS"| "NS"| "LonD"| "LonM"| "LonS"| "EW"|            "City"| "State"|
+----+-------+-------+-----+-------+-------+-------+-----+------------------+--------+
|41.0|    5.0|   59.0|  "N"|   80.0|   39.0|    0.0|  "W"|      "Youngstown"|      OH|
|42.0|   52.0|   48.0|  "N"|   97.0|   23.0|   23.0|  "W"|         "Yankton"|      SD|
|46.0|   35.0|   59.0|  "N"|  120.0|   30.0|   36.0|  "W"|          "Yakima"|      WA|
|42.0|   16.0|   12.0|  "N"|   71.0|   48.0|    0.0|  "W"|       "Worcester"|      MA|
|43.0|   37.0|   48.0|  "N"|   89.0|   46.0|   11.0|  "W"| "Wisconsin Dells"|      WI|
|36.0|    5.0|   59.0|  "N"|   80.0|   15.0|    0.0|  "W"|   "Winston-Salem"|      NC|
|49.0|   52.0|   48.0|  "N"|   97.0|    9.0|    0.0|  "W"|        "Winnipeg"|      MB|
|39.0|   11.0|   23.0|  "N"|   78.0|    9.0|   36.0|  "W"|      "Winchester"|      VA|
|34.0|   14.0|   24.0|  "N"|   77.0|   55.0

In [18]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Row
import pyspark.sql.functions as f
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler

def listOfFrequencyTables(df): #take main dataframe, generate frequency dataframes
    '''
    
    '''
    histograms = []
    for col in df.dtypes:
        h=df.groupBy(col[0]).count()
        h = h.sort(desc("count"))
        histograms.append(h)
        #h.show() #comment this line to suppress output
    return histograms

histograms = listOfFrequencyTables(df)

In [21]:
def getDecompFromTopFrequencies(df, histograms):
    clusterlst=[]
    for i in range(len(histograms)): #query database with top values of all columns
        d= str(histograms[i].first()) #value of the first row
        #print(d)
        d = d.split(",")[0].split('=')[1] #the splits are for formatting the string
        #print(d)
        #print("d before:",d)
        if "'"  in d:
            d = d.split("'")[1]
        
        #print("d after:",d)
        #print(type(d))
        cname = str(histograms[i][0]).split("'")[1]
        #print(cname,"=",d)
        
        data = (df.filter(col(cname) == d))
        
        #data.show(15)
        clusterlst.append(data)
    return clusterlst

clusterlst = getDecompFromTopFrequencies(df, histograms)


In [30]:
def are_dfs_equal(df1, df2): #this works, i tested it
    res = df1.subtract(df2) #set subtraction on the two dataframes. 
    if res.count() == 0: #subtraction yielded empty set
        print("dataframes are equal")
        return True
    else:
        print("error! these rows are not in the union of your queries:")
        res.show() #show which tuples are not included in your query union
        return False

from functools import reduce
from pyspark.sql.functions import lit
from pyspark.sql import DataFrame

def getDecompUsingFreqTable(df,freqdf): #takes original database and one frequency table as input, returns union of all queried dataframes as output
    #print("this is frequency table:")
    #freqdf.show()
    cname = freqdf.columns[0]
    valuelist = (freqdf.select(freqdf.columns[0]).rdd.flatMap(lambda x: x).collect()) #list of all values of frequency column
    
    unionlst = []
    
    for v in valuelist: #each unique value in the freq. table is used as a query
        result = df.filter(col(cname) == v)
        querystr = cname + "=" + str(v)
        #print("querystr:",querystr)
        
        containsquery = False
        for c in df.columns: #check if query column exists in the input dataframe
            if "query" in c:
                containsquery= True
        
        if(containsquery): #check if query column already exists in the input
            result=result.withColumn("query1",lit(querystr))
            #print("first newres:")
            
            result= result.withColumn("joined",concat(concat(col("query"), lit(","), col("query1")))) #putting query with existing queries
            #print("after join")
            
            columns_to_drop = ['query', 'query1']
            
            result=result.drop('query')
            result=result.drop('query1')
            result=result.withColumnRenamed("joined","query")
            
            result=result
        else:
            #print("creating query column:")
            result=result.withColumn("query",lit(querystr))
        print(querystr)
        
        unionlst.append(result)
    unn = reduce(DataFrame.unionAll, unionlst) #put all queried dataframes back together as one
    
    
    return unn


#union = (getDecompUsingFreqTable(df,histograms[0])) #function call with 'Name' frequency table
#print("equality result:")
#subdf = are_dfs_equal(df,union.drop('query')) #checks if union of queries covers whole database
#print(subdf)
#print("\nnext run:")
#union1 = (getDecompUsingFreqTable(union,histograms[1]))

#union2 = (getDecompUsingFreqTable(union1,histograms[2]))

#print("after queries:\n")
#union.show(10,False)
#union1.show(10,False)
#union2.show(10,False)

In [None]:
#histograms[0].show()

In [59]:
def addFeatureVector(df): #get feature vector for any dataframe for homogeneity function
    string_cols = [c for c, t in df.dtypes if t =='string' and c != 'query'] #get all columns that have stringtype, except query column
    
    stringindex_cols = [(i + "_indexed") for i in string_cols]
    indexer  = StringIndexer( inputCols=string_cols, outputCols=stringindex_cols, handleInvalid='error', stringOrderType='frequencyDesc')
    indexer.setHandleInvalid("keep") #change to "skip" to remove problematic rows
    indexed = indexer.fit(df).transform(df) #dataframe with indexed columns attached
    
    allnonstringcols = [column.name for column in indexed.schema if column.dataType != StringType()]
    vecAssembler = VectorAssembler(outputCol="features")
    
    # normalizaing
    #for col in allnonstringcols:
    #    maxValue = indexed.select(max(col)).collect()[0][0]
    #    minValue = indexed.select(min(col)).collect()[0][0]
    #    indexed = indexed.withColumn(col + '_normalized', (indexed[col] - minValue)/(maxValue - minValue))
    
    #allnonstringcols = [col + '_normalized' for col in allnonstringcols]
    vecAssembler.setInputCols(allnonstringcols) #all numerical columns are put into feature vector, including indexed cols
    result=  ( vecAssembler.transform(indexed)) #return the dataframe with feature column attached
    
    for col in allnonstringcols:
        result = result.drop(col)
    for col in stringindex_cols:
        result = result.drop(col)
    return result

#union2withvec = addFeatureVector(union2)
#union2withvec.show()

In [None]:
#crossHomogeneityScore(union2withvec, 'query', 'features')

In [26]:
def shuffle(hist):
    return sorted(hist, key = lambda x: x.count())


def getDecompositionbyColumn(df, K):
    '''
    compute the final decomposition
    
    '''
    histograms = shuffle(listOfFrequencyTables(df))
    decomUnion = df
    colLeft = len(histograms)
    decomUnionWithVec = addFeatureVector(decomUnion)
    overAllHomoScore = crossHomogeneityScore(decomUnionWithVec, 'query', 'features')
    nBuckets = 1
    
    ifUpdated = True
    while nBuckets <= K and colLeft > 0 and ifUpdated:
        # at least one column left
        # fewer groups than K

        ifUpdated = False
        
        for freqdf in histograms:
            
            unionWithVec = getDecompUsingFreqTable(decomUnionWithVec, freqdf)
            # unionWithVec.show()
            crossScore =  crossHomogeneityScore(unionWithVec, 'query', 'features')
            
            nBuckets = len(unionWithVec.select('query').distinct().collect())
            
            if crossScore > overAllHomoScore and nBuckets <= K :
                # 
                overAllHomoScore = crossScore
                coldf = freqdf
                ifUpdated = True
                
        if ifUpdated:
            decomUnionWithVec = getDecompUsingFreqTable(decomUnionWithVec, freqdf)
            histograms.remove(coldf)
        colLeft = len(histograms)
        
    nBuckets = len(decomUnionWithVec.select('query').distinct().collect())
    print('user requested K =', str(K), ', but we can only got ', str(nBuckets), 'clusters.') 
    return decomUnionWithVec.drop('features')
            
            
        
            
        
    

In [None]:
#result = getDecompositionbyColumn(df, 3)

In [None]:
#result.show(result.count(), False)

In [29]:
#result.count()

18

In [60]:

def newDecomp(df):
  histograms = (listOfFrequencyTables(df))

  numDistincts = []
  for h in histograms: #put number of distinct items in each histogram into a list
    numDistincts.append(h.count())
  numDistincts.sort(reverse=True) #sort in descending order
  midIdx = len(numDistincts)//2 #median value index
  print(numDistincts)
  for h in histograms:
    if h.count() is numDistincts[midIdx]: 
      print("count=",h.count())
      midhist = h #histogram with median number of distinct values
  midhist.show()
  return midhist

midhisto = newDecomp(df)
queryresult = getDecompUsingFreqTable(df, midhisto)
queryresult.show()
decomUnionWithVec = addFeatureVector(queryresult)
decomUnionWithVec.show()
crossScore =  crossHomogeneityScore(decomUnionWithVec, 'query', 'features')
print(crossScore)
#newDecomp(df)

[120, 53, 51, 47, 44, 25, 10, 10, 1, 1]
count= 25
+----+-----+
|LatD|count|
+----+-----+
|41.0|   12|
|38.0|   11|
|42.0|   10|
|39.0|   10|
|37.0|    9|
|43.0|    9|
|44.0|    9|
|32.0|    8|
|40.0|    8|
|33.0|    7|
|46.0|    4|
|47.0|    4|
|34.0|    4|
|36.0|    3|
|35.0|    3|
|31.0|    3|
|49.0|    2|
|29.0|    2|
|45.0|    2|
|27.0|    2|
+----+-----+
only showing top 20 rows

LatD=41.0
LatD=38.0
LatD=42.0
LatD=39.0
LatD=44.0
LatD=37.0
LatD=43.0
LatD=40.0
LatD=32.0
LatD=33.0
LatD=47.0
LatD=34.0
LatD=46.0
LatD=35.0
LatD=36.0
LatD=31.0
LatD=49.0
LatD=29.0
LatD=45.0
LatD=27.0
LatD=30.0
LatD=50.0
LatD=48.0
LatD=28.0
LatD=26.0
+----+-------+-------+-----+-------+-------+-------+-----+---------------+--------+---------+
|LatD| "LatM"| "LatS"| "NS"| "LonD"| "LonM"| "LonS"| "EW"|         "City"| "State"|    query|
+----+-------+-------+-----+-------+-------+-------+-----+---------------+--------+---------+
|41.0|    5.0|   59.0|  "N"|   80.0|   39.0|    0.0|  "W"|   "Youngstown"|      

In [62]:
from pyspark.sql.functions import count_distinct
decomUnionWithVec.select(count_distinct("query")).show()


+---------------------+
|count(DISTINCT query)|
+---------------------+
|                   25|
+---------------------+

