In [2]:
from pyspark import SparkConf, SparkContext
import collections

print('Spark Conf :', SparkConf.__doc__)
print('Spark Context v %s :'%SparkContext.version, SparkContext.__doc__)


Spark Conf : 
    Configuration for a Spark application. Used to set various Spark
    parameters as key-value pairs.

    Most of the time, you would create a SparkConf object with
    ``SparkConf()``, which will load values from `spark.*` Java system
    properties as well. In this case, any parameters you set directly on
    the :class:`SparkConf` object take priority over system properties.

    For unit tests, you can also call ``SparkConf(false)`` to skip
    loading external settings and get the same configuration no matter
    what the system properties are.

    All setter methods in this class support chaining. For example,
    you can write ``conf.setMaster("local").setAppName("My app")``.

    Parameters
    ----------
    loadDefaults : bool
        whether to load values from Java system properties (True by default)
    _jvm : class:`py4j.java_gateway.JVMView`
        internal parameter used to pass a handle to the
        Java VM; does not need to be set by users
    _jc

In [5]:
#local[N] = Pas de cluster Manager. N Nombre de processeurs. Si non défini tous les processeurs. Si juste local, un processeur
# Une nouvelle exécution nécessite un redémarrage du kernel Jupyter.
conf = SparkConf().setMaster("local[2]").setAppName("EPSI-TP1")
sc = SparkContext(conf = conf)
print(sc.applicationId,sc.version,'\n---------------\n',sc.getConf().getAll())


ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=EPSI-TP1, master=local[2]) created by __init__ at /tmp/ipykernel_4995/3315268675.py:4 

In [6]:
# 1.1 Créer un RDD avec 1000 chiffres. Afficher 5 exemples pris au hasard :

rdd = sc.parallelize(range(1000)) #RDD = Données Distribuées Résilientes. Stockage des chiffres de 0 à 999 avec range.
rdd.takeSample(False, 5) # 5 valeurs au hasard. False car sans remplacement.


                                                                                

[246, 734, 168, 706, 949]

In [7]:
# Charger le fichier data/montaigne.txt ligne par ligne. Afficher 5 examples pris au hasard :

textFile = sc.textFile("./data/Montaigne.txt")

ligns = textFile.takeSample(False,5)
print(ligns)

["évolue S'il s'était agi de quelque chose destiné à durer, il eût fallu y ", 'sans blessure : nous nous découvrons donc utilement nos défauts ', 'il faut faire leurs semailles, leur récolte, celui qui est opportun ', 'leçon. ', '']


In [8]:
# 2.2 Segmenter chaque ligne en mots. Afficher 2 exemples au hasard :

mots = textFile.map(lambda ligne: ligne.split(" "))
mots.takeSample(False, 2)

[[''],
 ['réel,',
  'ni',
  'de',
  'ce',
  'qui',
  'est',
  'étranger',
  'quelque',
  'chose',
  'qui',
  'nous',
  'soit',
  'propre.',
  '']]

In [28]:
# 2.3 Segmenter chaque ligne en mots en rendant chaque mot indépendant. Afficher 5 exemples au hasard :

mot = textFile.flatMap(lambda ligne: ligne.split())
mot.takeSample(False, 5)

                                                                                

['ne', '»', 'façon', 'conflit', "m'attaque"]

In [29]:
# 2.4 Afficher le nombre total de mots et la taille du vocabulaire (nombre de mots distinct) :
nbMot = mot.count()
print(nbMot)

nbMotDistinct = mot.distinct().count()
print(nbMotDistinct)

170741
25110


In [11]:
# 2.5 Transformer chaque mot w en une clé-valeur (w,1). Afficher 5 exemples de clé-valeur :

motKey = mot.map(lambda w : (w,1))
motKey.takeSample(False, 5)

[('subtile,', 1), ('conspirant', 1), ('parce', 1), ('mal', 1), ('la', 1)]

In [12]:
# 2.6 Sommer par clé puis afficher 5 valeurs :

countByKey = motKey.reduceByKey(lambda x, y : x+y)
countByKey.takeSample(False,5)

                                                                                

[('portées', 2),
 ('vieux,', 5),
 ('entichée', 1),
 ('convaincu,', 2),
 ('guérissent', 2)]

In [13]:
# 2.7 Ne conserver que les mots de plus de 6 caractères apparaissant au moins 3 fois :

filterByLenOcc = countByKey.filter(lambda x: len(x[0]) > 6 and x[1] > 2)
filterByLenOcc.takeSample(False, 5)

[('reprocher', 6),
 ('adversaire,', 4),
 ('chemins', 4),
 ('déplacer', 3),
 ('commande', 5)]

In [18]:
# 2.8 Récupérer le résultat des actions précédentes. Afficher le type de la valeur renvoyée :

liste = filterByLenOcc.collect()

print(type(liste))

<class 'list'>


In [16]:
# 2.9 Trier et afficher les mots trouvés :

sortList = filterByLenOcc.sortBy(lambda x : x[1]).collect()

for mot in sortList :
    print(mot)


