In [1]:
import findspark
findspark.init()
import numpy as np 
import pyspark 
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from datetime import datetime
from pyspark.sql.functions import *
import json
import random
import requests
import base64
from urllib.request import urlopen
from bson import ObjectId

In [2]:
HDFS_ITEMS = "hdfs://worker-198:8020/user/fujinet/items/"
HDFS_IMAGE = "hdfs://worker-198:8020/user/fujinet/images/"
HDFS_ARCHIVE = "hdfs://worker-198:8020/user/fujinet/archive/device_data/"
CHECK_POINT = "hdfs://worker-198:8020/user/fujinet/checkpoint"
SEAWEEDFS_IMAGE = 'http://worker-198:9888/images/'
# SEAWEEDFS_IMAGE = 'http://192.168.137.57:9888/images/'
JAR_WORKING = "jars/*"
MONGODB_URI = "mongodb://pickdeal:Abc12345@worker-198:27117"# ?authSource=admin&retryWrites=true&w=majority
DBNAME = "pickdeal"
# CHECK_POINT = "/home/fujinet/Desktop/huy-ndh/PickDeal/checkpoint"
KAFKA_SERVER = "worker-198:9092"
KAFKA_TOPIC = "items"
SPARK_LOGS = "/home/fujinet/Documents/huy-ndh/PickDeal/spark.log"
AI_ENPOINTS = "http://worker-198:6006/api/v1/pickdeal"
LOGO_CLASSIFICATION = f"{AI_ENPOINTS }/logo-classification"
INFO_NER = f"{AI_ENPOINTS }/ner-extraction"
SERVER_ENPOINTS = "http://worker-198:4001/brands"

In [3]:
import logging
 
logging.basicConfig(filename=SPARK_LOGS,
                    filemode='a',
                    format='%(asctime)s %(message)s',
                    level=logging.INFO)

logging.info("Running Spark")

logger = logging.getLogger('urbanGUI')

In [4]:
spark = SparkSession \
    .builder \
    .appName("Streaming Process Files") \
    .master("local[*]") \
    .config("spark.sql.adaptive.enabled", True)\
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .config("spark.sql.streaming.schemaInference", True) \
    .config('spark.driver.extraClassPath', JAR_WORKING) \
    .getOrCreate()

#     .config('spark.mongodb.write.convertJson', "object_Or_Array_Only") \
#     .config('spark.mongodb.write.convertJson', "object_Or_Array_Only") \
#     .config('spark.driver.extraClassPath', JAR_WORKING) \
#     .config('spark.jars.packages', 'org.mongodb:mongo-java-driver:3.10.2') \
#     .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1') \

23/10/05 10:22:14 WARN Utils: Your hostname, worker-198 resolves to a loopback address: 127.0.1.1; using 172.16.1.198 instead (on interface enp6s0)
23/10/05 10:22:14 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/05 10:22:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
spark

In [6]:
# streaming_df = spark.readStream\
#     .format("json") \
#     .option("cleanSource", "archive") \
#     .option("sourceArchiveDir", HDFS_ARCHIVE) \
#     .option("maxFilesPerTrigger", 1) \
#     .load(HDFS_ITEMS)

# schema_data = spark.read\
#     .format("json") \
#     .load(HDFS_ITEMS)

# schema = schema_data.schema
schema = StructType([ 
    StructField("crawl_results",ArrayType(StructType([
        StructField("is_required",BooleanType(),True),
        StructField("key",StringType(),True),
        StructField("type",StringType(),True),
        StructField("value",StringType(),True)
    ])),True), 
    StructField("crawl_status_id",StringType(),True), 
    StructField("host",StringType(),True), 
    StructField("image_urls", ArrayType(StringType()), True),
    StructField("images",ArrayType(StructType([
        StructField("checksum",StringType(),True),
        StructField("path",StringType(),True),
        StructField("status",StringType(),True),
        StructField("url",StringType(),True)
    ])),True), 
    StructField("template_id",StringType(),True), 
    StructField("timestamp", StringType(), True),
    StructField("url", StringType(), True),
    StructField("img_seaweed", StringType(), True),
    StructField("img_seaweed_path", StringType(), True)
  ])


streaming_df = spark.readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", KAFKA_SERVER) \
      .option("subscribe", KAFKA_TOPIC) \
      .option("startingOffsets", "latest") \
      .load()

