## Install JDK
## Install Spark
## Set Environment variables
## Create a Spark Session

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.6.tgz
!tar -xvf spark-2.4.3-bin-hadoop2.6.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.3-bin-hadoop2.6"
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

spark-2.4.3-bin-hadoop2.6/
spark-2.4.3-bin-hadoop2.6/python/
spark-2.4.3-bin-hadoop2.6/python/setup.cfg
spark-2.4.3-bin-hadoop2.6/python/pyspark/
spark-2.4.3-bin-hadoop2.6/python/pyspark/resultiterable.py
spark-2.4.3-bin-hadoop2.6/python/pyspark/heapq3.py
spark-2.4.3-bin-hadoop2.6/python/pyspark/join.py
spark-2.4.3-bin-hadoop2.6/python/pyspark/version.py
spark-2.4.3-bin-hadoop2.6/python/pyspark/rdd.py
spark-2.4.3-bin-hadoop2.6/python/pyspark/java_gateway.py
spark-2.4.3-bin-hadoop2.6/python/pyspark/find_spark_home.py
spark-2.4.3-bin-hadoop2.6/python/pyspark/_globals.py
spark-2.4.3-bin-hadoop2.6/python/pyspark/worker.py
spark-2.4.3-bin-hadoop2.6/python/pyspark/accumulators.py
spark-2.4.3-bin-hadoop2.6/python/pyspark/mllib/
spark-2.4.3-bin-hadoop2.6/python/pyspark/mllib/feature.py
spark-2.4.3-bin-hadoop2.6/python/pyspark/mllib/random.py
spark-2.4.3-bin-hadoop2.6/python/pyspark/mllib/recommendation.py
spark-2.4.3-bin-hadoop2.6/python/pyspark/mllib/fpm.py
spark-2.4.3-bin-hadoop2.6/python/py

## Test Spark

In [0]:
df = spark.createDataFrame([{"Google": "Colab","Spark": "Scala"} ,{"Google": "Dataproc","Spark":"Python"}])
df.show()



+--------+------+
|  Google| Spark|
+--------+------+
|   Colab| Scala|
|Dataproc|Python|
+--------+------+



## Copy a data file to your local Colab environment

In [2]:
!wget https://raw.githubusercontent.com/futurexskill/bidata/master/retailstore.csv

--2020-04-27 08:27:51--  https://raw.githubusercontent.com/futurexskill/bidata/master/retailstore.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.0.133, 151.101.64.133, 151.101.128.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.0.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 279 [text/plain]
Saving to: ‘retailstore.csv’


2020-04-27 08:27:51 (46.5 MB/s) - ‘retailstore.csv’ saved [279/279]



## Check if the file is copied

In [3]:
!ls

retailstore.csv  spark-2.4.3-bin-hadoop2.6
sample_data	 spark-2.4.3-bin-hadoop2.6.tgz


# Resilient Distributed Dataset (RDD)

## Import SparkContext and SparkConf

In [0]:
from pyspark import SparkContext


## Create Spark Context from Spark Session

In [0]:
sc = spark.sparkContext

## Create a RDD from Python List

In [0]:
sampleRDD = sc.parallelize([10,20,30,40,50,60])


In [7]:
type(sampleRDD)

pyspark.rdd.RDD

In [8]:
sampleRDD.collect()

[10, 20, 30, 40, 50, 60]

## Read the CSV file into a RDD

In [0]:
customerData = sc.textFile("retailstore.csv")


In [10]:
type(customerData)

pyspark.rdd.RDD

## Perform RDD Operations

Print all records

In [11]:
customerData.collect()

['Age,Salary,Gender,Country,Purchased',
 '18,20000,Male,Germany,N',
 '19,22000,Female,France,N',
 '20,24000,Female,England,N',
 '21,,Male,England,N',
 '22,50000,Male,France,Y',
 '23,35000,Female,England,N',
 '24,,Male,Germany,N',
 '25,32000,Female,France,Y',
 ',35000,Male,Germany,N',
 '27,37000,Female,France,N']

Print count

In [12]:
customerData.count()


11

Print the fist row

In [13]:
customerData.first()


'Age,Salary,Gender,Country,Purchased'

Fetch the first 3 rows

In [14]:
customerData.take(3)


