In [0]:
import pyspark
from pyspark.sql import Window
from pyspark.sql.functions import *
#from pyspark import SparkContext as sc
import time

In [0]:
def iterate_map_df(df):
    newRow = df.select("val", "key")
    df1 = df.union(newRow)
    return df1

In [0]:
def iterate_reduce_df(df1):
    window = Window.orderBy("key","val").partitionBy("key")
    df_min = df1.withColumn("min", min("val").over(window))

    new_column_1 = expr( """IF(min > key, Null, IF(min = val, key, val))""")
    new_column_2 = expr("""IF(min > key, Null, min)""")
    new_df = (df_min
      .withColumn("new_key", new_column_1)
      .withColumn("new_val", new_column_2)) \
    .na.drop() \
    .select(col("new_key").alias("key"), col("new_val").alias("val")) \
    .sort("val", "key") 
        
    df2 = new_df.distinct()
    
    return df2, df_min

In [0]:
def algo(df):
    counter = 1
    iteration = 0
    while counter!=0:
        iteration +=1
        df1 = iterate_map_df(df)
        df1.cache()
        df.unpersist()
        df, df_counter = iterate_reduce_df(df1)
        df.cache()
        df1.unpersist()
        df_counter = df_counter.withColumn("counter_col", expr("""IF(min > key, 0, IF(min = val, 0, 1))"""))
        counter = df_counter.select(sum("counter_col")).collect()[0][0]
    return(df)

In [0]:
graph = sc.parallelize([(1,2), (1,3), (4,5), (5,6), (4,6)])
graph.collect()

Out[5]: [(1, 2), (1, 3), (4, 5), (5, 6), (4, 6)]

In [0]:
df = spark.createDataFrame(graph,["key","val"])
df.show()

+---+---+
|key|val|
+---+---+
|  1|  2|
|  1|  3|
|  4|  5|
|  5|  6|
|  4|  6|
+---+---+



In [0]:
df_processed = algo(df)

In [0]:
df_processed.show()

+---+---+
|key|val|
+---+---+
|  6|  4|
|  2|  1|
|  3|  1|
|  5|  4|
+---+---+



Let's apply the algorithm to larger dataframes.

In [0]:
def preprocessing_google(graph):
    graph = graph.zipWithIndex().filter(lambda tup: tup[1] > 3).map(lambda tup: tup[0])
    graph = graph.map(lambda x: x.split('\t'))
    return graph

def preprocessing_twitter(graph):
    graph = graph.map(lambda x : (x.split(" ")[0],x.split(" ")[1]))
    return graph

**Facebook**

In [0]:
graph = sc.textFile("/FileStore/tables/facebook_combined.txt")
graph = preprocessing_twitter(graph)
df = spark.createDataFrame(graph,["key","val"])
df.show()

+---+---+
|key|val|
+---+---+
|  0|  1|
|  0|  2|
|  0|  3|
|  0|  4|
|  0|  5|
|  0|  6|
|  0|  7|
|  0|  8|
|  0|  9|
|  0| 10|
|  0| 11|
|  0| 12|
|  0| 13|
|  0| 14|
|  0| 15|
|  0| 16|
|  0| 17|
|  0| 18|
|  0| 19|
|  0| 20|
+---+---+
only showing top 20 rows



In [0]:
df_processed = algo(df)

In [0]:
df_processed.show()

+----+---+
| key|val|
+----+---+
|3819|  0|
|2046|  0|
| 656|  0|
|3624|  0|
|3172|  0|
|3114|  0|
|1639|  0|
|  26|  0|
|1828|  0|
|2653|  0|
|1289|  0|
|1818|  0|
|2388|  0|
|2364|  0|
|3937|  0|
|3441|  0|
|1135|  0|
|2821|  0|
|2086|  0|
|1172|  0|
+----+---+
only showing top 20 rows



**Twitter**

In [0]:
graph = sc.textFile("/FileStore/tables/twitter_combined.txt")
graph = preprocessing_twitter(graph)
graph.collect()

