### ASBD tutorial 8 

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

### 1.  For the given input file, calculate Wordcount using Hadop and Spark

## With spark RDDs

In [6]:
from pyspark import SparkContext
import timeit
sc = spark.sparkContext

start_time = timeit.default_timer()
words = sc.textFile('sample.txt').flatMap(lambda line: line.split(" "))
wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda a,b : a+b)
end_time = timeit.default_timer()

time_taken_RDD = end_time - start_time

wordCounts.take(20)

[('What', 1),
 ('is', 8),
 ('Lorem', 17),
 ('', 7),
 ('Ipsum', 13),
 ('dummy', 2),
 ('of', 21),
 ('printing', 1),
 ('typesetting', 1),
 ("industry's", 1),
 ('ever', 1),
 ('1500s,', 1),
 ('when', 2),
 ('an', 1),
 ('unknown', 1),
 ('took', 1),
 ('galley', 1),
 ('type', 2),
 ('make', 1),
 ('book.', 1)]

### Without Spark RDDs

In [8]:

import timeit
count = 0

start_time = timeit.default_timer()
text = open("sample.txt", "r")
d = dict()
for line in text:
    line = line.strip()
    line = line.lower()
    words = line.split(" ")
    for word in words:
        if word in d:
            d[word] = d[word] + 1
        else:
            d[word] = 1

end_time = timeit.default_timer()

time_taken_NoRDD = end_time - start_time

i = 1
for key in list(d.keys()):
    if(i>20):
        break
    i += 1
    print(key, ":", d[key])

what : 1
is : 8
lorem : 17
ipsum? : 1
 : 7
ipsum : 14
simply : 2
dummy : 2
text : 2
of : 21
the : 27
printing : 1
and : 10
typesetting : 1
industry. : 1
has : 4
been : 1
industry's : 1
standard : 2
ever : 1


In [11]:
print("Time Taken to count words with RDD: ", round(time_taken_RDD, 3))
print("Time Taken to count words without RDD: ", round(time_taken_NoRDD, 3))

Time Taken to count words with RDD:  0.055
Time Taken to count words without RDD:  0.003


# 2. Populate 1000 numbers , calculate Mean, Variance, Std Dev

In [21]:
import numpy as np
import pandas as pd
df = spark.createDataFrame(pd.DataFrame(np.random.randint(0,100,size=(1000,1)), columns=["Numbers"]))

In [22]:
df.show()

+-------+
|Numbers|
+-------+
|     85|
|     87|
|     25|
|      2|
|     70|
|     61|
|     16|
|     66|
|     80|
|     17|
|     32|
|      0|
|     93|
|     40|
|     25|
|     58|
|     62|
|     86|
|     98|
|     28|
+-------+
only showing top 20 rows



In [23]:
df.count()

1000

In [24]:
from pyspark.sql.functions import mean, variance, stddev_samp
df.select(mean(df.Numbers)).show()

+------------+
|avg(Numbers)|
+------------+
|      48.759|
+------------+



In [25]:
df.select(variance(df.Numbers)).show()

+-----------------+
|var_samp(Numbers)|
+-----------------+
|842.1030220220222|
+-----------------+



In [26]:
df.select(stddev_samp(df.Numbers)).show()

+--------------------+
|stddev_samp(Numbers)|
+--------------------+
|  29.019011389467117|
+--------------------+



# 3. Compute correlation b/w two series using Pearson’s & Spearman’s method

In [27]:
from pyspark.ml.linalg import Vectors
data = [[Vectors.dense([35, 23, 47, 17, 10, 43, 9, 6, 28])], [Vectors.dense([30, 33, 45, 23, 8, 49, 12, 4, 31])]]
data = spark.createDataFrame(data, ["A"])

In [28]:
data.show()

