# Comprehensive Introduction to Apache Spark, RDDs & Dataframes (using PySpark)

https://www.analyticsvidhya.com/blog/2016/09/comprehensive-introduction-to-apache-spark-rdds-dataframes-using-pyspark/

## Creating a SparkContext

First we need to create a SparkContext. We will import this from pyspark:

In [1]:
# from pyspark import SparkContext

Now create the SparkContext,A SparkContext represents the connection to a Spark cluster, and can be used to create an RDD and broadcast variables on that cluster.

*Note! You can only have one SparkContext at a time the way we are running things here.*

In [4]:
# sc = SparkContext()

In [6]:
# Spark context alreday exists!
sc

<pyspark.context.SparkContext at 0x7f0020bae450>

In [7]:
sc.version

u'1.6.0'

In [18]:
# Import any .csv file as a RDD and then reaplace with the blogtexts file name in sc.textFile
# @hidden_cell
# This function is used to setup the access of Spark to your Object Storage. The definition contains your credentials.
# You might want to remove those credentials before you share your notebook.
def set_hadoop_config_with_credentials_19099026f8df40b6aec4353c7e897e95(name):
    """This function sets the Hadoop configuration so it is possible to
    access data from Bluemix Object Storage using Spark"""

    prefix = 'fs.swift.service.' + name
    hconf = sc._jsc.hadoopConfiguration()
    hconf.set(prefix + '.auth.url', 'https://identity.open.softlayer.com'+'/v3/auth/tokens')
    hconf.set(prefix + '.auth.endpoint.prefix', 'endpoints')
    hconf.set(prefix + '.tenant', 'cc29768790ec45439a43668592b02f84')
    hconf.set(prefix + '.username', 'd47dac60c7684410842aa453908da4ca')
    hconf.set(prefix + '.password', 'R1o7wzw?37&dHIMq')
    hconf.setInt(prefix + '.http.port', 8080)
    hconf.set(prefix + '.region', 'dallas')
    hconf.setBoolean(prefix + '.public', False)

# you can choose any name
name = 'keystone'
set_hadoop_config_with_credentials_19099026f8df40b6aec4353c7e897e95(name)

rdd = sc.textFile("swift://DatabricksSpark." + name + "/blogtexts")
rdd.take(5)


