# Following the content from <a href="https://github.com/XD-DENG/Spark-practice">Here Spark practice</a>
## Here we are doing the basic operations on RDD. 

## SparkSession vs SparkContext vs SparkSql

<img src="./sample_data/sparkConetxt_vs_sparkSession.png" />

In [2]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc  = spark.sparkContext
print(spark.sparkContext.version)

3.3.0


In [3]:
raw_content = sc.textFile("./sample_data/2015-12-12.csv")
type(raw_content)
raw_content.count()

421970

## RDD based useful operations

In [4]:
raw_content.take(5)

['"date","time","size","r_version","r_arch","r_os","package","version","country","ip_id"',
 '"2015-12-12","13:42:10",257886,"3.2.2","i386","mingw32","HistData","0.7-6","CZ",1',
 '"2015-12-12","13:24:37",1236751,"3.2.2","x86_64","mingw32","RJSONIO","1.3-0","DE",2',
 '"2015-12-12","13:42:35",2077876,"3.2.2","i386","mingw32","UsingR","2.0-5","CZ",1',
 '"2015-12-12","13:42:01",266724,"3.2.2","i386","mingw32","gridExtra","2.0.0","CZ",1']

In [5]:
raw_content.takeSample(False, 5)

['"2015-12-12","16:21:18",89895,"3.2.3","x86_64","linux-gnu","qlcVisualize","0.1.0","KR",4986',
 '"2015-12-12","13:18:27",121790,"3.2.3","x86_64","linux-gnu","bfast","1.5.7","HU",1402',
 '"2015-12-12","19:12:27",45185,"3.2.3","x86_64","linux-gnu","spatial","7.3-11","US",1163',
 '"2015-12-12","20:10:41",166008,"3.2.3","x86_64","linux-gnu","markdown","0.7.7","IR",7338',
 '"2015-12-12","07:53:31",1579595,"3.2.2","x86_64","mingw32","sp","1.2-1","GB",6906']

In [23]:
raw_content.takeSample(True, 5, 3)

['"2015-12-12","05:59:48",232128,"3.2.1","i386","mingw32","effects","3.0-4","US",10364',
 '"2015-12-12","05:10:24",287843,"3.2.2","x86_64","mingw32","xtable","1.8-0","US",5665',
 '"2015-12-12","13:06:32",494138,"3.2.3","x86_64","linux-gnu","rjson","0.2.15","KR",655',
 '"2015-12-12","23:29:36",1350975,"3.2.3","x86_64","mingw32","randtoolbox","1.17","US",6509',
 '"2015-12-12","13:13:13",1539904,"3.2.3","x86_64","linux-gnu","SpatialExtremes","2.0-2","GB",548']

In [6]:
content = raw_content.map(lambda x : x.split(','))
tmp_content = content.take(2)
print(type(tmp_content))
print(tmp_content)

<class 'list'>
[['"date"', '"time"', '"size"', '"r_version"', '"r_arch"', '"r_os"', '"package"', '"version"', '"country"', '"ip_id"'], ['"2015-12-12"', '"13:42:10"', '257886', '"3.2.2"', '"i386"', '"mingw32"', '"HistData"', '"0.7-6"', '"CZ"', '1']]


In [7]:
def clean(val):
    return([x.replace('"', '') for x in val])

In [8]:
content = content.map(clean)
print(content.take(2))

[['date', 'time', 'size', 'r_version', 'r_arch', 'r_os', 'package', 'version', 'country', 'ip_id'], ['2015-12-12', '13:42:10', '257886', '3.2.2', 'i386', 'mingw32', 'HistData', '0.7-6', 'CZ', '1']]


In [9]:
content_flat = raw_content.flatMap(lambda x : x.split(',')).map(lambda y : y.replace('"', ''))
print(content_flat.count())
print(content_flat.take(30))

