In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.sql import SparkSession
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import OneHotEncoder, StandardScaler, VectorAssembler, StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import when, col
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

import pandas as pd
import numpy as np
import pyspark.pandas as ps 
import os

from pyspark.sql import functions as F
from pyspark.sql.functions import regexp_replace, concat, lit
from pyspark.sql.functions import trim, mean

# from pyspark.sql.functions import  mean:




In [2]:
spark = SparkSession.builder \
    .config("spark.python.worker.memory", "2g")\
    .getOrCreate()



In [3]:
df_CO = spark.read.csv("/carbone/CO2.csv", header=True, inferSchema=True)

In [4]:
df_CO.show(4)

+---+--------------------+-------------+---------------+-----------+
|_c0|     Marque / Modele|Bonus / Malus|Rejets CO2 g/km|Cout enerie|
+---+--------------------+-------------+---------------+-----------+
|  2|AUDI E-TRON SPORT...|-6Â 000â‚¬Â 1|              0|   319Â â‚¬|
|  3|AUDI E-TRON SPORT...|-6Â 000â‚¬Â 1|              0|   356Â â‚¬|
|  4|AUDI E-TRON 55 (4...|-6Â 000â‚¬Â 1|              0|   357Â â‚¬|
|  5|AUDI E-TRON 50 (3...|-6Â 000â‚¬Â 1|              0|   356Â â‚¬|
+---+--------------------+-------------+---------------+-----------+
only showing top 4 rows



## Premier RDD, séparer Marque et Modèle

In [5]:
df_CO.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Marque / Modele: string (nullable = true)
 |-- Bonus / Malus: string (nullable = true)
 |-- Rejets CO2 g/km: integer (nullable = true)
 |-- Cout enerie: string (nullable = true)



In [6]:
# df_CO.select("Marque / Modele").describe().show()

In [7]:
# créer un RDD à partir du DataFrame
rdd_CO = df_CO.rdd

# Appliquer la méthode `map()` pour séparer la première colonne en deux

In [8]:
rdd_separated = rdd_CO.map(lambda x: (x[0], x[1].split(" ")[0], " ".join(x[1].split(" ")[1:]), *x[2:]))

# contrôle

In [9]:
df_separated = rdd_separated.toDF(["_C0", "Marque", "Modele", "Bonus / Malus", "Rejets CO2 g/km", "Cout enerie"])

In [10]:
df_separated.show(5)

+---+------+--------------------+-------------+---------------+-----------+
|_C0|Marque|              Modele|Bonus / Malus|Rejets CO2 g/km|Cout enerie|
+---+------+--------------------+-------------+---------------+-----------+
|  2|  AUDI|E-TRON SPORTBACK ...|-6Â 000â‚¬Â 1|              0|   319Â â‚¬|
|  3|  AUDI|E-TRON SPORTBACK ...|-6Â 000â‚¬Â 1|              0|   356Â â‚¬|
|  4|  AUDI|E-TRON 55 (408ch)...|-6Â 000â‚¬Â 1|              0|   357Â â‚¬|
|  5|  AUDI|E-TRON 50 (313ch)...|-6Â 000â‚¬Â 1|              0|   356Â â‚¬|
|  6|   BMW|           i3 120 Ah|-6Â 000â‚¬Â 1|              0|   204Â â‚¬|
+---+------+--------------------+-------------+---------------+-----------+
only showing top 5 rows



## Second traitement

In [11]:
df_separated.select("Bonus / Malus").show(5)

+-------------+
|Bonus / Malus|
+-------------+
|-6Â 000â‚¬Â 1|
|-6Â 000â‚¬Â 1|
|-6Â 000â‚¬Â 1|
|-6Â 000â‚¬Â 1|
|-6Â 000â‚¬Â 1|
+-------------+
only showing top 5 rows



In [12]:
# df_CO.select()
# Bm = df_CO.select(concat(regexp_replace(regexp_replace(trim("Bonus / Malus"), "[¬Â 1]", "" ), "[^0-9-]", ""), 
#                     lit("€")).alias("bonus_malus"))

Bm = df_separated.select(concat(regexp_replace(regexp_replace(trim("Bonus / Malus"), "[¬Â 1]", "" ), "[^0-9-]", ""), 
                    lit("€")).alias("bonus_malus"))

In [13]:
df_C = df_separated.toPandas()
df_Bm = Bm.toPandas()

In [14]:
df_C.head()

Unnamed: 0,_C0,Marque,Modele,Bonus / Malus,Rejets CO2 g/km,Cout enerie
0,2,AUDI,E-TRON SPORTBACK 55 (408ch) quattro,-6Â 000â‚¬Â 1,0,319Â â‚¬
1,3,AUDI,E-TRON SPORTBACK 50 (313ch) quattro,-6Â 000â‚¬Â 1,0,356Â â‚¬
2,4,AUDI,E-TRON 55 (408ch) quattro,-6Â 000â‚¬Â 1,0,357Â â‚¬
3,5,AUDI,E-TRON 50 (313ch) quattro,-6Â 000â‚¬Â 1,0,356Â â‚¬
4,6,BMW,i3 120 Ah,-6Â 000â‚¬Â 1,0,204Â â‚¬


In [15]:
print(df_Bm.value_counts())
print(df_Bm.mode())
print(df_Bm.isna().sum())

bonus_malus
8753€          185
-€             100
-6000€          53
7890€           27
8460€           17
873€            14
7340€           14
7073€           13
763€             7
680€             7
dtype: int64
  bonus_malus
0       8753€
bonus_malus    0
dtype: int64


### Remplacer les valeurs `-€`  de la colonne 'bonus_malus' par le `mode` de cette colonne

In [16]:
df_Bm["bonus_malus"] = df_Bm["bonus_malus"].replace("-€", df_Bm['bonus_malus'].mode()[0])

## Intéressons nous à la colone `cout enertie`

In [17]:
df_Cout = df_separated.select(concat(regexp_replace(trim("Cout enerie"), "[^0-9-]", ""), lit("€")).alias("Cout_enerie"))

In [18]:
df_Cout = df_Cout.toPandas()

## DEFINISSONS LE DATAFRAME `CO2` Formater

In [19]:
dfFormater = pd.concat([df_C[['_C0', 'Marque', 'Modele','Rejets CO2 g/km']], df_Bm["bonus_malus"]],axis=1)
dfFormater = pd.concat([dfFormater, df_Cout],axis=1)
dfFormater.head(4)

Unnamed: 0,_C0,Marque,Modele,Rejets CO2 g/km,bonus_malus,Cout_enerie
0,2,AUDI,E-TRON SPORTBACK 55 (408ch) quattro,0,-6000€,319€
1,3,AUDI,E-TRON SPORTBACK 50 (313ch) quattro,0,-6000€,356€
2,4,AUDI,E-TRON 55 (408ch) quattro,0,-6000€,357€
3,5,AUDI,E-TRON 50 (313ch) quattro,0,-6000€,356€


In [43]:
dfFormater.isna().sum()

_C0                0
Marque             0
Modele             0
Rejets CO2 g/km    0
bonus_malus        0
Cout_enerie        0
dtype: int64

## intégration des colonnes `"Bonus / Malus", "Rejets CO2 g/km", "Cout Energie"`,  dans le 
## `catalogue.csv`

### chargement des données `catalogue_hive_ext` qui ce trouve dans HIVE

Pour accéder aux données du catalogue stockées dans Hive depuis Spark via Jupyter Notebook,nous allons utiliser le connecteur Hive.

les classes nécessaires pour se connecter à Oracle NoSQL et charger les données

In [20]:
# import jaydebeapi
import jpype.imports

In [21]:
def jaydebeapi_data_converter(data):
    """ 
        jaydebeapi retourne des classes java : <java class 'java.lang.[type]'>
        Cette fonction Converti les classes java retournees par jaydebeapi en type python.
        En effet, on ne peut utiliser les classe java pour creer des Pandas Spark DataFrame.
        Mapping de type:
        <java class 'java.lang.String'>   => str
        <java class 'java.lang.Booleen'>  => bool
        <java class 'java.lang.Integer'>  => int
    """
    tmp = []
    for row in data:
        new_row = []
        for elm in row:
            if java.lang.String == type(elm):
                new_row.append(str(elm))
            elif java.lang.Integer == type(elm):
                new_row.append(int(elm))
            elif java.lang.Boolean == type(elm):
                new_row.append(bool(elm))
            elif "<java class 'JDouble'>" == str(type(elm)):
                new_row.append(str(elm))
            else:
                print(f"Type {type(elm)} non pris en charge")
                return
        tmp.append(new_row)
    return tmp

In [22]:
HIVE_URL = 'jdbc:hive2://localhost:10000'
HIVE_DRIVER = "org.apache.hive.jdbc.HiveDriver" 
HIVE_USERNAME = "vagrant"
HIVE_PASSWORD = ""
HIVE_JAR_FILE = '/usr/local/hive/jdbc/hive-jdbc-3.1.3-standalone.jar'

In [23]:
jpype.startJVM(classpath=[HIVE_JAR_FILE])
jpype.addClassPath(HIVE_JAR_FILE)

In [24]:
import java

In [25]:
conn = jaydebeapi.connect(HIVE_DRIVER,HIVE_URL,[HIVE_USERNAME,HIVE_PASSWORD])
cursor = conn.cursor()
cursor.execute("SELECT * FROM catalogue_hive_ext")
data1 = cursor.fetchall()
cursor.close()
conn.close()

In [26]:
data2 = jaydebeapi_data_converter(data1)

In [27]:
catalogue =  ps.DataFrame(data2)

In [28]:
catalogue.head()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9
0,Volkswagen,Touran 2.0 FSI,150,longue,7,5,gris,False,27340,1.4e-37
1,Volkswagen,Touran 2.0 FSI,150,longue,7,5,bleu,False,27340,1.5e-37
2,Volkswagen,Polo 1.2 6V,55,courte,5,3,gris,True,8540,3e-37
3,Volkswagen,Golf 2.0 FSI,150,moyenne,5,5,bleu,True,16029,4.3e-37
4,Seat,Toledo 1.6,102,longue,5,5,gris,False,18880,5.9000000000000005e-37


In [29]:
catalogue.columns=['Marque','nom','puissance','longueur','nbPlaces','nbPortes','couleur','occasion','prix', 'id']

In [30]:
type(catalogue)

pyspark.pandas.frame.DataFrame

In [31]:
catalogue=catalogue.drop('id')

In [32]:
catalogue.head()

Unnamed: 0,Marque,nom,puissance,longueur,nbPlaces,nbPortes,couleur,occasion,prix
0,Volkswagen,Touran 2.0 FSI,150,longue,7,5,gris,False,27340
1,Volkswagen,Touran 2.0 FSI,150,longue,7,5,bleu,False,27340
2,Volkswagen,Polo 1.2 6V,55,courte,5,3,gris,True,8540
3,Volkswagen,Golf 2.0 FSI,150,moyenne,5,5,bleu,True,16029
4,Seat,Toledo 1.6,102,longue,5,5,gris,False,18880


## Merge de `CO2` & `Catalogue`

In [33]:
type(dfFormater)

pandas.core.frame.DataFrame

In [34]:
type(catalogue)

pyspark.pandas.frame.DataFrame

In [35]:
catalogue=catalogue.to_pandas()

In [36]:
type(catalogue)

pandas.core.frame.DataFrame

In [37]:
dfFormaterM=dfFormater[["Marque","Rejets CO2 g/km","bonus_malus","Cout_enerie"]]

In [38]:
data = pd.merge(catalogue, dfFormaterM, on = "Marque")

In [39]:
data.columns

Index(['Marque', 'nom', 'puissance', 'longueur', 'nbPlaces', 'nbPortes',
       'couleur', 'occasion', 'prix', 'Rejets CO2 g/km', 'bonus_malus',
       'Cout_enerie'],
      dtype='object')

In [40]:
data.head()

Unnamed: 0,Marque,nom,puissance,longueur,nbPlaces,nbPortes,couleur,occasion,prix,Rejets CO2 g/km,bonus_malus,Cout_enerie
0,BMW,120i,150,moyenne,5,5,rouge,False,35800,0,-6000€,204€
1,BMW,120i,150,moyenne,5,5,rouge,False,35800,0,-6000€,204€
2,BMW,120i,150,moyenne,5,5,rouge,False,35800,36,8753€,60€
3,BMW,120i,150,moyenne,5,5,rouge,False,35800,36,8753€,54€
4,BMW,120i,150,moyenne,5,5,rouge,False,35800,36,8753€,54€
