#Import Library

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import re
import binascii
import hashlib
from random import randint, seed
from scipy.sparse import csr_matrix

##Read Data

In [None]:
def read_documents_from_file(file_path):
    documents = list()
    with open(file_path, 'r') as file:
        for line in file:
            document = line.strip()  # Loại bỏ khoảng trắng ở đầu và cuối dòng
            documents.append(document)
    return documents


# Đường dẫn đến file văn bản
file_path = "/content/WebOfScience-5736.txt"

# Đọc dữ liệu từ file văn bản và chuyển đổi thành danh sách các văn bản
documents_list = read_documents_from_file(file_path)
documents_df = pd.DataFrame(documents_list, columns=['document'])
# Loại bỏ các văn bản trùng lặp từ DataFrame
documents_df

Unnamed: 0,document
0,Phytoplasmas are insect-vectored bacteria that...
1,"Background: (-)-alpha-Bisabolol, also known as..."
2,A universal feature of the replication of posi...
3,"1,2-Dichloropropane (1,2-DCP) and dichlorometh..."
4,This paper presents the simulation results of ...
...,...
5731,The intercalation of L-phenylalanate (LP) into...
5732,There is current interest in harnessing the co...
5733,Aim: The zinc finger antiviral protein (ZAP) i...
5734,The present article reviews the biotechnologic...