['Age,Salary,Gender,Country,Purchased',
 '18,20000,Male,Germany,N',
 '19,22000,Female,France,N']

Print each row

In [15]:
for line in customerData.collect():
    print(line)


Age,Salary,Gender,Country,Purchased
18,20000,Male,Germany,N
19,22000,Female,France,N
20,24000,Female,England,N
21,,Male,England,N
22,50000,Male,France,Y
23,35000,Female,England,N
24,,Male,Germany,N
25,32000,Female,France,Y
,35000,Male,Germany,N
27,37000,Female,France,N


### Map

Replace "Male" with "M"

In [0]:
customerData2 = customerData.map(lambda x : x.replace("Male","M"))

In [17]:
customerData2.collect()

['Age,Salary,Gender,Country,Purchased',
 '18,20000,M,Germany,N',
 '19,22000,Female,France,N',
 '20,24000,Female,England,N',
 '21,,M,England,N',
 '22,50000,M,France,Y',
 '23,35000,Female,England,N',
 '24,,M,Germany,N',
 '25,32000,Female,France,Y',
 ',35000,M,Germany,N',
 '27,37000,Female,France,N']

### Filter

Display only females

In [0]:
femaleCustomers=customerData.filter(lambda x: "Female" in x)


In [19]:
femaleCustomers.collect()

['19,22000,Female,France,N',
 '20,24000,Female,England,N',
 '23,35000,Female,England,N',
 '25,32000,Female,France,Y',
 '27,37000,Female,France,N']

In [20]:
femaleCustomers.count()

5

### flatMap

Create a new RDD by splitting each row with comma delimeter

In [0]:
words=femaleCustomers.flatMap(lambda line: line.split(","))

In [22]:
words.count()

25

In [23]:
words.collect()

['19',
 '22000',
 'Female',
 'France',
 'N',
 '20',
 '24000',
 'Female',
 'England',
 'N',
 '23',
 '35000',
 'Female',
 'England',
 'N',
 '25',
 '32000',
 'Female',
 'France',
 'Y',
 '27',
 '37000',
 'Female',
 'France',
 'N']

### Set - Union & Intersection

In [0]:
rdd1 = sc.parallelize(["a","b","c","d","e"])
rdd2 = sc.parallelize(["c","e","k","l"])


Perform Union operation

In [25]:
for unions in rdd1.union(rdd2).distinct().collect():
    print(unions)

b
c
l
a
e
d
k


Perform Intersection operation

In [26]:
for intersects in rdd1.intersection(rdd2).collect():
    print(intersects)

c
e


### Transformation using function

In [27]:
customerData.collect()

['Age,Salary,Gender,Country,Purchased',
 '18,20000,Male,Germany,N',
 '19,22000,Female,France,N',
 '20,24000,Female,England,N',
 '21,,Male,England,N',
 '22,50000,Male,France,Y',
 '23,35000,Female,England,N',
 '24,,Male,Germany,N',
 '25,32000,Female,France,Y',
 ',35000,Male,Germany,N',
 '27,37000,Female,France,N']

Define the transformation method

In [0]:
def transformRDD(customer) :
    words =customer.split(",")
    #convert male to 0 and female to 1
    if words[2] == "Male" :
         words[2]="0"
    else :
         words[2]="1"
    #Convert N to 0 and Y to 1 for the purchased value
    if words[4] == "N" :
         words[4]="0"
    else :
         words[4]="1"
    #Convert Country to upper case        
    words[3] = words[3].upper()
    return ",".join(words)


Apply transformation using map

In [0]:
transformedCustData=customerData.map(transformRDD)


In [30]:
transformedCustData.collect()

['Age,Salary,1,COUNTRY,1',
 '18,20000,0,GERMANY,0',
 '19,22000,1,FRANCE,0',
 '20,24000,1,ENGLAND,0',
 '21,,0,ENGLAND,0',
 '22,50000,0,FRANCE,1',
 '23,35000,1,ENGLAND,0',
 '24,,0,GERMANY,0',
 '25,32000,1,FRANCE,1',
 ',35000,0,GERMANY,0',
 '27,37000,1,FRANCE,0']

## reduce

In [0]:
sampleRDD = sc.parallelize([10, 20, 30,40]) 

In [32]:
sampleRDD.reduce(lambda a, b: a + b)


100