In [None]:
import os
import socket
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import udf, length, when, col, lit, broadcast
from pyspark.sql.types import BooleanType, IntegerType, LongType, StringType, ArrayType, FloatType, StructType, StructField
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType
from pyspark import StorageLevel
from jinja2 import Environment, FileSystemLoader

from pyspark.sql.functions import * # count, desc, size, explode, arrays_zip, regexp_extract
import pyspark.sql.functions as F


In [None]:
# setting constants
APP_NAME = "Spark-Smetanin"
NORMALIZED_APP_NAME = APP_NAME.replace('/', '_').replace(':', '_')

APPS_TMP_DIR = os.path.join(os.getcwd(), "tmp")
APPS_CONF_DIR = os.path.join(os.getcwd(), "conf")
APPS_LOGS_DIR = os.path.join(os.getcwd(), "logs")
LOG4J_PROP_FILE = os.path.join(APPS_CONF_DIR, "pyspark-log4j-{}.properties".format(NORMALIZED_APP_NAME))
LOG_FILE = os.path.join(APPS_LOGS_DIR, 'pyspark-{}.log'.format(NORMALIZED_APP_NAME))
EXTRA_JAVA_OPTIONS = "-Dlog4j.configuration=file://{} -Dspark.hadoop.dfs.replication=1 -Dhttps.protocols=TLSv1.0,TLSv1.1,TLSv1.2,TLSv1.3"\
    .format(LOG4J_PROP_FILE)

LOCAL_IP = socket.gethostbyname(socket.gethostname())

In [None]:
for directory in [APPS_CONF_DIR, APPS_LOGS_DIR, APPS_TMP_DIR]:
    if not os.path.exists(directory):
        os.makedirs(directory)

env = Environment(loader=FileSystemLoader('/opt'))
template = env.get_template("pyspark_log4j.properties.template")
template\
    .stream(logfile=LOG_FILE)\
    .dump(LOG4J_PROP_FILE)

In [None]:
spark = SparkSession\
    .builder\
    .appName(APP_NAME)\
    .master("k8s://https://10.32.7.103:31412")\
    .config("spark.driver.host", LOCAL_IP)\
    .config("spark.ui.port", "4040")\
    .config("spark.kubernetes.memoryOverheadFactor", "0.6")\
    .config("spark.driver.memory", "4g")\
    .config("spark.driver.bindAddress", "0.0.0.0")\
    .config("spark.executor.instances", "5")\
    .config("spark.executor.cores", '4')\
    .config("spark.executor.memory", "5g")\
    .config("spark.memory.fraction", "0.6")\
    .config("spark.memory.storageFraction", "0.5")\
    .config("spark.sql.autoBroadcastJoinThreshold", "-1")\
    .config("spark.driver.extraJavaOptions", EXTRA_JAVA_OPTIONS)\
    .config("spark.kubernetes.namespace", "asmetanin-307619")\
    .config("spark.kubernetes.driver.label.appname", APP_NAME)\
    .config("spark.kubernetes.executor.label.appname", APP_NAME)\
    .config("spark.kubernetes.container.image.pullPolicy", "Always")\
    .config("spark.kubernetes.container.image", "node03.st:5000/spark-executor:asmetanin-307619")\
    .config("spark.kubernetes.executor.deleteOnTermination", "true")\
    .config("spark.local.dir", "/tmp/spark")\
    .getOrCreate()

In [None]:
!hdfs dfs -put work/detect_12.json /home/vbespalov-225110/detect_12.json

In [None]:
JSON_SCHEMA = StructType([
    StructField('_id', StructType([
        StructField('$oid', StringType())
    ])),
    StructField('x', IntegerType()),
    StructField('y', IntegerType()),
    StructField('z', IntegerType()),
    StructField('time_detected', StringType()),
    StructField('object_id', IntegerType()),
    StructField('category', StringType()),
    StructField('clf_category', StringType()),
    StructField('conf', FloatType()),
    StructField('picked', BooleanType()),
    StructField('proceed', StructType([
        StructField('proceed_1', StringType()),
        StructField('proceed_2', StringType()),
    ])),
    StructField('repeated_detection', ArrayType(StructType([
        StructField('time_detected', StringType()),
        StructField('x', IntegerType()),
        StructField('y', IntegerType()),
        StructField('z', IntegerType()),
        StructField('category', StringType()),
        StructField('clf_category', StringType()),
        StructField('conf', FloatType())
    ])))
])
df = spark.read.json('/home/vbespalov-225110/detect_12.json', 
                     multiLine=True, schema=JSON_SCHEMA)

In [None]:
df.show()

