In [1]:
# Regular packages
import pandas as pd
import matplotlib.pyplot as plt
import time
import warnings
warnings.filterwarnings("ignore") # might not be the best idea

In [2]:
# pyspark packages 
from pyspark.sql.functions import col, concat, collect_list,struct
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.functions import udf, array, array_distinct, array_min,array_max,array_union, explode
from pyspark.sql.types import IntegerType, DoubleType, ArrayType

from pyspark import SparkContext
from pyspark.sql import Row

## Analysis of RDD 

In [3]:
def file_to_rdd(file):
    """
    This function takes a file name and converts it into an RDD.
    
    Arguments:
    file (str): file name
    
    Returns:
    An RDD containing all information extracted from the file.
    """

    if file[-3:] == "csv" : 
        data = spark.read.format("csv").option("inferSchema", "true")\
                                        .option("delimiter", ',')\
                                        .option("header", 'true')\
                                        .load(file).cache()

        rdd = adj_cache.rdd.map(tuple)
        return rdd
    elif file[-3:] == "txt" : 
        rdd_web = sc.textFile(file) \
                    .map(lambda line: line.split('\t')) \
                    .filter(lambda line: len(line)>1) \
                        .map(lambda line: (line[0],line[1]))

        return rdd_web

In [4]:
sc = SparkContext()

In [5]:
spark = SparkSession.builder.appName('abc').getOrCreate()

In [6]:
#First try with the example of the research paper 

from pyspark import SparkContext

time1 = time.time()

graph = sc.parallelize([(0, 1), (1, 2), (1, 3), (3, 4), (5, 6), (6, 7)])
counter_new_pair = sc.accumulator(1)

def reducer(node):
    minimum = node[0]
    value_List = []
    joined = []
  
    for value in node[1]:
        if value<minimum:
            minimum = value
        value_List.append(value)
    if minimum<node[0]:
        joined.append((node[0],minimum))
        for value in value_List:
            if minimum != value:
                counter_new_pair.add(1)
                joined.append((value, minimum))
    return joined

while counter_new_pair.value > 0:
    counter_new_pair = sc.accumulator(0)

    #CCF-Iterate
    mapping_1 = graph.map(lambda node : (node[0], node[1]))
    mapping_2 = graph.map(lambda node : (node[1], node[0]))
    fusion = mapping_1.union(mapping_2)
    fusion = fusion.groupByKey().map(lambda node : (node[0], list(node[1])))
    joined = fusion.flatMap(lambda node: reducer(node))
    # CCF-Dedup
    graph = joined.map(lambda node : ((node[0], node[1]), None)).groupByKey()
    graph = graph.map(lambda x: (x[0][0], x[0][1]))
    graph.collect()
  
    print("counter: ", counter_new_pair)
  
    graph.collect()
    
time2 = time.time()

print("It took {:.3f} seconds".format(time2-time1))

counter:  4
counter:  9
counter:  4
counter:  0
It took 9.101 seconds


In [7]:
#Let's try with the Google Graph 

web_google = sc.textFile("web-Google 2.txt")\
                    .map(lambda line: line.split('\t')) \
                    .filter(lambda line: len(line)>1) \
                        .map(lambda line: (line[0],line[1]))

graph = web_google

time1 = time.time()

counter_new_pair = sc.accumulator(1)

def reducer(node):
    minimum = node[0]
    value_List = []
    joined = []
  
    for value in node[1]:
        if value<minimum:
            minimum = value
        value_List.append(value)
    if minimum<node[0]:
        joined.append((node[0],minimum))
        for value in value_List:
            if minimum != value:
                counter_new_pair.add(1)
                joined.append((value, minimum))
    return joined

while counter_new_pair.value > 0:
    counter_new_pair = sc.accumulator(0)

    #CCF-Iterate
    mapping_1 = graph.map(lambda node : (node[0], node[1]))
    mapping_2 = graph.map(lambda node : (node[1], node[0]))
    fusion = mapping_1.union(mapping_2)
    fusion = fusion.groupByKey().map(lambda node : (node[0], list(node[1])))
    joined = fusion.flatMap(lambda node: reducer(node))
    # CCF-Dedup
    graph = joined.map(lambda node : ((node[0], node[1]), None)).groupByKey()
    graph = graph.map(lambda x: (x[0][0], x[0][1]))
    graph.collect()
  
    print("counter: ", counter_new_pair)
    
