<a href="https://colab.research.google.com/github/datasigntist/byteSizedLearn/blob/master/pySpark_Codes_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Experiments with Spark**

Author : Vishwanathan Raman

EmailId : datasigntist@gmail.com

Description : This notebook covers basic to intermediate concepts through practical experimentation on pySpark. The following are covered
*   Creation of RDD from a text file 
*   Apply Actions and Transformations on RDD
*   Illustration of lambda, map, flatmap, filter functions on RDD    

Reference Links:

*   Introduction to Spark 1 : https://youtu.be/TuGn3e1EgXM
*   Introduction to Spark 2 : https://youtu.be/JruCKuWHKpk
*   Introduction to Spark 3 : https://youtu.be/c9jd4yZGyT8
*   Introduction to RDD 1   : https://youtu.be/M7UuKHYecXQ
*   Introduction to RDD 2   : https://youtu.be/qLGUPdSvAVg
*   Introduction to RDD 3   : https://youtu.be/9NBP-FiHrQg



## **Spark Installation in Google Colab**

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-eu.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

## **Loading text file**

In [0]:
data = spark.read.text("README.md")

In Apache Spark, a DataFrame is a distributed collection of rows under named columns. In simple terms, it is same as a table in relational database

In [5]:
print(type(data))

<class 'pyspark.sql.dataframe.DataFrame'>


Converting the DataFrame to a RDD

In [0]:
data_rdd = data.rdd

In [8]:
print(type(data_rdd))

<class 'pyspark.rdd.RDD'>


### Exploratory Analysis of the dataframe through actions

In [11]:
print("The number of lines in file README.md ",data.count())

The number of lines in file README.md  103


Retrieve the first row using the action first()

In [12]:
data.first()

Row(value='# Apache Spark')

Retrieve all the rows through collect() but limiting the retrieval to first 10 rows

In [14]:
data.collect()[0:10]

[Row(value='# Apache Spark'),
 Row(value=''),
 Row(value='Spark is a fast and general cluster computing system for Big Data. It provides'),
 Row(value='high-level APIs in Scala, Java, Python, and R, and an optimized engine that'),
 Row(value='supports general computation graphs for data analysis. It also supports a'),
 Row(value='rich set of higher-level tools including Spark SQL for SQL and DataFrames,'),
 Row(value='MLlib for machine learning, GraphX for graph processing,'),
 Row(value='and Spark Streaming for stream processing.'),
 Row(value=''),
 Row(value='<http://spark.apache.org/>')]

In [18]:
data.describe()

DataFrame[summary: string, value: string]

### Applying transformations on the dataset

Filter the data having Spark

In [0]:
linesWithSpark = data.filter(data.value.contains("Spark"))

Creating a new RDD based on the existing rdd through transformations

In [37]:
print(type(linesWithSpark))

<class 'pyspark.sql.dataframe.DataFrame'>


In [38]:
print("The number of lines with Spark is ",linesWithSpark.count())

The number of lines with Spark is  20


In [39]:
linesWithSpark.collect()

