# Contexte:
La réalisation d'un environnement Big Data: une première chaîne de traitement des données qui comprendra le preprocessing, feature extraction( basée sur transfer learning avec Resnet50) et une étape de réduction de dimension avec PCA.
Cette réalisation sera effectuée à l'aide d'un script Pyspark et une instance cloud AWS EC2 pour l'exécution et une instance cloud AWS S3 pour sauvegarder les données.

On localise Spark dans l’EC2 avec findspark

In [None]:
import findspark
findspark.init()

On importe les modules nécessaires

In [None]:
import os
import io
import numpy as np
import pyspark
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from PIL import Image
# for connections to S3 AWS
import boto3
# pyspark modules
import pyspark 
import urllib
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, split
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
from pyspark.ml.feature import StringIndexer, StandardScaler
from pyspark.ml.feature import PCA
# tnesorflow modules
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array

In [None]:
os.environ['JAVA_HOME']  ='/usr/lib/jvm/java-8-openjdk-amd64'

In [None]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars aws-java-sdk-bundle-1.12.178.jar, hadoop-aws-2.7.3.jar pyspark-shell'

On va réaliser laCommunication EC2-S3 avec boto3, et configurer la session Spark et le client S3. On va 
créer un contexte Spark capable de communiquer avec S3



## Vérification de la connexion EC2 <-> S3

On vérifie la connexion et on liste le compartiment et les fichiers



In [None]:
import boto3
session = boto3.session.Session(aws_access_key_id='....',
                                aws_secret_access_key='....')
s3_client = session.client(service_name='s3', region_name='eu-west-1')


On vérifie la liste des compartiments ( 1 seul dans ce cas) et les fichiers qu'il contient

In [None]:
print(s3_client.list_buckets())

## Spark session kickoff

In [None]:
spark = (SparkSession
         .builder.master('local[*]')
         .config('spark.executor.extraJavaOptions','-Dcom.amazonaws.services.s3.enableV4=true')
         #.config('spark.driver.extraJavaOptions','-Dcom.amazonaws.services.s3.enableV4=true')
         .appName('P8')
         #.config('fs.s3a.aws.credentials.provider','com.amazonaws.auth.DefaultAWSCredentialsProviderChain')
        #.config('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
        #.config('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3native.NativeS3FileSystem') 
         .getOrCreate()
        )

In [None]:
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set('fs.s3a.access.key', '....')
sc._jsc.hadoopConfiguration().set('fs.s3a.secret.key', '....') 
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.eu-west-1.amazonaws.com")
sc._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem")
sc._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
sc._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider")


## Lecture des données de S3

In [None]:
s3_url = "s3a://sm-p8-fruits/fruits-360_dataset - Copie/fruits-360/*"
image_df = spark.read.format("binaryFile").load(s3_url)

Extraction du label

In [None]:
image_df = images.withColumn('label', split(col('path'), '/').getItem(4))
image_df = image_df.select('path', 'content', 'label')
image_df.show()

Chargement des paramètres du Resnet50

In [None]:
model = ResNet50(include_top=False,
                 input_shape=(100, 100, 3),
                 pooling='max',
                 weights=None)
model.summary()  

bc_model_weights = sc.broadcast(model.get_weights())
model.set_weights(bc_model_weights.value)

In [None]:
def model_fn():
  
    model = ResNet50(include_top=False,
                     input_shape=(100, 100, 3),
                     pooling='max',
                     weights=None)
    model.set_weights(bc_model_weights.value)
  return model

In [None]:
def preprocess(content):
    img = Image.open(io.BytesIO(content)).resize([100, 100])
    arr = img_to_array(img)
    return preprocess_input(arr)


In [None]:
def featurize_series(model, url_series):
    
    input = np.stack(url_series.map(preprocess))
    preds = model.predict(input)
    output = [p.flatten() for p in preds]
    return pd.Series(output)


In [None]:
spark.conf.set("spark.sql.execution.arrow.enabled","true")

In [None]:
@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
  model = model_fn()
  for content_series in content_series_iter:
    yield featurize_series(model, content_series)



In [None]:
image_sdf = image_sdf.select(col("path"), col("label"), featurize_udf("content").alias("features"))
image_sdf.printSchema()
image_sdf.show()

In [None]:
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
features_df = image_sdf.select(col("path"),  col("label"), list_to_vector_udf(image_sdf["features"]).alias("features_"))

Normalisation des features avec standardscaler

In [None]:
standardizer = StandardScaler(withMean=True, withStd=True,
                              inputCol='features_',
                              outputCol='feats_scaled')
std = standardizer.fit(features_df)
features_df_scaled = std.transform(features_df)
features_df_scaled.show(5) 


Calcule de la réduction de dimensionnalité avec PCA

In [None]:
pca = PCA(k=50, inputCol="feats_scaled", outputCol="pca")
modelpca = pca.fit(features_df_scaled)
transformed = modelpca.transform(features_df_scaled)

In [None]:
import matplotlib.pyplot as plt

In [None]:
var = modelpca.explainedVariance.cumsum()
sns.set_context(context='poster', font_scale=0.8)
sns.lineplot(x=[i for i in range(51)], y=np.insert(var,0,0)*100, color='deepskyblue')
plt.xlabel('PCs')
plt.ylabel('Variance (%)')
plt.ylim(0,100)
plt.xlim(left=0)
plt.show()

Exportation du résultats sur S3

In [None]:
transformed_final = transformed.select('image', 'label', 'pca').write.csv("s3a://sm-p8-fruits/pca_df")