In [None]:
class InMemoryMinHashLSH:
  def __init__(self, documents: pd.DataFrame, num_hash_functions = 100, threshold=0.8, bands_nr = 50,  num_buckets = 500):
    self.documents = documents
    self.shingle_size = 5
    self.bool_vectors = None
    self.signatures = None
    self.lsh_results = None
    self.list_shingle_hash = {}
    self.hash_range = 10**8
    self.num_hash_functions = num_hash_functions
    self.threshold = threshold
    self.bands_nr = bands_nr
    self.all_Shingles = None
    self.num_buckets = num_buckets

  def preprocess(self,text):
    cleaned_text = re.sub(r'[:;,\.!""]', ' ', text)
    cleaned_text = cleaned_text.strip()
    tokens = cleaned_text.lower()
    return tokens

  def shingles(self, text, doc_index=-1):
      words = self.preprocess(text).split()  # Tách văn bản thành danh sách các từ
      shingles = set()
      for i in range(len(words) - self.shingle_size + 1):
          shingle = " ".join(words[i:i + self.shingle_size])  # Ghép các từ thành shingle
          hash_value = int(hashlib.sha1(shingle.encode('utf-8')).hexdigest(), 16) % self.hash_range
          shingles.add(hash_value)
      if doc_index != -1:
          self.list_shingle_hash[doc_index] = shingles
      return shingles


  def shingling(self, documents: pd.DataFrame):
      # Tạo tập hợp các shingle duy nhất từ tất cả các văn bản
      all_shingles = sorted(list(set().union(*(self.shingles(text) for text in documents['document']))))
      self.all_Shingles = all_shingles
      # print("all shingles: ", all_shingles)
      # Tạo ma trận boolean với số hàng bằng số lượng shingle và số cột bằng số lượng văn bản
      shingles_matrix = np.zeros((len(all_shingles), len(documents)), dtype=np.int8)

      # Tạo một từ điển để lưu trữ chỉ mục của từng shingle
      shingle_index_dict = {shingle: index for index, shingle in enumerate(all_shingles)}

      # Duyệt qua từng văn bản và đánh dấu các shingle có trong văn bản đó
      for doc_idx, text in enumerate(documents['document']):
          shingles_in_doc = self.shingles(text, doc_idx)
          shingle_indices = [shingle_index_dict[shingle] for shingle in shingles_in_doc]
          shingles_matrix[shingle_indices, doc_idx] = 1

      # Tạo DataFrame từ ma trận và danh sách shingle
      shingles_df = pd.DataFrame(shingles_matrix, index=all_shingles)
      # print("Vector Boolean:", shingles_df)
      return shingles_df

  def _generate_hash_function(self, len_all_shingle):
      # Generate random coefficients for the hash function
      a = np.random.randint(1, 100)
      b = np.random.randint(0, 100)
      p = 2**31 - 1  # Prime number
      return lambda x: ((a * x + b) % p) % len_all_shingle

  def minhashing(self, bool_vectors: pd.DataFrame):
    num_docs = bool_vectors.shape[1]  # Số lượng văn bản
    num_shingles = bool_vectors.shape[0]  # Số lượng shingle

    # Khởi tạo ma trận signature với kích thước (num_hashes, num_docs)
    signatures = np.zeros((self.num_hash_functions, num_docs), dtype=np.int32)

    # Tạo ma trận CSR từ vector boolean
    csr_bool_vectors = csr_matrix(bool_vectors)

    # Tạo các hàm hoán vị pi
    permutations = [np.random.permutation(num_shingles) for _ in range(self.num_hash_functions)]
    for i in range(self.num_hash_functions):
            pi = permutations[i]

            # Tính toán chỉ số của hàng đầu tiên chứa giá trị 1 cho mỗi văn bản
            idx_first_one = csr_bool_vectors[pi, :].argmax(axis=0)

            # Gán giá trị của chỉ số đầu tiên vào ma trận signature
            signatures[i, :] = np.array(idx_first_one).flatten()
    signatures_df = pd.DataFrame(signatures, index=[f'Hash_{i}' for i in range(self.num_hash_functions)])
    # print("vector signature: ", signatures_df)
    return signatures_df

  def locality_sensitive_hashing(self, signatures: pd.DataFrame):
    sign_len = len(signatures)
    r = int(sign_len/self.bands_nr) #number of rows in each band
    b = self.bands_nr  # Số bands


    # Chia ma trận chữ ký thành các bands với cột là các title và hàng là các sig
    bands = [signatures.iloc[i * r:(i + 1) * r, :] for i in range(b)]

    # Tạo dictionary để lưu trữ buckets
    buckets = {}

    # Xử lý từng band
    for band_index, band in enumerate(bands):
        # Tính số lượng documents
        num_docs = band.shape[1]

        # Duyệt qua mỗi cột (document) trong band
        for doc_index in range(num_docs):
            # Lấy signature của document đó
            signature = tuple(band.iloc[:, doc_index])

            # Tính hash cho band hiện tại
            band_hash = hash(signature) % self.num_buckets

            # Tạo hoặc cập nhật bucket cho hash band
            if band_hash not in buckets:
                buckets[band_hash] = set()
            buckets[band_hash].add((doc_index, signature))

    # Tạo DataFrame từ dict
    df = pd.DataFrame(buckets.items(), columns=['bucket-id', 'signature'])
    # print("Bucket: ", df)
    return df

  def run(self):
      self.bool_vectors = self.shingling(self.documents)
      self.signatures = self.minhashing(self.bool_vectors)
      self.lsh_results = self.locality_sensitive_hashing(self.signatures)

  def approxNearestNeighbors(self, key, n):
      key_signature  = self.signatures[key]
      # key_sig = key_signature.iloc[1:].tolist()
      sign_len = len(key_signature)
      r = int(sign_len/self.bands_nr)
      key_buckets = set()
      shingles_key = self.list_shingle_hash[key]
      # print("shingles_key: ", shingles_key)

      for i in range(0, sign_len, r):
        band_signature = tuple(key_signature[i:i + r])
        band_hash = hash(band_signature) % self.num_buckets
        key_buckets.add(band_hash)

      similar_docs_shingles = []
      similar_docs_sig = []
      checked_docs = set()
      for bucket_id in key_buckets:
        bucket = self.lsh_results[self.lsh_results['bucket-id'] == bucket_id]
        for row in bucket['signature']:
          for doc_index, doc_signature in row:
            if doc_index != key and doc_index not in checked_docs :
              shingles_doc_index = self.list_shingle_hash[doc_index]

              # sig_doc_index = self.signatures[doc_index]
              # key_sig_doc_index = sig_doc_index.iloc[1:].tolist()

              similarity = self.jaccard_sim(shingles_key, shingles_doc_index)
              # similarity_ = self.jaccard_sim(key_sig, key_sig_doc_index)

              similar_docs_shingles.append((doc_index, similarity))
              # similar_docs_sig.append((doc_index, similarity_))

              checked_docs.add(doc_index)

      similar_docs_shingles.sort(key=lambda x: x[1], reverse=True)
      # similar_docs_sig.sort(key=lambda x: x[1], reverse=True)
      top_n = similar_docs_shingles[:n]
      # top_n_ = similar_docs_sig[:n]
      return top_n

  def jaccard_sim(self,X, Y):
    """Jaccard similarity between two sets"""
    x = set(X)
    y = set(Y)
    return float(len(x & y)) / len(x | y)

  def jaccard_dist(self,X, Y):
    """Jaccard distance between two sets"""
    return 1 - self.jaccard_sim(X, Y)

  def visualize_similarity(self, n_top_similar=3):
        # Tính toán ma trận tương tự dựa trên độ tương đồng Jaccard giữa các documents
        similarity_matrix = np.zeros((len(self.documents), len(self.documents)))
        for i in range(len(self.documents)):
            for j in range(i, len(self.documents)):
                similarity = self.jaccard_sim(self.list_shingle_hash[i], self.list_shingle_hash[j])
                similarity_matrix[i, j] = similarity_matrix[j, i] = similarity

        # Tìm top documents tương tự nhất cho mỗi document
        top_similar_docs = []
        for i in range(len(self.documents)):
            similar_docs = self.approxNearestNeighbors(i, n_top_similar)
            top_similar_docs.append(similar_docs)

        # Trực quan hóa ma trận tương tự dưới dạng heatmap
        plt.figure(figsize=(12, 8))
        plt.imshow(similarity_matrix, cmap='hot', interpolation='nearest')
        plt.colorbar()
        plt.title('Similarity Matrix')
        plt.xlabel('Document Index')
        plt.ylabel('Document Index')

        # Hiển thị các điểm tương tự nhất trên scatter plot
        for i in range(len(top_similar_docs)):
            for similar_doc_index, similarity_score in top_similar_docs[i]:
                plt.scatter(i, similar_doc_index, color='blue', alpha=0.5, s=50)

        plt.show()

