In [1]:
import time
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.sql.types import DoubleType

# minhash

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, ArrayType, IntegerType
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import MinHashLSH
from pyspark.sql.functions import udf,col
from pyspark.sql.types import LongType, FloatType

import random

config = SparkConf().setAppName("LSH").setMaster("local[8]")
sc = SparkContext.getOrCreate(config)
spark = SparkSession(sc)

In [3]:
import re

SHINGLE_LENGTH = 3
PRIME = 2147483647
SIMILARITY_THRESHOLD = 0.001
BAND_SIMILARITY_THRESHOLD = 0.001
NUMBER_OF_BANDS = 10
NUMBER_OF_ROWS = 5
NUMBER_OF_HASHFUNCTIONS = 20

class minLSH:
    def __init__(self, numHashTables=5, shingleSize=5, inputCol="features", outputCol="hashes"):
        self.numHashTables = numHashTables
        self.shingleSize = shingleSize
        self.inputCol = inputCol
        self.outputCol = outputCol
        self.hashFunctions = self.getHashFunctions(self.numHashTables)

    def getHashFunctions(self, number):
        result = []
        rand = random.Random()
        for _ in range(number):
            a = rand.randint(0, PRIME-1)
            b = rand.randint(0, PRIME-1)
            result.append(lambda x, a=a, b=b: ((a * x + b) % PRIME))
        return result
    
    def preprocessDocument(self, document):
        return document.strip().lower().replace("[^\\w\\s]", "").replace("\\s+", " ") if isinstance(document, str) else document

    def shingle(self, document):
        resultingList = []
        i = 0
        while i + self.shingleSize < len(document):
            resultingList.append(hash(document[i:i+self.shingleSize]))
            i += 1
        return resultingList

    def minHash(self, listOfShingles):
        result = []
        for ind in range(len(self.hashFunctions)):
            minVal = float("inf")
            for shingle in listOfShingles:
                hashResult = self.hashFunctions[ind](shingle)
                if hashResult < minVal:
                    minVal = hashResult
            result.append(minVal)
        return result
    
    def signatureToHashedBandsOfRows(self, signature, numberOfBands, numberOfRowsInBand):
        if len(signature) != numberOfBands * numberOfRowsInBand:
            raise Exception("Wrong arguments number of bands times number of rows should equal length of signature")
        i = 0
        bands = []
        while i + numberOfRowsInBand <= len(signature):
            bands.append(signature[i:i+numberOfRowsInBand])
            i += numberOfRowsInBand
        return [hash(tuple(band)) for band in bands]

    def fit(self, dataframe):
        return self

    def transform(self, dataframe):
        preprocessDocument_udf = udf(self.preprocessDocument, StringType())
        df = dataframe.withColumn("document", preprocessDocument_udf(self.inputCol))
        # 分词
        shingle_udf = udf(self.shingle, ArrayType(LongType()))
        df = df.withColumn("shingles", shingle_udf("document")).drop("document")
        # MinHash
        minHash_udf = udf(lambda x: self.minHash(x), ArrayType(LongType()))
        df = df.withColumn("minHash", minHash_udf("shingles")).drop("shingles")
        return df

    def approxSimilarityJoin(self, dataframeA, dataframeB, threshold, joinID="id"):
        # 相似度 udf
        jaccard_similarity = udf(lambda x, y: len(
                    set(x).intersection(set(y))) / len(set(x).union(set(y))), FloatType())

        # 相似度
        similarities = dataframeA.alias("left").join(dataframeB.alias("right"), on=joinID, how="inner") \
            .withColumn("similarity", jaccard_similarity(col("left.minHash"), col("right.minHash")))  \
            .filter(col("similarity") >= threshold)

        return similarities

In [4]:
manual_threshold = udf(lambda x: 1 if x>=3 else 0)

