## Importations

This notebook is ran in a virtual environment in Ubuntu 20.04.2 LTS

Spark version: spark-2.4.7-bin-hadoop2.7

Java 8 !!!!!!!!!!!!!

In [2]:
import os

Findspark : to use spark within a jupyter notebook

Findspark needs the environment variable SPARK_HOME to work (indicate the spark directory).

In [3]:
# ensure SPARK_HOME is correctly set (in .bashrc)
os.environ['SPARK_HOME']

'/opt/spark'

Set the right path to java 8

In [4]:
os.environ['JAVA_HOME'] = "/usr/lib/jvm/java-8-openjdk-amd64"

Import findspark and initialize findspark (allow to use Spark with the notebook)

Makes pyspark available in the jupyter notebook

In [5]:
import findspark
findspark.init()

Sets the environnement variable 'PYSPARK_SUBMIT_ARGS' in order :
- to fetch the databricks sparkdl package, as soon as the pyspark-submit command will be run
- to make Hadoop AWS package available when spark will be loaded

In [6]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2,databricks:spark-deep-learning:1.5.0-spark2.4-s_2.11 pyspark-shell'

Import basic modules

In [7]:
import pandas as pd 
import numpy as np

Import pyspark

In [8]:
import pyspark

In [9]:
# # Explore functions of a module
# from inspect import getmembers, isfunction
# print(pd.DataFrame(getmembers(pyspark.sql)))

## Configurations ...

In [10]:
# extraction of AWS access keys from key file

path_cred = os.path.join("/home/maryse/p8/AWS/AWS_IAM_CREDENTIAL/Maryse_P8_credentials.csv")

with open(path_cred,'r') as f:
        msg = f.read()
          
ID = str(msg).split('\n')[1].split(',')[2]
KEY = str(msg).split('\n')[1].split(',')[3]

# set "temporary" environment variables
os.environ["AWS_ACCESS_KEY_ID"]=ID
os.environ["AWS_SECRET_ACCESS_KEY"]=KEY

... or use the configparser to read the credentials from our awsfile

In [11]:
# import configparser
# config = configparser.ConfigParser()
# config.read(os.path.expanduser("AWS/AWS_IAM_CREDENTIAL"))
# access_id = config.get(aws_profile, "aws_access_key_id") 
# access_key = config.get(aws_profile, "aws_secret_access_key")

Create and set parameters of the Hadoop configuration in order to be able to fetch data in S3.

In [12]:
from pyspark import SparkContext, SparkConf

# conf = (SparkConf().set('spark.executor.extraJavaOptions',
#                         '-Dcom.amazonaws.services.s3.enableV4=true')\
#          .set('spark.driver.extraJavaOptions','-Dcom.amazonaws.services.s3.enableV4=true'))

# sc = SparkContext(conf=conf)
sc = SparkContext.getOrCreate() #conf=conf)
# sc.setSystemProperty('com.amazonaws.services.s3.enableV4',
#                      'true')

hadoop_conf=sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3n.impl",
                "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoop_conf.set("fs.s3n.awsAccessKeyId", ID)
hadoop_conf.set("fs.s3n.awsSecretAccessKey", KEY)

# hadoopConf = sc._jsc.hadoopConfiguration()
# hadoopConf.set('fs.s3a.awsAccessKeyId', ID)
# hadoopConf.set('fs.s3a.awsSecretAccessKey', KEY)
# hadoopConf.set('fs.s3a.endpoint', 's3-us-east-2.amazonaws.com')
# hadoopConf.set('com.amazonaws.services.s3a.enableV4', 'true')
# hadoopConf.set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')

## Instantiation of SparkContext and import sparkdl

Let's instantiate our SparkContext

In [13]:
# # SparkContext become useless if SparkSession (spark.sql) is created
# from pyspark import SparkContext, SparkConf

In [14]:
# # En cas de pbe li√© aux serveurs S3 choisis
# conf = (SparkConf().set('spark.executor.extraJavaOptions',
#                         '-Dcom.amazonaws.services.s3.enableV4=true')\
#                    .set('spark.driver.extraJavaOptions',
#                         '-Dcom.amazonaws.services.s3.enableV4=true'))

In [15]:
# # Default SparkContext
# sc = SparkContext()

# # # Custom SparkContext
# # sc=SparkContext(conf=conf)
# # sc.setSystemProperty('com.amazonaws.services.s3.enableV4',
# #                      'true')

In [16]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('FeatExtr').getOrCreate()

Then only we import sparkdl

In [17]:
import warnings
# show only one warning if multiple warnings in the same cell
warnings.filterwarnings("ignore") # "once"

import sparkdl

Using TensorFlow backend.


## Create a Spark DataFrame containing all the pictures

### Read images and vectorize

In [18]:
from pyspark.ml.image import ImageSchema

Reads recursively all images in the specified directory, put in a Spark DataFrame

In [51]:
PREFIX = 'SAMPLE2'

In [52]:
# # Option1: Get local data

data_path = os.path.join("../data/fruits-360", PREFIX)
    
# Option2: Get data from s3

# bucket='ocfruitpictures'
# folder = PREFIX
# data_path = 's3n://{}/{}'.format(bucket, folder)

In [53]:
# reads all images contained in the directory

images_df = ImageSchema.readImages(data_path,
                                   recursive=True).repartition(12)

In [54]:
""" Return a JavaRDD of Object by unpickling
It will convert each Python object into Java object by Pyrolite, whenever the
RDD is serialized in batch or not.
"""

