## Apache Spark

C'est un framework de calcul distribué

**Quelques Applications :**

- E-Commerce : Shoppify
- Applications comme MyFitnessPal
- Web : Yahoo

<br>

<hr>

<br>

**ARCHITECTURE**
- Stockage : Base de SQL et NoSQL 
- Gestion des ressourses : Gestionnaire de Cluster et Worker
- Moteur de Traitement : Infrastructure informatique distribué
- Ecosysteme : Spark Core, Spark SQL, MLib, Spark Streaming, GraphiX
- API : Scale, java, python R, SQL

<hr>

- Spark Core : Composant fondamental, Répartition des taches, gestion des taches
- Spark SQL : Manipulation des données
- Spark MLib : Module determiné au machine learning
- Spark GraphX : ...

# INSTALLATION

In [1]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
!tar xf spark-2.3.1-bin-hadoop2.7.tgz
!pip install -q findspark

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Connecting to archive.ubuntu.com] [1 InRelease 14.2 kB/88.7 kB 16%] [Connec                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Hit:3 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:4 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease [1,581 B]
Get:7 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Hit:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:9 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubunt

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-hadoop2.7"

In [3]:
!ls

sample_data  spark-2.3.1-bin-hadoop2.7	spark-2.3.1-bin-hadoop2.7.tgz


In [4]:
import findspark
findspark.init()
from pyspark import SparkContext

sc = SparkContext.getOrCreate()
sc

In [5]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() 
spark

# Données

### Creer un dataframe

In [6]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

In [7]:
schema = StructType().add("id_utilisateur","string").add("pays","string").add("profession", "string").add("genre",'string').add("age", "integer")
df = spark.createDataFrame([("A123",'Maroc',"Médecin","Femme", 33), ("B234",'Canada',"Entrepreneur","Femme",35), ("C345",'France',"Professeur", "Homme",40)], schema = schema)

In [8]:
df.show()

+--------------+------+------------+-----+---+
|id_utilisateur|  pays|  profession|genre|age|
+--------------+------+------------+-----+---+
|          A123| Maroc|     Médecin|Femme| 33|
|          B234|Canada|Entrepreneur|Femme| 35|
|          C345|France|  Professeur|Homme| 40|
+--------------+------+------------+-----+---+



In [9]:
df.printSchema()

root
 |-- id_utilisateur: string (nullable = true)
 |-- pays: string (nullable = true)
 |-- profession: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- age: integer (nullable = true)



### Charger des données

In [10]:
from google.colab import files
files.upload()

Saving 03_03_data.csv to 03_03_data.csv


{'03_03_data.csv': b'Pays;2005;2006;2007;2008;2009;2010;2011;2012;2013;2014;2015;2016;2017;2018\r\nAllemagne;7,2;8,5;10,1;10,1;10,9;11,7;12,5;13,6;13,8;14,4;14,9;14,9;15,5;16,5\r\nAutriche;24,4;26,3;28,2;28,9;31;31,2;31,6;32,7;32,8;33,7;33,5;33,4;33,1;33,4\r\nBelgique;2,3;2,6;3,1;3,6;4,7;5,6;6,3;7,2;7,5;8;8;8,7;9,1;9,4\r\nBulgarie;9,2;9,4;9,1;10,3;12;13,9;14,2;15,8;18,9;18,1;18,3;18,8;18,7;20,5\r\nChypre;3,1;3,3;4;5,1;5,9;6,2;6,3;7,1;8,5;9,2;9,9;9,9;10,5;13,9\r\nCroatie;23,7;22,7;22,2;22;23,6;25,1;25,4;26,8;28;27,8;29;28,3;27,3;28\r\nDanemark;16;16,3;17,7;18,5;20;21,9;23,4;25,5;27,2;29,3;30,8;31,8;34,7;35,7\r\nEspagne;8,4;9,1;9,7;10,7;13;13,8;13,2;14,3;15,3;16,1;16,2;17,4;17,6;17,5\r\nEstonie;17,4;16;17;18,6;22,9;24,6;25,3;25,5;25,3;26,1;28,2;28,7;29,1;30\r\nFinlande;28,8;30,1;29,6;31,4;31,3;32,4;32,8;34,4;36,7;38,8;39,3;39;40,9;41,2\r\nFrance;9,6;9,3;10,2;11,2;12,2;12,7;12,7;13,4;14;14,6;15;15,7;16;16,6\r\nGr\xe8ce;7,3;7,5;8,2;8,2;8,7;10,1;11,2;13,7;15,3;15,7;15,7;15,4;17;18\r\nHongri

In [11]:
df = spark.read.option("delimiter", ';').csv("03_03_data.csv", header=True)
print(type(df))

<class 'pyspark.sql.dataframe.DataFrame'>


In [12]:
df.show()

+----------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|      Pays|2005|2006|2007|2008|2009|2010|2011|2012|2013|2014|2015|2016|2017|2018|
+----------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
| Allemagne| 7,2| 8,5|10,1|10,1|10,9|11,7|12,5|13,6|13,8|14,4|14,9|14,9|15,5|16,5|
|  Autriche|24,4|26,3|28,2|28,9|  31|31,2|31,6|32,7|32,8|33,7|33,5|33,4|33,1|33,4|
|  Belgique| 2,3| 2,6| 3,1| 3,6| 4,7| 5,6| 6,3| 7,2| 7,5|   8|   8| 8,7| 9,1| 9,4|
|  Bulgarie| 9,2| 9,4| 9,1|10,3|  12|13,9|14,2|15,8|18,9|18,1|18,3|18,8|18,7|20,5|
|    Chypre| 3,1| 3,3|   4| 5,1| 5,9| 6,2| 6,3| 7,1| 8,5| 9,2| 9,9| 9,9|10,5|13,9|
|   Croatie|23,7|22,7|22,2|  22|23,6|25,1|25,4|26,8|  28|27,8|  29|28,3|27,3|  28|
|  Danemark|  16|16,3|17,7|18,5|  20|21,9|23,4|25,5|27,2|29,3|30,8|31,8|34,7|35,7|
|   Espagne| 8,4| 9,1| 9,7|10,7|  13|13,8|13,2|14,3|15,3|16,1|16,2|17,4|17,6|17,5|
|   Estonie|17,4|  16|  17|18,6|22,9|24,6|25,3|25,5|25,3|26,1|28,2|28,7|29,1|  30|
|  F

In [13]:
df.count()

28

In [14]:
df.columns

['Pays',
 '2005',
 '2006',
 '2007',
 '2008',
 '2009',
 '2010',
 '2011',
 '2012',
 '2013',
 '2014',
 '2015',
 '2016',
 '2017',
 '2018']

In [15]:
df.take(5)

[Row(Pays='Allemagne', 2005='7,2', 2006='8,5', 2007='10,1', 2008='10,1', 2009='10,9', 2010='11,7', 2011='12,5', 2012='13,6', 2013='13,8', 2014='14,4', 2015='14,9', 2016='14,9', 2017='15,5', 2018='16,5'),
 Row(Pays='Autriche', 2005='24,4', 2006='26,3', 2007='28,2', 2008='28,9', 2009='31', 2010='31,2', 2011='31,6', 2012='32,7', 2013='32,8', 2014='33,7', 2015='33,5', 2016='33,4', 2017='33,1', 2018='33,4'),
 Row(Pays='Belgique', 2005='2,3', 2006='2,6', 2007='3,1', 2008='3,6', 2009='4,7', 2010='5,6', 2011='6,3', 2012='7,2', 2013='7,5', 2014='8', 2015='8', 2016='8,7', 2017='9,1', 2018='9,4'),
 Row(Pays='Bulgarie', 2005='9,2', 2006='9,4', 2007='9,1', 2008='10,3', 2009='12', 2010='13,9', 2011='14,2', 2012='15,8', 2013='18,9', 2014='18,1', 2015='18,3', 2016='18,8', 2017='18,7', 2018='20,5'),
 Row(Pays='Chypre', 2005='3,1', 2006='3,3', 2007='4', 2008='5,1', 2009='5,9', 2010='6,2', 2011='6,3', 2012='7,1', 2013='8,5', 2014='9,2', 2015='9,9', 2016='9,9', 2017='10,5', 2018='13,9')]

In [16]:
df.collect()

[Row(Pays='Allemagne', 2005='7,2', 2006='8,5', 2007='10,1', 2008='10,1', 2009='10,9', 2010='11,7', 2011='12,5', 2012='13,6', 2013='13,8', 2014='14,4', 2015='14,9', 2016='14,9', 2017='15,5', 2018='16,5'),
 Row(Pays='Autriche', 2005='24,4', 2006='26,3', 2007='28,2', 2008='28,9', 2009='31', 2010='31,2', 2011='31,6', 2012='32,7', 2013='32,8', 2014='33,7', 2015='33,5', 2016='33,4', 2017='33,1', 2018='33,4'),
 Row(Pays='Belgique', 2005='2,3', 2006='2,6', 2007='3,1', 2008='3,6', 2009='4,7', 2010='5,6', 2011='6,3', 2012='7,2', 2013='7,5', 2014='8', 2015='8', 2016='8,7', 2017='9,1', 2018='9,4'),
 Row(Pays='Bulgarie', 2005='9,2', 2006='9,4', 2007='9,1', 2008='10,3', 2009='12', 2010='13,9', 2011='14,2', 2012='15,8', 2013='18,9', 2014='18,1', 2015='18,3', 2016='18,8', 2017='18,7', 2018='20,5'),
 Row(Pays='Chypre', 2005='3,1', 2006='3,3', 2007='4', 2008='5,1', 2009='5,9', 2010='6,2', 2011='6,3', 2012='7,1', 2013='8,5', 2014='9,2', 2015='9,9', 2016='9,9', 2017='10,5', 2018='13,9'),
 Row(Pays='Croati

In [17]:
# df Spark to Pandas
df.toPandas()

Unnamed: 0,Pays,2005,2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2016,2017,2018
0,Allemagne,72,85,101,101,109,117,125,136,138,144,149,149,155,165
1,Autriche,244,263,282,289,31,312,316,327,328,337,335,334,331,334
2,Belgique,23,26,31,36,47,56,63,72,75,8,8,87,91,94
3,Bulgarie,92,94,91,103,12,139,142,158,189,181,183,188,187,205
4,Chypre,31,33,4,51,59,62,63,71,85,92,99,99,105,139
5,Croatie,237,227,222,22,236,251,254,268,28,278,29,283,273,28
6,Danemark,16,163,177,185,20,219,234,255,272,293,308,318,347,357
7,Espagne,84,91,97,107,13,138,132,143,153,161,162,174,176,175
8,Estonie,174,16,17,186,229,246,253,255,253,261,282,287,291,30
9,Finlande,288,301,296,314,313,324,328,344,367,388,393,39,409,412


### Manipuler des colonnes

In [18]:
df.select("2015").show(5)

+----+
|2015|
+----+
|14,9|
|33,5|
|   8|
|18,3|
| 9,9|
+----+
only showing top 5 rows



In [19]:
df.select("2015", "2016", "2017").show(5)

+----+----+----+
|2015|2016|2017|
+----+----+----+
|14,9|14,9|15,5|
|33,5|33,4|33,1|
|   8| 8,7| 9,1|
|18,3|18,8|18,7|
| 9,9| 9,9|10,5|
+----+----+----+
only showing top 5 rows



**Ajouter une colonne**

In [20]:
from pyspark.sql.functions import lit

df_2025 = df.withColumn("2025", lit(50))
df_2025.show()

+----------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|      Pays|2005|2006|2007|2008|2009|2010|2011|2012|2013|2014|2015|2016|2017|2018|2025|
+----------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
| Allemagne| 7,2| 8,5|10,1|10,1|10,9|11,7|12,5|13,6|13,8|14,4|14,9|14,9|15,5|16,5|  50|
|  Autriche|24,4|26,3|28,2|28,9|  31|31,2|31,6|32,7|32,8|33,7|33,5|33,4|33,1|33,4|  50|
|  Belgique| 2,3| 2,6| 3,1| 3,6| 4,7| 5,6| 6,3| 7,2| 7,5|   8|   8| 8,7| 9,1| 9,4|  50|
|  Bulgarie| 9,2| 9,4| 9,1|10,3|  12|13,9|14,2|15,8|18,9|18,1|18,3|18,8|18,7|20,5|  50|
|    Chypre| 3,1| 3,3|   4| 5,1| 5,9| 6,2| 6,3| 7,1| 8,5| 9,2| 9,9| 9,9|10,5|13,9|  50|
|   Croatie|23,7|22,7|22,2|  22|23,6|25,1|25,4|26,8|  28|27,8|  29|28,3|27,3|  28|  50|
|  Danemark|  16|16,3|17,7|18,5|  20|21,9|23,4|25,5|27,2|29,3|30,8|31,8|34,7|35,7|  50|
|   Espagne| 8,4| 9,1| 9,7|10,7|  13|13,8|13,2|14,3|15,3|16,1|16,2|17,4|17,6|17,5|  50|
|   Estonie|17,4|  16|  17|18,6|

**Supprimer une colonne**

In [21]:
df_sans_2005 = df.drop("2005")
df_sans_2005.show()

+----------+----+----+----+----+----+----+----+----+----+----+----+----+----+
|      Pays|2006|2007|2008|2009|2010|2011|2012|2013|2014|2015|2016|2017|2018|
+----------+----+----+----+----+----+----+----+----+----+----+----+----+----+
| Allemagne| 8,5|10,1|10,1|10,9|11,7|12,5|13,6|13,8|14,4|14,9|14,9|15,5|16,5|
|  Autriche|26,3|28,2|28,9|  31|31,2|31,6|32,7|32,8|33,7|33,5|33,4|33,1|33,4|
|  Belgique| 2,6| 3,1| 3,6| 4,7| 5,6| 6,3| 7,2| 7,5|   8|   8| 8,7| 9,1| 9,4|
|  Bulgarie| 9,4| 9,1|10,3|  12|13,9|14,2|15,8|18,9|18,1|18,3|18,8|18,7|20,5|
|    Chypre| 3,3|   4| 5,1| 5,9| 6,2| 6,3| 7,1| 8,5| 9,2| 9,9| 9,9|10,5|13,9|
|   Croatie|22,7|22,2|  22|23,6|25,1|25,4|26,8|  28|27,8|  29|28,3|27,3|  28|
|  Danemark|16,3|17,7|18,5|  20|21,9|23,4|25,5|27,2|29,3|30,8|31,8|34,7|35,7|
|   Espagne| 9,1| 9,7|10,7|  13|13,8|13,2|14,3|15,3|16,1|16,2|17,4|17,6|17,5|
|   Estonie|  16|  17|18,6|22,9|24,6|25,3|25,5|25,3|26,1|28,2|28,7|29,1|  30|
|  Finlande|30,1|29,6|31,4|31,3|32,4|32,8|34,4|36,7|38,8|39,3|  

### Manipuler des lignes

In [22]:
import seaborn as sns

pourboir = sns.load_dataset('tips')
pourboir.head()

Unnamed: 0,total_bill,tip,sex,smoker,day,time,size
0,16.99,1.01,Female,No,Sun,Dinner,2
1,10.34,1.66,Male,No,Sun,Dinner,3
2,21.01,3.5,Male,No,Sun,Dinner,3
3,23.68,3.31,Male,No,Sun,Dinner,2
4,24.59,3.61,Female,No,Sun,Dinner,4


In [23]:
df = spark.createDataFrame(pourboir)
df.show()

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|
|     14.83|3.02|Female|    No|Sun|Dinner|   2|
|     21.58|3.92|  Male|    No|Sun|Dinner|   2|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|
|     16.29|3.71|  Male|    No|Sun|Dinne

In [24]:
df.printSchema()

root
 |-- total_bill: double (nullable = true)
 |-- tip: double (nullable = true)
 |-- sex: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: long (nullable = true)



In [25]:
df_2 = df.withColumnRenamed("total_bill", "total") \
        .withColumnRenamed("tip", "pourboir") \
        .withColumnRenamed("sex", "xexe") \
        .withColumnRenamed("smoker", "fumeur") \
        .withColumnRenamed("day", "jour") \
        .withColumnRenamed("temps", "repas") \
        .withColumnRenamed("size", "nombre") \

df_2.show()

+-----+--------+------+------+----+------+------+
|total|pourboir|  xexe|fumeur|jour|  time|nombre|
+-----+--------+------+------+----+------+------+
|16.99|    1.01|Female|    No| Sun|Dinner|     2|
|10.34|    1.66|  Male|    No| Sun|Dinner|     3|
|21.01|     3.5|  Male|    No| Sun|Dinner|     3|
|23.68|    3.31|  Male|    No| Sun|Dinner|     2|
|24.59|    3.61|Female|    No| Sun|Dinner|     4|
|25.29|    4.71|  Male|    No| Sun|Dinner|     4|
| 8.77|     2.0|  Male|    No| Sun|Dinner|     2|
|26.88|    3.12|  Male|    No| Sun|Dinner|     4|
|15.04|    1.96|  Male|    No| Sun|Dinner|     2|
|14.78|    3.23|  Male|    No| Sun|Dinner|     2|
|10.27|    1.71|  Male|    No| Sun|Dinner|     2|
|35.26|     5.0|Female|    No| Sun|Dinner|     4|
|15.42|    1.57|  Male|    No| Sun|Dinner|     2|
|18.43|     3.0|  Male|    No| Sun|Dinner|     4|
|14.83|    3.02|Female|    No| Sun|Dinner|     2|
|21.58|    3.92|  Male|    No| Sun|Dinner|     2|
|10.33|    1.67|Female|    No| Sun|Dinner|     3|


In [26]:
df_2.count()

244

In [27]:
df_2.filter(df_2["pourboir"] > 2).filter(df_2["nombre"] == 2).show()

+-----+--------+------+------+----+------+------+
|total|pourboir|  xexe|fumeur|jour|  time|nombre|
+-----+--------+------+------+----+------+------+
|23.68|    3.31|  Male|    No| Sun|Dinner|     2|
|14.78|    3.23|  Male|    No| Sun|Dinner|     2|
|14.83|    3.02|Female|    No| Sun|Dinner|     2|
|21.58|    3.92|  Male|    No| Sun|Dinner|     2|
|17.92|    4.08|  Male|    No| Sat|Dinner|     2|
|20.29|    2.75|Female|    No| Sat|Dinner|     2|
|15.77|    2.23|Female|    No| Sat|Dinner|     2|
|19.82|    3.18|  Male|    No| Sat|Dinner|     2|
| 21.7|     4.3|  Male|    No| Sat|Dinner|     2|
|19.65|     3.0|Female|    No| Sat|Dinner|     2|
|15.06|     3.0|Female|    No| Sat|Dinner|     2|
|17.78|    3.27|  Male|    No| Sat|Dinner|     2|
|17.46|    2.54|  Male|    No| Sun|Dinner|     2|
|13.94|    3.06|  Male|    No| Sun|Dinner|     2|
|18.29|     3.0|  Male|    No| Sun|Dinner|     2|
|22.23|     5.0|  Male|    No| Sun|Dinner|     2|
|18.04|     3.0|  Male|    No| Sun|Dinner|     2|


In [28]:
df_2.filter(df_2["pourboir"] > 2).filter(df_2["nombre"] == 2).filter(df_2["fumeur"] == 'Yes').count()

41

In [29]:
df_2.filter(df_2["pourboir"] > 2).filter(df_2["nombre"] == 2).filter(df_2["fumeur"] == 'Yes').orderBy("pourboir", ascending=False).show()

+-----+--------+------+------+----+------+------+
|total|pourboir|  xexe|fumeur|jour|  time|nombre|
+-----+--------+------+------+----+------+------+
|23.33|    5.65|  Male|   Yes| Sun|Dinner|     2|
| 7.25|    5.15|  Male|   Yes| Sun|Dinner|     2|
|32.68|     5.0|  Male|   Yes|Thur| Lunch|     2|
|25.28|     5.0|Female|   Yes| Sat|Dinner|     2|
|16.32|     4.3|Female|   Yes| Fri|Dinner|     2|
|25.21|    4.29|  Male|   Yes| Sat|Dinner|     2|
|19.81|    4.19|Female|   Yes|Thur| Lunch|     2|
|20.49|    4.06|  Male|   Yes| Sat|Dinner|     2|
|14.31|     4.0|Female|   Yes| Sat|Dinner|     2|
|16.82|     4.0|  Male|   Yes| Sun|Dinner|     2|
|16.58|     4.0|  Male|   Yes|Thur| Lunch|     2|
|27.28|     4.0|  Male|   Yes| Fri|Dinner|     2|
|  9.6|     4.0|Female|   Yes| Sun|Dinner|     2|
|34.63|    3.55|  Male|   Yes| Sun|Dinner|     2|
|22.42|    3.48|Female|   Yes| Sat|Dinner|     2|
|13.42|    3.48|Female|   Yes| Fri| Lunch|     2|
|20.29|    3.21|  Male|   Yes| Sat|Dinner|     2|


In [30]:
df_2.groupBy("jour").count().orderBy("count", ascending=False).show()

+----+-----+
|jour|count|
+----+-----+
| Sat|   87|
| Sun|   76|
|Thur|   62|
| Fri|   19|
+----+-----+



# Quelques fonctions

### Fonctions natives

In [31]:
print(dir(pyspark.sql.functions))



In [32]:
from pyspark.sql import functions

In [33]:
from pyspark.sql.types import *
schema = StructType().add("id_utilisateur","string").add("pays","string").add("profession", "string").add("genre",'string').add("age", "integer")
df=spark.createDataFrame([("A123",'Maroc',"Médecin","Femme", 33), ("B234",'Canada',"Entrepreneur","Femme",35), ("C345",'France',"Professeur", "Homme",40)],schema=schema)
df.show()

+--------------+------+------------+-----+---+
|id_utilisateur|  pays|  profession|genre|age|
+--------------+------+------------+-----+---+
|          A123| Maroc|     Médecin|Femme| 33|
|          B234|Canada|Entrepreneur|Femme| 35|
|          C345|France|  Professeur|Homme| 40|
+--------------+------+------------+-----+---+



In [34]:
from pyspark.sql.functions import lower, upper, col, substring
df.select(lower(col('pays'))).show()

+-----------+
|lower(pays)|
+-----------+
|      maroc|
|     canada|
|     france|
+-----------+



In [35]:
df.select(upper(col('pays'))).show(5)

+-----------+
|upper(pays)|
+-----------+
|      MAROC|
|     CANADA|
|     FRANCE|
+-----------+



In [36]:
df.select(lower(col('pays')),upper(col('pays')),substring(col('pays'),1,3)).show(5)

+-----------+-----------+---------------------+
|lower(pays)|upper(pays)|substring(pays, 1, 3)|
+-----------+-----------+---------------------+
|      maroc|      MAROC|                  Mar|
|     canada|     CANADA|                  Can|
|     france|     FRANCE|                  Fra|
+-----------+-----------+---------------------+



In [37]:
from pyspark.sql.functions import avg
df.select(avg(col('Age'))).show()

+--------+
|avg(Age)|
+--------+
|    36.0|
+--------+



In [38]:
from pyspark.sql.functions import soundex
df.select(soundex(df.pays).alias("soundex")).show()

+-------+
|soundex|
+-------+
|   M620|
|   C530|
|   F652|
+-------+



### Fonctions date

In [39]:
df1 = spark.createDataFrame((("2021-01-23",1),("2021-06-24",2),("2021-09-20",3)),("date","nombre_de_mois"))
df1.show()

+----------+--------------+
|      date|nombre_de_mois|
+----------+--------------+
|2021-01-23|             1|
|2021-06-24|             2|
|2021-09-20|             3|
+----------+--------------+



In [40]:
import pyspark.sql.functions as F
df1.withColumn("nouvelle_date",F.add_months(col("date"), df1.first()[1])).show()

+----------+--------------+-------------+
|      date|nombre_de_mois|nouvelle_date|
+----------+--------------+-------------+
|2021-01-23|             1|   2021-02-23|
|2021-06-24|             2|   2021-07-24|
|2021-09-20|             3|   2021-10-20|
+----------+--------------+-------------+



In [41]:
df2 = spark.createDataFrame([("2021_01_11",)],["date"])
df2.show()

+----------+
|      date|
+----------+
|2021_01_11|
+----------+



In [42]:
df2.select(F.date_format('date', 'MM-dd-yyy').alias('Format date')).show()

+-----------+
|Format date|
+-----------+
|       null|
+-----------+



In [43]:
df2.printSchema()

root
 |-- date: string (nullable = true)



In [44]:
from pyspark.sql.functions import to_date
 
df3 = df2.withColumn('date',to_date(df2.date, 'yyyy_MM_dd'))
df3.printSchema()
df3.select("date").dtypes

root
 |-- date: date (nullable = true)



[('date', 'date')]

In [45]:
df3.show()

+----------+
|      date|
+----------+
|2021-01-11|
+----------+



In [46]:
df3.select(F.date_format('date', 'MM-dd-yyy').alias('Format date')).show()

+-----------+
|Format date|
+-----------+
| 01-11-2021|
+-----------+



### UDF

In [47]:
import pyspark.sql.functions as F
from pyspark.sql.types import *
schema = StructType().add("id_utilisateur","string").add("pays","string").add("profession", "string").add("genre",'string').add("age", "integer")
df=spark.createDataFrame([("A123",'Maroc',"Médecin","Femme", 33),("B234",'Canada',"Entrepreneur","Femme",35),("C345",'France',"Professeur", "Homme",40)],schema=schema)

In [48]:
df.show()

+--------------+------+------------+-----+---+
|id_utilisateur|  pays|  profession|genre|age|
+--------------+------+------------+-----+---+
|          A123| Maroc|     Médecin|Femme| 33|
|          B234|Canada|Entrepreneur|Femme| 35|
|          C345|France|  Professeur|Homme| 40|
+--------------+------+------------+-----+---+



In [49]:
def cat_age(age):
  if age < 40:
    return "Catégorie 1"
  else:
    return "Catégorie 2"

In [50]:
from pyspark.sql.functions import udf
age_udf = udf(cat_age, StringType())

In [51]:
df=df.withColumn('cat_age',age_udf(df['age']))
df.show()

+--------------+------+------------+-----+---+-----------+
|id_utilisateur|  pays|  profession|genre|age|    cat_age|
+--------------+------+------------+-----+---+-----------+
|          A123| Maroc|     Médecin|Femme| 33|Catégorie 1|
|          B234|Canada|Entrepreneur|Femme| 35|Catégorie 1|
|          C345|France|  Professeur|Homme| 40|Catégorie 2|
+--------------+------+------------+-----+---+-----------+



In [52]:
df.groupby("cat_age").count().show()

+-----------+-----+
|    cat_age|count|
+-----------+-----+
|Catégorie 1|    2|
|Catégorie 2|    1|
+-----------+-----+



Jointures

In [53]:
a = [('chat',1),('chien',2),('singe',3),('lion',4)]
df_a = spark.createDataFrame(a,['animal','id'])

b = [('tigre',1),('chat',2),('singe',3),('girafe',4)]
df_b = spark.createDataFrame(b,['animal','id'])
 
df_a.show()
print('-'*25)
df_b.show()

+------+---+
|animal| id|
+------+---+
|  chat|  1|
| chien|  2|
| singe|  3|
|  lion|  4|
+------+---+

-------------------------
+------+---+
|animal| id|
+------+---+
| tigre|  1|
|  chat|  2|
| singe|  3|
|girafe|  4|
+------+---+



In [54]:
j_interne = df_a.join(df_b, df_a.animal == df_b.animal)
j_interne.show()

+------+---+------+---+
|animal| id|animal| id|
+------+---+------+---+
|  chat|  1|  chat|  2|
| singe|  3| singe|  3|
+------+---+------+---+



In [55]:
j_gauche = df_a.join(df_b, df_a.animal == df_b.animal,how='left')
j_gauche.show()

+------+---+------+----+
|animal| id|animal|  id|
+------+---+------+----+
| chien|  2|  null|null|
|  lion|  4|  null|null|
|  chat|  1|  chat|   2|
| singe|  3| singe|   3|
+------+---+------+----+



In [56]:
j_droite = df_a.join(df_b, df_a.animal == df_b.animal,how='right')
j_droite.show()

+------+----+------+---+
|animal|  id|animal| id|
+------+----+------+---+
|  chat|   1|  chat|  2|
|  null|null| tigre|  1|
| singe|   3| singe|  3|
|  null|null|girafe|  4|
+------+----+------+---+



# RDD

Ce sont des collections partitionnées d'enrégistrements immuables

-   Objets Java, Scala ou Python
-   Avec une possibilité de traitement parallele

<hr>

**Avantages** :  
-  Reutiles efficacement les données
-  Réaliser rapidement des opérations Mapreduce
-  Controler le partitionnement des données
-  Uliser une large palette d'opération

**Inconvégnats** : 
-  Pas de Schéma imposé
-  Moins performants que les dataframes et datasets

<hr>

**Quand utilisé les RDD ?**
-  Transformations et action de bas niveau
-  Données non structurées
-  Utilisation de la programmation fonctionnelle

In [57]:
from pyspark import SparkContext
import numpy as np

In [58]:
from google.colab import files
files.upload()

Saving Server Mongo Atlas.txt to Server Mongo Atlas.txt


{'Server Mongo Atlas.txt': b'mongodb+srv://admin:root@cluster0.zblzd.mongodb.net/?retryWrites=true&w=majority\n\nmongoimport --host cluster0-shard-00-01.zblzd.mongodb.net:27017 --db terroristAttack_db --collection terroristAttack --type csv --file terroristAttack_db.csv --authenticationDatabase admin --ssl --username admin --password root\nmongoimport --host=cluster0-shard-00-01.zblzd.mongodb.net:27017 --db terrorismAttack_db --collection terrorismAttack --type csv --file terrorismAttack_db.csv --headerline --authenticationDatabase admin --ssl --username admin --password root\n\nmongoimport --host=cluster0-shard-00-01.zblzd.mongodb.net:27017 --db TerrorismAttack_db --collection TerrorismAttack --type csv --file terrorismAttack_db.csv --headerline --authenticationDatabase admin --ssl --username admin --password root \n\nmongoimport --host terrorismcluster-shard-00-02.miafv.mongodb.net:27017 --db terrorismAttack_db --collection terrorismAttack --type csv --file terrorismAttack_db.csv --h

In [59]:
rdd = sc.textFile("Server Mongo Atlas.txt")
type(rdd)

pyspark.rdd.RDD

In [60]:
rdd.collect()

['mongodb+srv://admin:root@cluster0.zblzd.mongodb.net/?retryWrites=true&w=majority',
 '',
 'mongoimport --host cluster0-shard-00-01.zblzd.mongodb.net:27017 --db terroristAttack_db --collection terroristAttack --type csv --file terroristAttack_db.csv --authenticationDatabase admin --ssl --username admin --password root',
 'mongoimport --host=cluster0-shard-00-01.zblzd.mongodb.net:27017 --db terrorismAttack_db --collection terrorismAttack --type csv --file terrorismAttack_db.csv --headerline --authenticationDatabase admin --ssl --username admin --password root',
 '',
 'mongoimport --host=cluster0-shard-00-01.zblzd.mongodb.net:27017 --db TerrorismAttack_db --collection TerrorismAttack --type csv --file terrorismAttack_db.csv --headerline --authenticationDatabase admin --ssl --username admin --password root ',
 '',
 'mongoimport --host terrorismcluster-shard-00-02.miafv.mongodb.net:27017 --db terrorismAttack_db --collection terrorismAttack --type csv --file terrorismAttack_db.csv --headerl

In [61]:
liste = np.random.randint(0,100,10)
liste

array([66,  0, 62, 36, 19, 98, 42, 87, 91, 13])

In [62]:
rdd_2 = sc.parallelize(liste)

In [63]:
rdd_2.collect()

[66, 0, 62, 36, 19, 98, 42, 87, 91, 13]

In [64]:
rdd_2.glom().collect()

[[66, 0, 62, 36, 19], [98, 42, 87, 91, 13]]

In [65]:
rdd_2.count()

10

In [66]:
rdd_2.countByValue()

defaultdict(int,
            {66: 1,
             0: 1,
             62: 1,
             36: 1,
             19: 1,
             98: 1,
             42: 1,
             87: 1,
             91: 1,
             13: 1})

In [67]:
rdd_2.first()

66

In [68]:
rdd_2.reduce(lambda x, y : x if x > y else y)

98

### Transformation RDD

In [69]:
rdd = sc.parallelize(range(1, 10))

In [70]:
rdd_1 = rdd.map(lambda x: (x, 'a'*x))
rdd_1.collect()

[(1, 'a'),
 (2, 'aa'),
 (3, 'aaa'),
 (4, 'aaaa'),
 (5, 'aaaaa'),
 (6, 'aaaaaa'),
 (7, 'aaaaaaa'),
 (8, 'aaaaaaaa'),
 (9, 'aaaaaaaaa')]

In [71]:
rdd_2 = rdd.filter(lambda x: x % 2 == 0)
rdd_2.collect()

[2, 4, 6, 8]

In [72]:
rdd_3 = rdd_2.flatMap(lambda x: [(x, 2*x), (-x, 3*x)])
rdd_3.collect()               

[(2, 4), (-2, 6), (4, 8), (-4, 12), (6, 12), (-6, 18), (8, 16), (-8, 24)]

In [73]:
rdd_4 = sc.parallelize([("a", 1), ("b", 2)])
rdd_5 = sc.parallelize([("b", 2), ("b", 4)])
sorted(rdd_4.join(rdd_5).collect())

[('b', (2, 2)), ('b', (2, 4))]

In [74]:
rdd_6= rdd_4.union(rdd_5)
rdd_6.collect()

[('a', 1), ('b', 2), ('b', 2), ('b', 4)]

In [75]:
rdd_6.distinct().collect()

[('a', 1), ('b', 2), ('b', 4)]

# MLib

### Encodage PySpark

In [76]:
import pandas as pd

df = pd.DataFrame({'Salarié': ['Eric', 'Elise', 'Hapsa', 'Xin'], 'Genre': ['Homme', 'Femme', 'Femme', 'Homme'], 'groupe': ['Technicien',  'Ingénieur','PhD', 
     'Technicien'],'date_embauche': [2016, 1988, 2012, 2001], 'salaire': [30000, 100000, 60000, 60000]})
df

Unnamed: 0,Salarié,Genre,groupe,date_embauche,salaire
0,Eric,Homme,Technicien,2016,30000
1,Elise,Femme,Ingénieur,1988,100000
2,Hapsa,Femme,PhD,2012,60000
3,Xin,Homme,Technicien,2001,60000


In [77]:
df1 = spark.createDataFrame(df)
df1.show()

+-------+-----+----------+-------------+-------+
|Salarié|Genre|    groupe|date_embauche|salaire|
+-------+-----+----------+-------------+-------+
|   Eric|Homme|Technicien|         2016|  30000|
|  Elise|Femme| Ingénieur|         1988| 100000|
|  Hapsa|Femme|       PhD|         2012|  60000|
|    Xin|Homme|Technicien|         2001|  60000|
+-------+-----+----------+-------------+-------+



In [78]:
# Encidage numérique
from pyspark.ml.feature import StringIndexer
Genre_index = StringIndexer(inputCol="Genre", outputCol="Genre_num").fit(df1)
df1 = Genre_index.transform(df1)
df1.show()

+-------+-----+----------+-------------+-------+---------+
|Salarié|Genre|    groupe|date_embauche|salaire|Genre_num|
+-------+-----+----------+-------------+-------+---------+
|   Eric|Homme|Technicien|         2016|  30000|      1.0|
|  Elise|Femme| Ingénieur|         1988| 100000|      0.0|
|  Hapsa|Femme|       PhD|         2012|  60000|      0.0|
|    Xin|Homme|Technicien|         2001|  60000|      1.0|
+-------+-----+----------+-------------+-------+---------+



In [79]:
# Encodage 
from pyspark.ml.feature import StringIndexer
groupe_index = StringIndexer(inputCol="groupe", outputCol="groupe_num").fit(df1)
df1 = groupe_index.transform(df1)

from pyspark.ml.feature import OneHotEncoder
groupe_encod = OneHotEncoder(inputCol="groupe_num", outputCol="groupe_vecteur")
df1 = groupe_encod.transform(df1)
df1.show()

+-------+-----+----------+-------------+-------+---------+----------+--------------+
|Salarié|Genre|    groupe|date_embauche|salaire|Genre_num|groupe_num|groupe_vecteur|
+-------+-----+----------+-------------+-------+---------+----------+--------------+
|   Eric|Homme|Technicien|         2016|  30000|      1.0|       0.0| (2,[0],[1.0])|
|  Elise|Femme| Ingénieur|         1988| 100000|      0.0|       2.0|     (2,[],[])|
|  Hapsa|Femme|       PhD|         2012|  60000|      0.0|       1.0| (2,[1],[1.0])|
|    Xin|Homme|Technicien|         2001|  60000|      1.0|       0.0| (2,[0],[1.0])|
+-------+-----+----------+-------------+-------+---------+----------+--------------+



### Mise a l'echelle

In [80]:
import findspark
findspark.init()
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [81]:
import pandas as pd
d = {'x': [7, 17, 27, 37, 47, 57], 'y': [7, 18, 16, 24, 22, 30], 'z': [17, 12, 19, 24, 22, 45]}
df = pd.DataFrame(data=d)
df1=spark.createDataFrame(df)
df1.show()

+---+---+---+
|  x|  y|  z|
+---+---+---+
|  7|  7| 17|
| 17| 18| 12|
| 27| 16| 19|
| 37| 24| 24|
| 47| 22| 22|
| 57| 30| 45|
+---+---+---+



In [82]:
from pyspark.ml.feature import VectorAssembler
assembleur = VectorAssembler(inputCols=df1.columns, outputCol="caractéristiques")
df_nouv=assembleur.transform(df1)
df_nouv.show()

+---+---+---+----------------+
|  x|  y|  z|caractéristiques|
+---+---+---+----------------+
|  7|  7| 17|  [7.0,7.0,17.0]|
| 17| 18| 12|[17.0,18.0,12.0]|
| 27| 16| 19|[27.0,16.0,19.0]|
| 37| 24| 24|[37.0,24.0,24.0]|
| 47| 22| 22|[47.0,22.0,22.0]|
| 57| 30| 45|[57.0,30.0,45.0]|
+---+---+---+----------------+



In [83]:
from pyspark.ml.feature import MinMaxScaler

scaler = MinMaxScaler (inputCol="caractéristiques", outputCol="mise_à_l_échelle")

scaler_model = scaler.fit(df_nouv)
scaled_data = scaler_model.transform(df_nouv)
scaled_data.select("caractéristiques",'mise_à_l_échelle').show(truncate=False)

+----------------+--------------------------------------------+
|caractéristiques|mise_à_l_échelle                            |
+----------------+--------------------------------------------+
|[7.0,7.0,17.0]  |[0.0,0.0,0.15151515151515152]               |
|[17.0,18.0,12.0]|[0.2,0.4782608695652174,0.0]                |
|[27.0,16.0,19.0]|[0.4,0.391304347826087,0.21212121212121213] |
|[37.0,24.0,24.0]|[0.6,0.7391304347826086,0.36363636363636365]|
|[47.0,22.0,22.0]|[0.8,0.6521739130434783,0.30303030303030304]|
|[57.0,30.0,45.0]|[1.0,1.0,1.0]                               |
+----------------+--------------------------------------------+



###Correlation Pearson

In [84]:
import pandas as pd
d = {'x': [7, 17, 27, 37, 47, 57], 'y': [7, 18, 16, 24, 22, 30]}
df = pd.DataFrame(data=d)
df

Unnamed: 0,x,y
0,7,7
1,17,18
2,27,16
3,37,24
4,47,22
5,57,30


In [85]:
df1 = spark.createDataFrame(df)
df1.show()

+---+---+
|  x|  y|
+---+---+
|  7|  7|
| 17| 18|
| 27| 16|
| 37| 24|
| 47| 22|
| 57| 30|
+---+---+



In [86]:
from pyspark.ml.feature import VectorAssembler
assembleur = VectorAssembler(inputCols=df1.columns, outputCol="caractéristiques")
df_nouv = assembleur.transform(df1)
df_nouv.show()

+---+---+----------------+
|  x|  y|caractéristiques|
+---+---+----------------+
|  7|  7|       [7.0,7.0]|
| 17| 18|     [17.0,18.0]|
| 27| 16|     [27.0,16.0]|
| 37| 24|     [37.0,24.0]|
| 47| 22|     [47.0,22.0]|
| 57| 30|     [57.0,30.0]|
+---+---+----------------+



In [87]:
from pyspark.ml.stat import Correlation
pearson_corr = Correlation.corr(df_nouv,'caractéristiques')
pearson_corr.show(2,False)

+---------------------------------------------------------------------------------+
|pearson(caractéristiques)                                                        |
+---------------------------------------------------------------------------------+
|1.0                 0.9201575383978458  
0.9201575383978458  1.0                 |
+---------------------------------------------------------------------------------+



In [88]:
spearman_corr=Correlation.corr(df_nouv,'caractéristiques',"spearman")
spearman_corr.show(2,False)

+---------------------------------------------------------------------------------+
|spearman(caractéristiques)                                                       |
+---------------------------------------------------------------------------------+
|1.0                 0.8857142857142843  
0.8857142857142843  1.0                 |
+---------------------------------------------------------------------------------+



### Régression Linéaire

In [89]:
import pandas as pd
d = {'x': [7, 17, 27, 37, 47, 57,67,77,87,97,107,117,127,137, 147], 'label': [7, 18, 16, 24, 22, 30, 28, 36, 34, 42, 40, 48, 46, 54, 52]}
df = pd.DataFrame(data=d)
df

Unnamed: 0,x,label
0,7,7
1,17,18
2,27,16
3,37,24
4,47,22
5,57,30
6,67,28
7,77,36
8,87,34
9,97,42


In [90]:
df1 = spark.createDataFrame(df)
df1.show()

+---+-----+
|  x|label|
+---+-----+
|  7|    7|
| 17|   18|
| 27|   16|
| 37|   24|
| 47|   22|
| 57|   30|
| 67|   28|
| 77|   36|
| 87|   34|
| 97|   42|
|107|   40|
|117|   48|
|127|   46|
|137|   54|
|147|   52|
+---+-----+



In [91]:
df1.columns

['x', 'label']

In [92]:
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler

assembleur = VectorAssembler(inputCols=['x'], outputCol='features')
df_nouv = assembleur.transform(df1)
df_nouv.printSchema()

root
 |-- x: long (nullable = true)
 |-- label: long (nullable = true)
 |-- features: vector (nullable = true)



In [93]:
df_nouv.select(['features','label']).show()

+--------+-----+
|features|label|
+--------+-----+
|   [7.0]|    7|
|  [17.0]|   18|
|  [27.0]|   16|
|  [37.0]|   24|
|  [47.0]|   22|
|  [57.0]|   30|
|  [67.0]|   28|
|  [77.0]|   36|
|  [87.0]|   34|
|  [97.0]|   42|
| [107.0]|   40|
| [117.0]|   48|
| [127.0]|   46|
| [137.0]|   54|
| [147.0]|   52|
+--------+-----+



In [100]:
train, test = df_nouv.randomSplit([0.75, 0.25])
print(f"Taille données d'apprentissage : {train.count()}" )
print(f"Taille données de test : {test.count()}" )

Taille données d'apprentissage : 12
Taille données de test : 3


In [101]:
train.show()

test.show()

+---+-----+--------+
|  x|label|features|
+---+-----+--------+
| 17|   18|  [17.0]|
| 27|   16|  [27.0]|
| 37|   24|  [37.0]|
| 47|   22|  [47.0]|
| 67|   28|  [67.0]|
| 77|   36|  [77.0]|
| 87|   34|  [87.0]|
| 97|   42|  [97.0]|
|107|   40| [107.0]|
|117|   48| [117.0]|
|137|   54| [137.0]|
|147|   52| [147.0]|
+---+-----+--------+

+---+-----+--------+
|  x|label|features|
+---+-----+--------+
|  7|    7|   [7.0]|
| 57|   30|  [57.0]|
|127|   46| [127.0]|
+---+-----+--------+



In [102]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression()
lr_model = lr.fit(train)

In [103]:
pred_df = lr_model.transform(test)
pred_df.show()

+---+-----+--------+------------------+
|  x|label|features|        prediction|
+---+-----+--------+------------------+
|  7|    7|   [7.0]|12.499999999999966|
| 57|   30|  [57.0]| 27.49999999999999|
|127|   46| [127.0]| 48.50000000000003|
+---+-----+--------+------------------+



### ML with load_diabetes

In [148]:
from sklearn.datasets import load_diabetes

In [149]:
data = load_diabetes().data
target = load_diabetes().target

In [150]:
data = pd.DataFrame(data)
target = pd.DataFrame(data=target, columns=["label"])

df = pd.concat([data, target], axis=1)
df.head()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,label
0,0.038076,0.05068,0.061696,0.021872,-0.044223,-0.034821,-0.043401,-0.002592,0.019908,-0.017646,151.0
1,-0.001882,-0.044642,-0.051474,-0.026328,-0.008449,-0.019163,0.074412,-0.039493,-0.06833,-0.092204,75.0
2,0.085299,0.05068,0.044451,-0.005671,-0.045599,-0.034194,-0.032356,-0.002592,0.002864,-0.02593,141.0
3,-0.089063,-0.044642,-0.011595,-0.036656,0.012191,0.024991,-0.036038,0.034309,0.022692,-0.009362,206.0
4,0.005383,-0.044642,-0.036385,0.021872,0.003935,0.015596,0.008142,-0.002592,-0.031991,-0.046641,135.0


In [151]:
df1 = spark.createDataFrame(df)
df1.show()

+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+-------------------+--------------------+-----+
|                   0|                 1|                   2|                   3|                   4|                   5|                  6|                   7|                  8|                   9|label|
+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+-------------------+--------------------+-----+
|  0.0380759064334241|0.0506801187398187|  0.0616962065186885|  0.0218723549949558| -0.0442234984244464| -0.0348207628376986|-0.0434008456520269|-0.00259226199818282| 0.0199084208763183| -0.0176461251598052|151.0|
|-0.00188201652779104|-0.044641636506989| -0.0514740612388061| -0.0263278347173518|-0.00844872411121698|  -0.019163339748222| 0.0744115640787594

In [152]:
assembleur = VectorAssembler(inputCols=['1', "2", "3", '4', "5", "6", '7', "8", "9"], outputCol='features')
df_nouv = assembleur.transform(df1)
df_nouv.printSchema()

root
 |-- 0: double (nullable = true)
 |-- 1: double (nullable = true)
 |-- 2: double (nullable = true)
 |-- 3: double (nullable = true)
 |-- 4: double (nullable = true)
 |-- 5: double (nullable = true)
 |-- 6: double (nullable = true)
 |-- 7: double (nullable = true)
 |-- 8: double (nullable = true)
 |-- 9: double (nullable = true)
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)



In [153]:
df_nouv.show()

+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+-------------------+--------------------+-----+--------------------+
|                   0|                 1|                   2|                   3|                   4|                   5|                  6|                   7|                  8|                   9|label|            features|
+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+-------------------+--------------------+-----+--------------------+
|  0.0380759064334241|0.0506801187398187|  0.0616962065186885|  0.0218723549949558| -0.0442234984244464| -0.0348207628376986|-0.0434008456520269|-0.00259226199818282| 0.0199084208763183| -0.0176461251598052|151.0|[0.05068011873981...|
|-0.00188201652779104|-0.044641636506989| -0.051474061238806

In [154]:
train, test = df_nouv.randomSplit([0.75, 0.25])
print(f"Taille données d'apprentissage : {train.count()}" )
print(f"Taille données de test : {test.count()}" )

Taille données d'apprentissage : 341
Taille données de test : 101


In [155]:
train.show()

test.show()

+-------------------+------------------+--------------------+--------------------+--------------------+-------------------+-------------------+--------------------+-------------------+--------------------+-----+--------------------+
|                  0|                 1|                   2|                   3|                   4|                  5|                  6|                   7|                  8|                   9|label|            features|
+-------------------+------------------+--------------------+--------------------+--------------------+-------------------+-------------------+--------------------+-------------------+--------------------+-----+--------------------+
| -0.103593093156339|-0.044641636506989| -0.0374625042783544| -0.0263278347173518| 0.00255889875439205| 0.0199802179754696| 0.0118237214092792|-0.00259226199818282|-0.0683297436244215| -0.0259303389894746|113.0|[-0.0446416365069...|
| -0.099960554705319|-0.044641636506989| -0.0676412423470196|  -0.10

In [156]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression()
lr_model = lr.fit(train)

In [157]:
pred_df = lr_model.transform(test)
pred_df.show()

+--------------------+------------------+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+------------------+
|                   0|                 1|                   2|                   3|                   4|                  5|                   6|                   7|                   8|                   9|label|            features|        prediction|
+--------------------+------------------+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+------------------+
|  -0.107225631607358|-0.044641636506989| -0.0773415510119477| -0.0263278347173518| -0.0896299427450836|-0.0961978613484469|  0.0265502726256275|  -0.076394503750001| -0.0425721049227942| -0.0052198044153011|137.0|[-0.0446416365069...|