# Install Java, Spark, and Findspark
This installs Apache Spark 2.2.1, Java 8, and [Findspark](https://github.com/minrk/findspark), a library that makes it easy for Python to find Spark.

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz

In [0]:
!tar xf spark-2.4.0-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
Set the locations where Spark and Java are installed.

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.0-bin-hadoop2.7"

# Start a SparkSession
This will start a local Spark session.

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

# **Big-Data**
Big data is a term that describes the large volume of data – both structured and unstructured
Big data can be analyzed for insights that lead to better decisions and strategic business moves.

### **Why big data?**
The importance of big data doesn’t revolve around how much data you have, but what you do with it. You can take data from any source and analyze it to find answers that enable 1) cost reductions, 2) time reductions, 3) new product development and optimized offerings, and 4) smart decision making. When you combine big data with high-powered analytics, you can accomplish business-related tasks such as:
Determining root causes of failures, issues and defects in near-real time.
Generating coupons at the point of sale based on the customer’s buying habits.
Recalculating entire risk portfolios in minutes.
Detecting fraudulent behavior before it affects your organization.

### Three V of big data:

Volume: The amount of data is immense.  Each day 2.3 trillion gigabytes of new data is being created.

Velocity: The speed of data (always in flux) and processing (analysis of streaming data to produce near or real time results)

Variety:  The different types of data, structured, as well as, unstructured.


# HDFS Architecture