In [None]:
minhash_lsh = InMemoryMinHashLSH(documents_df)
bool_vectors = minhash_lsh.run()

In [None]:
minhash_lsh.approxNearestNeighbors(0,3)

[(3973, 0.006711409395973154),
 (456, 0.002638522427440633),
 (3130, 0.002551020408163265)]

#Minhash - Lsh Implementation using Pyspark

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=45aeadb57cc0cd40bf99bd61f137995bba0934e71eb8da0b4716932b22dcb0f3
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, explode, lower
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import MinHashLSH

In [None]:
class MinHashLSH_PySpark:
    def __init__(self, df):
        self.spark = SparkSession.builder \
            .appName("MinHashLSH") \
            .getOrCreate()
        self.df = df

    def preprocess(self):
        # Tokenize the documents
        tokenized = self.df.select(
            explode(split(lower(col("document")), " ")).alias("word")
        )

        # CountVectorizer to create the document-term matrix
        cv = CountVectorizer(inputCol="word", outputCol="features", vocabSize=10000, minDF=2.0)
        cv_model = cv.fit(tokenized)
        self.vectorized_docs = cv_model.transform(tokenized)

    def minhash_lsh(self):
        mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
        self.model = mh.fit(self.vectorized_docs)

    def locality_sensitive_hashing(self, key):
        # Create DataFrame for the key
        key_df = self.spark.createDataFrame([(0, key)], ["id", "document"])

        key_tokenized = key_df.select(
            explode(split(lower(col("document")), " ")).alias("word")
        )

        # Union key_tokenized with self.vectorized_docs
        key_vectorized = self.vectorized_docs.union(key_tokenized)

        # Transform with the LSH model
        key_hashes = self.model.transform(key_vectorized)

        return self.model.approxSimilarityJoin(key_hashes, self.vectorized_docs, 0.6)

    def run(self):
        self.preprocess()
        self.minhash_lsh()

    def __del__(self):
        self.spark.stop()

