# **Xử lý dữ liệu lớn** - 504048

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive



Thành viên nhóm:

*   Nguyễn Quốc Anh     - **52100871**
*   Võ Phú Vinh         - **52100947**
*   Lục Minh Hiếu       - **52100889**
*   Nguyễn Vũ Tường     - **52100944**




## Cài đặt môi trường PySpark

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
# !cp drive/MyDrive/TĐT/MMDS/spark-3.1.1-bin-hadoop3.2.tgz .
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [3]:
import os
import findspark

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

findspark.init()

In [1]:
import os
import findspark

os.environ["JAVA_HOME"] = "C:\Program Files\Java\jdk-11"
os.environ["SPARK_HOME"] = "C:\spark\spark-3.1.1-bin-hadoop3.2"

findspark.init()

In [2]:
import pyspark

print(pyspark.__version__)

3.1.1


## Câu 1: In-memory MinHashLSH

### Import các thư viện cần thiết

In [3]:
import pandas as pd
import numpy as np
import random
import hashlib

from collections import defaultdict

In [4]:
# Read the text file line by line
with open('WebOfScience-5736.txt', 'r') as file:
    lines = file.readlines()

# Create a DataFrame with a single column containing the sentences
df = pd.DataFrame({'abstract': [line.strip() for line in lines]})

# Display the DataFrame
print(df)

                                               abstract
0     Phytoplasmas are insect-vectored bacteria that...
1     Background: (-)-alpha-Bisabolol, also known as...
2     A universal feature of the replication of posi...
3     1,2-Dichloropropane (1,2-DCP) and dichlorometh...
4     This paper presents the simulation results of ...
...                                                 ...
5731  The intercalation of L-phenylalanate (LP) into...
5732  There is current interest in harnessing the co...
5733  Aim: The zinc finger antiviral protein (ZAP) i...
5734  The present article reviews the biotechnologic...
5735  This paper focuses on a new kind of artificial...

[5736 rows x 1 columns]


### Tạo lớp đối tượng InMemoryMinHashLSH

