- findspark==1.4.2
- pyspark==3.0.0

In [1]:
import findspark
findspark.init()

In [2]:
import sys
from operator import add

In [3]:
import pyspark

In [4]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

In [38]:
import numpy as np
from random import random
from math import log2

### WordCount

##### Implemented classic wordcount example to familiarize myself with pyspark
[Skip to Project Code](#Naive-Method)

In [None]:
filename = './test_data.txt'

In [13]:
spark = SparkSession\
        .builder\
        .master("local[*]")\
        .appName("WCount")\
        .getOrCreate()

In [14]:
sc = spark.sparkContext

In [15]:
# .filter(lambda line: len(line)>1) \

#### RDD

 > RDD - these are the elements that run and operate on multiple nodes to do parallel processing on a cluster

In [16]:
lines = spark.read.text(filename).rdd.map(lambda r: r[0]) # RDD

In [17]:
type(lines)

pyspark.rdd.PipelinedRDD

In [18]:
lines.take(2)

['Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. At ultrices mi tempus imperdiet nulla malesuada pellentesque. Vitae congue mauris rhoncus aenean vel elit. Arcu dui vivamus arcu felis bibendum ut tristique et egestas. Nunc non blandit massa enim nec dui nunc mattis. Amet facilisis magna etiam tempor orci eu lobortis elementum. Quis risus sed vulputate odio ut enim. Leo vel orci porta non pulvinar neque laoreet suspendisse. In tellus integer feugiat scelerisque. Diam sit amet nisl suscipit adipiscing bibendum est ultricies integer. Lacinia quis vel eros donec ac odio tempor. Sit amet consectetur adipiscing elit ut aliquam purus sit. Aliquet eget sit amet tellus cras. Eget est lorem ipsum dolor sit amet consectetur adipiscing elit. Ut ornare lectus sit amet est.',
 '']

In [32]:
%%time
reduced_data = lines.flatMap(lambda x: x.split(' ')) \ # flatmap returns RDD
    .map(lambda x: (x, 1)) \
     .reduceByKey(add)

Wall time: 165 ms


In [37]:
type(reduced_data)

pyspark.rdd.PipelinedRDD

In [34]:
%%time
output = reduced_data.collect()

Wall time: 751 ms


In [38]:
type(output)

list

In [39]:
output

[('Lorem', 570),
 ('ipsum', 3420),
 ('consectetur', 3990),
 ('sed', 12540),
 ('do', 570),
 ('labore', 570),
 ('magna', 3420),
 ('At', 1710),
 ('mi', 2280),
 ('nulla', 2850),
 ('malesuada', 4560),
 ('congue', 570),
 ('rhoncus', 3420),
 ('aenean', 2280),
 ('elit.', 2280),
 ('Arcu', 1710),
 ('felis', 3420),
 ('bibendum', 4560),
 ('Nunc', 2850),
 ('nunc', 4560),
 ('etiam', 1140),
 ('lobortis', 1710),
 ('elementum.', 1710),
 ('vulputate', 3420),
 ('odio', 6270),
 ('enim.', 1140),
 ('Leo', 1140),
 ('tellus', 5130),
 ('integer', 1710),
 ('feugiat', 3420),
 ('scelerisque.', 1140),
 ('Diam', 570),
 ('amet', 9690),
 ('suscipit', 1140),
 ('est', 3420),
 ('quis', 8550),
 ('Sit', 3990),
 ('purus', 5700),
 ('sit.', 3420),
 ('Aliquet', 2280),
 ('eget', 6840),
 ('ornare', 3420),
 ('lectus', 6840),
 ('', 5759),
 ('Non', 1710),
 ('interdum', 2850),
 ('Dui', 1140),
 ('Purus', 1140),
 ('faucibus', 5130),
 ('nisi', 2850),
 ('lacus', 2850),
 ('sapien', 2280),
 ('pellentesque', 8550),
 ('habitant', 570),
 ('

In [123]:
spark.stop()

## Naive Method

In [6]:
spark = SparkSession\
        .builder\
        .master("local[*]")\
        .appName("GroupByMat")\
        .getOrCreate()

In [7]:
sc = spark.sparkContext

In [8]:
# to split the input using the delimiter
def splitter(l):
    i,j,v = l.strip().split(' ')
    return tuple([int(i),(int(j),float(v))])

In [249]:
# working dope
# mat_rdd = sc.textFile('./small_numbers.txt').map(lambda l: splitter(l))

In [9]:
# working dope
mat_rdd = sc.textFile('./test_data_jester2.txt').map(lambda l: splitter(l))

In [8]:
# mat_rdd = sc.textFile('./test_jester2_big2.txt').map(lambda l: splitter(l))

In [30]:
mat_rdd.take(2)

[(0, (0, 0.0)), (0, (1, 0.8109999999999999))]

In [211]:
# best working
# join_results = mat_rdd\
#     .map(lambda l: (l[0],(l[1],l[2])))\
#     .join(mat_rdd2.map(lambda l: (l[0],(l[1],l[2]))))\
#     .map(lambda t: ((t[1][0][0],t[1][1][0]), t[1][0][1]*t[1][1][1]))\
#     .reduceByKey(add)

In [31]:
# best working
join_results = mat_rdd\
    .join(mat_rdd2.map(lambda l: l))\
    .map(lambda t: ((t[1][0][0],t[1][1][0]), t[1][0][1]*t[1][1][1]))\
    .reduceByKey(add)

In [32]:
%%time
collected_results = join_results.collect()

Wall time: 9min 35s


In [213]:
collected_results = sorted(collected_results)

In [216]:
collected_results[0]

((0, 0), 4508.473709000027)

In [None]:
# ((0, 0), 4508.473709000027)

In [215]:
fileName = './res_test_jester_data.txt'
with open(fileName, 'w') as f:
    for t in collected_results:
        (i,j),v = t
        f.write(f"{i} {j} {v}\n")

### Results
- (23500,100) --> (100,100)
- rows in txt files 2350000
- test_data_jester2.txt: 40MB
- Time to collect results: 1m 30s

---------------------------------------------
- (47000,100) --> (100,100)
- rows in txt file 4700000
- test_jester2_big2.txt --> 80MB
- Time to collect results: 9m 35s

#### Efficient Naive

In [9]:
def map_func(ar):
    ar = np.array(list(ar))
    return ar.reshape(-1,1)*ar

In [10]:
def map_wo_numpy(ar,n):
    ar = list(ar)
    l = [[0 for i in range(n)] for j in range(n)]
    for i in range(n):
        for j in range(n):
            l[i][j] = ar[i]*ar[j]
    return l

In [11]:
def fold_func(a,b,n):
    a,b = np.array(a), np.array(b)
    return a+b

In [12]:
def fold_wo_numpy(a,b):
    a,b = list(a), list(b)
    n = len(a)
    for i in range(n):
        for j in range(n):
            a[i][j] += b[i][j]
    return a

In [13]:
def calc_ATA_naive(mat):
    res = mat\
        .map(lambda x: map_func(x))\
        .fold(0,add)
    return res

In [14]:
def calc_ATA_naive_wo_numpy(mat):
    n = len(mat.first())
    res = mat\
        .map(lambda x: map_wo_numpy(x,n))\
        .fold([[0 for i in range(n)]for j in range(n)],fold_wo_numpy)
    return res

In [15]:
spark = SparkSession\
        .builder\
        .master("local[*]")\
        .appName("GroupByMat")\
        .getOrCreate()

In [16]:
sc = spark.sparkContext

In [17]:
def splitter(l):
    return [float(x) for x in l.strip().split(' ')]

In [25]:
mat_rdd = sc.textFile('./by-row-mat/jester2_row.txt').map(lambda l: splitter(l))

In [16]:
%%time
res = calc_ATA_naive(mat_rdd)

Wall time: 2.93 s


Takes **~30 seconds**

In [32]:
%%time
res = calc_ATA_naive_wo_numpy(mat_rdd)

Wall time: 26.4 s


In [96]:
test_mag = np.loadtxt('./by-row-mat/jester2_row.txt', delimiter=' ')

In [101]:
# the error
(test_mag.T @ test_mag - res).max()

1.9554136088117957e-11

#### On the larger dataset

In [34]:
mat_rdd = sc.textFile('./by-row-mat/jester2_row_b1.txt').map(lambda l: splitter(l))

Takes **~ 50 seconds**

In [35]:
%%time
res = calc_ATA_naive_wo_numpy(mat_rdd)

Wall time: 49.7 s


In [37]:
test_mag = np.loadtxt('./by-row-mat/jester2_row_b1.txt', delimiter=' ')

In [38]:
# the error
((test_mag.T @ test_mag) - np.array(res)).max()

5.547917680814862e-11

#### On an even larger dataset

In [46]:
mat_rdd = sc.textFile('./by-row-mat/jester2_row_b2.txt').map(lambda l: splitter(l))

Takes ~ **3m 4sec**

In [47]:
%%time
res = calc_ATA_naive_wo_numpy(mat_rdd)

Wall time: 3min 4s


In [42]:
test_mag = np.loadtxt('./by-row-mat/jester2_row_b2.txt', delimiter=' ')

In [48]:
# the error
((test_mag.T @ test_mag) - np.array(res)).max()

4.874891601502895e-10

#### Using mapPartition

In [147]:
def map_part(ar,n):
    l = [[0 for i in range(n)] for j in range(n)]
    for a in ar:
        for i in range(n):
            for j in range(n):
                l[i][j] += a[i]*a[j]
    
    yield l

In [148]:
def calc_map_part_naive_wo_numpy(mat):
    n = len(mat.first())
    zeroVal = [[0 for i in range(n)]for j in range(n)]
    res = mat\
        .mapPartitions(lambda x: map_part(x,n))\
        .fold(zeroVal, fold_wo_numpy)
    return res

#### Good References for using mapPartitions
> https://stackoverflow.com/questions/44222307/spark-rdd-default-number-of-partitions

> https://github.com/mahmoudparsian/pyspark-tutorial/tree/master/tutorial/map-partitions

> https://medium.com/parrot-prediction/partitioning-in-apache-spark-8134ad840b0

In [130]:
mat_rdd = sc.textFile('./by-row-mat/jester2_row.txt', 12).map(lambda l: splitter(l))

In [131]:
mat_rdd.getNumPartitions()

12

With 12 partitions: ~ **23 s**

In [149]:
%%time
res = calc_map_part_naive_wo_numpy(mat_rdd)

Wall time: 22.8 s


## Sampling Method

- mapPartitions
- fold
- https://github.com/apache/spark/pull/1778/commits/5b8cd7deb3f29d3c2533b01f496f41175471f023 (both implementations)

**Caching RDDs** in Spark: It is one mechanism to speed up applications that access the same RDD multiple times. An RDD that is not cached, nor checkpointed, is re-evaluated again each time an action is invoked on that RDD. There are two function calls for caching an RDD: cache() and persist(level: StorageLevel). The difference among them is that cache() will cache the RDD into memory, whereas persist(level) can cache in memory, on disk, or off-heap memory according to the caching strategy specified by level.

- A good reference: https://stackoverflow.com/questions/28981359/why-do-we-need-to-call-cache-or-persist-on-a-rdd

In [18]:
mat_rdd = sc.textFile('./by-row-mat/jester2_row.txt').map(lambda l: splitter(l))

In [21]:
def multiplier(ar):
    ar = np.array(ar)
    return ar*ar

In [22]:
def add_arr(a,b):
    return [(ai+bi) for (ai,bi) in zip(a,b)]

In [23]:
def multiplier_wo_numpy(ar):
    return [a*a for a in ar]

In [25]:
def cols_magnitude(mat):
    res = mat\
        .map(lambda x: multiplier(x))\
        .fold(0, add)
    return res

In [26]:
def cols_magnitude_wo_numpy(mat):
    n = len(mat.first())
    res = mat\
        .map(lambda x: multiplier_wo_numpy(x))\
        .fold([0 for i in range(n)], add_arr)
    return res

In [30]:
%%time
col_mags_numpy = cols_magnitude(mat_rdd)

Wall time: 1.46 s


In [31]:
%%time
col_mags_wo_numpy = cols_magnitude_wo_numpy(mat_rdd)

Wall time: 1.95 s


In [36]:
def dimsum_map_part(ar,n,gamma,mag_cols):
    l = [[0 for i in range(n)] for j in range(n)]
    for a in ar:
        for i in range(n):
            if random() < (gamma/mag_cols[i]):
                for j in range(i+1,n):
                    temp = a[i]*a[j]
                    l[i][j] += temp
                    l[j][i] += temp
    yield l

In [40]:
dimsum_fold = fold_wo_numpy

In [60]:
def dimsum_wo_numpy(mat, mag_cols, gamma, n_cols):
    # gamma generally 2*log2(n)
    if gamma<1:
        return "Error: Please provide gamma > 1"
    
    n = n_cols
    zeroVal = [[0 for i in range(n)]for j in range(n)]
    res = mat\
        .mapPartitions(lambda x: dimsum_map_part(x,n,gamma,mag_cols))\
        .fold(zeroVal, dimsum_fold)
    
    # Now we restore what we normalized using gamma and col_mags  
    for i in range(n):
        for j in range(i,n):
            if i==j:
                res[i][j] = mag_cols[i]
            elif (gamma / mag_cols[i]) < 1:
                res[i][j] *= (mag_cols[i]/ gamma)
            res[j][i] = res[i][j]
    return res

In [43]:
n_cols = len(mat_rdd.first())

In [48]:
gamma = 2*log2(n_cols)

Takes only ~ **2 sec**

In [61]:
%%time
res = dimsum_wo_numpy(mat_rdd, col_mags_wo_numpy, gamma, n_cols)

Wall time: 1.53 s


In [63]:
test_mag = np.loadtxt('./by-row-mat/jester2_row.txt', delimiter=' ')

In [62]:
res

[[4508.430000000002,
  1474.9413212136785,
  1311.198439380336,
  2003.8315081217045,
  1543.919621828511,
  1231.3966867752758,
  1548.975089999495,
  1575.6096035177657,
  2157.5313122462467,
  1466.5607800174835,
  2127.4699377530137,
  621.381503915422,
  725.5105765110551,
  1596.3743452670415,
  579.4109393012808,
  364.97765849176534,
  889.3891756107325,
  1464.8982435048777,
  257.28600765483463,
  52.590440704865344,
  1235.027123649741,
  2592.9801612825313,
  538.0850317022321,
  1864.483804912103,
  1838.2225138762549,
  1714.3126497122748,
  203.37232360320178,
  -431.03803788039306,
  885.6569507865161,
  2078.1706407567754,
  117.225788796974,
  830.2164474886126,
  1323.480851983666,
  1444.2352897053534,
  1141.4839978282482,
  1413.8006927296992,
  1959.0448102311093,
  1727.1040020643616,
  464.390556082253,
  1671.2224176508691,
  1779.049785754135,
  1258.7776452583896,
  1206.4925684027787,
  2136.3254893813814,
  1373.323018045245,
  946.4582861046573,
  1172.08

In [64]:
((test_mag.T @ test_mag) - np.array(res)).max()

3249.215687217695