('profession.', 3)
('arrache', 3)
("d'assister", 3)
('faibles,', 3)
('couvert', 3)
('affaires.', 3)
("d'agir,", 3)
("s'agissant", 3)
("d'affaires", 3)
('Allusion', 3)
('interprétation.', 3)
('grecque,', 3)
('Mémoires', 3)
('Pomponius', 3)
('hésitant', 3)
("d'aucun", 3)
('maintenait', 3)
('dresser', 3)
('personnage,', 3)
('étrangers.', 3)
('traîtrise', 3)
('mensonge', 3)
('changeant', 3)
('rebelle', 3)
('constituent', 3)
('depuis,', 3)
('conformes', 3)
('reproduire', 3)
('artificielles,', 3)
('gaiement,', 3)
('recevait', 3)
('Parlement', 3)
('bonnes,', 3)
('Diogène,', 3)
('arrivés', 3)
("l'Empereur", 3)
('feintes', 3)
('meurtre', 3)
('commander', 3)
("d'Egypte", 3)
('entièrement,', 3)
('traîtres', 3)
('Mahomet', 3)
('rattacher', 3)
("d'apprécier", 3)
('apporté', 3)
('objectif', 3)
('déclarations', 3)
('détriment', 3)
('justifie', 3)
('circonstances.', 3)
("l'État,", 3)
('condamna', 3)
('propose,', 3)
('donnée,', 3)
('voleurs', 3)
('inconsidérément', 3)
('pensée,', 3)
('occasion', 3)
('m

In [17]:
# 3.1 Charger dans Spark le fichier Titanic.csv :

titanicCsv = sc.textFile("./data/Titanic.csv")

lignTitanic = titanicCsv.takeSample(False, 1)

print(lignTitanic)


['694;0;3;Saad, Mr. Khalil;male;25;0;0;2672;7.225;;C']


In [67]:
# 3.2 Créer une fonction python qui transforme une ligne en une liste de variables avec le type correct. Tester sur une ligne :

def listToListType(lign):
    fields = lign.split(";")

    idPassager = int(fields[0])
    survived = int(fields[1])
    passagerClass = int(fields[2])
    name = fields[3]
    sex = fields[4]
    if fields[5] == '':
        fields[5] = '0'
    age = float(fields[5])
    sib_sp = int(fields[6])
    parch = int(fields[7])
    ticket = fields[8]
    if fields[9] == '':
        fields[9] = '0',
    fare = float(fields[9])
    cabin = fields[10]
    embarked = fields[11].strip()

    return [idPassager, survived, passagerClass, name, sex, age, sib_sp, parch, ticket, fare, cabin, embarked]

listToListType(titanicCsv.takeSample(False, 1)[0])

[691,
 1,
 1,
 'Dick, Mr. Albert Adrian',
 'male',
 31.0,
 1,
 0,
 '17474',
 57.0,
 'B20',
 'S']

In [68]:
# 3.3 Charger les segmentations dans un RDD. Afficher les trois premiers :

rdd2 = titanicCsv.map(listToListType)
rdd2.take(3)

[[1,
  0,
  3,
  'Braund, Mr. Owen Harris',
  'male',
  22.0,
  1,
  0,
  'A/5 21171',
  7.25,
  '',
  'S'],
 [2,
  1,
  1,
  'Cumings, Mrs. John Bradley (Florence Briggs Thayer)',
  'female',
  38.0,
  1,
  0,
  'PC 17599',
  71.2833,
  'C85',
  'C'],
 [3,
  1,
  3,
  'Heikkinen, Miss. Laina',
  'female',
  26.0,
  0,
  0,
  'STON/O2. 3101282',
  7.925,
  '',
  'S']]

In [85]:
# 3.4 Créer un RDD composé de la classe (élément 2) et de la survie (élément 1). Afficher les 10 premiers :

rddClassSurvive = rdd2.map(lambda x : (x[2], x[1]))
print(rddClassSurvive.take(10))

[(3, 0), (1, 1), (3, 1), (1, 1), (3, 0), (3, 0), (1, 0), (3, 0), (3, 1), (2, 1)]


In [86]:
# 3.5 En utilisant la fonction mapValues, créer une liste avec la classe du passager et un doublet (survie,1). Afficher les 10 premiers :

keyMapValues = rddClassSurvive.mapValues(lambda x : (x,1))
keyMapValues.take(10)

[(3, (0, 1)),
 (1, (1, 1)),
 (3, (1, 1)),
 (1, (1, 1)),
 (3, (0, 1)),
 (3, (0, 1)),
 (1, (0, 1)),
 (3, (0, 1)),
 (3, (1, 1)),
 (2, (1, 1))]

In [92]:
# 3.6 Pour chaque classe afficher le nombre de survivants et le nombre total :

deadAliveByClass = keyMapValues.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1])).collect()

for i in range(len(deadAliveByClass)):
    print("Classe", deadAliveByClass[i][0])
    print("Nombre total de passagers :", deadAliveByClass[i][1][1])
    print("Nombre de survivants :", deadAliveByClass[i][1][0])
    print("")

Classe 2
Nombre total de passagers : 184
Nombre de survivants : 87

Classe 3
Nombre total de passagers : 491
Nombre de survivants : 119

Classe 1
Nombre total de passagers : 216
Nombre de survivants : 136



In [None]:
# 3.7 Afficher le taux de survie par Classe, trié de 1 à 3 :



In [None]:
# 3.8 Créer un rdd des survivants :

In [None]:
# 3.9 Trouver le survivant le plus agé :