+--------------------+
|                   A|
+--------------------+
|[35.0,23.0,47.0,1...|
|[30.0,33.0,45.0,2...|
+--------------------+



In [29]:
from pyspark.ml.stat import Correlation

In [30]:
pearsonCorr = Correlation.corr(data, 'A', 'pearson').collect()[0][0]
print(str(pearsonCorr))

DenseMatrix([[ 1., -1.,  1., -1.,  1., -1., -1.,  1., -1.],
             [-1.,  1., -1.,  1., -1.,  1.,  1., -1.,  1.],
             [ 1., -1.,  1., -1.,  1., -1., -1.,  1., -1.],
             [-1.,  1., -1.,  1., -1.,  1.,  1., -1.,  1.],
             [ 1., -1.,  1., -1.,  1., -1., -1.,  1., -1.],
             [-1.,  1., -1.,  1., -1.,  1.,  1., -1.,  1.],
             [-1.,  1., -1.,  1., -1.,  1.,  1., -1.,  1.],
             [ 1., -1.,  1., -1.,  1., -1., -1.,  1., -1.],
             [-1.,  1., -1.,  1., -1.,  1.,  1., -1.,  1.]])


In [31]:
spearmanCorr = Correlation.corr(data, 'A', 'spearman').collect()[0][0]
print(str(spearmanCorr))

DenseMatrix([[ 1., -1.,  1., -1.,  1., -1., -1.,  1., -1.],
             [-1.,  1., -1.,  1., -1.,  1.,  1., -1.,  1.],
             [ 1., -1.,  1., -1.,  1., -1., -1.,  1., -1.],
             [-1.,  1., -1.,  1., -1.,  1.,  1., -1.,  1.],
             [ 1., -1.,  1., -1.,  1., -1., -1.,  1., -1.],
             [-1.,  1., -1.,  1., -1.,  1.,  1., -1.,  1.],
             [-1.,  1., -1.,  1., -1.,  1.,  1., -1.,  1.],
             [ 1., -1.,  1., -1.,  1., -1., -1.,  1., -1.],
             [-1.,  1., -1.,  1., -1.,  1.,  1., -1.,  1.]])


# 4. Applying e^x and log(x) on 10k numbers

In [32]:
import numpy as np
import pandas as pd
df10k = spark.createDataFrame(pd.DataFrame(np.random.randint(0,100,size=(10000,1)), columns=["Numbers"]))

### DataFrame for 1000 entries

In [33]:
df10k.show()

+-------+
|Numbers|
+-------+
|     80|
|     11|
|     52|
|     82|
|     35|
|     37|
|     20|
|     30|
|      1|
|     85|
|     59|
|      6|
|     99|
|     27|
|     25|
|     41|
|     39|
|     93|
|      2|
|     65|
+-------+
only showing top 20 rows



In [34]:
df10k.count()

10000

In [50]:
from pyspark.sql.functions import exp
import timeit
start_time = timeit.default_timer()
df10k.select(exp(df10k.Numbers)).show()
end_time = timeit.default_timer()

time_taken = end_time - start_time

+--------------------+
|        EXP(Numbers)|
+--------------------+
| 5.54062238439351E34|
|   59874.14171519782|
|3.831008000716577E22|
|4.093996962127454...|
|1.586013452313430...|
|1.171914237280261...|
| 4.851651954097903E8|
|1.068647458152446...|
|  2.7182818284590455|
|8.223012714622913E36|
|4.201210403790514...|
|   403.4287934927351|
|9.889030319346946E42|
|5.320482406017986...|
|7.200489933738588E10|
|6.398434935300549...|
|8.659340042399374...|
|2.451245542920086E40|
|    7.38905609893065|
|1.694889244410333...|
+--------------------+
only showing top 20 rows



In [51]:
print("Time Taken for 10k entries: ", round(time_taken, 3))

Time Taken for 10k entries:  0.037


In [54]:
from pyspark.sql.functions import log
import timeit
start_time = timeit.default_timer()
df10k.select(log(df10k.Numbers)).show()
end_time = timeit.default_timer()

time_taken = end_time - start_time

+------------------+
|       ln(Numbers)|
+------------------+
| 4.382026634673881|
|2.3978952727983707|
|3.9512437185814275|
| 4.406719247264253|
|3.5553480614894135|
|3.6109179126442243|
| 2.995732273553991|
|3.4011973816621555|
|               0.0|
| 4.442651256490317|
|  4.07753744390572|
| 1.791759469228055|
|  4.59511985013459|
| 3.295836866004329|
|3.2188758248682006|
| 3.713572066704308|
|3.6635616461296463|
| 4.532599493153256|
|0.6931471805599453|
| 4.174387269895637|
+------------------+
only showing top 20 rows



In [55]:
print("Time Taken for 10k entries: ", round(time_taken, 3))

Time Taken for 10k entries:  0.048


### DataFrame For 100 entries

In [41]:
df100 = df10k.sample(fraction=0.01)

In [42]:
df100.show()

+-------+
|Numbers|
+-------+
|     11|
|     62|
|      6|
|     24|
|     95|
|     37|
|     33|
|     21|
|     84|
|     41|
|     53|
|     98|
|     41|
|     93|
|      9|
|     12|
|     96|
|     19|
|     79|
|     24|
+-------+
only showing top 20 rows



In [43]:
df100.count()

103

In [52]:
from pyspark.sql.functions import exp
import timeit
start_time = timeit.default_timer()
df100.select(exp(df100.Numbers)).show()
end_time = timeit.default_timer()

time_taken = end_time - start_time

+--------------------+
|        EXP(Numbers)|
+--------------------+
|   59874.14171519782|
|8.438356668741455E26|
|   403.4287934927351|
|2.648912212984347E10|
|1.811239082889023...|
|1.171914237280261...|
|2.146435797859160...|
|1.3188157344832146E9|
|3.025077322201142...|
|6.398434935300549...|
|1.041375943302908...|
|3.637970947608805E42|
|6.398434935300549...|
|2.451245542920086E40|
|   8103.083927575384|
|  162754.79141900392|
|4.923458286012058E41|
|1.7848230096318728E8|
|2.038281066512668...|
|2.648912212984347E10|
+--------------------+
only showing top 20 rows



In [53]:
print("Time Taken for 100 entries: ", round(time_taken, 3))

Time Taken for 100 entries:  0.081


In [56]:
from pyspark.sql.functions import log
import timeit
start_time = timeit.default_timer()
df100.select(log(df100.Numbers)).show()
end_time = timeit.default_timer()

time_taken = end_time - start_time

+------------------+
|       ln(Numbers)|
+------------------+
|2.3978952727983707|
| 4.127134385045092|
| 1.791759469228055|
|3.1780538303479458|
| 4.553876891600541|
|3.6109179126442243|
|3.4965075614664802|
| 3.044522437723423|
| 4.430816798843313|
| 3.713572066704308|
| 3.970291913552122|
| 4.584967478670572|
| 3.713572066704308|
| 4.532599493153256|
|2.1972245773362196|
|2.4849066497880004|
| 4.564348191467836|
|2.9444389791664403|
|4.3694478524670215|
|3.1780538303479458|
+------------------+
only showing top 20 rows



In [57]:
print("Time Taken for 100 entries: ", round(time_taken, 3))

Time Taken for 100 entries:  0.084


# 5. Generate FIM using FP Growth on dataset

In [59]:
from pyspark.sql.functions import split
data = (spark.read.text("mushroom.txt").select(split("value", "\s+").alias("items")))
data.show(truncate=False)

+----------------------------------------------------------------------------------------------+
|items                                                                                         |
+----------------------------------------------------------------------------------------------+
|[1, 3, 9, 13, 23, 25, 34, 36, 38, 40, 52, 54, 59, 63, 67, 76, 85, 86, 90, 93, 98, 107, 113, ] |
|[2, 3, 9, 14, 23, 26, 34, 36, 39, 40, 52, 55, 59, 63, 67, 76, 85, 86, 90, 93, 99, 108, 114, ] |
|[2, 4, 9, 15, 23, 27, 34, 36, 39, 41, 52, 55, 59, 63, 67, 76, 85, 86, 90, 93, 99, 108, 115, ] |
|[1, 3, 10, 15, 23, 25, 34, 36, 38, 41, 52, 54, 59, 63, 67, 76, 85, 86, 90, 93, 98, 107, 113, ]|
|[2, 3, 9, 16, 24, 28, 34, 37, 39, 40, 53, 54, 59, 63, 67, 76, 85, 86, 90, 94, 99, 109, 114, ] |
|[2, 3, 10, 14, 23, 26, 34, 36, 39, 41, 52, 55, 59, 63, 67, 76, 85, 86, 90, 93, 98, 108, 114, ]|
|[2, 4, 9, 15, 23, 26, 34, 36, 39, 42, 52, 55, 59, 63, 67, 76, 85, 86, 90, 93, 98, 108, 115, ] |
|[2, 4, 10, 15, 23, 27, 34, 36

In [79]:
from pyspark.ml.fpm import FPGrowth
fp = FPGrowth(minSupport=0.9, minConfidence=0.7)
fpm = fp.fit(data)
fpm.freqItemsets.show(truncate=False)

+--------------+----+
|items         |freq|
+--------------+----+
|[86]          |7924|
|[86, ]        |7924|
|[86, , 85]    |7924|
|[86, 85]      |7924|
|[90]          |7488|
|[90, ]        |7488|
|[90, , 85]    |7488|
|[90, 85]      |7488|
|[]            |8124|
|[, 85]        |8124|
|[34]          |7914|
|[34, 86]      |7906|
|[34, 86, ]    |7906|
|[34, 86, , 85]|7906|
|[34, 86, 85]  |7906|
|[34, ]        |7914|
|[34, , 85]    |7914|
|[34, 85]      |7914|
|[85]          |8124|
+--------------+----+



In [80]:
fpm.associationRules.show(truncate=False)

+------------+----------+------------------+------------------+------------------+
|antecedent  |consequent|confidence        |lift              |support           |
+------------+----------+------------------+------------------+------------------+
|[34, 85]    |[86]      |0.9989891331817033|1.0242033970176878|0.9731659281142294|
|[34, 85]    |[]        |1.0               |1.0               |0.9741506646971935|
|[34, , 85]  |[86]      |0.9989891331817033|1.0242033970176878|0.9731659281142294|
|[85]        |[86]      |0.9753815854258986|1.0               |0.9753815854258986|
|[85]        |[90]      |0.9217134416543574|1.0               |0.9217134416543574|
|[85]        |[]        |1.0               |1.0               |1.0               |
|[85]        |[34]      |0.9741506646971935|1.0               |0.9741506646971935|
|[34, 86, 85]|[]        |1.0               |1.0               |0.9731659281142294|
|[34, 86, ]  |[85]      |1.0               |1.0               |0.9731659281142294|
|[34