In [1]:
# import findspark and os and let findspark find all the environment variables
import findspark
import os
findspark.init()


# Before you create the SparkSession, you need to add a new environment variable 
# to tell pyspark where the graphframes library is
SUBMIT_ARGS = "--packages graphframes:graphframes:0.7.0-spark2.4-s_2.11 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS

import pyspark
from pyspark.sql import *
from pyspark.sql.functions import udf

# Create the SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("graphx-lab").getOrCreate()

# import additional libraries
from graphframes import *
import hashlib

In [2]:
df = spark.read.load("test/",
                     format="csv", sep="\t", inferSchema="true", header="false")

In [3]:
df.show(10)

+---------+----------------+
|      _c0|             _c1|
+---------+----------------+
|000530.cn|   hd.hstong.com|
|001001.jp|       i1.wp.com|
|001001.jp|       i1.wp.com|
|001001.jp|       i0.wp.com|
|001001.jp|        yelp.com|
|001001.jp|    facebook.com|
|001001.jp|     twitter.com|
|001001.jp|   instagram.com|
|001001.jp|ja.wordpress.org|
|002211.cn|   hd.hstong.com|
+---------+----------------+
only showing top 10 rows



In [8]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)



In [26]:
df.count()

320044

In [4]:
df = df.withColumnRenamed('_c0', 'parentTLD').withColumnRenamed('_c1', 'childTLD').filter("parentTLD is not null and childTLD is not null")

In [5]:
df = df.drop_duplicates()
df.show(5)

+--------------+--------------------+
|     parentTLD|            childTLD|
+--------------+--------------------+
|    alatest.it|       r6.kelkoo.com|
|  aldilife.com|webgate.ec.europa.eu|
|algarserra.com|        google.co.uk|
|     amcham.lv|         twitter.com|
|androidblog.ch|         mobiflip.de|
+--------------+--------------------+
only showing top 5 rows



In [17]:
# aggcodes = df.select("parentTLD","childTLD").rdd.flatMap(lambda x: x).distinct()
print(aggcodes.count())
aggcodes.top(10)

159549


['나로.한국',
 '節約主婦ブログ.jp',
 '立会外分売.jp',
 '海外fx比較.top',
 '朝ドラまとめ速報.com',
 '中秋ＤＭ.jpg',
 '中国模具.mobi',
 'ココマイスターwebカタログ.jp',
 '⑤feedly.com',
 'پوزیشنر.ghorfe24.com']

In [7]:
def hashnode(x):
    return hashlib.sha1(x.encode("UTF-8")).hexdigest()[:8]

hashnode_udf = udf(hashnode)

In [8]:
vertices = aggcodes.map(lambda x: (hashnode(x), x)).toDF(["id","name"])

vertices.show(5)

+--------+-----------------+
|      id|             name|
+--------+-----------------+
|78407e3b|gratiskryssord.no|
|465806fb|      twitter.com|
|06252e37|    instagram.com|
|86850682|       bitpay.com|
|ee665830|       inpress.fr|
+--------+-----------------+
only showing top 5 rows



In [9]:
edges = df.select("parentTLD","childTLD")\
.withColumn("src", hashnode_udf("parentTLD"))\
.withColumn("dst", hashnode_udf("childTLD"))\
.select("src","dst")

edges.show(5)

+--------+--------+
|     src|     dst|
+--------+--------+
|3b12f9b8|149b84dc|
|8d116c83|4cb3ed47|
|f502c192|89c0c16b|
|86ff2a90|465806fb|
|b7581c64|b0b83807|
+--------+--------+
only showing top 5 rows



In [10]:
graph = GraphFrame(vertices, edges)

In [23]:
graph.vertices.show(5)
graph.edges.show(5)

+--------+--------------------+
|      id|                name|
+--------+--------------------+
|0fde1b8b|       wcsbradio.com|
|465806fb|         twitter.com|
|3ab76dbd|    llmu.tt-donau.de|
|f75c4fbc|mixetterempizzeri...|
|9df4f83c|carbonoffsetlist.org|
+--------+--------------------+
only showing top 5 rows

+--------+--------+
|     src|     dst|
+--------+--------+
|3b12f9b8|149b84dc|
|8d116c83|4cb3ed47|
|f502c192|89c0c16b|
|86ff2a90|465806fb|
|b7581c64|b0b83807|
+--------+--------+
only showing top 5 rows



In [22]:
graph.inDegrees.join(vertices, on="id")\
.orderBy("inDegree", ascending=False).show(20, False)
graph.outDegrees.join(vertices, on="id")\
.orderBy("outDegree", ascending=False).show(20, False)

+--------+--------+--------------------+
|id      |inDegree|name                |
+--------+--------+--------------------+
|b7c70898|16105   |facebook.com        |
|465806fb|12063   |twitter.com         |
|d7e222c8|6986    |youtube.com         |
|06252e37|6813    |instagram.com       |
|4fa6d5a6|3118    |googletagmanager.com|
|1b274516|3045    |linkedin.com        |
|9b319411|2361    |plus.google.com     |
|bc565513|2187    |wordpress.org       |
|d84f4904|1907    |pinterest.com       |
|baea954b|1547    |google.com          |
|4894387e|1249    |vk.com              |
|c43b840d|949     |play.google.com     |
|dffd1f8f|769     |itunes.apple.com    |
|855621f6|732     |creativecommons.org |
|ef80a3c5|719     |mc.yandex.ru        |
|50ed7453|654     |goo.gl              |
|903fe3e1|640     |bit.ly              |
|5bff5e76|583     |secure.gravatar.com |
|e05a3223|541     |beian.miit.gov.cn   |
|f18e2083|503     |t.me                |
+--------+--------+--------------------+
only showing top

In [16]:
spark.stop()