[u'Think of it for a moment \u2013 1 Qunitillion = 1 Million Billion! Can you imagine how many drives / CDs / Blue-ray DVDs would be required to store them? It is difficult to imagine this scale of data generation even as a data science professional. While this pace of data generation is very exciting,  it has created entirely new set of challenges and has forced us to find new ways to handle Big Huge data effectively.',
 u'',
 u'Big Data is not a new phenomena. It has been around for a while now. However, it has become really important with this pace of data generation. In past, several systems were developed for processing big data. Most of them were based on MapReduce framework. These frameworks typically rely on use of hard disk for saving and retrieving the results. However, this turns out to be very costly in terms of time and speed.',
 u'',
 u'On the other hand, Organizations have never been more hungrier to add a competitive differentiation through understanding this data and o

## General transformations

In [41]:
# Transformation: map and flatMap

def Func(lines):
      lines = lines.lower()
      lines = lines.split()
      return lines
rdd1 = rdd.map(Func)

# rdd1 = rdd.map(lambda lines: lines.lower()).map(lambda lines: lines.split())

In [42]:
rdd1.take(2) # It will print first 2 elements of rdd

[[u'think',
  u'of',
  u'it',
  u'for',
  u'a',
  u'moment',
  u'\u2013',
  u'1',
  u'qunitillion',
  u'=',
  u'1',
  u'million',
  u'billion!',
  u'can',
  u'you',
  u'imagine',
  u'how',
  u'many',
  u'drives',
  u'/',
  u'cds',
  u'/',
  u'blue-ray',
  u'dvds',
  u'would',
  u'be',
  u'required',
  u'to',
  u'store',
  u'them?',
  u'it',
  u'is',
  u'difficult',
  u'to',
  u'imagine',
  u'this',
  u'scale',
  u'of',
  u'data',
  u'generation',
  u'even',
  u'as',
  u'a',
  u'data',
  u'science',
  u'professional.',
  u'while',
  u'this',
  u'pace',
  u'of',
  u'data',
  u'generation',
  u'is',
  u'very',
  u'exciting,',
  u'it',
  u'has',
  u'created',
  u'entirely',
  u'new',
  u'set',
  u'of',
  u'challenges',
  u'and',
  u'has',
  u'forced',
  u'us',
  u'to',
  u'find',
  u'new',
  u'ways',
  u'to',
  u'handle',
  u'big',
  u'huge',
  u'data',
  u'effectively.'],
 []]

In [43]:
rdd2 = rdd.flatMap(Func)

#rdd2 = rdd.map(lambda lines: lines.lower()).flatMap(lambda lines: lines.split())
rdd2.take(5)

[u'think', u'of', u'it', u'for', u'a']

In [44]:
# Transformation: filter

stopwords = ['is','am','are','the','for','a']
rdd3 = rdd2.filter(lambda x: x not in stopwords)
rdd3.take(10)

[u'think',
 u'of',
 u'it',
 u'moment',
 u'\u2013',
 u'1',
 u'qunitillion',
 u'=',
 u'1',
 u'million']

In [None]:
#Transformation: groupByKey / reduceByKey 

In [48]:
rdd4 = rdd3.groupBy(lambda w: w[0:3])
print [(k, list(v)) for (k, v) in rdd4.take(1)]

[(u'all', [u'all', u'allocates', u'all', u'all', u'allows', u'all', u'all', u'all', u'all', u'all', u'all', u'all'])]


In [53]:
rdd3_mapped = rdd3.map(lambda x: (x,1))
rdd3_grouped = rdd3_mapped.groupByKey()

In [54]:
print(list((j[0], list(j[1])) for j in rdd3_grouped.take(5)))

[(u'all', [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]), (u'elements,', [1, 1]), (u'step2:', [1]), (u'manager', [1]), (u'computation', [1, 1, 1, 1, 1, 1])]


In [66]:
rdd3.filter(lambda x: x == 'manager,').collect()

[u'manager,', u'manager,', u'manager,']

In [67]:
rdd3_freq_of_words = rdd3_grouped.mapValues(sum).map(lambda x: (x[1],x[0])).sortByKey(False)

In [68]:
rdd3_freq_of_words.take(10)

[(164, u'to'),
 (143, u'in'),
 (122, u'of'),
 (106, u'and'),
 (103, u'we'),
 (69, u'spark'),
 (64, u'this'),
 (63, u'data'),
 (55, u'can'),
 (52, u'apache')]

In [69]:
rdd3_mapped.reduceByKey(lambda x,y: x+y).map(lambda x:(x[1],x[0])).sortByKey(False).take(10)

[(164, u'to'),
 (143, u'in'),
 (122, u'of'),
 (106, u'and'),
 (103, u'we'),
 (69, u'spark'),
 (64, u'this'),
 (63, u'data'),
 (55, u'can'),
 (52, u'apache')]

In [None]:
# Transformation: mapPartitions

In [77]:
def func(iterator):
  count_spark = 0
  count_apache = 0
  for i in iterator:
     if i =='spark':
        count_spark = count_spark + 1
     if i == 'apache':
        count_apache = count_apache + 1
  return (count_spark,count_apache)

In [78]:
rdd3.mapPartitions(func).glom().collect()

[[49, 39], [20, 13]]

In [80]:
rdd3.mapPartitions(func).collect()

[49, 39, 20, 13]

In [None]:
# Transformation: sample

In [81]:
rdd3_sampled = rdd3.sample(False, 0.4, 42)
print len(rdd3.collect()),len(rdd3_sampled.collect())

4768 1895


In [82]:
# Transformation: union

In [83]:
sample1 = rdd3.sample(False,0.2,42)
sample2 =rdd3.sample(False,0.2,42)
union_of_sample1_sample2 = sample1.union(sample2)
print len(sample1.collect()), len(sample2.collect()),len(union_of_sample1_sample2.collect())

914 914 1828


In [84]:
#Transformation: join

In [85]:
sample1 = rdd3_mapped.sample(False,.2,42)
sample2 = rdd3_mapped.sample(False,.2,42)
join_on_sample1_sample2 = sample1.join(sample2)
join_on_sample1_sample2.take(2)

[(u'operations', (1, 1)), (u'operations', (1, 1))]

In [86]:
# Transformation: distinct
rdd3_distinct = rdd3.distinct()
len(rdd3_distinct.collect())

1485

In [87]:
# Transformation: coalesce
rdd3.getNumPartitions()

2

In [88]:
rdd3_coalesce = rdd3.coalesce(1)
rdd3_coalesce.getNumPartitions()

1

## General Actions

In [90]:
# Action: getNumPartitions
rdd3.getNumPartitions()

2

In [91]:
# Action: Reduce
num_rdd = sc.parallelize(range(1,1000))
num_rdd.reduce(lambda x,y: x+y)

499500

In [92]:
# Action: count
rdd3.count()

4768

In [93]:
# Action: max, min, sum, variance and stdev
num_rdd.max(),num_rdd.min(), num_rdd.sum(),num_rdd.variance(),num_rdd.stdev() 

(999, 1, 499500, 83166.66666666667, 288.38631497813253)