## Installation de spark-cluster & findspark, pyspark 

In [7]:
# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.1.1-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"


# install findspark using pip
!pip install -q findspark

# install pyspark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


# importation et initialisation de Spark

In [8]:
import findspark 
findspark.init()

## importer PySpark et initialiser SparkContext

In [9]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("actions").setMaster("local[*]")
sc = SparkContext(conf = conf)

## Afficher la configuration du cluster Spark

In [10]:
sc.getConf().getAll()

[('spark.app.id', 'local-1666644392577'),
 ('spark.rdd.compress', 'True'),
 ('spark.driver.port', '36041'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.app.startTime', '1666644390790'),
 ('spark.app.name', 'actions'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.driver.host', 'f56d880599d2')]

## Créer un simple RDD

In [11]:
inputWords = ["esi", "esi-sba", "big-data", "python", "spark", "esi-sba", "spark","esi-sba"]
wordRdd = sc.parallelize(inputWords)

type(wordRdd)

pyspark.rdd.RDD

## Application de l'Action Collect() sur wordRdd

In [12]:
inputWords = ["esi", "esi-sba", "big-data", "python", "spark", 
              "esi-sba", "spark","esi-sba"]
wordRdd = sc.parallelize(inputWords)

wordRdd.collect()

['esi',
 'esi-sba',
 'big-data',
 'python',
 'spark',
 'esi-sba',
 'spark',
 'esi-sba']

## Application de l'Action count() sur wordRdd

In [13]:
wordRdd.count()

8

## Application de l'Action countByValue() sur wordRdd

In [15]:
wordRdd.countByValue ()

defaultdict(int,
            {'esi': 1, 'esi-sba': 3, 'big-data': 1, 'python': 1, 'spark': 2})

## Application de l'Action Take() sur wordRdd

In [16]:
wordRdd.take(3)

['esi', 'esi-sba', 'big-data']

## Application de l'Action reduce()

In [17]:
numRdd= sc.parallelize([5,5,4,3,2,9,2])

print(numRdd.reduce(lambda a,b: a+b))


def myfun(a,b):
    return a*9 + b/2

print(numRdd.reduce(myfun))


30
5020.25


## Application de l'Action saveAsTextFile ()

In [18]:
wordRdd.saveAsTextFile('word_txt')


In [19]:
wordRdd.coalesce(1).saveAsTextFile('word_txt2')

## Excercice 

In [20]:
numRdd= sc.parallelize(list(range(1,99)))
print(numRdd.reduce(myfun))

2.341020866017947e+47


In [21]:
wordRdd.cache()

ParallelCollectionRDD[1] at readRDDFromFile at PythonRDD.scala:274

In [22]:
wordRdd.count()

8

In [23]:
wordRdd.glom().collect()

[['esi', 'esi-sba', 'big-data', 'python'],
 ['spark', 'esi-sba', 'spark', 'esi-sba']]