In [5]:
class InMemoryMinHashLSH():
    def __init__(
            self,
            documents: pd.DataFrame,
            k = 2,
            num_per = 10,
            bands = 10,
        ):
        self.documents = documents
        self.k = k
        self.num_per = num_per
        self.bands = bands

    @staticmethod
    def __shingle(text: str, k: int) -> set:
        shingle_set = []
        text = text.lower()
        for i in range(len(text) - k+1):
            shingle_set.append(text[i:i+k])
        return set(shingle_set)

    @staticmethod
    def __convert_to_bool_vector(text: str, shingle_idx: dict, k: int) -> set:
        shingle_set = InMemoryMinHashLSH.__shingle(text, k)
        res = [0] * len(shingle_idx.keys())
        for s in shingle_set:
            res[shingle_idx[s]] = 1
        return res

    def shingling(self, documents: pd.DataFrame, k = 2) -> pd.DataFrame:
        all_shingle_set = []
        for text in documents['abstract'].values:
            for i in InMemoryMinHashLSH.__shingle(text, k):
                all_shingle_set.append(i)

        all_shingle_set = sorted(set(all_shingle_set))

        shingle_idx = dict()
        for idx, s in enumerate(all_shingle_set):
            shingle_idx[s] = idx

        self.all_shingle_set = all_shingle_set
        self.shingle_idx = shingle_idx

        result = documents['abstract']\
                    .apply(
                        InMemoryMinHashLSH.__convert_to_bool_vector,
                        args=(shingle_idx, k)
                    )

        return result.to_frame()

    def minhashing_not_permute(self, bool_vectors: pd.DataFrame, num_per=10) -> pd.DataFrame:
        bool_len = len(bool_vectors['abstract'][0])
        list_idx = list(range(bool_len))
        hash_func = lambda x, seed: hashlib.sha256(
                        str(x).encode('utf-8') + \
                        str(seed).encode('utf-8')
                    ).hexdigest()

        # Create a new DataFrame with each boolean value as a new column
        bool_vectors_t = pd.DataFrame(bool_vectors['abstract'].tolist())\
                           .transpose()

        # Create a new DataFrame with each boolean value as a new column
        all_hashed_idx = []
        for i in range(num_per):
            hashed_idx = map(lambda x: int(hash_func(x, i), 16) % 1000, list_idx)
            hashed_idx = pd.Series(list(hashed_idx), name = 'hash' + str(i + 1))
            all_hashed_idx.append(hashed_idx)
        all_hashed_idx.append(bool_vectors_t)

        bool_vectors_t = pd.concat(all_hashed_idx, axis = 1)

        int_max = float('inf')  # Maximum integer value

        # Create a DataFrame with dimensions n x m, filled with int_max
        # Where n: number of document
        # m is number of permutation
        signatures = pd.DataFrame([[int_max] * len(bool_vectors)] * num_per)

        for row in bool_vectors_t.values:
            true_column = []
            hashed_row = row[-num_per:]
            for idx, v in enumerate(row[:len(row) - num_per]):
                if v == 1:
                    true_column.append(idx)
            if len(true_column) == 0:
                continue

            for col in true_column:
                signatures[col] = [min(v, hashed_row[idx]) for idx, v in enumerate(signatures[col])]

        return signatures.transpose().apply(lambda x: list(x), axis = 1)

    def generate_all_permutation(self, num_per : int, bool_len: int) -> list:
        res = []
        list_idx = list(range(bool_len))
        for per in range(num_per):
            random.Random(10).shuffle(list_idx)
            res.append(list_idx.copy())

        return res

    def minhashing(self, bool_vectors: pd.DataFrame, num_per=10) -> pd.DataFrame:
        number_documents = len(bool_vectors)
        bool_len = len(bool_vectors['abstract'][0])
        bool_vector_t = pd.DataFrame(bool_vectors['abstract'].tolist())\
                           .transpose()
        signatures = pd.DataFrame([[-1] * number_documents] * num_per)

        self.all_permutations = self.generate_all_permutation(num_per, bool_len)
        for per in range(num_per):
            list_idx = self.all_permutations[per]
            for doc in range(number_documents):
                first_idx = -1
                for i in range(bool_len):
                    if bool_vector_t[doc][list_idx[i]] == 1:
                        first_idx = i
                        break
                signatures[doc][per] = first_idx

        return signatures.transpose().apply(lambda x: list(x), axis = 1)

    def jaccard_distance(self, x: set, y: set) -> float:

        return 1 - (len(x.intersection(y)) / len(x.union(y)))

    def locality_sensity_hashing(self, signatures: pd.DataFrame, bands = 10) -> pd.DataFrame:
        assert len(signatures[0]) % bands == 0, "Can not use this number for bands."

        num_documents = len(signatures)
        band_size = int(len(signatures[0]) / bands)

        self.band_size = band_size

        buckets = [[] for _ in range(num_documents)]

        signatures_t = pd.DataFrame(signatures.tolist())\
                           .transpose()


        hash_func = lambda x: hashlib.sha256(str(x).encode('utf-8')).hexdigest()

        # Hash bands into buckets
        for b in range(bands):
            sigs_of_band = signatures_t.iloc[band_size * b: band_size * (b + 1), : num_documents]
            for idx in range(num_documents):
                hash_val = int(hash_func(",".join(str(v) for v in sigs_of_band[idx].tolist())), 16) % 200
                buckets[idx].append(hash_val)

        res = []

        for doc in range(num_documents):
            sig = signatures_t.iloc[:,:][doc].tolist()
            res.append({
                "signature": sig,
                "bucket_id": buckets[doc]
            })

        res = pd.DataFrame(res, columns = ["signature", "bucket_id"])

        return res

    def run(self) -> None:
        self.bool_vectors = self.shingling(self.documents, k = self.k)
        self.signatures = self.minhashing(self.bool_vectors, num_per = self.num_per)
        self.buckets = self.locality_sensity_hashing(self.signatures, bands = self.bands)

    def approxNearestNeighbors(self, key: str, n: int) -> pd.DataFrame:
        bool_vector = InMemoryMinHashLSH.__convert_to_bool_vector(key, self.shingle_idx, self.k)
        signature = []
        for list_idx in self.all_permutations:
            first_idx = -1
            for i in range(len(bool_vector)):
                if bool_vector[list_idx[i]] == 1:
                    first_idx = i
                    break
            signature.append(first_idx)

        hash_func = lambda x: hashlib.sha256(str(x).encode('utf-8')).hexdigest()
        candidate = []
        bucket = []
        for b in range(self.bands):
            sig_band = signature[b * self.band_size : (b + 1) * self.band_size]
            hash_val = int(hash_func(",".join(str(v) for v in sig_band)), 16) % 200
            bucket.append(hash_val)

        for row in self.buckets.iterrows():
            for i in range(len(bucket)):
                if bucket[i] == row[1]["bucket_id"][i]:
                    candidate.append((row[0], row[1]["signature"]))
                    break

        all_sim_c = []

        for c in candidate:
            sim = self.jaccard_distance(set(signature) , set(c[1]))
            all_sim_c.append((c[0], self.documents["abstract"][c[0]], sim))

        return pd.DataFrame(all_sim_c, columns = ["id", "abstract", "dis_col"])\
                    .sort_values(by=['dis_col'])\
                    .reset_index(drop = True)\
                    .head(n)