+--------------------+----+---+----+--------------------+---------+-------------+------------+----------+------+-------+--------------------+
|                 _id|   x|  y|   z|       time_detected|object_id|     category|clf_category|      conf|picked|proceed|  repeated_detection|
+--------------------+----+---+----+--------------------+---------+-------------+------------+----------+------+-------+--------------------+
|{609ff700392c23cc...| 615|455|-150|1.6210961914386578E9|        0|  BLUE BOTTLE|        none| 0.6977539| false|   {, }|                  []|
|{609ff700392c23cc...| 810|142|-150|1.6210961914386578E9|        1|       YOGURT|        none| 0.7988281| false|   {, }|                  []|
|{609ff701392c23cc...| 810|495|-150|1.6210961928171592E9|        2|       YOGURT|        none| 0.7836914| false|   {, }|                  []|
|{609ff701392c23cc...| 578|214|-150|1.6210961928171592E9|        3|       YOGURT|        none|0.83203125| false|   {, }|[{1.6210961933068...|
|{609f

In [None]:
df.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- x: integer (nullable = true)
 |-- y: integer (nullable = true)
 |-- z: integer (nullable = true)
 |-- time_detected: string (nullable = true)
 |-- object_id: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- clf_category: string (nullable = true)
 |-- conf: float (nullable = true)
 |-- picked: boolean (nullable = true)
 |-- proceed: struct (nullable = true)
 |    |-- proceed_1: string (nullable = true)
 |    |-- proceed_2: string (nullable = true)
 |-- repeated_detection: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- time_detected: string (nullable = true)
 |    |    |-- x: integer (nullable = true)
 |    |    |-- y: integer (nullable = true)
 |    |    |-- z: integer (nullable = true)
 |    |    |-- category: string (nullable = true)
 |    |    |-- clf_category: string (nullable = true)
 |    |    |-- conf: float (nullable = true)



In [None]:
robo_category = ['TRANSP BOTTLE' , 'BLUE BOTTLE', 'DETERGENT COLOR', 'DETERGENT TRANSPARENT', 
                 'DETERGENT BOX', 'DETERGENT WHITE',
                 'DARK BOTTLE','GREEN BOTTLE','5L BOTTLE','MILK BOTTLE']
cc = df.select('category').distinct().count()

print(cc)

20


In [None]:
df.groupby("category")\
.agg(F.count("object_id").name('count_object'))\
.orderBy('count_object',ascending=False)\
.show(df.count(),False)

+---------------------+------------+
|category             |count_object|
+---------------------+------------+
|CANS                 |166175      |
|BLUE BOTTLE          |108776      |
|TRANSP BOTTLE        |101130      |
|MILK CARDBOARD       |60024       |
|GREEN BOTTLE         |38201       |
|JUICE CARDBOARD      |34560       |
|GLASS TRANSP         |28796       |
|5L BOTTLE            |20671       |
|YOGURT               |16825       |
|DARK BOTTLE          |12906       |
|GLASS DARK           |12892       |
|GLASS GREEN          |10398       |
|MILK BOTTLE          |6600        |
|OIL BOTTLE           |6588        |
|DETERGENT WHITE      |5441        |
|DETERGENT TRANSPARENT|4895        |
|DETERGENT COLOR      |3839        |
|MULTICOLOR BOTTLE    |951         |
|DETERGENT BOX        |441         |
|CANISTER             |149         |
+---------------------+------------+



In [None]:
max_value = df.select(col('x')).agg(F.max("x"))
max_value.show()

+------+
|max(x)|
+------+
|  1150|
+------+



In [None]:
min_value = df.select(col('x')).agg(F.min("x"))
min_value.show()

+------+
|min(x)|
+------+
|   151|
+------+



In [None]:
picked = df.select('category').where('picked = true').count()

object_robo_detect = df.filter(F.col("category").isin(robo_category))\
.where(f'x > {(max_value.take(1)[0][0]+min_value.take(1)[0][0])/2}')\
.count()


In [None]:
picked = df.select('category').where('picked = true').count()

object_robo_detect = df.filter(F.col("category").isin(robo_category))\
.where(f'x > {(max_value.take(1)[0][0]+min_value.take(1)[0][0])/2}')\
.count()
print("Percentage of picked objects %.2f" % (picked/object_robo_detect*100))

Percentage of picked objects 80.67


In [None]:
right_robo_object_count = df.select('x')\
.filter(F.col("category").isin(robo_category))\
.where(f'x > {(max_value.take(1)[0][0]+min_value.take(1)[0][0])/2}')\
.count()
print(right_robo_object_count)

left_robo_object_count = df.select('x')\
.filter(F.col("category").isin(robo_category))\
.where(f'x < {(max_value.take(1)[0][0]+min_value.take(1)[0][0])/2}')\
.count()
print(left_robo_object_count)



88648
214252


Количетсво объектов по группам по каждой неделе (граф)
Количество поднятых объектов по каждой неделе и количество объектов за которыми поехал робот(picked= true)(граф)

Средний порог уверенности (conf) для каждого объекта(гист)
Cреднее количество повторных детектов


In [None]:
print("Difference %.2f" % (left_robo_object_count/right_robo_object_count))

Difference 2.42