from pyspark.serializers import PickleSerializer, AutoBatchedSerializer

# Function to convert python object to Java objects
def _to_java_object_rdd(rdd):  
    rdd = rdd._reserialize(AutoBatchedSerializer(PickleSerializer()))
    return rdd.ctx._jvm.org.apache.spark.mllib.api.python.SerDe.pythonToJava(rdd._jrdd, True)

# Convert DataFrame to an RDD 
JavaObj = _to_java_object_rdd(images_df.rdd)

# Estimate size in bytes
nbytes = sc._jvm.org.apache.spark.util.SizeEstimator.estimate(JavaObj)
nbytes

7083464

In [46]:
sc.defaultParallelism

6

Show the content of the Spark DataFrame

In [21]:
# images_df.show()

In [22]:
# images_df.printSchema()

Now we've got a Spark DataFrame containing all the images, each as one row.

### Display sample picture

Extract first picture

In [23]:
# # extract first row of the DataFrame
# row0 = images_df.first()

In [24]:
# row0.asDict()['image']['mode']

In [25]:
# # transform the row in a dict, and turn the data in a 1D np.array
# mat0 = np.array(row0.asDict()['image']['data'])
# # reshape the 1D vector into a 3 channel, 2D np.array of pixels
# mat0 = mat0.reshape(100, 100, 3)[:, :, ::-1] # reverse BGR to RGB
# mat0.shape

In [26]:
# from PIL import Image
# # Display sample image
# Image.fromarray(mat0, 'RGB')

## Features extraction (Transfer Learning) using Sparkdl

In [27]:
from sparkdl import DeepImageFeaturizer

In [28]:
# Instantiation of the featurizer
feat = DeepImageFeaturizer(inputCol="image",
                           outputCol="image_features",
                           modelName="ResNet50")

Instantiation of a sparkdl pipeline to process the image data

In [29]:
from pyspark.ml import Pipeline

pipe = Pipeline(stages=[feat])
extractor = pipe.fit(images_df)

In [30]:
ext_features_df = extractor.transform(images_df)
# ext_features_df.printSchema()

In [31]:
# ext_features_df.show()

In [32]:
# # compare the size of the Spark DataFrame (prior action)
# # and that of a Pandas DataFrame

# import sys
# print(sys.getsizeof(ext_features_df),
#       sys.getsizeof(ext_features_df.toPandas()))

## PCA on the extracted features

In [33]:
from pyspark.ml.feature import PCA

# instantiate Spark PCA model
pca = PCA(k=8,
          inputCol="image_features",
          outputCol="pca_features")

In [None]:
# fit the model on the extracted features
model = pca.fit(ext_features_df.select('image_features'))

In [None]:
# cumulative explained variance
cumValues = model.explainedVariance.cumsum()

In [None]:
import matplotlib.pyplot as plt

# show the scree plot
plt.rcParams['figure.facecolor']='w'
plt.figure(figsize=(6,3))
plt.plot(range(1,9), cumValues, color='r',
         marker = 'o', linestyle='--')
plt.title('Scree plot')
plt.xlabel('Number of first components')
plt.ylabel('Cumulative explained variance')

In [None]:
# get the projection of the extracted features using PCA

pca_feat_df = model.transform(ext_features_df)
# pca_feat_df.show(truncate=True)

## Get the class of each image

In [None]:
# # Get class of the fruits

orig_col = pca_feat_df['image']['origin']
split_col = pyspark.sql.functions.split(orig_col,
                                        PREFIX+'/')

# add a new "label" column
df_ = pca_feat_df.withColumn('labels',
                             split_col.getItem(1))
split_col = pyspark.sql.functions.split(df_['labels'],
                                        '/')
df_ = df_.withColumn('labels',
                     split_col.getItem(0))

df_ = df_.withColumnRenamed("image", "path")

# df_.show()

In [None]:
results_df = df_.select('path','pca_features','labels')

Write final DataFrame in parquet format in S3 Bucket

In [None]:
# import pyarrow.csv as pv
# import pyarrow.parquet as pq

# # results_pd = results_df.toPandas()
# pq.write_table(results_df, 'test1.parquet') # Index(['path', 'pca_features', 'labels'], dtype='object')

In [None]:
# spark.conf.set("spark.sql.parquet.compression.codec", "snappy") # gzip, lzo or lz4
# spark.conf.set("spark.sql.parquet.compression.codec", "uncompressed")
# spark.sql("SET parquet.compression=SNAPPY")
# spark.sql("SET spark.sql.parquet.compression.codec=snappy")
# df_.write.parquet("p0.parquet")

In [None]:
# results_pd = results_df.toPandas()

In [None]:
# # MARCHE !!!
# f = open("/home/maryse/PARTAGE/FORMATION/OCR_DS/PROJET8/mon_texte2.txt", 'w+')
# f.write('contenu')
# f.close()

In [None]:
# # MARCHE PAS!!!
# path = "/home/maryse/PARTAGE/FORMATION/OCR_DS/PROJET8/P0.json"
# results_df.write.json('truc')

In [None]:
# # MARCHE
results_df.write.mode('overwrite').parquet("s3://ocfruitpictures/RESULTS_all")

In [None]:
# # MARCHE PAS!!!
# path = "file:///home/maryse/PARTAGE/FORMATION/OCR_DS/PROJET8/Pultimate.parquet"
# results_df.write.parquet(path)

In [None]:
# # MARCHE!!!
# path = "file:///home/maryse/Pultimate.parquet"
# results_df.write.parquet(path)