In [3]:
import pandas as pd
import glob
import os
import zipfile
import matplotlib.pyplot as plt
import numpy as np
from tqdm import tqdm
import re
from jinja2 import Environment, FileSystemLoader
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# necessary import
from pyspark.sql import SparkSession
from pyspark.ml.image import ImageSchema
from pyspark.sql.functions import lit
from functools import reduce
from pyspark.sql.functions import pandas_udf
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col
from pyspark.sql.types import ArrayType, IntegerType, StringType, MapType
import shutil
from tqdm import tqdm
import socket

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
# !pip install --trusted-host 10.32.7.103 --index-url http://10.32.7.103:32273/simple/ hdfs3
import os
import subprocess
import traceback
import logging

In [4]:
# from sparkdl import DeepImageFeaturizer

In [5]:
# from sparkdl.udf.keras_image_model import registerKerasImageUDF

In [6]:
APP_NAME = "Course-YAMOLDIN"
NORMALIZED_APP_NAME = APP_NAME.replace('/', '_').replace(':', '_')

APPS_TMP_DIR = os.path.join(os.getcwd(), "tmp")
APPS_CONF_DIR = os.path.join(os.getcwd(), "conf")
APPS_LOGS_DIR = os.path.join(os.getcwd(), "logs")
LOG4J_PROP_FILE = os.path.join(APPS_CONF_DIR, "pyspark-log4j-{}.properties".format(NORMALIZED_APP_NAME))
LOG_FILE = os.path.join(APPS_LOGS_DIR, 'pyspark-{}.log'.format(NORMALIZED_APP_NAME))
EXTRA_JAVA_OPTIONS = "-Dlog4j.configuration=file://{} -Dspark.hadoop.dfs.replication=1 -Dhttps.protocols=TLSv1.0,TLSv1.1,TLSv1.2,TLSv1.3".format(LOG4J_PROP_FILE)

LOCAL_IP = socket.gethostbyname(socket.gethostname())

# preparing configuration files from templates
for directory in [APPS_CONF_DIR, APPS_LOGS_DIR, APPS_TMP_DIR]:
    if not os.path.exists(directory):
        os.makedirs(directory)

env = Environment(loader=FileSystemLoader('/opt'))
template = env.get_template("pyspark_log4j.properties.template")
template.stream(logfile=LOG_FILE).dump(LOG4J_PROP_FILE)

#running spark
SPARK_ADDRESS = "local[4]"

spark = SparkSession.builder.appName("lab-2-yamoldin-app").master(SPARK_ADDRESS).config("spark.ui.port", "4040")\
    .config("spark.memory.fraction", "0.8")\
    .config("spark.memory.storageFraction", "0.6")\
    .config("spark.driver.memory", "10g")\
    .config("spark.driver.extraJavaOptions", EXTRA_JAVA_OPTIONS)\
    .config("spark.executor.memory", "10g")\
    .getOrCreate()

In [7]:
Agaricus = spark.read.format("image").load("hdfs:///tmp/mgajnutdinov-337342/families_data/Agaricus").withColumn("label", lit('Agaricus'))
Amanita = spark.read.format("image").load("hdfs:///tmp/mgajnutdinov-337342/families_data/Amanita").withColumn("label", lit('Amanita'))

In [9]:
Amanita.printSchema()

root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = true)
 |    |-- width: integer (nullable = true)
 |    |-- nChannels: integer (nullable = true)
 |    |-- mode: integer (nullable = true)
 |    |-- data: binary (nullable = true)
 |-- label: string (nullable = false)



In [13]:
Amanita.filter(Amanita.image.origin[151:-4] == 105171).show(truncate=False)

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1207, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1033, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1211, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while receiving


Py4JError: An error occurred while calling o86.showString

In [13]:
dataframes = [Agaricus, Amanita]
# merge data frame
df = reduce(lambda first, second: first.union(second), dataframes)


# repartition dataframe 
df = df.repartition(200)


# split the data-frame
train, test = df.randomSplit([0.8, 0.2], 42)

In [8]:
df.describe().show()

+-------+--------+
|summary|   label|
+-------+--------+
|  count|    9063|
|   mean|    null|
| stddev|    null|
|    min|Agaricus|
|    max| Amanita|
+-------+--------+



In [11]:
df.groupBy(["label"]).agg(F.count("label")).show()

+--------+------------+
|   label|count(label)|
+--------+------------+
|Agaricus|        2220|
| Amanita|        6843|
+--------+------------+



In [7]:
featurizer = DeepImageFeaturizer(inputCol="image",
                                 outputCol="features",
                                 modelName="InceptionV3")


In [8]:
lr = LogisticRegression(maxIter=5, regParam=0.03, 
                        elasticNetParam=0.5, labelCol="label")

In [14]:
sparkdn     = Pipeline(stages=[featurizer, lr])
spark_model = sparkdn.fit(train) # start fitting or training

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# evaluate the model with test set
evaluator = MulticlassClassificationEvaluator() 
tx_test   = spark_model.transform(test)

print('F1-Score ',   evaluator.evaluate(tx_test, 
                                      {evaluator.metricName: 'f1'}))
print('Precision ',  evaluator.evaluate(tx_test,
                                       {evaluator.metricName: 'weightedPrecision'}))
print('Recall ',     evaluator.evaluate(tx_test, 
                                    {evaluator.metricName: 'weightedRecall'}))
print('Accuracy ',   evaluator.evaluate(tx_test, 
                                      {evaluator.metricName: 'accuracy'}))

In [11]:
spark.stop()

In [11]:
from keras.applications import InceptionV3

model = InceptionV3(weights='inception_v3_weights_tf_dim_ordering_tf_kernels_notop.h5')
# model.save('/tmp/model-full.h5')

ValueError: Layer count mismatch when loading weights from file. Model expected 189 layers, found 188 saved layers.

In [13]:
train.show()

+--------------------+--------+
|               image|   label|
+--------------------+--------+
|{hdfs://hdfs-name...|Agaricus|
|{hdfs://hdfs-name...|Agaricus|
|{hdfs://hdfs-name...|Agaricus|
|{hdfs://hdfs-name...|Agaricus|
|{hdfs://hdfs-name...|Agaricus|
|{hdfs://hdfs-name...|Agaricus|
|{hdfs://hdfs-name...|Agaricus|
|{hdfs://hdfs-name...|Agaricus|
|{hdfs://hdfs-name...|Agaricus|
|{hdfs://hdfs-name...| Amanita|
|{hdfs://hdfs-name...| Amanita|
|{hdfs://hdfs-name...| Amanita|
|{hdfs://hdfs-name...| Amanita|
|{hdfs://hdfs-name...| Amanita|
|{hdfs://hdfs-name...| Amanita|
|{hdfs://hdfs-name...| Amanita|
|{hdfs://hdfs-name...| Amanita|
|{hdfs://hdfs-name...| Amanita|
|{hdfs://hdfs-name...| Amanita|
|{hdfs://hdfs-name...| Amanita|
+--------------------+--------+
only showing top 20 rows



In [6]:
train.printSchema()

root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = true)
 |    |-- width: integer (nullable = true)
 |    |-- nChannels: integer (nullable = true)
 |    |-- mode: integer (nullable = true)
 |    |-- data: binary (nullable = true)
 |-- label: string (nullable = false)



ModuleNotFoundError: No module named 'tensorflow'