4219700
['date', 'time', 'size', 'r_version', 'r_arch', 'r_os', 'package', 'version', 'country', 'ip_id', '2015-12-12', '13:42:10', '257886', '3.2.2', 'i386', 'mingw32', 'HistData', '0.7-6', 'CZ', '1', '2015-12-12', '13:24:37', '1236751', '3.2.2', 'x86_64', 'mingw32', 'RJSONIO', '1.3-0', 'DE', '2']


In [91]:
list1=[1,2]
len(list1)
list_rdd = sc.parallelize(list1).map(lambda x : (x, 1))
for val in list_rdd.collect():
    print(type(val))

<class 'tuple'>
<class 'tuple'>


# Operations on List of RDD 

In [13]:
reduce = content.map(lambda x : len(x)).reduce(lambda a,b : a+b)
print(type(reduce))
print(reduce)

<class 'int'>
4219700


# Operation on Key-Value Pair of RDD
### reduceByKey - Here value will be perform the respective operation defined in lambda function. if value changes during mapping into tuple then result of each key will also affect. It returns list of tuples

### countByKey  - Same as above but final value will not chang unlike above. It returns the dictionary

In [108]:
reduce_count = content.map(lambda x : (x[6], 1)).reduceByKey(lambda a,b : a+b)
print(reduce_count.take(10))
print(reduce_count.count())

[('HistData', 159), ('UsingR', 151), ('lme4', 1560), ('testthat', 1178), ('maps', 1586), ('geosphere', 284), ('ryouready', 57), ('gtools', 1544), ('matrixcalc', 204), ('doParallel', 1328)]
8660


In [110]:
reduce_count = content.map(lambda x : (x[6], 2)).reduceByKey(lambda a,b : a+b)
print(reduce_count.take(10))
print(reduce_count.count())

[('HistData', 318), ('UsingR', 302), ('lme4', 3120), ('testthat', 2356), ('maps', 3172), ('geosphere', 568), ('ryouready', 114), ('gtools', 3088), ('matrixcalc', 408), ('doParallel', 2656)]
8660


In [133]:
reduce_count1 = content.map(lambda x : (x[6], 1)).countByKey()
print(len(reduce_count1))
print(reduce_count1['HistData'])

8660
159


In [50]:
print("Performing group by key operation which not recommended.")
groupByCount = content.map(lambda x : (x[6], 1)).groupByKey(4).mapValues(len)
print(groupByCount.getNumPartitions())
print(groupByCount.take(7))

Performing group by key operation which not recommended.
4
[('UsingR', 151), ('lme4', 1560), ('geosphere', 284), ('htmltools', 1350), ('TH.data', 532), ('rmarkdown', 997), ('rgl', 1084)]


# Sorting

In [122]:
print("sorting on List of tuples data received by reduceByKey in ascending order")
sorted_data_by_key1 = reduce_count.map(lambda x : (x[1], x[0])).sortByKey(0).take(10)
print(sorted_data_by_key1)

sorting on List of tuples data received by reduceByKey
[(9566, 'Rcpp'), (7826, 'ggplot2'), (7496, 'stringi'), (6898, 'stringr'), (6872, 'plyr'), (6530, 'magrittr'), (6446, 'digest'), (6410, 'reshape2'), (6092, 'RColorBrewer'), (6014, 'scales')]


In [123]:
print("sorting on List of tuples data received by reduceByKey in decending order")
sorted_data_by_key1 = reduce_count.map(lambda x : (x[1], x[0])).sortByKey(1).take(10)
print(sorted_data_by_key1)

sorting on List of tuples data received by reduceByKey in decending order
[(2, 'multic'), (2, 'RBerkeley'), (2, 'vimcom'), (2, 'waldwolf'), (2, 'bstats'), (2, 'parspatstat'), (2, 'WaveCGH'), (2, 'mixnet'), (2, 'postgwas'), (2, 'rolasized')]


