In [1]:
import os
import io
from getpass import getpass
import numpy as np
import pandas as pd

from functions import *

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.feature import PCA, PCAModel, StringIndexer
import pyspark.sql.functions as F
from pyspark.ml.linalg import DenseVector, VectorUDT
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

import boto3

In [2]:
# Clés AMI pour accès S3
access_key = getpass(prompt="Clé d'accès pour S3") # Clé accès
secret_key = getpass(prompt='Clé secrète pour S3') # Clé secrète

Clé d'accès pour S3 ····················
Clé secrète pour S3 ········································


In [3]:
# Définition des variables d'environnement système pour accès S3
os.environ['AWS_ACCESS_KEY_ID'] = access_key
os.environ['AWS_SECRET_ACCESS_KEY'] = secret_key

In [4]:
# Accès S3 par boto3 et test
s3 = boto3.resource(
    service_name='s3',
    region_name='eu-west-1',
    aws_access_key_id=access_key,
    aws_secret_access_key=secret_key
)

for bucket in s3.buckets.all():
    print(bucket.name)

oc-bucket-img


In [5]:
spark = (SparkSession
         .builder # Création d'une SparkSession
         .master('local[4]') # Allocation de tous les threads locaux disponibles
         .config('spark.executor.memory', '1500m') # Allocation de mémoire aux executors
         .appName('OC-P8') # Nom donné
         .getOrCreate())

In [6]:
# Importation du modèle précédemment créé
pca_f = PCAModel.load("s3a://oc-bucket-img/model/pca")

In [7]:
# Création d'une user-defined function pour extraire le nom du dossier = nom du fruit, pour classifier
path2name = F.udf(lambda x: cat_extract(x))
# Création d'une user-defined function pour le préprocessing des images ligne à ligne
img_size = [30, 30]
img2vec = F.udf(lambda x: DenseVector(preprocess(x, img_size).flatten()/255), VectorUDT())

In [8]:
def preprocess_step(path):
    
    """
    Fonction gérant le préprocessing des images contenues au chemin indiqué :
    - Extraction des noms de fruits
    - Egalisation de l'histogramme des images et scaling
    - PCA sur images
    """
    
    df = spark.read.format("binaryfile").load(path) # Lecture des documents (images) contenues dans le dossier
    df_prep = df.withColumn("name", path2name("path")).select('name', 'content')
    df_prep = df_prep.withColumn("content", img2vec("content"))
    pca_df = pca_f.transform(df_prep)
    return pca_df

In [9]:
data_url = "s3a://oc-bucket-img/Training-aws/*" # URL du dossier dans S3

In [10]:
# Preprocessing des données d'entraînement
train_df = preprocess_step(data_url)

In [11]:
# Définition des étapes du modèle
strind = StringIndexer(inputCol='name', outputCol='name_ind') # Encodage des noms de fruits
logreg = LogisticRegression(featuresCol='pca_feat', labelCol='name_ind', maxIter=10) # Régression logistique sur variables après PCA

In [12]:
# Définition du pipeline
pipeline = Pipeline(stages=[strind, logreg])

In [13]:
# Hyperparamètres à évaluer
params = ParamGridBuilder() \
    .addGrid(logreg.regParam, [0.01, 0.1, 1]) \
    .build()

In [14]:
# Méthode d'évaluation = accuracy
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol='name_ind', metricName='accuracy')

In [15]:
# Evaluation croisée en 5 folds
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=params,
                          evaluator=evaluator,
                          numFolds=5)

In [16]:
# Grid search
model_cv = crossval.fit(train_df)

In [17]:
# Importation et préprocessing du set de test
test_url = "s3a://oc-bucket-img/Test-aws/*" # URL du dossier dans S3
test_pca_df = preprocess_step(test_url)

In [18]:
# Prédictions sur set de test
test_preds = model_cv.transform(test_pca_df)

In [19]:
test_preds.printSchema()

root
 |-- name: string (nullable = true)
 |-- content: vector (nullable = true)
 |-- pca_feat: vector (nullable = true)
 |-- name_ind: double (nullable = false)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [20]:
# Calcul de l'accuracy sur set de test
evaluator.evaluate(test_preds)

0.566412213740458