# AMD Project: finding similar items

In [None]:
# Setup packages
!pip install cloudpickle==1.5.0
!pip install distributed --upgrade
!pip install scikit-learn --upgrade
!pip install kaggle --upgrade
!pip install dask[complete] --upgrade
!pip install dask-ml
!pip install nltk
!pip install beautifulsoup4
!pip install dask-distance # For Jaccard distance
!pip install datasketch # For MinHashLSH



In [None]:
# Run and upload the kaggle.json containing the API key
from google.colab import files
uploaded = files.upload()

Saving kaggle.json to kaggle.json


In [None]:
# Setup kaggle credentials
!mkdir ~/.kaggle/
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

# Authenticate and download the dataset
from kaggle.api.kaggle_api_extended import KaggleApi
kaggle_api = KaggleApi()
kaggle_api.authenticate()
kaggle_api.dataset_download_files(dataset='stackoverflow/stacksample', path='data/', quiet=False, unzip=True)

Downloading stacksample.zip to data


100%|██████████| 1.11G/1.11G [00:12<00:00, 91.5MB/s]





In [None]:
!pip install-scikit-learn>=0.23

In [None]:
# Load libraries
import numpy as np
import dask.dataframe as dd
from bs4 import BeautifulSoup
import re
import string
import nltk
import dask_ml.feature_extraction.text
from nltk.corpus import stopwords
import dask_distance

ContextualVersionConflict: ignored

In [None]:
# Setup the Dask cluster.
# Dask is a library for distributed computation. It is an alternative to Spark.
# One of the pros of Dask is that it embodies most of the Pandas functions,
# so that you will not be lost if you're familiar manipulating Pandas Series/DataFrames.
# Since we are working on Colab, we'll have only one machine, with 2 cores and approx.
# 12Gb of RAM. We'll increase this in case we get a nice cluster of workers.
from dask.distributed import Client, progress

client = Client(n_workers=1, threads_per_worker=2, memory_limit='10GB')
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 1
Total threads: 2,Total memory: 9.31 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:40945,Workers: 1
Dashboard: http://127.0.0.1:8787/status,Total threads: 2
Started: Just now,Total memory: 9.31 GiB

0,1
Comm: tcp://127.0.0.1:35223,Total threads: 2
Dashboard: http://127.0.0.1:40709/status,Memory: 9.31 GiB
Nanny: tcp://127.0.0.1:41657,
Local directory: /content/dask-worker-space/worker-g_3b072h,Local directory: /content/dask-worker-space/worker-g_3b072h


In [None]:
# Contains the csv columns' types
dtypes = {"Id": np.int32, "Body": str}

In [None]:
# Pre-process the csv file using Dask.
# We set the encoding, we only read two columns, 'Id' and 'Body', and we pass their
# types explicitly (dtypes variable). This will allow for faster reading.
# This step is needed since our csv has some nasty double quotes and multi-line text
# that mess up the reading in blocks.
# Thus, we will first fix this (still via Dask) and then we will load it 'normally'
# using Dask read_csv.
df = dd.read_csv("data/Questions.csv", blocksize=None, encoding="ISO-8859-1", usecols=["Id", "Body"], dtype=dtypes)

In [None]:
# We replace the double quotes, new lines and return carriage
def optimize_df(df):
    df["Body"] = df["Body"].map(lambda x: x.replace('"', "'").replace("\n", " ").replace("\r", " "))

    return df

In [None]:
# Map the optimize function to the df and compute the result.
# One feature of Dask is that most of its operations are lazy.
# This means that, most of the commands that we issue, are not actually run until
# we execute .compute(). In fact, they are added to a queue (better: a DAG) and only
# when we call the compute function they are executed.
df = df.map_partitions(optimize_df, meta=df).compute()

In [None]:
# Save the fixed csv.
df.to_csv("questions_preproc.csv", index=False, encoding="ISO-8859-1")

In [None]:
# Reads back the fixed csv, now allowing it to be split in blocks
df = dd.read_csv("questions_preproc.csv", encoding="ISO-8859-1", usecols=["Id", "Body"], dtype=dtypes)

In [None]:
# You see n tasks below, and npartitions above. This is the number of blocks
# in which our csv has been divided into.
# This mean that *any* computation that we ran on the df, it's run *n* times, one per 
# each block.
df

Unnamed: 0_level_0,Id,Body
npartitions=29,Unnamed: 1_level_1,Unnamed: 2_level_1
,int32,object
,...,...
...,...,...
,...,...
,...,...


