In [1]:
import enum
from http.client import responses
import pandas as pd 
import os

import pyspark.sql.functions as F
from pyspark.sql.types import *
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import Row
import pyarrow.parquet as pq
import pyarrow as pa
import glob
import numpy as np

import sys
sys.path.append("/scratch/venia/web2wiki/code/helpers/")
sys.path.append("/scratch/venia/web2wiki/code/iterative_coding/")

sys.setrecursionlimit(10000)

import re
import time
from bs4 import BeautifulSoup

from settings import WIKI_PAGES_DIR
import helping_functions as hf

os.environ['SPARK_HOME'] = "/home/veselovs/spark-3.2.1-bin-hadoop2.7/"
os.environ['JAVA_HOME'] = "/home/veselovs/jdk-13.0.2/"
spark = SparkSession.builder.getOrCreate()

files = glob.glob(WIKI_PAGES_DIR+"/*")
# files = list(np.random.choice(files, 1000))


### all_links[0].parent.parent.previous_sibling.previous_sibling.text

regex_search = r"(en.wikipedia\.org[\s\/a-zA-ZäöüÄÖÜßþóúí\_\?(\),\,\-\#\&\$\@\!0-9\.\%\–\'\:\!]+)"

def find_neighbouring_links(all_links):
    all_links2 = []
    for link in all_links:
        rand_num = np.random.rand()
        if rand_num > 0.5:
            b = link.find_next(re.compile('^a$'))
        else:
            b = link.find_previous(re.compile('^a$'))
        if b:
            all_links2.append(b)
        else:
            pass
    return all_links2
        

def extract_neighbouring_text2(x,text, num_chars = 150):
        """
        Extracts the neighbouring text of the Wikipedia mention
        """
        ind = x.find(text)
        n = len(x)
        m = len(text)

        if ind - num_chars < 0:
            ind_0 = 0
        else: ind_0 = ind - num_chars
        if ind + num_chars > n:
            return x[ind_0:ind] + " [BREAK] " +x[ind:ind+m] + " [BREAK] " + x[ind + m:]
        else: 
            ind_1 = ind + num_chars + 50
            return x[ind_0:ind] + " [BREAK] " +x[ind:ind+m] + " [BREAK] " + x[ind + m: ind_1]


# extract_neighbouring_text = F.udf(extract_neighbouring_text, ArrayType(StringType()))

def nbhd_text(x):
    hypertext = x.get_text()
    text = x.parent.parent.get_text()
    return extract_neighbouring_text2(text, hypertext)
    

def process_headers(x):
    soup = BeautifulSoup(x,features="lxml")
    all_links = soup.find_all('a', {"href": re.compile(regex_search)})
#     all_links2 = find_neighbouring_links(all_links)
    all_links = all_links 

    resp = []
    for elt in all_links:
        resp.append(extract_header(elt))
    return resp

def is_tag(elt,tag,val = None):
    if val == None:
        val = 0
    if elt.name == tag:
        val += 1
    elif (elt.name != "html"):
        val += is_tag(elt.parent, tag,val)
    return val



def extract_header(elt):
    prev_header = elt.find_previous(re.compile('^h[1-6]$'))
    if (prev_header != None):
        if prev_header.parent == elt.parent.parent.parent:
            prev_name = prev_header.text
        else:
            prev_name = "None"
    else:
        prev_name = "None"
        distance = "None"
    return prev_name

# process_headers = F.udf(process_headers, ArrayType(StringType()))

def is_class(elt, key = None, val=None):
#     elt = elt.parent
    keys = class_variables.all_classes[key]
    if val == None:
        val = 0
    pattern = re.compile("|".join(keys))
    if elt.has_attr("class"):
        try:
            if len(pattern.findall(elt["class"][0])) > 0:
                val += 1
            else:
                val += 0
        except:
            val += 0
    if elt.has_attr("id"):
        try: 
            if len(pattern.findall(elt["id"][0])) > 0:
                val += 1
            else:
                val += 0
        except:
            val += 0
    elif (elt.name != "html"):
        val += is_class(elt.parent, key, val)
    return val

def process_all(x: str):
    soup = BeautifulSoup(x,features="lxml")
    all_links = soup.find_all('a', {"href": re.compile(regex_search)})
    classes = []
    headers = []
    tags = []
    nbhd_texts = []
    hrefs = []
    for elt in all_links:
        hrefs.append(elt.get("href"))
        classes.append(process_class(elt))
        tags.append(process_tags(elt))
        nbhd_texts.append(nbhd_text(elt))
        headers.append(extract_header(elt))
    final_list = []
    for link, header, c, t, text in zip(hrefs,headers,classes,tags,nbhd_texts):
        final_list.append([link,header,t,c,text])
    return final_list
