In [2]:
import requests
import numpy as np
import pandas as pd
import yaml
from kaggle import Kaggle
import zipfile
from io import BytesIO
import re
import csv

In [3]:
def load_settings():
    with open('config.yaml', 'r') as sf:
        settings = yaml.load(sf.read())
    return settings

In [4]:
# Download dataset process
DATA_URL = "https://www.kaggle.com/rishisankineni/text-similarity/downloads/test.csv"
SETTINGS = load_settings()

In [5]:
kaggle = Kaggle(SETTINGS)
response = kaggle.get_data(DATA_URL)

Login to Kaggle
Getting dataset


In [16]:
lines = response.content.splitlines()
kaggle_data = kaggle.to_array(lines, quoting=csv.QUOTE_MINIMAL, delimiter=',')

Transform data from text to array


In [17]:
np_array = np.array(kaggle_data)

In [21]:
print(np_array)

[['test_id' 'description_x' 'description_y' 'same_security']
 ['0' 'semtech corp' 'semtech corporation' '']
 ['1' 'vanguard mid cap index' 'vanguard midcap index - a' '']
 ...
 ['513' 'wisdomtree japan hedged equity -' 'wisdomtree japan hedged eq'
  '']
 ['514' 'templeton global bond fund advisor class'
  'templeton glbal bond adv' '']
 ['515' 'trp health sciences' 't. rowe price health sciences fund' '']]


In [19]:
# Convert to dataframme
fullData = pd.DataFrame(data=np_array[1:, 1:], index=np_array[1:,0], columns=np_array[0,1:])

In [31]:
def my_great_cleaning_function(s):
    s = s.strip()
    s = s.lower()
    s = ''.join(c for c in unicodedata.normalize('NFD', s) if unicodedata.category(c) != 'Mn')
    return s

In [41]:
from fuzzywuzzy import process, fuzz

def fuzzy_match(x, choices, scorer, cutoff):
    return process.extractOne(
        x, choices=choices, scorer=scorer, score_cutoff=cutoff
    )

In [66]:
matching_results = fullData.loc[:, 'description_x'].apply(
    fuzzy_match,
    args=(
        fullData.loc[:, 'description_y'], 
        fuzz.ratio,
        80
    )
)

In [44]:
fullData['fuzzy_output'] = matching_results

In [57]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()

In [65]:
spark.sparkContext.parallelize(fullData.loc[:, 'description_x'].apply(
    fuzzy_match,
    args=(
        fullData.loc[:, 'description_y'], 
        fuzz.ratio,
        80
    )
)).collect()

[None,
 ('vanguard mid cap index', 100, '461'),
 ('spdr gold trust spdr gold shares', 92, '2'),
 ('vanguard total bond market idx-adm', 86, '442'),
 ('oakmark international cl i', 87, '4'),
 None,
 ('spartan global ex us index fid adv cl', 100, '113'),
 ('vanguard total bond market idx-adm', 100, '442'),
 None,
 None,
 ('whole foods market', 100, '308'),
 None,
 None,
 ('guggenheim bulletshares 2018', 96, '94'),
 ('vanguard small-cap index adm', 100, '132'),
 ('dfa emrging markets', 86, '387'),
 None,
 ('tegna inc', 82, '17'),
 None,
 ('vanguard mid cap index', 100, '461'),
 ('jpmorgan chase & co div: 1.760', 100, '205'),
 ('american funds europacific growth fund', 94, '196'),
 ('vanguard total bond market idx-adm', 100, '442'),
 None,
 ('american intl gro 21 wtswarrants exp 01/19/21', 80, '24'),
 ('fifth street financial corp com', 93, '25'),
 ('ishares jpm embi global core', 100, '104'),
 None,
 ('exelixis inc', 86, '28'),
 ('glenmede large cap growth', 96, '29'),
 ('af europac growt