In [1]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import FloatType, ArrayType, StructField, StructType
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, array, struct, col, collect_list
from pyspark import SQLContext
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.functions import lit
from pyspark.sql import Row

In [2]:
spark = SparkSession.builder.master("local[*]").appName("testing_html").getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)

In [4]:
!echo $SPARK_HOME

/Users/asingh/Packages/spark-2.3.2-bin-hadoop2.7


In [16]:
data = spark.read.json("/Users/asingh/html_output/91472513/sample.json").repartition("breadcrumbs")

In [17]:
data.show()

+----------------+---+--------------------+--------------------+--------------------+------+--------------------+
|     breadcrumbs| id|           image_url|                name|   similar_items_aip|   sku|                  ts|
+----------------+---+--------------------+--------------------+--------------------+------+--------------------+
|      activewear|220|//img1.jockeyimg....|Jockey&reg; Twist...|[[activewear, 121...| 15351| 1.554962901749173E9|
|sleep-and-lounge|130|//img1.jockeyimg....|Baby Jockey&reg; ...|[[sleep-and-loung...|100780|1.5549629017472472E9|
|       shapewear| 90|//img1.jockeyimg....|Jockey&reg; Slimm...|[[shapewear, 88, ...|  4137|  1.55496290174595E9|
|        t-shirts| 30|//img1.jockeyimg....|Jockey&reg; Tall ...|[[t-shirts, 489, ...|  9988| 1.554962901744106E9|
|      sportswear|210|//img1.jockeyimg....|Jockey&reg; Perfo...|[[sportswear, 211...| 21100| 1.554962901748533E9|
|       underwear| 10|//img1.jockeyimg....|Jockey Sport&reg;...|[[underwear, 11, ...|  8

In [15]:
index_rdd = data.groupBy("breadcrumbs").agg(F.collect_list("id").alias("list_ids"))

In [24]:
data.where(data.id == 220).show()

+-----------+---+--------------------+--------------------+--------------------+-----+-------------------+
|breadcrumbs| id|           image_url|                name|   similar_items_aip|  sku|                 ts|
+-----------+---+--------------------+--------------------+--------------------+-----+-------------------+
| activewear|220|//img1.jockeyimg....|Jockey&reg; Twist...|[[activewear, 121...|15351|1.554962901749173E9|
+-----------+---+--------------------+--------------------+--------------------+-----+-------------------+



In [12]:
index_rdd.take(2)

[Row(breadcrumbs='activewear', list_ids=[220]),
 Row(breadcrumbs='sleep-and-lounge', list_ids=[130])]

In [13]:
temp = index_rdd.map(lambda row: row.asDict()).collect()

In [14]:
temp

[{'breadcrumbs': 'activewear', 'list_ids': [220]},
 {'breadcrumbs': 'sleep-and-lounge', 'list_ids': [130]},
 {'breadcrumbs': 'shapewear', 'list_ids': [90]},
 {'breadcrumbs': 't-shirts', 'list_ids': [30]},
 {'breadcrumbs': 'sportswear', 'list_ids': [210]},
 {'breadcrumbs': 'underwear',
  'list_ids': [10, 20, 50, 60, 200, 230, 240, 360]},
 {'breadcrumbs': 'bras', 'list_ids': [100, 330]}]

In [42]:
data.rdd.getNumPartitions(), data.rdd.getStorageLevel()

(200, StorageLevel(False, False, False, False, 1))

In [43]:
data.rdd.count()

15

In [44]:
data_rdd = data.rdd

In [61]:
import math, os

In [79]:
def func1(domain, app_info, iterator):
    print("domain ", domain, app_info)
    max_items_row_count = 5
    total_count = 15
    total_items = list(iterator)
    if not total_items:
        return
    breadcrumb = total_items[0]["breadcrumbs"]
    
    f = open(os.path.join("/tmp", breadcrumb+".html"), "w")
    htmlstr = u"""<html><body><table> 
            <style>table, th, td {{border: 1px solid black;}} </style>
            <tr> 
            <th> Product Name </th>
            <th align="center" colspan="{0}"> {1} </th> 
            </tr>
            """.format(max_items_row_count, "similar_items_v2")
    for product_data in total_items:
        image_url = "https://" + product_data["image_url"]
        pid = product_data["id"]
        output_list = product_data["similar_items_aip"][0:total_count]
        row_count = int(math.ceil(len(output_list) / float(max_items_row_count)))
        for n in range(row_count):
            htmlstr += u"<tr>"
            if n == 0:
                htmlstr += u"""<td rowspan="{0}" align="center" >
                           <img height="200" src="{1}"/>
                           <br/>
                           {2}
                           </td>""".format(row_count, image_url, str(pid) + ":" + product_data["name"])

            for si in output_list[max_items_row_count * n: max_items_row_count * n + max_items_row_count]:

                htmlstr += u"""<td align="center" >
                           <img height="200" src="{0}"/>
                           <br/>
                           {1}
                           </td>""".format("https://" + si["image_url"], str(si["id"]) + ":" + si["name"])

            htmlstr += u"</tr>"
    htmlstr += u"</table></body></html>"
    print("htmlstr ", htmlstr[0:100])
    f.write(htmlstr)

In [80]:
from functools import partial

In [81]:
app_info = {"one":1, "two":2}

In [82]:
data_rdd.foreachPartition(partial(func1, domain, app_info))

In [76]:
domain = "123232"

In [83]:
import copy

In [86]:
copy.deepcopy(app_info)

{'one': 1, 'two': 2}

In [87]:
dict(z=12
)

{'z': 12}

In [93]:
temp = data_rdd.map(lambda row: row.asDict())

In [94]:
temp.collect()

[{'breadcrumbs': 'activewear',
  'id': 220,
  'image_url': '//img1.jockeyimg.com/pi/J-015351-3461-OMF-228.jpg',
  'name': 'Jockey&reg; Twist Back Tank',
  'similar_items_aip': [Row(breadcrumbs='activewear', id='1219', image_url='//img1.jockeyimg.com/pi/J-015553-4783-OMF-228.jpg', name='Jockey&reg; Convertible Tie-Back Tee', prob='0.20367', sku='15553'),
   Row(breadcrumbs='activewear', id='696', image_url='//img1.jockeyimg.com/pi/J-015430-0896-OMF-228.jpg', name='Jockey&reg; Tee with Back Seams', prob='0.20027', sku='15430'),
   Row(breadcrumbs='activewear', id='781', image_url='//img1.jockeyimg.com/pi/J-015507-0001-OMF-228.jpg', name='Jockey&reg; Long Sleeve Performance Tee', prob='0.19696', sku='15507'),
   Row(breadcrumbs='underwear', id='1025', image_url='//img1.jockeyimg.com/pi/J-011110-4763-LDF-228.jpg', name='Jockey&reg; Ribbed Tank 4-Pack', prob='0.19353', sku='11110'),
   Row(breadcrumbs='shapewear', id='124', image_url='//img1.jockeyimg.com/pi/J-004016-1260-OMF-228.jpg', name

In [90]:
temp[0]["breadcrumbs"]

'activewear'

In [91]:
type(temp[0])

pyspark.sql.types.Row