In [None]:
df.head()

Unnamed: 0,Id,Body
0,80,<p>I've written a database generation script i...
1,90,<p>Are there any really good tutorials explain...
2,120,<p>Has anyone got experience creating <strong>...
3,180,<p>This is something I've pseudo-solved many t...
4,260,<p>I have a little game written in C#. It uses...


In [None]:
# Let's check if there are missing values
missing_values = df.isnull().sum()
percent_missing = ((missing_values / df.index.size) * 100).compute()
percent_missing

Id      0.0
Body    0.0
dtype: float64

In [None]:
# Load nltk stuff
nltk.download("stopwords")
nltk.download('punkt')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


True

In [None]:
# Text cleaning function
stop_w = string.punctuation

def clean_str(text):
    # The body text is HTML, thus we use BS to clean it
    soup = BeautifulSoup(text)
    text = soup.text

    # We set a mapping of contractions to be expanded
    contr_dict = {
        "n't": " not",
        "/TD": " ",
        "won\'t": "will not",
        "can\'t": "can not",
        "n\'t": " not",
        "\'re": " are",
        "\'s": " is",
        "\'d": " would",
        "\'ll": " will",
        "\'t": " not",
        "\'ve": " have",
        "\'m": " am",
        "\'ve": " have ",
        "n't": " not ",
        "i'm": "i am ",
        "\'re": " are ",
        "\'d": " would ",
        "\'ll": " will ",
        "I'm" : "I am",
        " 've " : " have ",
        " 're " : " are ",
        " 'll " : " will ",
    }
    
    # We use regex to expand them
    comp_re = re.compile("(%s)" % "|".join(contr_dict.keys()))
    txt = comp_re.sub(lambda match: contr_dict[match.group(0)], text)

    # Let's tokenize the result
    tokens = nltk.word_tokenize(txt)

    # Let's clean up punctuation and numbers.
    # We will have as result a single string whose words are divided by whitespaces.
    final_txt = " ".join([i for i in tokens if not i in stop_w and len(i) > 1 and str.isalpha(i) ])

    return final_txt

In [None]:
# We sample the df to ~10k rows to allow for faster computation for our project.
# However, if we had the computational power and time, having set up our code in Dask
# in this way, it would have allowed for a even much bigger sample.
df = df.sample(frac=0.01)

In [None]:
# Let's map the cleaning function to the df.
# Creates a new column with the cleaned txt
df["txt_cleaned"] = df["Body"].apply(lambda x: clean_str(str(x).lower()), meta=('txt_cleaned', object))

In [None]:
# Compute the result
df = df.compute()
df

Unnamed: 0,Id,Body,txt_cleaned
352,32640,<p>So the controller context depends on some a...,so the controller context depends on some inte...
59176,2720130,<p>This question is similar to <a href='http:/...,this question is similar to the one here but f...
36721,1832030,<p>I have the domain www.mydomain.com and I se...,have the domain and set apache so as to have w...
68636,3067330,<pre><code>#include'iostream' class CMessage {...,class cmessage public int cmessage void testin...
28103,1468640,<p>I want to know whether it's possible to get...,want to know whether it is possible to get inf...
...,...,...,...
337,40066300,<p>I have installed my app in the google compu...,have installed my app in the google compute en...
1523,40093750,<p>I'm trying so many days using the Google Lo...,am trying so many days using the google locati...
1677,40097200,<p>I've already read those topics: <a href='ht...,have already read those topics php library for...
3427,40138990,<p>I am trying to create a Powershell script t...,am trying to create powershell script that wil...


In [None]:
from dask_ml.feature_extraction.text import CountVectorizer
import dask.bag as db

# Bag of words
vectorizer = CountVectorizer()
corpus = db.from_sequence(list(df["txt_cleaned"].values))
X = vectorizer.fit_transform(corpus)
X.compute()

<12643x51118 sparse matrix of type '<class 'numpy.int64'>'
	with 839976 stored elements in Compressed Sparse Row format>

In [None]:
entity_names = vectorizer.get_feature_names()

In [None]:
# Let's compute the MinHashLSH Forest.
# MinHashLSH forest computes the top-k results of an operation. This is slightly
# different from the 'original' MinHashLSH.
# For more info:
# http://infolab.stanford.edu/~ullman/mmds/ch3.pdf
# https://github.com/ekzhu/datasketch/issues/42#issuecomment-350074304
# http://ekzhu.com/datasketch/lshforest.html
# https://towardsdatascience.com/understanding-locality-sensitive-hashing-49f6d1f6134