In [6]:
lsh = InMemoryMinHashLSH(df, k = 3, num_per = 100, bands = 20)
lsh.run()

In [7]:
len(lsh.bool_vectors["abstract"][0])

31046

In [8]:
lsh.bool_vectors

Unnamed: 0,abstract
0,"[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."
1,"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."
2,"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."
3,"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."
4,"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."
...,...
5731,"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."
5732,"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."
5733,"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."
5734,"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."


In [9]:
lsh.signatures

0       [7, 5, 90, 107, 49, 34, 0, 34, 4, 76, 7, 61, 1...
1       [1, 10, 90, 15, 55, 13, 37, 34, 4, 29, 8, 43, ...
2       [7, 5, 8, 1, 55, 34, 79, 22, 4, 76, 28, 64, 52...
3       [4, 5, 86, 27, 86, 34, 102, 34, 31, 76, 8, 72,...
4       [87, 102, 160, 3, 116, 37, 132, 34, 4, 76, 8, ...
                              ...                        
5731    [7, 25, 18, 107, 37, 34, 27, 22, 4, 14, 16, 77...
5732    [7, 5, 90, 1, 55, 34, 142, 22, 4, 76, 8, 84, 1...
5733    [7, 5, 18, 1, 37, 127, 83, 22, 4, 76, 7, 72, 2...
5734    [1, 10, 90, 1, 84, 239, 132, 22, 59, 3, 16, 84...
5735    [179, 145, 355, 3, 199, 239, 79, 37, 114, 118,...
Length: 5736, dtype: object

In [10]:
lsh.buckets

Unnamed: 0,signature,bucket_id
0,"[7, 5, 90, 107, 49, 34, 0, 34, 4, 76, 7, 61, 1...","[39, 171, 117, 33, 168, 167, 0, 125, 107, 29, ..."
1,"[1, 10, 90, 15, 55, 13, 37, 34, 4, 29, 8, 43, ...","[97, 160, 195, 133, 53, 8, 184, 150, 189, 182,..."
2,"[7, 5, 8, 1, 55, 34, 79, 22, 4, 76, 28, 64, 52...","[192, 101, 57, 130, 162, 6, 60, 78, 108, 64, 7..."
3,"[4, 5, 86, 27, 86, 34, 102, 34, 31, 76, 8, 72,...","[140, 22, 11, 126, 196, 151, 126, 66, 180, 147..."
4,"[87, 102, 160, 3, 116, 37, 132, 34, 4, 76, 8, ...","[199, 184, 62, 5, 98, 72, 142, 110, 197, 145, ..."
...,...,...
5731,"[7, 25, 18, 107, 37, 34, 27, 22, 4, 14, 16, 77...","[167, 53, 50, 175, 29, 193, 14, 154, 42, 101, ..."
5732,"[7, 5, 90, 1, 55, 34, 142, 22, 4, 76, 8, 84, 1...","[123, 155, 133, 179, 162, 13, 25, 73, 100, 160..."
5733,"[7, 5, 18, 1, 37, 127, 83, 22, 4, 76, 7, 72, 2...","[147, 31, 18, 21, 64, 107, 174, 19, 175, 12, 1..."
5734,"[1, 10, 90, 1, 84, 239, 132, 22, 59, 3, 16, 84...","[26, 93, 0, 165, 181, 177, 12, 65, 18, 178, 15..."


In [11]:
key = """Phytoplasmas are insect-vectored bacteria that cause disease in a wide range of plant species. The increasing availability of molecular DNA analyses, expertise and additional methods in recent years has led to a proliferation of discoveries of phytoplasma-plant host associations and in the numbers of taxonomic groupings for phytoplasmas. The widespread use of common names based on the diseases with which they are associated, as well as separate phenetic and taxonomic systems for classifying phytoplasmas based on variation at the 16S rRNA-encoding gene, complicates interpretation of the literature. We explore this issue and related trends through a focus on Australian pathosystems, providing the first comprehensive compilation of information for this continent, covering the phytoplasmas, host plants, vectors and diseases. Of the 33 16Sr groups reported internationally, only groups I, II, III, X, XI and XII have been recorded in Australia and this highlights the need for ongoing biosecurity measures to prevent the introduction of additional pathogen groups. Many of the phytoplasmas reported in Australia have not been sufficiently well studied to assign them to 16Sr groups so it is likely that unrecognized groups and sub-groups are present. Wide host plant ranges are apparent among well studied phytoplasmas, with multiple crop and non-crop species infected by some. Disease management is further complicated by the fact that putative vectors have been identified for few phytoplasmas, especially in Australia. Despite rapid progress in recent years using molecular approaches, phytoplasmas remain the least well studied group of plant pathogens, making them a "crouching tiger" disease threat."""
candidate = lsh.approxNearestNeighbors(key, 10)

In [12]:
candidate

Unnamed: 0,id,abstract,dis_col
0,0,Phytoplasmas are insect-vectored bacteria that...,0.0
1,375,Background and ObjectivePeriodontal disease is...,0.443038
2,5374,The focus of this work is to model the hydrode...,0.445783
3,3649,There are two adult life history types among l...,0.448718
4,4806,Rabies virus (RABV) remains one of the most im...,0.45679
5,121,Equivalently implementing a generalized memris...,0.458824
6,4519,The long-day plant Petunia x hybrida 'Fantasy ...,0.468354
7,2657,Background: Improving islet graft revasculariz...,0.468354
8,5507,Toxoplasma gondii iswidely distributed in huma...,0.469136
9,2427,The present study was aimed to evaluate the ne...,0.46988


## Câu 2: Large Data MinHashLSH

### Tạo Spark Context

In [13]:
from pyspark import SparkContext

sc = SparkContext("local", "Process 2")

In [14]:
from pyspark.sql import SQLContext, SparkSession

# Initialize SparkSession
sparkSession = SparkSession.builder \
    .appName("Process 2") \
    .getOrCreate()

### Đọc dữ liệu từ file

In [16]:
from pyspark.ml.linalg import Vectors, SparseVector, VectorUDT, DenseVector
import pyspark.sql.functions as F
import pyspark.sql.types as T

abstracts_df = sparkSession.read.text("WebOfScience-5736.txt")
abstracts_df = abstracts_df.withColumn("id", F.monotonically_increasing_id())

# Reorder the columns
abstracts_df = abstracts_df.select("id", "value")

# Rename the "value" column to "abstract"
abstracts_df = abstracts_df.withColumnRenamed("value", "abstract")

# Show the DataFrame schema and some sample data
abstracts_df.printSchema()
abstracts_df.show(truncate=False)

root
 |-- id: long (nullable = false)
 |-- abstract: string (nullable = true)

+---+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [17]:
abstracts_df.count()

5736

### Tạo lớp đối tượng LargeDataMinHashLSH

In [18]:
class LargeDataMinHashLSH():
    def __init__(
            self,
            documents: pyspark.sql.DataFrame,
            k = 2,
            num_per = 10,
            bands = 10,
        ):
        self.documents = documents
        self.k = k
        self.num_per = num_per
        self.bands = bands
        self.band_size = num_per // bands

    def __shingle(text, k = 3):
        shingle_set = []
        text = text.lower()
        for i in range(len(text) - k+1):
            shingle_set.append(text[i:i+k])
        return list(set(shingle_set))

    def __convert_to_bv(
                x: list,
                all_shingles: list,
        ) -> SparseVector:
        all_idx = [all_shingles.index(i) for i in x]
        all_idx.sort()

        return Vectors.sparse(len(all_shingles), all_idx, [1] * len(all_idx))


    def shingling(
            self,
            documents: pyspark.sql.DataFrame,
            k = 2
        ) -> pyspark.sql.DataFrame:

        # Apply the shingle UDF to the documents DataFrame
        shingle_udf = F.udf(
                          lambda x: LargeDataMinHashLSH.__shingle(x, k = k),
                          T.ArrayType(T.StringType())
                      )

        shingles = documents\
                        .withColumn(
                            "shingles",
                            shingle_udf(F.col("abstract"))
                        )

        all_shingles = shingles\
                        .select("shingles")\
                        .rdd\
                        .flatMap(lambda x: x[0])\
                        .distinct()\
                        .sortBy(lambda x: x)\
                        .cache()

        all_shingles = all_shingles.collect()

        # Apply convert shingles to bool_vector udf
        convert_udf = F.udf(
            lambda x: LargeDataMinHashLSH.__convert_to_bv(x, all_shingles),
            VectorUDT()
        )

        shingles = shingles\
                    .withColumn(
                        "bool_vector",
                        convert_udf(F.col("shingles"))
                    )

        self.all_shingles = all_shingles

        return shingles.select("id", "bool_vector").cache()

    def generate_all_permutation(self, num_per : int, bool_len: int) -> list:
        res = []
        list_idx = list(range(bool_len))
        for per in range(num_per):
            random.Random(10).shuffle(list_idx)
            res.append(list_idx.copy())

        return res

    def minhashing(self, bool_vectors: pyspark.sql.DataFrame, num_per = 10) -> pyspark.sql.DataFrame:
        bool_len = len(self.all_shingles)
        all_permutations = self.generate_all_permutation(num_per, bool_len)

        self.all_permutations = all_permutations

        def gen_signature(
            x: SparseVector, all_permutations: list[list[int]]
        ) -> list:
            size_vec = x.size
            res = []
            for perm in all_permutations:
                perm_idx = perm
                for i in range(size_vec):
                    idx = perm_idx[i]
                    if idx in x.indices:
                        res.append(i)
                        break

            return res

        gen_sig_udf = F.udf(lambda x: gen_signature(x, all_permutations), T.ArrayType(T.IntegerType()))

        sig = bool_vectors\
                .withColumn(
                    "signature", gen_sig_udf(F.col("bool_vector"))
                )

        return sig.select("id", "signature")

    def locality_sensity_hashing(
            self, signatures: pyspark.sql.DataFrame, bands = 10
        ) -> pyspark.sql.DataFrame:
        # assert len(signatures.take(1)[0]["signature"]) % bands == 0,\
        # "Bands should be devided by length signature."

        def gen_buck(x: list, bands: int) -> DenseVector:
            hash_func = lambda v: int(hashlib.sha256(str(v).encode('utf-8')).hexdigest(), 16) % 200
            band_size = len(x) // bands
            res = []
            tmp_x = x
            for i in range(bands):
                tmp = ",".join([str(v) for v in tmp_x[i * band_size : (i + 1) * band_size]])
                res.append(hash_func(tmp))

            return Vectors.dense(res)


        generate_buck = F.udf(lambda x: gen_buck(x, bands), VectorUDT())

        buckets = signatures.withColumn(
            "bucket_id",
            generate_buck(F.col("signature"))
        )

        return buckets

    def run(self) -> None:
        self.bool_vectors = self.shingling(self.documents, k = self.k)
        self.bool_vectors.show()
        # self.bool_vectors = self.bool_vectors.cache()
        self.signatures = self.minhashing(self.bool_vectors, num_per = self.num_per)
        # self.signatures = self.signatures.cache()
        self.signatures.show()

        self.buckets = self.locality_sensity_hashing(self.signatures, bands = self.bands)
        # self.buckets = self.buckets.cache()
        self.buckets.show()


    def jaccard_distance(x: set, y: set) -> float:

        return 1 - (len(x.intersection(y)) / len(x.union(y)))

    def approxNearestNeighbors(self, key: str, n: int) -> pyspark.sql.DataFrame:
        shingles_set = LargeDataMinHashLSH.__shingle(key, k = self.k)
        bool_vec = LargeDataMinHashLSH.__convert_to_bv(shingles_set, self.all_shingles)

        signature = []
        for list_idx in self.all_permutations:
            first_idx = -1
            for i in range(len(bool_vec)):
                if bool_vec[list_idx[i]] == 1:
                    first_idx = i
                    break
            signature.append(first_idx)

        hash_func = lambda v: int(hashlib.sha256(str(v).encode('utf-8')).hexdigest(), 16) % 200

        bucket = []
        for b in range(self.bands):
            sig_band = signature[b * self.band_size : (b + 1) * self.band_size]
            hash_val = hash_func(",".join(str(v) for v in sig_band))
            bucket.append(hash_val)


        check_candidate_udf = F.udf(lambda x: sum([1 if int(x[i]) == bucket[i] else 0 for i in range(len(x))]) > 0, T.BooleanType())

        candidates = self.buckets\
                        .withColumn(
                            "isCandidate",
                            check_candidate_udf(F.col("bucket_id"))
                        )

        candidates = candidates.filter(candidates["isCandidate"] == True)

        calc_jaccard_udf = F.udf(lambda x: LargeDataMinHashLSH.jaccard_distance(set(x), set(signature)), T.FloatType())
        candidates = candidates\
                        .withColumn(
                            "dist_col",
                            calc_jaccard_udf(F.col("signature"))
                        )\
                        .sort(F.asc("dist_col"))\
                        .limit(n).cache()



        return candidates.select("id", "signature", "bucket_id", "dist_col").cache()


In [19]:
large_lsh = LargeDataMinHashLSH(abstracts_df, k = 3, num_per = 100, bands = 20)

In [20]:
large_lsh.run()

+---+--------------------+
| id|         bool_vector|
+---+--------------------+
|  0|(31046,[5,233,308...|
|  1|(31046,[85,88,90,...|
|  2|(31046,[84,103,26...|
|  3|(31046,[50,88,89,...|
|  4|(31046,[115,116,2...|
|  5|(31046,[105,107,2...|
|  6|(31046,[104,116,1...|
|  7|(31046,[88,89,90,...|
|  8|(31046,[88,102,10...|
|  9|(31046,[103,118,1...|
| 10|(31046,[103,117,1...|
| 11|(31046,[103,235,2...|
| 12|(31046,[89,103,11...|
| 13|(31046,[89,269,27...|
| 14|(31046,[102,103,1...|
| 15|(31046,[106,116,1...|
| 16|(31046,[89,104,10...|
| 17|(31046,[102,119,2...|
| 18|(31046,[90,113,11...|
| 19|(31046,[106,116,1...|
+---+--------------------+
only showing top 20 rows

+---+--------------------+
| id|           signature|
+---+--------------------+
|  0|[7, 5, 90, 107, 4...|
|  1|[1, 10, 90, 15, 5...|
|  2|[7, 5, 8, 1, 55, ...|
|  3|[4, 5, 86, 27, 86...|
|  4|[87, 102, 160, 3,...|
|  5|[7, 5, 18, 1, 67,...|
|  6|[13, 29, 144, 27,...|
|  7|[4, 5, 18, 3, 50,...|
|  8|[4, 5, 18, 3, 31,...|
| 

In [21]:
key = """Phytoplasmas are insect-vectored bacteria that cause disease in a wide range of plant species. The increasing availability of molecular DNA analyses, expertise and additional methods in recent years has led to a proliferation of discoveries of phytoplasma-plant host associations and in the numbers of taxonomic groupings for phytoplasmas. The widespread use of common names based on the diseases with which they are associated, as well as separate phenetic and taxonomic systems for classifying phytoplasmas based on variation at the 16S rRNA-encoding gene, complicates interpretation of the literature. We explore this issue and related trends through a focus on Australian pathosystems, providing the first comprehensive compilation of information for this continent, covering the phytoplasmas, host plants, vectors and diseases. Of the 33 16Sr groups reported internationally, only groups I, II, III, X, XI and XII have been recorded in Australia and this highlights the need for ongoing biosecurity measures to prevent the introduction of additional pathogen groups. Many of the phytoplasmas reported in Australia have not been sufficiently well studied to assign them to 16Sr groups so it is likely that unrecognized groups and sub-groups are present. Wide host plant ranges are apparent among well studied phytoplasmas, with multiple crop and non-crop species infected by some. Disease management is further complicated by the fact that putative vectors have been identified for few phytoplasmas, especially in Australia. Despite rapid progress in recent years using molecular approaches, phytoplasmas remain the least well studied group of plant pathogens, making them a "crouching tiger" disease threat."""
candidate_l = large_lsh.approxNearestNeighbors(key, 10)

In [22]:
candidate_l.show()

+----+--------------------+--------------------+----------+
|  id|           signature|           bucket_id|  dist_col|
+----+--------------------+--------------------+----------+
|   0|[7, 5, 90, 107, 4...|[39.0,171.0,117.0...|       0.0|
| 375|[13, 5, 18, 1, 55...|[101.0,66.0,120.0...|  0.443038|
|5374|[7, 5, 44, 3, 37,...|[10.0,154.0,124.0...|0.44578314|
|3649|[13, 10, 18, 1, 5...|[9.0,178.0,55.0,6...|0.44871795|
|4806|[13, 5, 18, 1, 50...|[149.0,104.0,117....|0.45679012|
| 121|[13, 10, 3, 92, 6...|[89.0,51.0,14.0,3...|0.45882353|
|2657|[7, 5, 75, 107, 1...|[2.0,137.0,40.0,1...|0.46835443|
|4519|[7, 29, 75, 3, 55...|[57.0,120.0,115.0...|0.46835443|
|5507|[5, 5, 90, 1, 116...|[130.0,114.0,153....| 0.4691358|
|2427|[112, 5, 8, 1, 37...|[173.0,112.0,164....| 0.4698795|
+----+--------------------+--------------------+----------+

