In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import Row
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import col, from_unixtime
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer

import numpy as np
import cv2
import matplotlib.pyplot as plt
import time


spark = SparkSession.builder.appName("image_processing").getOrCreate()
spark.conf.set("spark.sql.catalog.myCatalog", "com.datastax.spark.connector.datasource.CassandraCatalog")

In [2]:
img_data_path = "/Users/lavanyamk/Documents/FeaViz/object_data/"
model_label_path = "/Users/lavanyamk/Documents/FeaViz/coco-yolo-v3/coco.names"
model_weights_path = "/Users/lavanyamk/Documents/FeaViz/coco-yolo-v3/yolov3.weights"
model_cfg_path = "/Users/lavanyamk/Documents/FeaViz/coco-yolo-v3/yolov3.cfg"
http_server_ip = "http://localhost:9753"

# db_bundle_path = "/Users/lavanyamk/Downloads/secure-connect-nosql-db.zip"
# user_name = "CgYxKnkrerLwiDCMYlNXdPHa"
# password = "91YcRxuTwnZR-.WB-TJa5HOoFL8oL,AdmQusxObsSK5pIriJM0kFX9oSgr1jj3BakDZ5jOz4AZy0rpOwT76JWUNQWTC8u1an5FmJs-FR1xsczTLxJXFGUZczuGCMJ9aK"

In [3]:
img_features_df = spark.read.format("image")\
    .option("basePath", img_data_path)\
    .option("dropInvalid", True).load("file://" + img_data_path + "/*")

In [4]:
# Opening file, reading, eliminating whitespaces, and splitting by '\n', which in turn creates list
labels = open(model_label_path).read().strip().split('\n')  # list of names


In [5]:
# labels

In [6]:
# Setting minimum probability to eliminate weak predictions
probability_minimum = 0.6

# Setting threshold for non maximum suppression
threshold = 0.5

network = cv2.dnn.readNetFromDarknet(model_cfg_path, model_weights_path)

# Getting names of all layers
layers_names_all = network.getLayerNames()  # list of layers' names

# # Check point
# print(layers_names_all)

# Getting only output layers' names that we need from YOLO algorithm
layers_names_output = [layers_names_all[i[0] - 1] for i in network.getUnconnectedOutLayers()]  # list of layers' names

# Check point
# print(layers_names_output) 

In [7]:
def get_desc(img, labels):
    image = cv2.imread(img.split('//')[-1])
    image_input_shape = image.shape
    network = cv2.dnn.readNetFromDarknet(model_cfg_path, model_weights_path)
    blob = cv2.dnn.blobFromImage(image, 1 / 255.0, (416, 416), swapRB=True, crop=False)
    layers_names_all = network.getLayerNames()
    layers_names_output = [layers_names_all[i[0] - 1] for i in network.getUnconnectedOutLayers()] 
    network.setInput(blob)  # setting blob as input to the network
    output_from_network = network.forward(layers_names_output)
    h, w = image_input_shape[:2]
    bounding_boxes = []
    confidences = []
    class_numbers = []
    class_labels = []
    for result in output_from_network:
        for detection in result:
            scores = detection[5:]
            class_current = np.argmax(scores)
            # Getting confidence (probability) for current object
            confidence_current = scores[class_current]
            # Eliminating weak predictions by minimum probability
            if confidence_current > probability_minimum:
                # Scaling bounding box coordinates to the initial image size
                # YOLO data format keeps center of detected box and its width and height
                # That is why we can just elementwise multiply them to the width and height of the image
                box_current = detection[0:4] * np.array([w, h, w, h])
                # From current box with YOLO format getting top left corner coordinates
                # that are x_min and y_min
                x_center, y_center, box_width, box_height = box_current.astype('int')
                x_min = int(x_center - (box_width / 2))
                y_min = int(y_center - (box_height / 2))
                # Adding results into prepared lists
                bounding_boxes.append([x_min, y_min, int(box_width), int(box_height)])
                confidences.append(float(confidence_current))
                class_numbers.append(int(class_current))
                class_labels.append(labels[class_current])
    return Row(#"bounding_box", 
               "confidence_score",
               "class",
               "class_labels")(#bounding_boxes,
                               confidences,
                               class_numbers, 
                               class_labels)

schema_added = StructType([
#     StructField("bounding_box", ArrayType(ArrayType(IntegerType())), False),
    StructField("confidence_score", ArrayType(FloatType()), False),
    StructField("classes", ArrayType(IntegerType()), False),
    StructField("class_labels", ArrayType(StringType()), False)
])

udf_image = udf(lambda x: get_desc(x, labels), schema_added)


features_df = img_features_df.select("image.origin", "image.height", "image.width", "image.nChannels", "image.mode")\
    .withColumnRenamed("origin", "image_path")
features_df = features_df.withColumn("desc", udf_image("image_path")).select( "*", "desc.*").drop("desc")#.show()
image_features_df = features_df.withColumn("image_view", concat(lit('<img src="'+http_server_ip),
                                             split(col("image_path"), img_data_path).getItem(1), 
                                             lit('"  width="150" height="200">')))