streaming_df = streaming_df.select(from_json(col("value").cast("string"), schema).alias("data"), col("timestamp").alias("timestamp_kafka"))
streaming_df = streaming_df.select("data.*", "timestamp_kafka")

# streaming_df.writeStream \
#             .format("console")\
#             .outputMode("append")\
#             .start()\
#             .awaitTermination()

In [7]:
schema_ner = StructType([ 
    StructField("date",ArrayType(StringType()),True),
    StructField("brand",StructType([
        StructField("ner",ArrayType(StringType()),True),
        StructField("logo",StructType([
            StructField("content",StringType(),True),
            StructField("accuracy",FloatType(),True)
        ]),True)
    ]),True), 
    StructField("location",ArrayType(StringType()),True)
  ])

schema_id = StructType([ 
    StructField("oid",StringType(),True),
  ])

def get_url_image (images):
    images = r"""{}""".format(images)
    imgs = json.loads(images)
    
    if len(imgs) > 0:
        file_name = imgs[0]['path'].split('/')
        return SEAWEEDFS_IMAGE + file_name[len(file_name) - 1]

    return ''

def get_description (crawl): 
    crawl = r"""{}""".format(crawl)
    items = json.loads(crawl)
    
    for item in items:
        if item['key'] == 'description':
            return item['value']
    return ''

def get_title (crawl): 
    crawl = r"""{}""".format(crawl)
    items = json.loads(crawl)
    
    for item in items:
        if item['key'] == 'title':
            return item['value']
    return ''

# def get_brand (host):
#     domain = urlparse(host).netloc

#     return domain

def get_brand_image (url_img):
    logo = { 
        "content": "", 
        "accuracy": 0
    }
    try:
        img = urlopen(url_img)
        data_to_upload=[]
        data_to_upload.append(("file",  img))
        res = requests.post(LOGO_CLASSIFICATION, files=data_to_upload)
        
        if (res.status_code == 200):
            result = json.loads(res.text)["result"]
            brand = "Unknow"
            max = 0
            for e in result:
                if e["prob"] >= max:
                    max = e["prob"]
                    brand = e["trade_mask"]

            if brand == "Unknow":
                return logo
            else:
                brand = brand.replace("_", " ").title()
                return { "content": brand, "accuracy": max}
        else:
            return logo
    except:
        return logo

def get_info_description (url_img, text):
    original_array = []
    logo = get_brand_image(url_img)
    ner = {    
        "date": [],
        "brand": {
            "logo": logo,
            "ner": []
        },
        "location": []
    }   
    try:
        data = {"text_data": text}
        res = requests.post(INFO_NER, data=data)
        if (res.status_code == 200):
            result = json.loads(json.loads(res.text)["result"])
            if isinstance(result, dict):
                ner["brand"]["ner"] = [i.title() for i in result["trademark"]] 
            else:
                if len(result) > 0:
                    result_json = result[0]
                    ner["date"] = result_json["date"]
                    ner["brand"]["ner"] = [i.title() for i in result_json["trademark"]] 
                    ner["location"] = result_json["location"]
        return json.dumps(ner)                           
    except:
        return json.dumps(ner)
    
def get_brand (info_extraction):
    logo = info_extraction.brand.logo.content
    ner = info_extraction.brand.ner
    
    if len(logo) > 0:
        return logo
    else:
        if len(ner) > 0:
            return ner[0]
    
    return None

udf_struct_id = udf(
    lambda x: tuple((str(x),)), 
    StructType([StructField("$oid",  StringType(), True)])
)

def generate_object_id (id):
    return O

def add_brand (info_extraction):
    brand = get_brand(info_extraction)
    if brand is None:
        brand= "No brand"
        
    try:
        data = {
            "name": brand, 
            "description": brand,
        }

        files = {
            "logo": open('./no-image.png', 'rb')
        }

        res = requests.post(SERVER_ENPOINTS, data=data, files=files)
        id_brand = json.loads(res.text)["id"]
        return id_brand
    except:
        return ''

