# Query 6
For all the products in group DVD, if a user is at the amazon.com page for the bestselling product and from then on is only using the links under “Customers who bought this item also bought” list to view other items, how many items (clicks) later will they reach the worst selling product? What was the path taken? Assume each edge between products persists regardless of date. List the items in the order in which they were clicked. [NOTE: salesrank of each product is calculated based on how many times it was sold. A product climbs up the rank as it’s sales increases. Therefore, lower the salesrank number higher it is in the order and number of sales]

In [0]:
from pyspark.sql.functions import asc, desc, col
products = spark.table('products')
graphs = spark.table("graphs")

dvd_product_query = '''
SELECT products.product_id AS id, products.asin, products.title, products.salesrank, products.group_id
FROM products 
JOIN groups
ON products.group_id = groups.group_id 
WHERE groups.group_name = 'DVD'
'''

dvd_products = spark.sql(dvd_product_query)
dvd_products = dvd_products.withColumnRenamed("product_id", "id")

display(dvd_products.orderBy(desc("salesrank")))
display(dvd_products.orderBy(asc("salesrank")))



id,asin,title,salesrank,group_id
pid_546148,asin_B00005QJHY,Jack Frost & Others,71583,gid_130
pid_544806,asin_B00005MKK6,Allied & Axis Bombers,71501,gid_130
pid_544804,asin_B00005MKK5,Allied & Axis Fighters,71491,gid_130
pid_443582,asin_B00005KFWG,Vol. 3-Georgey,71282,gid_130
pid_175350,asin_B00005KJP6,Live at Wakefield Jazz,71051,gid_130
pid_542721,asin_B00005TQ4G,Pieces & Bits,71006,gid_130
pid_280393,asin_B000055XML,New Barbarians,70994,gid_130
pid_22358,asin_B00005JL4F,Rad,70917,gid_130
pid_260731,asin_B00005NKBX,Fury,70790,gid_130
pid_543769,asin_B00005RDS2,On the Road with Luke: Christmas Edition,70774,gid_130


id,asin,title,salesrank,group_id
pid_100995,asin_B00005QJJY,Indian Wars,-1,gid_130
pid_274738,asin_B00005O06G,Aftershock,-1,gid_130
pid_548547,asin_B000059TOC,The Drifter,0,gid_130
pid_548548,asin_B00006JBIX,The House Of Morecock,0,gid_130
pid_548550,asin_B00008DDST,"1, 2, 3 Soleils: Taha, Khaled, Faudel",0,gid_130
pid_193107,asin_B00003CX5P,"Star Wars - Episode I, The Phantom Menace (Widescreen Edition)",28,gid_130
pid_137401,asin_B00006CXSS,Band of Brothers,47,gid_130
pid_104775,asin_B00001QEE7,The Little Mermaid (Limited Issue),49,gid_130
pid_547782,asin_B00000JS62,The Wizard of Oz,55,gid_130
pid_136839,asin_B00005LC1H,Fawlty Towers - The Complete Collection,85,gid_130


In [0]:
from pyspark.sql.functions import col, when
graphs = graphs.withColumnRenamed("from_id", "src")
graphs = graphs.withColumnRenamed("to_id", "dst")
def get_neighbors(graph, product_id):
    neighbors = graph.filter((col("src") == product_id) | (col("dst") == product_id))
    neighbors = neighbors.withColumn('neighbor',
        when(col("src") == product_id, col('dst')) \
        .otherwise(col('src'))
    )
    neighbors = neighbors.select('neighbor').rdd.flatMap(lambda x: x).collect()
    return neighbors


product_id = "pid_175350"
neighbors = get_neighbors(graphs, product_id)
print(neighbors)


product_id = "pid_193107"
neighbors = get_neighbors(graphs, product_id)
print(neighbors)



['pid_119435', 'pid_65808', 'pid_78772', 'pid_119432', 'pid_119434', 'pid_119435', 'pid_193178', 'pid_318100', 'pid_334589', 'pid_380475', 'pid_146894', 'pid_25506', 'pid_97222', 'pid_103981', 'pid_175348', 'pid_175349', 'pid_180806', 'pid_186668', 'pid_205916', 'pid_227661', 'pid_318100', 'pid_180806', 'pid_205916', 'pid_206859', 'pid_227661', 'pid_293756', 'pid_302859', 'pid_106271', 'pid_1609', 'pid_3159', 'pid_3755', 'pid_10336', 'pid_15914', 'pid_44863', 'pid_70694', 'pid_106271', 'pid_211958', 'pid_214567', 'pid_142108', 'pid_8346', 'pid_9808', 'pid_20655', 'pid_25023', 'pid_82407', 'pid_104760', 'pid_125465', 'pid_141259', 'pid_142108', 'pid_97222']
['pid_192992', 'pid_104842', 'pid_156656', 'pid_167546', 'pid_167547', 'pid_192992', 'pid_32', 'pid_13554', 'pid_120155', 'pid_181168', 'pid_205618', 'pid_205619', 'pid_205621', 'pid_207625', 'pid_207625', 'pid_149928', 'pid_66175', 'pid_66176', 'pid_149928', 'pid_92042', 'pid_92043', 'pid_193106', 'pid_2063', 'pid_4487', 'pid_9138',

In [0]:
from graphframes import GraphFrame
graphs = spark.table("graphs")
vertices = graphs.select('from_id').withColumnRenamed('from_id', 'id') \
    .union(graphs.select('to_id').withColumnRenamed('to_id', 'id')).distinct()
edges = graphs.select('from_id', 'to_id').withColumnRenamed('from_id', 'src').withColumnRenamed('to_id', 'dst')
all_gf = GraphFrame(vertices, edges)
path = all_gf.bfs(fromExpr="id='pid_193107'", toExpr="id='pid_175350'")
path.show()



+------------+--------------------+------------+--------------------+-----------+--------------------+------------+--------------------+------------+
|        from|                  e0|          v1|                  e1|         v2|                  e2|          v3|                  e3|          to|
+------------+--------------------+------------+--------------------+-----------+--------------------+------------+--------------------+------------+
|{pid_193107}|{pid_193107, pid_...|{pid_104842}|{pid_104842, pid_...|{pid_80071}|{pid_80071, pid_1...|{pid_180806}|{pid_180806, pid_...|{pid_175350}|
+------------+--------------------+------------+--------------------+-----------+--------------------+------------+--------------------+------------+