from datasketch import MinHash, MinHashLSH, MinHashLSHForest

forest = MinHashLSHForest(num_perm=128)
lsh = MinHashLSH(threshold=0.5, num_perm=128)

def get_str_lsh(norm_title, num_perms=128):
    s_set = set(norm_title.split(' '))
    mh = MinHash(num_perm=num_perms)

    for d in s_set:
        mh.update(d.encode('utf8'))

    return mh

# Create the index
for i, title in enumerate(df["txt_cleaned"].values):
    forest.add(i, get_str_lsh(title))
    lsh.insert(i, get_str_lsh(title))

# Index, so all keys will be searchable
forest.index()

def get_results_lsh(x, top_n=10): 
    search_q_hash = get_str_lsh(x)
    res = forest.query(search_q_hash, top_n)

    # We use the exact Jaccard similarity here.
    # You can replace it with the approx. MinHash if you'd like by changing
    # search_q_hash.jaccard to search_q_hash.query (I guess...)
    return (
        [df["txt_cleaned"].values[i] for i in res],
        [search_q_hash.jaccard(get_str_lsh(df["txt_cleaned"].values[i])) for i in res]
    )

In [None]:
# Let's now apply and compute the MinHashLSH
df["lsh_result"] = df["txt_cleaned"].apply(lambda x: get_results_lsh(x, 10))

In [None]:
df["lsh_result"]

352      ([so the controller context depends on some in...
59176    ([this question is similar to the one here but...
36721    ([somewhere am doing something wrong my query ...
68636    ([to create digital signature with the client ...
28103    ([is it possible to mark visited page numbers ...
                               ...                        
337      ([am currently working on website project writ...
1523     ([am trying to run tclsh with my package share...
1677     ([have already read those topics php library f...
3427     ([how do display like in the following way in ...
224      ([am trying to determine the type of property ...
Name: lsh_result, Length: 12643, dtype: object

In [None]:
# Each row will have 10 results, which are the most similar according to our algo.
# Not surprisingly (and this is also proof that the algo may be working correctly),
# the very first result is the row we used as input, since each text is of course to itself.
# Performance may not be the best, you can also suggest to have a look at
# http://ekzhu.com/datasketch/lshforest.html#tips-for-improving-accuracy
# for tips on improving accuracy.
df["lsh_result"].iloc[0]

(['so the controller context depends on some internals what are some ways to cleanly mock these up for unit tests seems like its very easy to clog up tests with tons of setup when only need for example to return have seen some out on the nets but some are dated figured this would be good place to keep the latest and greatest am using latest version of rhino mocks',
  'have to update the views of the current post from table posts witch have data millions and the loading time of the page is slow tables idpost iduser views title some title some title some title and many more up to millions and iduser have index idpost have primary key if seprate the data and make new table and use left join to get the value of the views at first it will be fast since the new table is still small but over time she as well will have millions rows and again it will be slow how you deal with huge table',
  'am actually working on sp in sql using sp am creating job and am scheduling it for particular time thes

In [None]:
df["txt_cleaned"].iloc[0]

'so the controller context depends on some internals what are some ways to cleanly mock these up for unit tests seems like its very easy to clog up tests with tons of setup when only need for example to return have seen some out on the nets but some are dated figured this would be good place to keep the latest and greatest am using latest version of rhino mocks'

In [None]:
# Scalability discussion: have a look at the prof. notes for the time and space complexity
# (and also here: https://towardsdatascience.com/understanding-locality-sensitive-hashing-49f6d1f6134).
# Regarding scalability of the code implementation, Dask provides it, since it is a distributed framework. 
# This means that, by increasing the dataset size (ie. number of rows of our csv),
# we will just need to add more machines to our cluster and register them as workers at
# the beginning of this code, when we initialized the Dask cluster: Dask will
# automatically handle the split of the computation among the workers.
# For more benchmarkings on accuracy and/or scalability, just run this
# https://github.com/ekzhu/datasketch/blob/master/benchmark/sketches/minhash_benchmark.py
# or these
# https://github.com/ekzhu/datasketch/blob/master/benchmark/sketches/cardinality_benchmark.py
# https://github.com/ekzhu/datasketch/blob/master/benchmark/indexes/jaccard/lshforest.py