In [1]:
import os
os.environ["PYSPARK_PYTHON"] = "/Users/seanwu/opt/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/Users/seanwu/opt/anaconda3/bin/python"
# os.environ["AWS_ACCESS_KEY_ID"] = ""
# os.environ["AWS_SECRET_ACCESS_KEY"] = ""

from pyspark.sql import SparkSession
import json
import tldextract # extracts TLDs

spark = SparkSession \
    .builder \
    .appName("Python Spark basic example") \
    .master("local[*]") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.3,graphframes:graphframes:0.3.0-spark2.0-s_2.11")\
    .getOrCreate()

### Set Java to version 8

In [2]:
!export JAVA_HOME=$(/usr/libexec/java_home -v 1.8)

### (Make sure that it is version 8

In [3]:
!java -version

java version "1.8.0_251"
Java(TM) SE Runtime Environment (build 1.8.0_251-b08)
Java HotSpot(TM) 64-Bit Server VM (build 25.251-b08, mixed mode)


In [4]:
def get_json(line):
    ''' turns everything into json '''
    try:
        line = line.lower()
        return json.loads(line)
    except:
        pass

In [5]:
def get_target_TLD(x):
    ''' extracts target URL '''
    try:
        sub, do, suffix = tldextract.extract(x['envelope']['warc-header-metadata']['warc-target-uri'])
        return '.'.join([do, suffix])
        #return sub+'.'+do+'.'+suffix
    except:
        return None

In [6]:
def get_links(x):
    ''' Extracts only the linked TLDs from a specific website links '''
    try:
        s = set()
        for dics in x['envelope']["payload-metadata"]\
            ["http-response-metadata"]\
            ["html-metadata"]\
            ["links"]:
            if dics["url"]:
                if dics["url"][:4] == "http":
                    sub, do, suffix = tldextract.extract(dics["url"])
                    s.add('.'.join([do, suffix]))
        return s
    except:
        pass

In [7]:
def prevent_selfLoop(x,y):
    ''' removes itself preventing self loop'''
    if x in y:
        y.remove(x)
    return x, y

In [8]:
def hash_key_and_values(x):
    ''' takes in (domain,{set}) and hash them into ids'''
    key = hash(x[0])
    values = []
    for i in x[1]:
        values.append(hash(i))
    return (key, values)

### Read in text file

In [9]:
# distText = spark.sparkContext.textFile("s3a://commoncrawl/crawl-data/CC-MAIN-2016-07/segments/*/wat/*.gz")
distText = spark.sparkContext.textFile('/February 2016/crawl-data/CC-MAIN-2016-07/segments/*/wat/*.gz')

### get vertices

In [10]:
verticesRDD = distText.map(lambda x: get_json(x))\
    .filter(lambda x: x != None)\
    .map(lambda x:get_target_TLD(x))\
    .filter(lambda x: x!= None)\
    .map(lambda x:(hash(x), x))\
    .distinct()

In [11]:
#verticesRDD.take(5)

### get edges

In [12]:
edgesRDD = distText.map(lambda x: get_json(x))\
    .filter(lambda x: x != None)\
    .map(lambda x: (get_target_TLD(x), get_links(x)))\
    .filter(lambda x: x[0] != None and x[1] != None)\
    .map(lambda x:prevent_selfLoop(x[0],x[1]))\
    .reduceByKey(lambda x, y: x.union(y))\
    .map(lambda x: hash_key_and_values(x))\
    .flatMapValues(lambda x: x)

In [13]:
#edgesRDD.take(5)

### read into dataframe

In [14]:
vertices_dataFrame = spark.createDataFrame(verticesRDD,['id','name'])
edgesRDD_dataFrame = spark.createDataFrame(edgesRDD,["src", "dst"])

In [15]:
edgesRDD_dataFrame.head(5)

[Row(src=4285572903547207920, dst=2739784602837965824),
 Row(src=4285572903547207920, dst=-6967053938570686463),
 Row(src=4285572903547207920, dst=-5791541011073193466),
 Row(src=4285572903547207920, dst=6761084252930425355),
 Row(src=4285572903547207920, dst=-8708776065850436596)]

In [16]:
from graphframes import *

In [17]:
g = GraphFrame(vertices_dataFrame, edgesRDD_dataFrame)

In [18]:
g.edges.show()

+-------------------+--------------------+
|                src|                 dst|
+-------------------+--------------------+
|4285572903547207920| 2739784602837965824|
|4285572903547207920|-6967053938570686463|
|4285572903547207920|-5791541011073193466|
|4285572903547207920| 6761084252930425355|
|4285572903547207920|-8708776065850436596|
|4285572903547207920|-1214350095715070963|
|4285572903547207920|-7808670595516599281|
|4285572903547207920| 6165036195414574097|
|4285572903547207920|-5985818500628700653|
|4285572903547207920|-8881742504770768876|
|4285572903547207920| 4576300366550306839|
|4285572903547207920|-8754969770135597544|
|4285572903547207920|-4180227107228633062|
|4285572903547207920| 2507482377650386462|
|4285572903547207920| 3954257185637245470|
|4285572903547207920|-8362541349642173921|
|4285572903547207920| 6562974860942138402|
|4285572903547207920| 4469509359167677987|
|4285572903547207920|-2523948590802034648|
|4285572903547207920| 3290824996227573803|
+----------

In [19]:
result = g.labelPropagation(maxIter=5) # if didn't use Java 8, will error.

In [20]:
#result.take(5)

In [21]:
LPA_result = result.rdd.map(tuple).map(lambda x: (x[2], [x[1]]))\
    .reduceByKey(lambda x,y: x + y)

In [22]:
try:
    LPA_result.saveAsTextFile("LPA_results")
except:
    print("File already exist.")