In [125]:
print("sorting on List of tuples data received by reduceByKey in ascending order using sortBy and single field")
sorted_data_by_key1 = reduce_count.sortBy(lambda x : x[1], ascending = True).take(10)
print(sorted_data_by_key1)

sorting on List of tuples data received by reduceByKey in ascending order using sortBy and single field
[('multic', 2), ('RBerkeley', 2), ('vimcom', 2), ('waldwolf', 2), ('bstats', 2), ('parspatstat', 2), ('WaveCGH', 2), ('mixnet', 2), ('postgwas', 2), ('rolasized', 2)]


In [127]:
print("sorting on List of tuples data received by reduceByKey in ascending order using sortBy and multiple fields")
sorted_data_by_key1 = reduce_count.sortBy(lambda x : (x[1], x[0]), ascending = True).take(10)
print(sorted_data_by_key1)

sorting on List of tuples data received by reduceByKey in ascending order using sortBy and multiple fields
[('ARAMIS', 2), ('BINCO', 2), ('BMAmevt', 2), ('CAscaling', 2), ('CPGchron', 2), ('CVD', 2), ('ChangeAnomalyDetection', 2), ('DART', 2), ('EDanalysis', 2), ('EasyUpliftTree', 2)]


In [144]:
print("sorting on dictionary data received by countByKey. Parallelize always expects the keys")
print(type(reduce_count1))
sorted_data_by_key = sc.parallelize(reduce_count1.keys()).map(lambda x : (reduce_count1[x], x)).sortByKey(0).take(10)
print(sorted_data_by_key)

sorting on dictionary data received by countByKey. Parallelize always expects the keys
<class 'collections.defaultdict'>
[(4783, 'Rcpp'), (3913, 'ggplot2'), (3748, 'stringi'), (3449, 'stringr'), (3436, 'plyr'), (3265, 'magrittr'), (3223, 'digest'), (3205, 'reshape2'), (3046, 'RColorBrewer'), (3007, 'scales')]


# Filter

In [151]:
filter_content = content.filter(lambda x : x[6] == 'Rtts' and x[8] == 'CN')
print(type(filter_content))
print("Printing result as List. Never use collect() in production.")
print(filter_content.collect())

<class 'pyspark.rdd.PipelinedRDD'>
Printing result as List. Never use collect() in production.
[['2015-12-12', '20:15:24', '23820', '3.2.2', 'x86_64', 'mingw32', 'Rtts', '0.3.3', 'CN', '41']]


# Set Operations

In [155]:
print(raw_content.count())
print(raw_content.union(raw_content).count())
print(raw_content.intersection(raw_content).count())

421970
843940
421553


In [156]:
print(raw_content.distinct().count())

421553


# Joins

In [30]:
spark.conf.set("spark.sql.shuffle.partitions", "3")
content_modified=content.map(lambda x:(x[8], x))
print(content_modified.take(2))

[('country', ['date', 'time', 'size', 'r_version', 'r_arch', 'r_os', 'package', 'version', 'country', 'ip_id']), ('CZ', ['2015-12-12', '13:42:10', '257886', '3.2.2', 'i386', 'mingw32', 'HistData', '0.7-6', 'CZ', '1'])]


In [31]:
mapping=[('DE', 'Germany'), ('US', 'United States'), ('CN', 'China'), ('IN',"India")]
mapping=sc.parallelize(mapping)

In [32]:
print("Inner join on the basis of any matching fields start with position 1.")
print(content_modified.getNumPartitions())
innerJoinValue = content_modified.join(mapping)
print(innerJoinValue.take(10))
innerJoinValue.cache();