In [8]:
def batch (udfdata, batchId):
    udfdata.cache()
    batch_df = udfdata
    
    print(f'{datetime.now()} \t batchId:{batchId}; Row:{batch_df.count()}')
    logging.info(f'batchId:{batchId}; Row:{batch_df.count()}')
    
    promotion_df = udfdata
    item_df = udfdata
    
    promotion_df = promotion_df.withColumn("status", lit(0))
    promotion_df = promotion_df.withColumn("createdDate", current_timestamp())
    promotion_df = promotion_df.withColumn('imageList', to_json(promotion_df.images))
#     promotion_df = promotion_df.withColumn("imageUrl", udf(get_url_image)(promotion_df["imageList"]))
    promotion_df = promotion_df.withColumnRenamed("img_seaweed", "imageUrl")
    promotion_df = promotion_df.withColumnRenamed("img_seaweed_path", "imagePath")
#     promotion_df = promotion_df.withColumn("brand", struct( \
#                                                                          udf(get_brand)(promotion_df["host"]).alias("content"), \
#                                                                          lit(random.randrange(50,100)/100).alias("accuracy")))
#     promotion_df = promotion_df.withColumn("type", struct( \
#                                                                          lit("sale").alias("content"), \
#                                                                          lit(random.randrange(50,100)/100).alias("accuracy")))
#     promotion_df = promotion_df.withColumn("startDate", struct( \
#                                                                          to_date(lit(start_date),'yyyy-MM-dd').alias("content"), \
#                                                                          lit(random.randrange(50,100)/100).alias("accuracy")))
#     promotion_df = promotion_df.withColumn("expiryDate", struct( \
#                                                                          to_date(lit(expiry_date),'yyyy-MM-dd').alias("content"), \
#                                                                          lit(random.randrange(50,100)/100).alias("accuracy")))
    promotion_df = promotion_df.withColumnRenamed("timestamp", "timeStamp")
    promotion_df = promotion_df.withColumnRenamed("crawl_status_id", "crawlId")
    promotion_df = promotion_df.withColumn("crawlResults", to_json(promotion_df.crawl_results))
    promotion_df = promotion_df.withColumn("description", udf(get_description)(promotion_df["crawlResults"]))
    promotion_df = promotion_df.withColumn("title", udf(get_title)(promotion_df["crawlResults"]))
    promotion_df = promotion_df.withColumn("ner", udf(get_info_description)(promotion_df["imageUrl"], promotion_df["description"]))
    promotion_df = promotion_df.withColumn("extractInformation", from_json(col("ner"), schema_ner))
    promotion_df = promotion_df.withColumn("brand", udf(add_brand)(promotion_df["extractInformation"]))
#     promotion_df = promotion_df.withColumn("brand", from_json(col("brandId"), schema_id))
    
    promotion_df = promotion_df[['timeStamp', 'crawlId', 'createdDate', 'title', 'description', 'brand', 'imageUrl','imagePath', 'url', 'extractInformation', 'status']]
    
    item_df = item_df.withColumnRenamed("crawl_status_id", "crawlStatusId")
    item_df = item_df.withColumnRenamed("crawl_results", "crawlResults")
    item_df = item_df.withColumnRenamed("template_id", "templateId")
    item_df = item_df.withColumnRenamed("image_urls", "imageUrls")

    item_df = item_df[['crawlStatusId', 'timeStamp', 'host', 'url', 'templateId', 'imageUrls', 'images', 'crawlResults']]

    promotion_df.write \
        .format("mongodb") \
        .option("spark.mongodb.connection.uri", MONGODB_URI) \
        .option("spark.mongodb.database", DBNAME) \
        .option("spark.mongodb.collection", "promotions") \
        .mode("append") \
        .save()
    
    item_df.write \
        .format("mongodb") \
        .option("spark.mongodb.connection.uri", MONGODB_URI) \
        .option("spark.mongodb.database", DBNAME) \
        .option("spark.mongodb.collection", "crawl_item") \
        .mode("append") \
        .save()
    
    udfdata.unpersist()

In [9]:
streaming_df.writeStream \
    .foreachBatch(batch) \
    .start()

#     .option("checkpointLocation", CHECK_POINT) \    .trigger(processingTime='60 seconds') \

23/10/05 10:22:18 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-42b8fdee-587a-4a35-96f1-859326d5fb93. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/10/05 10:22:18 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0x7f9105f8cd30>

In [10]:
# spark.stop()

23/10/05 10:22:19 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


2023-10-05 10:22:20.757798 	 batchId:0; Row:0