time2 = time.time()

print("It took {:.3f} seconds".format(time2-time1))

Py4JJavaError: An error occurred while calling o276.union.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/Users/hippolyteguigon/Database Management Project/web-Google 2.txt
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:297)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:239)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:325)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:205)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
	at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:55)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
	at org.apache.spark.SparkContext.$anonfun$union$2(SparkContext.scala:1404)
	at org.apache.spark.SparkContext.$anonfun$union$2$adapted(SparkContext.scala:1404)
	at scala.collection.TraversableLike.noneIn$1(TraversableLike.scala:271)
	at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:337)
	at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:249)
	at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
	at scala.collection.TraversableLike.filter(TraversableLike.scala:347)
	at scala.collection.TraversableLike.filter$(TraversableLike.scala:347)
	at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
	at org.apache.spark.SparkContext.$anonfun$union$1(SparkContext.scala:1404)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.SparkContext.withScope(SparkContext.scala:786)
	at org.apache.spark.SparkContext.union(SparkContext.scala:1403)
	at org.apache.spark.SparkContext.$anonfun$union$5(SparkContext.scala:1415)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.SparkContext.withScope(SparkContext.scala:786)
	at org.apache.spark.SparkContext.union(SparkContext.scala:1415)
	at org.apache.spark.rdd.RDD.$anonfun$union$1(RDD.scala:665)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.union(RDD.scala:665)
	at org.apache.spark.api.java.JavaRDD.union(JavaRDD.scala:177)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [None]:
# Comparison according to graph size 

values = [len(web_google.collect())//(10-i) for i in range(1, 10)]

time_ = []

for i in range(len(values)):
    graph = sc.parallelize(web_google.collect()[1:values[i]])
    
    time1 = time.time()
    
    counter_new_pair = sc.accumulator(1)
    
    while counter_new_pair.value > 0:
        
        counter_new_pair = sc.accumulator(0)

        #CCF-Iterate
        mapping_1 = graph.map(lambda node : (node[0], node[1]))
        mapping_2 = graph.map(lambda node : (node[1], node[0]))
        fusion = mapping_1.union(mapping_2)
        fusion = fusion.groupByKey().map(lambda node : (node[0], list(node[1])))
        joined = fusion.flatMap(lambda node: reducer(node))
        # CCF-Dedup
        graph = joined.map(lambda node : ((node[0], node[1]), None)).groupByKey()
        graph = graph.map(lambda x: (x[0][0], x[0][1]))
        graph.collect()
    
    time2 = time.time()
    
    time_.append((i, time2-time1))

In [16]:
# Comparison according to graph size 

time_ = []

graph = sc.parallelize(web_google.collect()[1:5000000])
    
time1 = time.time()
    
counter_new_pair = sc.accumulator(1)
    
while counter_new_pair.value > 0:
        
    counter_new_pair = sc.accumulator(0)

    #CCF-Iterate
    mapping_1 = graph.map(lambda node : (node[0], node[1]))
    mapping_2 = graph.map(lambda node : (node[1], node[0]))
    fusion = mapping_1.union(mapping_2)
    fusion = fusion.groupByKey().map(lambda node : (node[0], list(node[1])))
    joined = fusion.flatMap(lambda node: reducer(node))
    # CCF-Dedup
    graph = joined.map(lambda node : ((node[0], node[1]), None)).groupByKey()
    graph = graph.map(lambda x: (x[0][0], x[0][1]))
    graph.collect()
    
time2 = time.time()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/Users/hippolyteguigon/Database Management Project/web-Google 2.txt
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:297)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:239)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:325)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:205)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
	at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:55)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2267)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor86.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [None]:
plt.figure(figsize=(10, 8))
plt.title("Speed (in seconds) relatively to the % of the graph taken")
plt.plot([time_[i][1] for i in range(len(time_))]);

## Analysis of Dataframes

In [14]:
#First, let's define our function to import and analyze Dataframes

def our_union (x):
    x[1].append(x[0])
    return x[1]

our_union_udf = f.udf(our_union, ArrayType(IntegerType()))
findmin = f.udf(lambda x: min(x), IntegerType())
our_distinct = f.udf(lambda x: list(set(x)), ArrayType(IntegerType()))