![alt text](https://github.com/Heisenberg0203/PUBG/blob/master/images/HDFS%20Architecture.png?raw=true)

**HDFS** :- Hadoop Distributed File System
Smallest Unit of Storage in filesystem is block. HDFS has *block size of 128MB* (default) which is configurable unlike traditional OS where blocksize is of 4 kB. It splits the large file accordingly and place every block in different node i.e it is* distributed*. Second, Each block by default has 3 replicas stored in nodes(i.e it has *reliability* and it is known as replication factor.By default it is 3 and yes, it is configurable).Each replica is stored in different node, so that even if one node gets failed, block can be accessed from another(Fault Tolerant).

There are types of node in HDFS:-

**Datanode** :- This is the daemon that runs on Slave node, these are the ones that actually stores the data.

**Namenode**: - This is the master node that runs all the datanode. It has all the metadata information like which block of file is present on which node, replication factor, block size, faulty nodes etc etc.

###What exactly is Spark and why is its need? :-
Traditional Processing was unable to process huge amount of data(Big Data) i.e being generated, so there was need of different platform different file structure, a way completely new technique of processing and here comes Apache Spark Framework.


#Spark:
Apache Spark is an open-source distributed general-purpose cluster-computing framework. Originally developed at the University of California, Berkeley's AMPLab, the Spark codebase was later donated to the Apache Software Foundation, which has maintained it since.

*   **Distributed**:  A distributed system is a system whose components are located on different networked computers, which then communicate and coordinate their actions by passing messages to one another. 
*   **General purpose** : broadly applicable across application domains, and lacks specialized features for a particular domain.
*   **Open Source**: Open-source software is a type of computer software in which source code is released under a license in which the copyright holder grants users the rights to study, change, and distribute the software to anyone and for any purpose.
*  **Cluster** : A computer cluster is a set of loosely or tightly connected computers that work together so that, in many respects, they can be viewed as a single system.






###Spark's Architecture:
Spark uses a master/worker architecture. There is a driver that talks to a single coordinator called master that manages workers in which executors run.
![alt text](https://github.com/purva11198/PUBG/blob/master/images/Spark%20Architecture.jpg?raw=true)



# Use Spark!
That's all there is to it - you're ready to use Spark!

In [0]:
from pyspark import SparkContext, SparkConf

In [0]:
conf = SparkConf().setAppName("Scipy").setMaster("local[*]")
sc = SparkContext(conf=conf)

#Resilient Distributed Dataset :- 
RDD is an immutable distributed collection of elements of your data, partitioned across nodes in your cluster that can be operated in parallel with a low-level API that offers transformations and actions.

Each dataset in RDD is divided into logical partitions which can be computed on different nodes of cluster.

There are 2 ways to create RDD:-

*   By parallelizing existing collection
*   By reading a dataset present in file,table ,hive or any other data source.




In [0]:
data = [1, 2, 3, 4, 5,6,7,8,9,10]

In [0]:
#For creating RDD, which is nothing but distributed dataset, you can create it by using sc.parallelize
distData = sc.parallelize(data) 
#here number of partitions are equal to number of clusters, you can manually create number of partitions by  
#distData = sc.parallelize(data,num_of_partitions)

#### Lets Checkout type of our distibuted dataset

In [0]:
type(distData)

There are 2 things we can do on RDD's 

*   Transaformations
*   Actions

##Unless and until Action is not called, Transformations doesn't happens, due to its lazy architecuture.



## Transformation
![alt text](https://github.com/purva11198/PUBG/blob/master/images/RDD%20Transformation.jpg?raw=true)

### The RDD can be Seen by calling Collect Action on it

In [0]:
distData.collect()

### Transformation :Filter 

In [0]:
filterData = distData.filter(lambda x: x<5)

In [0]:
filterData.collect()

In [0]:
#@title Exercise
#Find elements in datasets that are divisible by 2

### Transformation : map

In [0]:
mapData = distData.map(lambda x : x*2)

In [0]:
mapData.collect()

In [0]:
#@title Exercise
#Take square of each number and add 5 to it

### Transformation : mapToPair

In [0]:
mapToPairData = distData.map(lambda x:(x,x+1))

In [0]:
mapToPairData.collect()

In [0]:
#@title Exercise
#Find the pair of number and its cube

In [0]:
textdata = ["hello world i am learning apache Spark and i am coding at Scipy 2018 "]
textData =sc.parallelize(textdata)

In [0]:
textData.collect()

### Transformation : flatmap

In [0]:
flatmap =textData.flatMap(lambda x : x.split())

In [0]:
flatmap.collect()

### Transformation : GroupBy/GroupByKey


In [0]:
data =['hello','hello','world']
textData  = sc.parallelize(data)
groupData = textData.groupBy(lambda x:x)

In [0]:
for group in groupData.collect():
  print(group[0])
  print(group[1])
  for i in group[1]:
    print(i)

In [0]:
groupData.mapValues(len).collect()

In [0]:
#@title Exercise
##group the dataset by 1 charachter, and then print its count

In [0]:
data =['James','Thomas','Arya','Ned','Jamie','Jon','Maradona','James','Ned',]
textData  = sc.parallelize(data)


In [0]:
mapToPairData = textData.map(lambda x:(x,1))
mapToPairData.collect()

### Transformation : reduceByKey

In [0]:
mapToPairData.reduceByKey(lambda x,y : x+y).collect()

### Transformation : Sort/SortByKey

In [0]:
mapToPairData.sortByKey(ascending=False).collect()

## Persist
Set this RDD’s storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet. If no storage level is specified defaults to (MEMORY_ONLY).

In [0]:
mapToPairData.persist()

##Wordcount

In [0]:
data = ['A wonderful king is Hadoop. The elephant plays well with Sqoop. But what helps him to thrive Are Impala, and Hive,And HDFS in the group.A wonderful king is Hadoop. The elephant plays well with Sqoop.A wonderful king is Hadoop. The elephant plays well with Sqoop.A wonderful king is Hadoop. The elephant plays well with Sqoop.A wonderful king is Hadoop. The elephant plays well with Sqoop.A wonderful king is Hadoop. The elephant plays well with Sqoop.A wonderful king is Hadoop. The elephant plays well with Sqoop.A wonderful king is Hadoop. The elephant plays well with Sqoop.A wonderful king is Hadoop. The elephant plays well with Sqoop.A wonderful king is Hadoop. The elephant plays well with Sqoop.A wonderful king is Hadoop. The elephant plays well with Sqoop.']
##make a distributed dataset
textData = 
##split the dataset into words
words = 
##generate key value pairs (word,1)
wordsAndOnes =
##generate count of word
##method 1
groupWords =
output= 
##show output
output.collect()

In [0]:
#@title Solution - Way 1 { vertical-output: true }
data = ['A wonderful king is Hadoop. The elephant plays well with Sqoop. But what helps him to thrive Are Impala, and Hive,And HDFS in the group.A wonderful king is Hadoop. The elephant plays well with Sqoop.A wonderful king is Hadoop. The elephant plays well with Sqoop.A wonderful king is Hadoop. The elephant plays well with Sqoop.A wonderful king is Hadoop. The elephant plays well with Sqoop.A wonderful king is Hadoop. The elephant plays well with Sqoop.A wonderful king is Hadoop. The elephant plays well with Sqoop.A wonderful king is Hadoop. The elephant plays well with Sqoop.A wonderful king is Hadoop. The elephant plays well with Sqoop.A wonderful king is Hadoop. The elephant plays well with Sqoop.A wonderful king is Hadoop. The elephant plays well with Sqoop.']
textData = sc.parallelize(data)
##split the dataset into words
words = textData.flatMap(lambda x : x.split())
##generate key value pairs (word,1)
wordsAndOnes = words.map(lambda word:(word,1))
##generate count of word
##method 1
groupWords = wordsAndOnes.groupByKey()
output=groupWords.mapValues(len)
output.collect()

In [0]:
#@title Solution - Way 2 { vertical-output: true }
data = ['A wonderful king is Hadoop. The elephant plays well with Sqoop. But what helps him to thrive Are Impala, and Hive,And HDFS in the group.A wonderful king is Hadoop. The elephant plays well with Sqoop.A wonderful king is Hadoop. The elephant plays well with Sqoop.A wonderful king is Hadoop. The elephant plays well with Sqoop.A wonderful king is Hadoop. The elephant plays well with Sqoop.A wonderful king is Hadoop. The elephant plays well with Sqoop.A wonderful king is Hadoop. The elephant plays well with Sqoop.A wonderful king is Hadoop. The elephant plays well with Sqoop.A wonderful king is Hadoop. The elephant plays well with Sqoop.A wonderful king is Hadoop. The elephant plays well with Sqoop.A wonderful king is Hadoop. The elephant plays well with Sqoop.']
textData = sc.parallelize(data)
##split the dataset into words
words = textData.flatMap(lambda x : x.split())
##generate key value pairs (word,1)
wordsAndOnes = words.map(lambda word:(word,1))
##generate count of word
##method 1
output = wordsAndOnes.reduceByKey(lambda x,y : x+y)
output.collect()

In [0]:
sortedWords=output.sortByKey()
sortedWords.collect()

In [0]:
sortedCount = output.sortBy(lambda x:x[1])
sortedCount.collect()