In [5]:
dfA = spark.read.csv("sick/train.csv", header=True).select("pair_ID", "sentence_A").withColumnRenamed("sentence_A", "sentence").withColumnRenamed("pair_ID", "id")
dfB = spark.read.csv("sick/train.csv", header=True).select("pair_ID", "sentence_B", "relatedness_score").withColumnRenamed("sentence_B", "sentence").withColumnRenamed("pair_ID", "id")

dfB = dfB.withColumn("relatedness_score", dfB["relatedness_score"].cast(DoubleType()))
dfB = dfB.withColumn("is_duplicate", manual_threshold("relatedness_score"))

dfA.show(5)
dfB.show(5)

+---+--------------------+
| id|            sentence|
+---+--------------------+
|  1|A group of kids i...|
|  2|A group of childr...|
|  3|The young boys ar...|
|  5|The kids are play...|
|  9|The young boys ar...|
+---+--------------------+
only showing top 5 rows

+---+--------------------+-----------------+------------+
| id|            sentence|relatedness_score|is_duplicate|
+---+--------------------+-----------------+------------+
|  1|A group of boys i...|              4.5|           1|
|  2|A group of kids i...|              3.2|           1|
|  3|The kids are play...|              4.7|           1|
|  5|A group of kids i...|              3.4|           1|
|  9|A group of kids i...|              3.7|           1|
+---+--------------------+-----------------+------------+
only showing top 5 rows



In [9]:
# num of partition 对时间的影响

for numPar in [5, 10, 20, 50]:
    A = dfA.repartition(numPar)
    B = dfB.repartition(numPar)
    
    start_time = time.time()
    
    model = minLSH(numHashTables=10, shingleSize=3, inputCol="sentence", outputCol="hashes")
    A = model.transform(A)
    B = model.transform(B)
    
    result = model.approxSimilarityJoin(A, B, 0, joinID="id")
    end_time = time.time()
    run_time = end_time - start_time
    
    print("Num of partition:", numPar, "; Time:", run_time)

Num of partition: 5 ; Time: 0.3506152629852295
Num of partition: 10 ; Time: 0.31075048446655273
Num of partition: 20 ; Time: 0.3353879451751709
Num of partition: 50 ; Time: 0.3309915065765381


In [7]:
# AUC

tmp = result.select("id", "is_duplicate", "similarity")
tmp = tmp.withColumn("is_duplicate", tmp["is_duplicate"].cast(DoubleType()))
metrics = BinaryClassificationMetrics(tmp.select("is_duplicate", "similarity").rdd.map(tuple))
metrics.areaUnderROC

                                                                                

0.6438155987559453

# simhash

In [12]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType, FloatType, ArrayType
from pyspark.ml.feature import HashingTF

spark = SparkSession.builder.appName("SimHashLSH").getOrCreate()

def preprocessDocument(document):
    return document.strip().lower().replace("[^\\w\\s]", "").replace("\\s+", " ")

# 定义 SimHash 函数
def simhash(document):
    document = preprocessDocument(document)
    # 将文档拆分为单词
    words = document.split(" ")

    # 计算每个单词的 SimHash 值
    hashes = []
    for word in words:
        word_hash = hash(word)
        # 使用 64 位 SimHash 值，将每个单词的哈希值转换为二进制表示，并填充到 64 位
        binary_hash = format(word_hash, "064b")
        # 将二进制表示的哈希值转换为 DenseVector，每个元素值为 -1 或 1
        vector = [1 if b == "1" else -1 for b in binary_hash]
        hashes.append(vector)
    # 将所有单词的 SimHash 值合并到一个数组中
    simhash_value = [0] * 64
    for v in hashes:
        for i in range(64):
            simhash_value[i] += v[i]
    simhash_value = [1 if x > 0 else 0 for x in simhash_value]
    return simhash_value

# 将 SimHash 函数注册为 UDF
simhash_udf = udf(simhash, ArrayType(IntegerType()))