In [21]:

image_features_df.write.mode("append").partitionBy("image_path").saveAsTable("myCatalog.nosql1.image_features_table")

In [23]:
image_features_df.count()

15

In [11]:
# %%time
# sample_df.show(100, False)

In [22]:
# image_features_df.show(10, False)

In [15]:

image_features_df.select("image_view").toPandas().values

array([['<img src="file:///Users/lavanyamk/Documents/FeaViz/object_data/veggies_2.jpeg" alt="local_image" />'],
       ['<img src="file:///Users/lavanyamk/Documents/FeaViz/object_data/broccoli_1.jpeg" alt="local_image" />'],
       ['<img src="file:///Users/lavanyamk/Documents/FeaViz/object_data/apple_2.jpeg" alt="local_image" />'],
       ['<img src="file:///Users/lavanyamk/Documents/FeaViz/object_data/veggies_1.jpeg" alt="local_image" />'],
       ['<img src="file:///Users/lavanyamk/Documents/FeaViz/object_data/berry_1.jpeg" alt="local_image" />'],
       ['<img src="file:///Users/lavanyamk/Documents/FeaViz/object_data/carrot_2.jpg" alt="local_image" />'],
       ['<img src="file:///Users/lavanyamk/Documents/FeaViz/object_data/apple_1.jpeg" alt="local_image" />'],
       ['<img src="file:///Users/lavanyamk/Documents/FeaViz/object_data/orange_2.jpeg" alt="local_image" />'],
       ['<img src="file:///Users/lavanyamk/Documents/FeaViz/object_data/banana-red_2.jpeg" alt="local_image" />'

<img src= "http://localhost:9753/veggies_2.jpeg" width="150" 
     height="200">
<!-- <img src="/images/picture.jpg"> -->

In [36]:
# image_features_df.coalesce(1).write.option(
#     "header",True).mode('overwrite').parquet("file:///Users/lavanyamk/Documents/FeaViz/outputs/image_features")

In [19]:
# tuple([str(m[1]) for m in features_df.dtypes]), features_df.columns

In [20]:
#ref: https://www.datasciencemadesimple.com/count-of-missing-nanna-and-null-values-in-pyspark/
# spark.createDataFrame(features_df.dtypes).show()

# spark.createDataFrame([("data_type")] + [(m[1]) for m in features_df.dtypes], ["summary"] + features_df.columns)
# rdd.toDF(columns).show()

In [51]:

# data

In [57]:
summary_df = create_feature_summary(features_df)
summary_df.show()

+--------------------+------+------+------+-----+---------+--------------+--------------------+------------------+--------------------+-------------+------------------+
|             summary|top_25|top_50|top_75|count|data_type|distinct_count|                 max|              mean|                 min|missing_count|            stddev|
+--------------------+------+------+------+-----+---------+--------------+--------------------+------------------+--------------------+-------------+------------------+
|  product_photos_qty|   1.0|   1.0|   3.0|32341|   double|            19|                20.0|2.1889861166939797|                 1.0|          610|1.7367656379315435|
|product_category_...|      |      |      |32341|   string|            73|       watches_gifts|                  |agro_industry_and...|          610|                  |
|    product_weight_g| 300.0| 700.0|1900.0|32949|   double|          2204|             40425.0|2276.4724877841513|                 0.0|            2| 4282.

In [35]:

cloud_config= {
        'secure_connect_bundle': db_bundle_path
}
auth_provider = PlainTextAuthProvider(user_name, password)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

row = session.execute("select * from nosql1.users_by_city").one()
if row:
    print(row[0])
else:
    print("An error occurred.")

PARIS


In [39]:
print([col for col in summary_df.dtypes])

[('summary', 'string'), ('product_id', 'string'), ('product_category_name', 'string'), ('product_name_lenght', 'string'), ('product_description_lenght', 'string'), ('product_photos_qty', 'string'), ('product_weight_g', 'string'), ('product_length_cm', 'string'), ('product_height_cm', 'string'), ('product_width_cm', 'string')]


In [None]:
session.execute("")

In [28]:
# imporpyspark.sql.catalogca

In [29]:
summary_df.write.format("org.apache.spark.sql.cassandra"
                        ).mode('append'
                              ).options(table="feature_summary", keyspace="nosql1").save()

AnalysisException: Couldn't find nosql1.feature_summary or any similarly named keyspace and table pairs;

In [76]:
features_df.dtypes

[('product_id', 'string'),
 ('product_category_name', 'string'),
 ('product_name_lenght', 'double'),
 ('product_description_lenght', 'double'),
 ('product_photos_qty', 'double'),
 ('product_weight_g', 'double'),
 ('product_length_cm', 'double'),
 ('product_height_cm', 'double'),
 ('product_width_cm', 'double')]

+--------------+------------------+--------------------+--------------------+--------------------+----------------------+--------------------+-----------------------+
|       summary|         review_id|            order_id|        review_score|review_comment_title|review_comment_message|review_creation_date|review_answer_timestamp|
+--------------+------------------+--------------------+--------------------+--------------------+----------------------+--------------------+-----------------------+
|         count|            105188|              102859|              102692|               12176|                 41868|               96025|                  96002|
|          mean|               4.5|                 0.0|   4.071667849964501|3.155449396525236...|  1.111111111111111...|                null|                   null|
|        stddev|0.7071067811865476|                 0.0|   1.386648877434681|5.616554832455847E11|              Infinity|                null|                   null

In [100]:
features_df.agg(countDistinct("product_id").alias("total_count")).show()

+-----------+
|total_count|
+-----------+
|      32951|
+-----------+



In [74]:
describe_features_df = get_transpose_df(features_df.describe(), features_df.columns, "summary")
describe_features_df.show()

+--------------------+-----+--------------------+------------------+--------------------+------------------+
|             summary|count|                 max|              mean|                 min|            stddev|
+--------------------+-----+--------------------+------------------+--------------------+------------------+
|  product_photos_qty|32341|                   9|2.1889861166939797|                   1|1.7367656379315435|
|product_category_...|32341|utilidades_domest...|                  |agro_industria_e_...|                  |
|    product_weight_g|32949|                 998|2276.4724877841513|                   0| 4282.038730977024|
|    product_width_cm|32949|                  98|23.196728277034204|                  10|12.079047453227794|
| product_name_lenght|32341|                   9| 48.47694876472589|                  10|10.245740725237287|
|   product_height_cm|32949|                  99|16.937661234028347|                  10|13.637554061749569|
|   product_length_

In [90]:
@udf(returnType=StringType())
def get_distinct_count(col_name_str, features_df):
    distinct_count = features_df.select(col_name_str).distinct().count()
    return distinct_count 

# get_distinct_count_udf = udf(lambda z: get_distinct_count(z),IntegerType())

In [91]:
features_df.select("product_photos_qty").distinct().count()

20

In [92]:
describe_features_df.select("summary")

DataFrame[summary: string]

In [93]:
describe_features_df.withColumn("type", get_distinct_count_udf(col("summary"), features_df))#.show(truncate=False)

# df.select(col("Seqno"), \
#     convertUDF(col("Name")).alias("Name") ) \
#    .show(truncate=False)

TypeError: Invalid argument, not a string or column: DataFrame[product_id: string, product_category_name: string, product_name_lenght: string, product_description_lenght: string, product_photos_qty: string, product_weight_g: string, product_length_cm: string, product_height_cm: string, product_width_cm: string] of type <class 'pyspark.sql.dataframe.DataFrame'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

In [None]:
# from pyspark.sql.functions import *
# from pyspark.sql import SparkSession

# def main():
#     DataList = [("Shirts", 10, 13, 34, 10), ("Trousers", 11, 2, 30, 20), ("Pants", 70, 43, 24, 60), ("Sweater", 101, 44, 54, 80)]
#     productQtyDF  = spark.createDataFrame(DataList, ["Products", "Small", "Medium", "Large", "ExLarge"])
#     productQtyDF.show()
#     #+--------+-----+------+-----+-------+
#     #|Products|Small|Medium|Large|ExLarge|
#     #+--------+-----+------+-----+-------+
#     #|  Shirts|   10|    13|   34|     10|
#     #|Trousers|   11|     2|   30|     20|
#     #|   Pants|   70|    43|   24|     60|
#     #| Sweater|  101|    44|   54|     80|
#     #+--------+-----+------+-----+-------+

#     productTypeDF = TransposeDF(productQtyDF, ["Small", "Medium", "Large", "ExLarge"], "Products")
#     productTypeDF.show(truncate= False)
#     #+--------+-----+------+-------+--------+
#     #|Products|Pants|Shirts|Sweater|Trousers|
#     #+--------+-----+------+-------+--------+
#     #|Medium  |43   |13    |44     |2       |
#     #|Small   |70   |10    |101    |11      |
#     #|ExLarge |60   |10    |80     |20      |
#     #|Large   |24   |34    |54     |30      |
#     #+--------+-----+------+-------+--------+


# if __name__ == '__main__':
#     main()

### Read Data

In [45]:
df.count()

32952

### Print schema

In [46]:
df.printSchema

<bound method DataFrame.printSchema of DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string]>

In [47]:
df.show(1)

+----------+--------------------+-------------------+--------------------+------------------+----------------+-----------------+-----------------+----------------+
|       _c0|                 _c1|                _c2|                 _c3|               _c4|             _c5|              _c6|              _c7|             _c8|
+----------+--------------------+-------------------+--------------------+------------------+----------------+-----------------+-----------------+----------------+
|product_id|product_category_...|product_name_lenght|product_descripti...|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+----------+--------------------+-------------------+--------------------+------------------+----------------+-----------------+-----------------+----------------+
only showing top 1 row

