<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"></ul></div>

In [1]:
import findspark
findspark.init()

In [2]:
import os
import time
from PIL import Image
from pyspark.sql import SparkSession
from pyspark import SQLContext
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.image import ImageSchema

from pyspark.sql.functions import udf
from pyspark.sql.functions import input_file_name
from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf

from pyspark.ml.image import ImageSchema

# Settings

In [3]:
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/jpaul/.conda/envs/OP8_SPARK/python.exe'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2,databricks:spark-deep-learning:1.5.0-spark2.4-s_2.11 pyspark-shell'
os.environ['PYSPARK_PYTHON']='C:/Users/jpaul/.conda/envs/OP8_SPARK/python.exe'

In [4]:
S3 = True
root_path = 'C:/Users/jpaul/Ds_Projet8/'
path = 'C:/Users/jpaul/Ds_Projet8/fruits-360/Training/'
bucket = 'op-projet8-julienpaulet3'
categ = os.listdir(path)

In [5]:
if S3 is True:
    path_to_save = "s3a://"+bucket+"/"
else:
    path_to_save = root_path

# Functions

In [6]:
def load_aws_key():
    '''Loading AWS ID and Key from .txt file'''
    with open('Key.txt','r') as f:
        msg = f.read()
    ID = str(msg).split('\n')[0]
    KEY = msg.split('\n')[1]
    os.environ["AWS_ACCESS_KEY_ID"]=ID
    os.environ["AWS_SECRET_ACCESS_KEY"]=KEY
    return ID, KEY

def init_spark_session(S3=False, bucket = '', path_local=''):
    '''SPARK Session setup'''
    spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "25g") \
    .appName('Projet8') \
    .getOrCreate()
    sc = spark.sparkContext
    
    if S3 is True:
        path_img = "s3a://"+bucket+"/Training/"
        #Amazon ID and Key
        spark._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
        spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3-eu-west-1.amazonaws.com")
        spark._jsc.hadoopConfiguration().set("fs.s3a.aws.access.key", os.environ["AWS_ACCESS_KEY_ID"])
        spark._jsc.hadoopConfiguration().set("fs.s3a.aws.secret.key", os.environ["AWS_SECRET_ACCESS_KEY"])
        # categ = os.listdir(path_img)

    else:
        path_img = path_local
        categ = os.listdir(path)

    return sc, spark, path_img

if S3 is True:
    load_aws_key()

sc, spark, path = init_spark_session(S3=S3,
                                     bucket=bucket,
                                     path_local=path)

In [7]:
# Uncomment this line for debug - Execution is quicker
categ = categ[0:2]
categ

['AppleBraeburn', 'AppleCrimsonSnow']

In [8]:
def rename_categ(path, categ):
    """Function that transform the path, removing spaces and point"""
    try:
        for cat in categ:
            if ' ' in cat:
                os.rename(os.path.join(path, cat), os.path.join(path, cat.replace(' ', '')))
            elif '.' in cat:
                os.rename(os.path.join(path, cat), os.path.join(path, cat.replace('.', '')))
    except:
        pass
    cat = os.listdir(path)
    return cat

def parse_category(path):
    '''Output image'\ category from the path'''
    if len(path) > 0:
        return path.split('/')[-2]
    else:
        return ''
    
def load_data(path_img):
    '''Df loading ; It takes the path from were the images are and output df with path, images, and categories'''
    
    df_img = spark.read.format("image").load([path + cat for cat in categ])
    #df_img = spark.read.format("image").load(path + categ) # If only one categ
    print('chargement effectué')
    #path from images
    df_img = df_img.withColumn("path", input_file_name())
    #categories
    udf_categorie = udf(parse_category, StringType())
    df_img = df_img.withColumn('categorie', udf_categorie('path'))
    
    return df_img

In [9]:
sc

# Loading

In [10]:
#Loading of Df with path, images, and categories
spark_df = load_data(path)

chargement effectué


In [11]:
spark_df.show()

+--------------------+--------------------+-------------+
|               image|                path|    categorie|
+--------------------+--------------------+-------------+
|[s3a://op-projet8...|s3a://op-projet8-...|AppleBraeburn|
|[s3a://op-projet8...|s3a://op-projet8-...|AppleBraeburn|
|[s3a://op-projet8...|s3a://op-projet8-...|AppleBraeburn|
|[s3a://op-projet8...|s3a://op-projet8-...|AppleBraeburn|
|[s3a://op-projet8...|s3a://op-projet8-...|AppleBraeburn|
|[s3a://op-projet8...|s3a://op-projet8-...|AppleBraeburn|
|[s3a://op-projet8...|s3a://op-projet8-...|AppleBraeburn|
|[s3a://op-projet8...|s3a://op-projet8-...|AppleBraeburn|
|[s3a://op-projet8...|s3a://op-projet8-...|AppleBraeburn|
|[s3a://op-projet8...|s3a://op-projet8-...|AppleBraeburn|
|[s3a://op-projet8...|s3a://op-projet8-...|AppleBraeburn|
|[s3a://op-projet8...|s3a://op-projet8-...|AppleBraeburn|
|[s3a://op-projet8...|s3a://op-projet8-...|AppleBraeburn|
|[s3a://op-projet8...|s3a://op-projet8-...|AppleBraeburn|
|[s3a://op-pro

# Preprocessing

In [12]:
from sparkdl import DeepImageFeaturizer

In [13]:
# We'll use ResNet50 for the transformation
featurizer = DeepImageFeaturizer(inputCol="image", outputCol="image_preprocessed", modelName="ResNet50")
spark_df_preprocessed = featurizer.transform(spark_df).select(['path', 'categorie', 'image_preprocessed'])

# Saving

In [14]:
#Saving as parquet file
spark_df_preprocessed.repartition(16).write.format("parquet").mode('overwrite').save(path_to_save + 'preprocessed_parquet')

Py4JJavaError: An error occurred while calling o136.save.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:696)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:305)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:291)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:249)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 2.0 failed 1 times, most recent failure: Lost task 2.0 in stage 2.0 (TID 33, localhost, executor driver): java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:609)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:977)
	at org.apache.hadoop.util.DiskChecker.checkAccessByFileMethods(DiskChecker.java:187)
	at org.apache.hadoop.util.DiskChecker.checkDirAccess(DiskChecker.java:174)
	at org.apache.hadoop.util.DiskChecker.checkDir(DiskChecker.java:108)
	at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:285)
	at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344)
	at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416)
	at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198)
	at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
	at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:151)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:108)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:236)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1925)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1913)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1912)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1912)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2146)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2095)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2084)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
	... 32 more
Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:609)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:977)
	at org.apache.hadoop.util.DiskChecker.checkAccessByFileMethods(DiskChecker.java:187)
	at org.apache.hadoop.util.DiskChecker.checkDirAccess(DiskChecker.java:174)
	at org.apache.hadoop.util.DiskChecker.checkDir(DiskChecker.java:108)
	at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:285)
	at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344)
	at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416)
	at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198)
	at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
	at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:151)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:108)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:236)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