Out[8]: [('214328887', '34428380'),
 ('17116707', '28465635'),
 ('380580781', '18996905'),
 ('221036078', '153460275'),
 ('107830991', '17868918'),
 ('151338729', '222261763'),
 ('19705747', '34428380'),
 ('222261763', '88323281'),
 ('19933035', '149538028'),
 ('158419434', '17434613'),
 ('149538028', '153226312'),
 ('364971269', '153226312'),
 ('100581193', '279787626'),
 ('113058991', '69592091'),
 ('151338729', '187773078'),
 ('406628822', '262802533'),
 ('460282402', '88323281'),
 ('280935165', '437804658'),
 ('222261763', '27633075'),
 ('285312927', '151338729'),
 ('279787626', '131613362'),
 ('158419434', '17675120'),
 ('394263193', '100581193'),
 ('254839786', '88323281'),
 ('204317520', '21548772'),
 ('67864340', '172883064'),
 ('270449528', '297801196'),
 ('153226312', '187773078'),
 ('67864340', '8088112'),
 ('153226312', '17116707'),
 ('394263193', '14925700'),
 ('124528830', '307458983'),
 ('204317520', '160237722'),
 ('220368467', '54228724'),
 ('206923844', '103598216'),


In [0]:
df = spark.createDataFrame(graph,["key","val"])

In [0]:
df_processed = algo(df)

In [0]:
df_processed.show()

+---------+---------+
|      key|      val|
+---------+---------+
| 59910141|100004373|
| 98952413|100004373|
|126467468|100004373|
|262388975|100004373|
| 26548846|100004373|
|271263441|100004373|
| 27974947|100004373|
| 21651598|100004373|
| 22563769|100004373|
| 28220400|100004373|
|  5921472|100004373|
| 26030282|100004373|
| 68002922|100004373|
|531074501|100004373|
| 78419512|100004373|
| 12864352|100004373|
| 20405185|100004373|
| 58637862|100004373|
|396278480|100004373|
| 15676629|100004373|
+---------+---------+
only showing top 20 rows



**Google**

In [0]:
graph = sc.textFile("/FileStore/tables/web_Google.txt")
graph = preprocessing_google(graph)
graph.collect()

Out[9]: [['0', '11342'],
 ['0', '824020'],
 ['0', '867923'],
 ['0', '891835'],
 ['11342', '0'],
 ['11342', '27469'],
 ['11342', '38716'],
 ['11342', '309564'],
 ['11342', '322178'],
 ['11342', '387543'],
 ['11342', '427436'],
 ['11342', '538214'],
 ['11342', '638706'],
 ['11342', '645018'],
 ['11342', '835220'],
 ['11342', '856657'],
 ['11342', '867923'],
 ['11342', '891835'],
 ['824020', '0'],
 ['824020', '91807'],
 ['824020', '322178'],
 ['824020', '387543'],
 ['824020', '417728'],
 ['824020', '438493'],
 ['824020', '500627'],
 ['824020', '535748'],
 ['824020', '695578'],
 ['824020', '867923'],
 ['824020', '891835'],
 ['867923', '0'],
 ['867923', '11342'],
 ['867923', '136593'],
 ['867923', '414038'],
 ['867923', '500627'],
 ['867923', '523684'],
 ['867923', '760842'],
 ['867923', '815602'],
 ['867923', '835220'],
 ['867923', '846213'],
 ['867923', '857527'],
 ['867923', '891835'],
 ['891835', '0'],
 ['891835', '11342'],
 ['891835', '112028'],
 ['891835', '235849'],
 ['891835', '3022

In [0]:
df = spark.createDataFrame(graph,["key","val"])

In [0]:
df_processed = algo(df)

In [0]:
df_processed.show()

+------+---+
|   key|val|
+------+---+
|138004|  0|
|174023|  0|
|229137|  0|
|274863|  0|
|299823|  0|
|312903|  0|
|320473|  0|
|333013|  0|
|351084|  0|
|391605|  0|
|416451|  0|
|560160|  0|
|563495|  0|
| 58358|  0|
|600198|  0|
|603231|  0|
|693898|  0|
|696867|  0|
| 70078|  0|
|732436|  0|
+------+---+
only showing top 20 rows