In [None]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("MinHashLSH") \
    .getOrCreate()

In [None]:
# Sample documents
documents.head()

Unnamed: 0,document
0,Phytoplasmas are insect-vectored bacteria that...
1,"Background: (-)-alpha-Bisabolol, also known as..."
2,A universal feature of the replication of posi...
3,"1,2-Dichloropropane (1,2-DCP) and dichlorometh..."
4,This paper presents the simulation results of ...


In [None]:
# Initialize MinHashLSH
minhash_lsh_spark = MinHashLSH_PySpark(documents)
minhash_lsh_spark.run()

# Query for approximate nearest neighbors
key = "Phytoplasmas are insect-vectored bacteria"
result = minhash_lsh_spark.locality_sensitive_hashing(key)
result.show()

[link text](https://)#Câu 2 PYSPARK


In [None]:
from pyspark.sql import SparkSession, Row
from pyspark import SparkContext
from pyspark.sql.functions import rand, row_number,array_position , split,when, concat_ws, count,concat, udf, lower, regexp_replace, array, monotonically_increasing_id, collect_list, expr, explode, posexplode, lit, arrays_zip, col
from pyspark.conf import SparkConf
from pyspark.ml.feature import MinHashLSH, Tokenizer, HashingTF, IDF, CountVectorizer
from pyspark.sql.types import ArrayType, IntegerType, StringType, StructType, StructField
from pyspark.ml import Pipeline
from pyspark.sql.window import Window
from pyspark.ml.feature import NGram
from hashlib import sha1

In [None]:
import pyspark.sql.functions as F
import pyspark.sql.types as T

In [None]:
import hashlib
import random
from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_set, col, lower, regexp_replace, split, concat_ws, monotonically_increasing_id, explode
from pyspark.ml.feature import NGram
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import count

In [None]:
def hash_shingle(shingle):
    # Hash each word in the shingle using SHA-1 and return a list of hash values
    hash_values = [hashlib.sha1(word.encode('utf-8')).hexdigest() for word in shingle]
    return hash_values

class LargeDataMinHashLSH:
    def __init__(self, filepath):
        self.spark = None
        self.document_df = None
        self.filepath = filepath
        self.vectorboolean = None
        self.vectorsignature = None
        self.shingle_size = 3
        self.num_permutations = 100
        self.band = 50
        self.rowofband = 2

    def init_spark(self):
        self.spark = SparkSession.builder \
            .appName("Large_BigData") \
            .getOrCreate()

    def stop_spark(self):
        if self.spark is not None:
            self.spark.stop()
            self.spark = None

    def readFile(self):
        if self.spark is None:
            self.init_spark()
        self.document_df = self.spark.read.text(self.filepath).toDF("text")
        self.document_df = self.document_df.withColumn("docid", monotonically_increasing_id())
        return self.document_df

    def shingling(self, documents):
        df_clean = documents.withColumn("clean_text", lower(regexp_replace(concat_ws(" ", col("text")), "[;:,.!\"]", "")).alias("text"))
        df_words = df_clean.withColumn("words", split("clean_text", "\\s+"))

        # Use NGram to create shingles from text
        ngram = NGram(n=self.shingle_size, inputCol="words", outputCol="shingles")
        pipeline = Pipeline(stages=[ngram])

        # Train pipeline and apply to data
        model = pipeline.fit(df_words)
        shingles_df = model.transform(df_words)

        hashed_shingles_df, all_hash_shingles = self.hash_shingles(shingles_df)

        vector_boolean_df = self.generate_vector_boolean(all_hash_shingles, hashed_shingles_df)

        return vector_boolean_df.drop("hash_shingle")

    def hash_shingles(self, shingle_data):
        hash_shingle_udf = udf(hash_shingle, ArrayType(StringType()))
        hashed_shingles_df = shingle_data.withColumn("hash_shingles", hash_shingle_udf(col("shingles"))).select("docid", "hash_shingles")

        exploded_df = hashed_shingles_df.select(explode("hash_shingles").alias("hash_shingle"))
        all_hash_shingles = exploded_df.select("hash_shingle").distinct()
        return hashed_shingles_df, all_hash_shingles

    def generate_vector_boolean(self, hash_shingles_df, shingles_of_docs_df):
        exploded_df = shingles_of_docs_df.withColumn("shingle", explode(col("hash_shingles")))

        vector_boolean_df = hash_shingles_df \
            .join(exploded_df, hash_shingles_df["hash_shingle"] == exploded_df["shingle"], how="left_outer") \
            .groupBy("hash_shingle") \
            .pivot("docid") \
            .agg(count("shingle").cast("int")) \
            .na.fill(0)

        return vector_boolean_df

    def minhashing(self, bool_vectors):
            #add id
            bool_vectors = bool_vectors.withColumn("docid", monotonically_increasing_id())

            #hashing
            hash_funcs = [lambda x: (a * x + b) % self.num_permutations for a, b in zip(random.sample(range(1, 1000), self.num_permutations), random.sample(range(1, 1000), self.num_permutations))]
            minhash_values = bool_vectors.select(
                *[col("docid")] + [hash_funcs[idx](col("docid")) for idx in range(self.num_permutations)]
            )
            return minhash_values
    def locality_sensitive_hashing(self, signatures):
        num_bands = self.num_permutations // self.band
        rows_per_band = self.num_permutations // num_bands

        # Create a UDF to hash signature columns into band ID
        hash_band_udf = udf(lambda row: (row * num_bands) // self.num_permutations, IntegerType())

        # Add a column for band ID to the DataFrame of signatures
        signatures_with_band = signatures.withColumn("band_id", hash_band_udf(monotonically_increasing_id()))

        grouped_signatures = signatures_with_band.groupBy("band_id").agg(collect_set("docid").alias("doc_ids"))

        # generate hash values and assign buckets for each band
        hash_bucket_udf = udf(lambda row: f"{row[0]}-{hash(row[1]) % self.rowofband}", StringType())
        buckets = grouped_signatures.withColumn("bucket", hash_bucket_udf(array("band_id", "doc_ids"))).select("bucket")

        return buckets

    def approxNearestNeighbors(self, key, n):
        # Ensure MinHashLSH model is trained
        if self.vectorsignature is None:
            raise ValueError("MinHashLSH model is not trained. Run the `run` method first.")

        # Convert key document to boolean vector
        key_vector = self.shingling(self.spark.createDataFrame([(0, key)], ["docid", "text"]))
        key_vector = key_vector.drop("hash_shingle")

        # Apply LSH to find candidate buckets
        candidate_buckets = self.locality_sensitive_hashing(self.vectorsignature)

        # Apply MinHashLSH to find approximate nearest neighbors within candidate buckets
        similar_docs = model.approxNearestNeighbors(candidate_buckets, key_vector, key, n)

        return similar_docs
    def run(self):
        if self.document_df is None:
            self.readFile()
        self.vectorboolean = self.shingling(self.document_df)
        self.vectorsignature = self.minhashing(self.vectorboolean)

In [None]:
file_path ="drive/MyDrive/BigData_QT2/WebOfScience-5736.txt"
test_file_path = "drive/MyDrive/BigData_QT2/test.txt"
large_minhash_lsh = LargeDataMinHashLSH(file_path)
large_minhash_lsh.stop_spark()
# Read the file and run the processing
large_minhash_lsh.run()
# Test data
query_document = "Phytoplasmas are insect-vectored bacteria that cause disease in a wide range of plant species."
similar_docs = minhash_lsh.approxNearestNeighbors(query_document, 2)


for doc in similar_docs:
  print(doc +"\n")

print(len(similar_docs))
# Stop the SparkSession
large_minhash_lsh.stop_spark()

This paper presents the simulation results of a linear, fully integrated, two-stage digitally programmable 130 nm CMOS power amplifier (PA) operating at 2.4 GHz. Its power stage is composed of a set of amplifying cells which can be enabled or disabled independently by a digital control circuit. All seven operational modes are univocal in terms of 1 dB output compression point (OCP1dB), saturated output power (P-SAT) and power gain at 2.4 GHz. The lowest power mode achieves an 8.1 dBm P-SAT, a 13.5 dB power gain and consumes 171 mW DC power (P-DC) at an OCP1dB of 6 dBm, whereas the highest power mode reaches an 18.9 dBm P-SAT and a 21.1 dB power gain and consumes 415 mW P-DC at an OCP1dB of 18.2 dBm.

1A6/DRIM is a nucleolar protein with a nucleolar targeting sequence in its 3'-terminus. Bioinformatic analysis indicated that human 1A6/ DRIM shares 23% identity and 43% similarity with yeast Utp20, which has been reported as a component of U3 snoRNA protein complex and has been implicated

In [6]:
!apt-get install texlive texlive-xetex texlive-latex-extra pandoc
!pip install pypandoc

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
pandoc is already the newest version (2.9.2.1-3ubuntu2).
pandoc set to manually installed.
The following additional packages will be installed:
  dvisvgm fonts-droid-fallback fonts-lato fonts-lmodern fonts-noto-mono fonts-texgyre
  fonts-urw-base35 libapache-pom-java libcommons-logging-java libcommons-parent-java
  libfontbox-java libfontenc1 libgs9 libgs9-common libidn12 libijs-0.35 libjbig2dec0 libkpathsea6
  libpdfbox-java libptexenc1 libruby3.0 libsynctex2 libteckit0 libtexlua53 libtexluajit2 libwoff1
  libzzip-0-13 lmodern poppler-data preview-latex-style rake ruby ruby-net-telnet ruby-rubygems
  ruby-webrick ruby-xmlrpc ruby3.0 rubygems-integration t1utils teckit tex-common tex-gyre
  texlive-base texlive-binaries texlive-fonts-recommended texlive-latex-base
  texlive-latex-recommended texlive-pictures texlive-plain-generic tipa xfonts-encodings
  xfonts-utils
Suggested packages:
  fo

In [7]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [9]:
!cp /content/drive/MyDrive/Colab Notebooks

cp: target 'BigData_QT2.ipynb' is not a directory


In [10]:
!jupyter nbconvert --to PDF "/content/drive/MyDrive/Colab Notebooks/Copy of BigData_QT2.ipynb"

[NbConvertApp] Converting notebook /content/drive/MyDrive/Colab Notebooks/Copy of BigData_QT2.ipynb to PDF
[NbConvertApp] Writing 109059 bytes to notebook.tex
[NbConvertApp] Building PDF
[NbConvertApp] Running xelatex 3 times: ['xelatex', 'notebook.tex', '-quiet']
[NbConvertApp] Running bibtex 1 time: ['bibtex', 'notebook']
[NbConvertApp] PDF successfully created
[NbConvertApp] Writing 92483 bytes to /content/drive/MyDrive/Colab Notebooks/Copy of BigData_QT2.pdf