process_all_udf = F.udf(
    process_all, ArrayType(
        StructType(
            [StructField("href", StringType()),
             StructField("header", StringType()),
             StructField("tags", ArrayType(StructType([
                 StructField("tag", StringType()),
                 StructField("tag_count",IntegerType())
             ]))),
                StructField("classes", ArrayType(StructType([
                 StructField("class", StringType()),
                 StructField("class_count",IntegerType())
             ]))),
             StructField("nbhd_text",StringType())
            ])))


def process_tags(elt):
    tags = tag_instance_of()
    for tag in tags.tags():
        tags[tag] = tags[tag]+is_tag(elt, tag)
    return tags._list()
# process_tags = F.udf(process_tags, ArrayType(ArrayType(StructType([StructField("tag", StringType()),StructField("count", IntegerType())]))))

    
def process_class(elt):
    class_vals = class_instance_of()
    for c in class_variables.all_classes.keys():
        class_vals[c] = class_vals[c]+is_class(elt, c)
    return class_vals._list()


# process_class = F.udf(process_class, ArrayType(ArrayType(StructType([StructField("class", StringType()),StructField("count", IntegerType())]))))
    
def extract_soup(x):
    soup = BeautifulSoup(x,features="lxml")
    all_links = soup.find_all('a', {"href": re.compile(regex_search)})
#     all_links2 = find_neighbouring_links(all_links)

    k = [k.get("href") for k in all_links]
    return k
# extract_soup = F.udf(extract_soup, ArrayType(StringType()))


def chunks(lst, n):
    """Yield successive n-sized chunks from lst."""
    for i in range(0, len(lst), n):
        yield lst[i:i + n]


22/10/27 21:53:32 WARN Utils: Your hostname, iccluster039 resolves to a loopback address: 127.0.1.1; using 10.90.38.15 instead (on interface ens786f0)
22/10/27 21:53:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/10/27 21:53:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/10/27 21:53:36 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
22/10/27 21:53:41 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/10/27 21:53:41 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
22/10/27 21:5

In [42]:
files = glob.glob("/scratch/venia/web2wiki/data/web_content/iterative_coding_sample/tag_info_55_*")

In [43]:
df = spark.read.load(files)

                                                                                

In [44]:
df.printSchema()

root
 |-- url: string (nullable = true)
 |-- processed: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- href: string (nullable = true)
 |    |    |-- header: string (nullable = true)
 |    |    |-- tags: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- tag: string (nullable = true)
 |    |    |    |    |-- tag_count: integer (nullable = true)
 |    |    |-- classes: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- class: string (nullable = true)
 |    |    |    |    |-- class_count: integer (nullable = true)
 |    |    |-- nbhd_text: string (nullable = true)



In [45]:
df = df.filter(F.size('processed') > 0)


In [60]:
df2 = df.withColumn("element",F.explode("processed"))

In [61]:
# df2.printSchema()

In [62]:
df2 = df2.select("url","element")

In [63]:
df2 = df2.withColumn("href", F.col("element.href"))
df2 = df2.withColumn("header",  F.col("element.header"))
df2 = df2.withColumn("tags",  F.col("element.tags"))
df2 = df2.withColumn("classes", F.col("element.classes"))
df2 = df2.withColumn("nbhd_text", F.col("element.nbhd_text"))

In [66]:
df2=df2.withColumn('tag_footer', F.col('element.tags').getItem(0)).withColumn('tag_footer', F.col("tag_footer.tag_count"))
df2=df2.withColumn('tag_header', F.col('element.tags').getItem(1)).withColumn('tag_header', F.col("tag_header.tag_count"))
df2=df2.withColumn('tag_sup', F.col('element.tags').getItem(2)).withColumn('tag_sup', F.col("tag_sup.tag_count"))



In [67]:
# df2.agg(F.sum("class_sidebar")).collect()

In [68]:
df2=df2.withColumn('class_footer', F.col('element.classes').getItem(0)).withColumn('class_footer', F.col("class_footer.class_count"))
df2=df2.withColumn('class_header', F.col('element.classes').getItem(1)).withColumn('class_header', F.col("class_header.class_count"))
df2=df2.withColumn('class_sidebar', F.col('element.classes').getItem(2)).withColumn('class_sidebar', F.col("class_sidebar.class_count"))
df2=df2.withColumn('class_response', F.col('element.classes').getItem(3)).withColumn('class_response', F.col("class_response.class_count"))



In [70]:
dff2 = df2.drop("element","tags","classes")

In [75]:
dff2.write.parquet("/scratch/venia/web2wiki/data/test/sanitized_wiki.parquet")

                                                                                

In [55]:
# df2.limit(100000).dropDuplicates(subset = ["url"]).select("element.classes").take(1000)

In [None]:
d