[Row(value='# Apache Spark'),
 Row(value='Spark is a fast and general cluster computing system for Big Data. It provides'),
 Row(value='rich set of higher-level tools including Spark SQL for SQL and DataFrames,'),
 Row(value='and Spark Streaming for stream processing.'),
 Row(value='You can find the latest Spark documentation, including a programming'),
 Row(value='## Building Spark'),
 Row(value='Spark is built using [Apache Maven](http://maven.apache.org/).'),
 Row(value='To build Spark and its example programs, run:'),
 Row(value='You can build Spark using more than one thread by using the -T option with Maven, see ["Parallel builds in Maven 3"](https://cwiki.apache.org/confluence/display/MAVEN/Parallel+builds+in+Maven+3).'),
 Row(value='["Building Spark"](http://spark.apache.org/docs/latest/building-spark.html).'),
 Row(value='For general development tips, including info on developing Spark using an IDE, see ["Useful Developer Tools"](http://spark.apache.org/developer-tools.html).'

Applying transformations on the RDD which was created in the earlier step. This is different from the dataframe object. Each line in the rdd has a value which is being scanned for the word Spark. The result is the same as in the previous step except that we are scanning the RDD instead of the dataframe object.

In [0]:
linesWithSpark_rdd = data_rdd.filter(lambda line: "Spark" in line.value)

In [49]:
print(type(linesWithSpark_rdd))

<class 'pyspark.rdd.PipelinedRDD'>


In [50]:
print("The number of lines having Spark ",linesWithSpark_rdd.count())

The number of lines having Spark  20


In [53]:
linesWithSpark_rdd.collect()

[Row(value='# Apache Spark'),
 Row(value='Spark is a fast and general cluster computing system for Big Data. It provides'),
 Row(value='rich set of higher-level tools including Spark SQL for SQL and DataFrames,'),
 Row(value='and Spark Streaming for stream processing.'),
 Row(value='You can find the latest Spark documentation, including a programming'),
 Row(value='## Building Spark'),
 Row(value='Spark is built using [Apache Maven](http://maven.apache.org/).'),
 Row(value='To build Spark and its example programs, run:'),
 Row(value='You can build Spark using more than one thread by using the -T option with Maven, see ["Parallel builds in Maven 3"](https://cwiki.apache.org/confluence/display/MAVEN/Parallel+builds+in+Maven+3).'),
 Row(value='["Building Spark"](http://spark.apache.org/docs/latest/building-spark.html).'),
 Row(value='For general development tips, including info on developing Spark using an IDE, see ["Useful Developer Tools"](http://spark.apache.org/developer-tools.html).'

Adding additional transformation of splitting the line into individual words

In [0]:
wordsInlinesWithSpark = linesWithSpark_rdd.flatMap(lambda line: line.value.split(" "))

The following is a representation of the different operations in the RDD

In [61]:
print(type(wordsInlinesWithSpark))

<class 'pyspark.rdd.PipelinedRDD'>


In [62]:
wordsInlinesWithSpark.toDebugString()

b'(1) PythonRDD[58] at RDD at PythonRDD.scala:53 []\n |  MapPartitionsRDD[9] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[8] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[7] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  FileScanRDD[6] at javaToPython at NativeMethodAccessorImpl.java:0 []'

Listing the first 20 words in the RDD wordsInlinesWithSpark

In [64]:
wordsInlinesWithSpark.collect()[0:20]

['#',
 'Apache',
 'Spark',
 'Spark',
 'is',
 'a',
 'fast',
 'and',
 'general',
 'cluster',
 'computing',
 'system',
 'for',
 'Big',
 'Data.',
 'It',
 'provides',
 'rich',
 'set',
 'of']

Defining a function to convert all words to lowercase. Applying the function using the map operation

In [0]:
#Function to convert to lowercase
def convertToLower(word):
    word = word.lower()
    return word

In [0]:
wordsinlinesWithSpark_toLowerCase = wordsInlinesWithSpark.map(convertToLower)

In [67]:
wordsinlinesWithSpark_toLowerCase.collect()[0:20]

['#',
 'apache',
 'spark',
 'spark',
 'is',
 'a',
 'fast',
 'and',
 'general',
 'cluster',
 'computing',
 'system',
 'for',
 'big',
 'data.',
 'it',
 'provides',
 'rich',
 'set',
 'of']

Executing the take function, take function is similar to head in pandas. In take we can specify the number of retrievals which will be more efficient than collect and filter.

In [68]:
wordsinlinesWithSpark_toLowerCase.take(10)

['#',
 'apache',
 'spark',
 'spark',
 'is',
 'a',
 'fast',
 'and',
 'general',
 'cluster']

In [0]:
stopwords = ['#','[',']','is','for','a','and','.','']

Removing all the stop words from the list

In [0]:
wordsinlinesWithSpark_toLowerCase_afterStopWordRemoval = wordsinlinesWithSpark_toLowerCase.filter(lambda word: word not in stopwords)

In [103]:
wordsinlinesWithSpark_toLowerCase_afterStopWordRemoval.take(10)

['apache',
 'spark',
 'spark',
 'fast',
 'general',
 'cluster',
 'computing',
 'system',
 'big',
 'data.']

Creating a key value pair using the map function

In [0]:
wordDict = wordsinlinesWithSpark_toLowerCase_afterStopWordRemoval.map(lambda word: (word, 1))

In [105]:
wordDict.take(10)

[('apache', 1),
 ('spark', 1),
 ('spark', 1),
 ('fast', 1),
 ('general', 1),
 ('cluster', 1),
 ('computing', 1),
 ('system', 1),
 ('big', 1),
 ('data.', 1)]

Coding lambda function to generate word counts.

In [0]:
wordFreq = wordDict.reduceByKey(lambda a, b: a + b)

In [107]:
wordFreq.take(5)

[('apache', 1), ('spark', 16), ('fast', 1), ('general', 2), ('cluster', 2)]

Sorting through lambda function

In [0]:
wordFreqSorted = wordFreq.sortBy(lambda a: a[1],False)

In [109]:
wordFreqSorted.take(10)

[('spark', 16),
 ('the', 9),
 ('to', 6),
 ('using', 5),
 ('including', 3),
 ('you', 3),
 ('build', 3),
 ('in', 3),
 ('general', 2),
 ('cluster', 2)]

Printing the entire sequence of transformations

In [110]:
wordFreqSorted.toDebugString()

b'(1) PythonRDD[92] at RDD at PythonRDD.scala:53 []\n |  MapPartitionsRDD[89] at mapPartitions at PythonRDD.scala:133 []\n |  ShuffledRDD[88] at partitionBy at NativeMethodAccessorImpl.java:0 []\n +-(1) PairwiseRDD[87] at reduceByKey at <ipython-input-106-90e3bc73a175>:1 []\n    |  PythonRDD[86] at reduceByKey at <ipython-input-106-90e3bc73a175>:1 []\n    |  MapPartitionsRDD[9] at javaToPython at NativeMethodAccessorImpl.java:0 []\n    |  MapPartitionsRDD[8] at javaToPython at NativeMethodAccessorImpl.java:0 []\n    |  MapPartitionsRDD[7] at javaToPython at NativeMethodAccessorImpl.java:0 []\n    |  FileScanRDD[6] at javaToPython at NativeMethodAccessorImpl.java:0 []'

Convert RDD to a dataframe

In [0]:
wordFreqSorteddf = wordFreqSorted.toDF(["word","freq"])

In [116]:
print(type(wordFreqSorteddf))

<class 'pyspark.sql.dataframe.DataFrame'>


In [119]:
wordFreqSorteddf.collect()

[Row(word='spark', freq=16),
 Row(word='the', freq=9),
 Row(word='to', freq=6),
 Row(word='using', freq=5),
 Row(word='including', freq=3),
 Row(word='you', freq=3),
 Row(word='build', freq=3),
 Row(word='in', freq=3),
 Row(word='general', freq=2),
 Row(word='cluster', freq=2),
 Row(word='sql', freq=2),
 Row(word='can', freq=2),
 Row(word='with', freq=2),
 Row(word='see', freq=2),
 Row(word='on', freq=2),
 Row(word='an', freq=2),
 Row(word='./bin/run-example', freq=2),
 Row(word='sparkpi', freq=2),
 Row(word='apache', freq=1),
 Row(word='fast', freq=1),
 Row(word='computing', freq=1),
 Row(word='system', freq=1),
 Row(word='big', freq=1),
 Row(word='data.', freq=1),
 Row(word='it', freq=1),
 Row(word='provides', freq=1),
 Row(word='rich', freq=1),
 Row(word='set', freq=1),
 Row(word='of', freq=1),
 Row(word='higher-level', freq=1),
 Row(word='tools', freq=1),
 Row(word='dataframes,', freq=1),
 Row(word='streaming', freq=1),
 Row(word='stream', freq=1),
 Row(word='processing.', freq=1),

Converting to a Pandas dataframe

In [0]:
wordFreqSorteddf_pandas = wordFreqSorteddf.toPandas()

In [133]:
type(wordFreqSorteddf_pandas)

pandas.core.frame.DataFrame