Inner join on the basis of any matching fields start with position 1.
2
[('DE', (['2015-12-12', '13:24:37', '1236751', '3.2.2', 'x86_64', 'mingw32', 'RJSONIO', '1.3-0', 'DE', '2'], 'Germany')), ('DE', (['2015-12-12', '13:00:21', '3687766', 'NA', 'NA', 'NA', 'lme4', '1.1-10', 'DE', '3'], 'Germany')), ('DE', (['2015-12-12', '13:08:56', '57429', 'NA', 'NA', 'NA', 'testthat', '0.11.0', 'DE', '3'], 'Germany')), ('DE', (['2015-12-12', '13:08:09', '216068', '3.2.2', 'x86_64', 'mingw32', 'mvtnorm', '1.0-3', 'DE', '4'], 'Germany')), ('DE', (['2015-12-12', '13:25:00', '3595497', '3.2.2', 'x86_64', 'mingw32', 'maps', '3.0.1', 'DE', '2'], 'Germany')), ('DE', (['2015-12-12', '13:25:05', '1579597', '3.2.2', 'x86_64', 'mingw32', 'sp', '1.2-1', 'DE', '2'], 'Germany')), ('DE', (['2015-12-12', '13:25:21', '892152', '3.2.3', 'x86_64', 'mingw32', 'geosphere', '1.4-3', 'DE', '2'], 'Germany')), ('DE', (['2015-12-12', '10:43:38', '74715', '3.1.3', 'x86_64', 'mingw32', 'ryouready', '0.4', 'DE', '5'], 'Germany

### cache() vs persist()
#### The cache() method is a shorthand for using the default storage level, which is StorageLevel.MEMORY_ONLY (store deserialized objects in memory) while  persist() method will support both Memory as well as DICK level caching of data. We have to pass the storage level in case of persist() method.

In [33]:
print("Value after Cached into Memory.")
print(innerJoinValue.take(10))

Value after Cached into Memory.
[('DE', (['2015-12-12', '13:24:37', '1236751', '3.2.2', 'x86_64', 'mingw32', 'RJSONIO', '1.3-0', 'DE', '2'], 'Germany')), ('DE', (['2015-12-12', '13:00:21', '3687766', 'NA', 'NA', 'NA', 'lme4', '1.1-10', 'DE', '3'], 'Germany')), ('DE', (['2015-12-12', '13:08:56', '57429', 'NA', 'NA', 'NA', 'testthat', '0.11.0', 'DE', '3'], 'Germany')), ('DE', (['2015-12-12', '13:08:09', '216068', '3.2.2', 'x86_64', 'mingw32', 'mvtnorm', '1.0-3', 'DE', '4'], 'Germany')), ('DE', (['2015-12-12', '13:25:00', '3595497', '3.2.2', 'x86_64', 'mingw32', 'maps', '3.0.1', 'DE', '2'], 'Germany')), ('DE', (['2015-12-12', '13:25:05', '1579597', '3.2.2', 'x86_64', 'mingw32', 'sp', '1.2-1', 'DE', '2'], 'Germany')), ('DE', (['2015-12-12', '13:25:21', '892152', '3.2.3', 'x86_64', 'mingw32', 'geosphere', '1.4-3', 'DE', '2'], 'Germany')), ('DE', (['2015-12-12', '10:43:38', '74715', '3.1.3', 'x86_64', 'mingw32', 'ryouready', '0.4', 'DE', '5'], 'Germany')), ('DE', (['2015-12-12', '20:57:34', 

In [34]:
print("Left Outer join.")
content_modified.cache()
print(content_modified.leftOuterJoin(mapping).take(4))

Left Outer join.
[('ES', (['2015-12-12', '20:56:16', '802583', '3.1.0', 'x86_64', 'mingw32', 'mgcv', '1.8-10', 'ES', '6'], None)), ('ES', (['2015-12-12', '20:57:27', '13847206', '3.1.1', 'x86_64', 'mingw32', 'BH', '1.58.0-1', 'ES', '58'], None)), ('ES', (['2015-12-12', '20:57:39', '2553171', '3.2.0', 'x86_64', 'darwin13.4.0', 'dplyr', '0.4.3', 'ES', '58'], None)), ('ES', (['2015-12-12', '20:58:16', '256800', '3.2.3', 'i386', 'mingw32', 'signal', '0.7-6', 'ES', '81'], None))]


In [37]:
print(innerJoinValue.take(5))

[('DE', (['2015-12-12', '13:24:37', '1236751', '3.2.2', 'x86_64', 'mingw32', 'RJSONIO', '1.3-0', 'DE', '2'], 'Germany')), ('DE', (['2015-12-12', '13:00:21', '3687766', 'NA', 'NA', 'NA', 'lme4', '1.1-10', 'DE', '3'], 'Germany')), ('DE', (['2015-12-12', '13:08:56', '57429', 'NA', 'NA', 'NA', 'testthat', '0.11.0', 'DE', '3'], 'Germany')), ('DE', (['2015-12-12', '13:08:09', '216068', '3.2.2', 'x86_64', 'mingw32', 'mvtnorm', '1.0-3', 'DE', '4'], 'Germany')), ('DE', (['2015-12-12', '13:25:00', '3595497', '3.2.2', 'x86_64', 'mingw32', 'maps', '3.0.1', 'DE', '2'], 'Germany'))]


In [38]:
print(content_modified.take(5))

[('country', ['date', 'time', 'size', 'r_version', 'r_arch', 'r_os', 'package', 'version', 'country', 'ip_id']), ('CZ', ['2015-12-12', '13:42:10', '257886', '3.2.2', 'i386', 'mingw32', 'HistData', '0.7-6', 'CZ', '1']), ('DE', ['2015-12-12', '13:24:37', '1236751', '3.2.2', 'x86_64', 'mingw32', 'RJSONIO', '1.3-0', 'DE', '2']), ('CZ', ['2015-12-12', '13:42:35', '2077876', '3.2.2', 'i386', 'mingw32', 'UsingR', '2.0-5', 'CZ', '1']), ('CZ', ['2015-12-12', '13:42:01', '266724', '3.2.2', 'i386', 'mingw32', 'gridExtra', '2.0.0', 'CZ', '1'])]


In [42]:
print(content.take(6))
content.cache()
print(content.take(6))

[['date', 'time', 'size', 'r_version', 'r_arch', 'r_os', 'package', 'version', 'country', 'ip_id'], ['2015-12-12', '13:42:10', '257886', '3.2.2', 'i386', 'mingw32', 'HistData', '0.7-6', 'CZ', '1'], ['2015-12-12', '13:24:37', '1236751', '3.2.2', 'x86_64', 'mingw32', 'RJSONIO', '1.3-0', 'DE', '2'], ['2015-12-12', '13:42:35', '2077876', '3.2.2', 'i386', 'mingw32', 'UsingR', '2.0-5', 'CZ', '1'], ['2015-12-12', '13:42:01', '266724', '3.2.2', 'i386', 'mingw32', 'gridExtra', '2.0.0', 'CZ', '1'], ['2015-12-12', '13:00:21', '3687766', 'NA', 'NA', 'NA', 'lme4', '1.1-10', 'DE', '3']]
[['date', 'time', 'size', 'r_version', 'r_arch', 'r_os', 'package', 'version', 'country', 'ip_id'], ['2015-12-12', '13:42:10', '257886', '3.2.2', 'i386', 'mingw32', 'HistData', '0.7-6', 'CZ', '1'], ['2015-12-12', '13:24:37', '1236751', '3.2.2', 'x86_64', 'mingw32', 'RJSONIO', '1.3-0', 'DE', '2'], ['2015-12-12', '13:42:35', '2077876', '3.2.2', 'i386', 'mingw32', 'UsingR', '2.0-5', 'CZ', '1'], ['2015-12-12', '13:42:01'

In [52]:
#content.unpersist()
print(content.take(6))

AttributeError: 'NoneType' object has no attribute 'sc'

In [51]:
spark.stop()

In [None]:
print(sc.)