def CCF_DEDUP_df(df):
    
    reverseDF = df.select(col("From").alias("To"),col("To").alias("From"))
    df_0 = df.union(reverseDF)
    
    df_1 = df_0.groupBy(col("To")).agg(our_distinct(collect_list(col("From"))).alias('From'))
    df_2 = df_1.withColumn('From', our_union_udf(struct(df_1.To, df_1.From)))\
                    .withColumn('To', findmin("From"))\
                        .withColumn('From', our_distinct('From'))
    
    df_3 = df_2.select( explode(col("From")).alias("To"), col("To").alias("From")).dropDuplicates()
    
    return df_3

def Analyze(df):
    
    reverseDF = df.select(col("From").alias("To"),col("To").alias("From"))
    df_0 = df.union(reverseDF)
      
    size = df_0.distinct().count()/2

    t = time.time()
    
    counter = 0 
    while df.count()!= df.select('To').distinct().count() :
        counter +=1 
        df = CCF_DEDUP_df(df) 
    t = time.time() - t
    
    #Getting the number of groups of connected components
    num_of_groups = len(df.select('From').distinct().collect())
    
    return t, size,num_of_groups, counter,  df

In [None]:
#First, let's define our function to import and analyze Dataframes

def our_union (x):
    x[1].append(x[0])
    return x[1]

our_union_udf = f.udf(our_union, ArrayType(IntegerType()))
findmin = f.udf(lambda x: min(x), IntegerType())
our_distinct = f.udf(lambda x: list(set(x)), ArrayType(IntegerType()))

def CCF_DEDUP_df(df):
    
    reverseDF = df.select(col("From").alias("To"),col("To").alias("From"))
    df_0 = df.union(reverseDF)
    
    df_1 = df_0.groupBy(col("To")).agg(our_distinct(collect_list(col("From"))).alias('From'))
    df_2 = df_1.withColumn('From', our_union_udf(struct(df_1.To, df_1.From)))\
                    .withColumn('To', findmin("From"))\
                        .withColumn('From', our_distinct('From'))
    
    df_3 = df_2.select( explode(col("From")).alias("To"), col("To").alias("From")).dropDuplicates()
    
    return df_3

def Analyze(df):
    
    reverseDF = df.select(col("From").alias("To"),col("To").alias("From"))
    df_0 = df.union(reverseDF)
      
    size = df_0.distinct().count()/2

    t = time.time()
    
    counter = 0 
    while df.count()!= df.select('To').distinct().count() :
        counter +=1 
        df = CCF_DEDUP_df(df) 
    t = time.time() - t
    
    #Getting the number of groups of connected components
    num_of_groups = len(df.select('From').distinct().collect())
    
    return t, size,num_of_groups, counter,  df

In [15]:
Analyze("web-Google 2.txt")

AttributeError: 'str' object has no attribute 'select'

In [31]:
df = sc.textFile("web-Google 2.txt") \
            .map(lambda line: (line.split('\t'))).toDF()\
            .select(col('_1').cast(IntegerType()).alias('To'), col('_2').cast(IntegerType()).alias('From'))

In [61]:
df_test1 = pd.read_csv("web-Google 2.txt", sep="\t")

In [62]:
df_test1.to_csv("web-Google_5000000.csv", header=False, index=False)

In [23]:
names = ["web-Google_500000.csv", "web-Google_1000000.csv", "web-Google_1500000.csv", "web-Google_2000000.csv", "web-Google_2500000.csv", "web-Google_3000000.csv", "web-Google_3500000.csv", "web-Google_4000000.csv", "web-Google_1000000.csv"]

time_df = []

for file in names:
    df = data = spark.read.format("csv").option("inferSchema", "true")\
                                        .option("delimiter", ',')\
                                        .option("header", 'true')\
                                        .load(file).toDF("To","From").cache()
    
    time_df.append(Analyze(df)[0])
    


In [24]:
time_df

[742.6663370132446,
 1173.5141010284424,
 1178.273682832718,
 1244.8001997470856,
 2275.856563806534,
 1476.6470799446106,
 1604.9051089286804,
 1770.9568598270416,
 1165.5193858146667]

In [19]:
web_google.show()

AttributeError: 'PipelinedRDD' object has no attribute 'show'