In [1]:
import re
import numpy as np
import matplotlib.pyplot as plt
from pprint import pprint
from pyspark import SparkContext
from google.colab import drive
sc = SparkContext.getOrCreate()
drive.mount('/content/mydrive', force_remount=True)

Mounted at /content/mydrive


In [2]:
DATAFILE_PATTERN = '^(.+),"(.+)",(.*),(.*),(.*)'
def remove_quotes(s):
    return ''.join(i for i in s if i != '"')

In [3]:
def parse_data_file_line(data_file_line):
    match = re.search(DATAFILE_PATTERN, data_file_line)
    if match is None:
        print('Invalid datafile line: %s' % data_file_line)
        return (data_file_line, -1)
    elif match.group(1) == '"id"':
        print('Header datafile line: %s' % data_file_line)
        return (data_file_line, 0)
    else:
        product = '%s %s %s' % (match.group(2), match.group(3), match.group(4))
        return ((remove_quotes(match.group(1)), product), 1)

In [4]:
files = {
  'Amazon':                 "/content/mydrive/My Drive/CSCI-573/Amazon.csv"
, 'Google':                 "/content/mydrive/My Drive/CSCI-573/Google.csv"
, 'gold_standard':          "/content/mydrive/My Drive/CSCI-573/Amazon_Google_perfectMapping.csv"
, 'stopwords':              "/content/mydrive/My Drive/CSCI-573/stopwords.txt"
}

In [5]:
def parse_data(filename):
    return sc.textFile(filename, 4, use_unicode=True).map(parse_data_file_line)

In [6]:
def load_data(filekey):
    raw = parse_data(files[filekey]).cache()
    failed = raw.filter(lambda s: s[1] == -1).map(lambda s: s[0])
    for line in failed.take(10):
        print('%s - Invalid datafile line: %s' % (files[filekey], line))
    valid = raw.filter(lambda s: s[1] == 1).map(lambda s: s[0]).cache()
    print('%s - Read %d lines, successfully parsed %d lines, failed to parse %d lines'
        % (files[filekey], raw.count(), valid.count(), failed.count()))
    assert(failed.count() == 0)
    assert(raw.count() == (valid.count() + 1))
    return valid

In [7]:
google = load_data('Google')
amazon = load_data('Amazon')

/content/mydrive/My Drive/CSCI-573/Google.csv - Read 3227 lines, successfully parsed 3226 lines, failed to parse 0 lines
/content/mydrive/My Drive/CSCI-573/Amazon.csv - Read 1364 lines, successfully parsed 1363 lines, failed to parse 0 lines


In [8]:
split_regex = r'\W+'
stopwords = set(sc.textFile(files['stopwords'], 1, use_unicode=True).collect())

def tokenize(string):
    return [w for w in re.split(split_regex, string.lower())
               if w != '' and w not in stopwords]

In [9]:
# (4a) Tokenize the full dataset
amazon_full_rec_to_token = amazon.map(lambda line: (line[0], tokenize(line[1])))
google_full_rec_to_token = google.map(lambda line: (line[0], tokenize(line[1])))
print('Amazon full dataset is %s products, Google full dataset is %s products'
    % (amazon_full_rec_to_token.count(), google_full_rec_to_token.count()))
# Amazon full dataset is 1363 products, Google full dataset is 3226 products

Amazon full dataset is 1363 products, Google full dataset is 3226 products


In [10]:
def idfs(corpus):
    N = corpus.count()
    unique_tokens = corpus.map(lambda x: {t for t in x[1]})
    token_count_pair_tuple = unique_tokens.flatMap(lambda s: [(t, 1) for t in s])
    token_sum_pair_tuple = token_count_pair_tuple.reduceByKey(lambda total, count: total + count);
    return token_sum_pair_tuple.map(lambda x: (x[0], N / x[1]))

In [12]:
def tf(tokens):
    TF = dict()
    for token in tokens:
        if token not in TF:
            TF[token] = 1
        else: TF[token] += 1
    for token in TF:
        TF[token] = TF[token] / len(tokens)
    return TF

In [13]:
def tfidf(tokens, idfs):
    tfs = tf(tokens)
    tf_idf_dict = {t: TF * idfs[t] for t, TF in tfs.items()}
    return tf_idf_dict