# 定义 Hamming Distance 函数
def hamming_distance(simhash1, simhash2):
    # 计算两个 SimHash 值的 Hamming 距离
    distance = sum([1 for i in range(64) if simhash1[i] != simhash2[i]])
    return distance

# 将 Hamming Distance 函数注册为 UDF
hamming_distance_udf = udf(hamming_distance, IntegerType())

# 定义相似度计算函数 similarity = hamming_distance / 64
def similarity(simhash1, simhash2):
    length = len(simhash1)
    cnt = sum([1 for i in range(length) if simhash1[i] == simhash2[i]])
    return cnt / length

# 将相似度计算函数注册为 UDF
similarity_udf = udf(similarity, FloatType())

In [13]:
# 创建示例数据
dfA = spark.read.csv("sick/train.csv", header=True).select("pair_ID", "sentence_A")
dfB = spark.read.csv("sick/train.csv", header=True).select("pair_ID", "sentence_B", "relatedness_score")

dfB = dfB.withColumn("relatedness_score", dfB["relatedness_score"].cast(DoubleType()))
dfB = dfB.withColumn("is_duplicate", manual_threshold("relatedness_score"))

dfA.show(5)
dfB.show(5)

+-------+--------------------+
|pair_ID|          sentence_A|
+-------+--------------------+
|      1|A group of kids i...|
|      2|A group of childr...|
|      3|The young boys ar...|
|      5|The kids are play...|
|      9|The young boys ar...|
+-------+--------------------+
only showing top 5 rows

+-------+--------------------+-----------------+------------+
|pair_ID|          sentence_B|relatedness_score|is_duplicate|
+-------+--------------------+-----------------+------------+
|      1|A group of boys i...|              4.5|           1|
|      2|A group of kids i...|              3.2|           1|
|      3|The kids are play...|              4.7|           1|
|      5|A group of kids i...|              3.4|           1|
|      9|A group of kids i...|              3.7|           1|
+-------+--------------------+-----------------+------------+
only showing top 5 rows



In [15]:
for numPar in [5, 10, 20, 50]:
    
    # 改成多个分区数，对比运行时间
    dfA = dfA.repartition(numPar)
    dfB = dfB.repartition(numPar)
    
    start_time = time.time()
    
    # 对文档进行 SimHash 计算
    dfA = dfA.withColumn("simhash_A", simhash_udf(dfA["sentence_A"]))
    dfB = dfB.withColumn("simhash_B", simhash_udf(dfB["sentence_B"]))

    # 计算文档之间的相似度
    similar_documents = dfA.join(dfB, 'pair_ID') \
        .select('pair_ID', 'sentence_A', 'sentence_B', 
                similarity_udf(col("simhash_A"), col("simhash_B")).alias("similarity"),
                hamming_distance_udf(col("simhash_A"), col("simhash_B")).alias("distance"))
    
    end_time = time.time()
    run_time = end_time - start_time
    
    print("Num of partition:", numPar, "; Time:", run_time)

Num of partition: 5 ; Time: 0.05202341079711914
Num of partition: 10 ; Time: 0.04153585433959961
Num of partition: 20 ; Time: 0.04148983955383301
Num of partition: 50 ; Time: 0.039687395095825195


In [17]:
result = similar_documents.select("pair_ID", "similarity").join(dfB.select("pair_ID", "is_duplicate"), "pair_ID")
result.show(5)

+-------+----------+------------+
|pair_ID|similarity|is_duplicate|
+-------+----------+------------+
|   3491|  0.609375|           0|
|   8288|      0.75|           1|
|   2025|  0.515625|           0|
|   9570|    0.6875|           0|
|   7008|  0.890625|           1|
+-------+----------+------------+
only showing top 5 rows



In [20]:
# AUC

tmp = result.withColumn("is_duplicate", result["is_duplicate"].cast(DoubleType()))
metrics = BinaryClassificationMetrics(tmp.select("is_duplicate", "similarity").rdd.map(tuple))
metrics.areaUnderROC

                                                                                

0.6932290157442287