In [20]:
# (4b) Compute IDFs and TF-IDFs for the full datasets
RDD_full_corpus = amazon_full_rec_to_token.union(google_full_rec_to_token)
idfs_full = idfs(RDD_full_corpus)
idfs_full_count = idfs_full.count()
print('There are %s unique tokens in the full datasets.' % idfs_full_count)
# There are 17078 unique tokens in the full datasets.
# Recompute IDFs for full dataset
idfs_full_weights = idfs_full.collectAsMap()
idfs_full_broadcast = sc.broadcast(idfs_full_weights)
# Pre-compute TF-IDF weights. Build mappings from record ID weight vector.
RDD_amazon_weights = amazon_full_rec_to_token.map(lambda x: (x[0], tfidf(x[1], idfs_full_weights)))
RDD_google_weights = google_full_rec_to_token.map(lambda x: (x[0], tfidf(x[1], idfs_full_weights)))
print('There are %s Amazon weights and %s Google weights.'
      % (RDD_amazon_weights.count(), RDD_google_weights.count()))
# There are 1363 Amazon weights and 3226 Google weights.

There are 17078 unique tokens in the full datasets.
There are 1363 Amazon weights and 3226 Google weights.


In [21]:
import math
def dotprod(a, b):
    sum = 0.0
    for t, at in a.items():
        if t in b:
            sum += at * b[t]
    return sum
def norm(a):
    sum = 0.0
    for t, at in a.items():
        sum += at * at
    return math.sqrt(sum)
def cossim(a, b):
    return dotprod(a, b) / (norm(a) * norm(b))

In [47]:
def cosine_similarity(string1, string2, idfs_dictionary):
    w1 = tfidf(tokenize(string1), idfs_dictionary)
    w2 = tfidf(tokenize(string2), idfs_dictionary)
    return cossim(w1, w2)

In [25]:
# (4c) Compute Norms for the weights from the full datasets
amazon_norms = RDD_amazon_weights.map(lambda x: (x[0], norm(x[1]))).collectAsMap()
amazon_norms_broadcast = sc.broadcast(amazon_norms)
google_norms = RDD_google_weights.map(lambda x: (x[0], norm(x[1]))).collectAsMap()
google_norms_broadcast = sc.broadcast(google_norms)
print(len(amazon_norms_broadcast.value))
#1363
print(len(google_norms_broadcast.value))
#3226

1363
3226


In [28]:
# (4d) Create inverted indicies from the full datasets
def invert(record):
    return [(k, record[0]) for k in record[1].keys()]
print(invert((1, {'foo': 2, 'bar': 3})))
#[('foo', 1), ('bar', 1)]
RDD_amazon_inv_pairs = RDD_amazon_weights.flatMap(lambda x: invert(x)).cache()
RDD_google_inv_pairs = RDD_google_weights.flatMap(lambda x: invert(x)).cache()
print('There are %s Amazon inverted pairs and %s Google inverted pairs.'
     % (RDD_amazon_inv_pairs.count(), RDD_google_inv_pairs.count()))
#There are 111387 Amazon inverted pairs and 77678 Google inverted pairs.

[('foo', 1), ('bar', 1)]
There are 111387 Amazon inverted pairs and 77678 Google inverted pairs.


In [46]:
# (4e) Identify common tokens from the full dataset
from pyspark.rdd import portable_hash
def swap(record): return (record[1], record[0])
common_tokens = ( RDD_amazon_inv_pairs #.partitionBy(64, lambda k: portable_hash(k[0]))
                  .join(RDD_google_inv_pairs) #.partitionBy(64, lambda k: portable_hash(k[0]))
                  .map(lambda x: swap(x))
                  .groupByKey()
                  .cache()
                )
print('Found %d common tokens' % common_tokens.count())
#Found 2441100 common tokens

Found 2441100 common tokens


In [70]:
# (4f) Identify common tokens from the full dataset (cont.)
amazon_weights_broadcast = sc.broadcast(RDD_amazon_weights.collectAsMap())
google_weights_broadcast = sc.broadcast(RDD_google_weights.collectAsMap())
def fast_cosine_similarity(record):
    amazon_rec = record[0][0]
    google_rec = record[0][1]
    tokens = record[1]
    value = 0.0
    for token in tokens:
        value +=  ( amazon_weights_broadcast.value[amazon_rec][token]
                  * google_weights_broadcast.value[google_rec][token]
                  )
    value /= amazon_norms_broadcast.value[amazon_rec]
    value /= google_norms_broadcast.value[google_rec]
    key = (amazon_rec, google_rec)
    return (key, value)
RDD_similarities_full = common_tokens.map(lambda x: fast_cosine_similarity(x)).cache()
pprint(RDD_similarities_full.count())
#2441100
similarity_test = RDD_similarities_full.filter(
    lambda x: x[0][0] == 'b00005lzly'
          and x[0][1] == 'http://www.google.com/base/feeds/snippets/13823221823254120257'
).collect()
pprint(similarity_test)
#[(('b00005lzly', 'http://www.google.com/base/feeds/snippets/13823221823254120257'), 4.286548413995203e-06)]

2441100
[(('b00005lzly',
   'http://www.google.com/base/feeds/snippets/13823221823254120257'),
  4.2865484139952024e-06)]
