In [1]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
import pyspark
import pyspark.sql
from pyspark.sql import *
from pyspark.sql.functions import *
import json
import argparse
%matplotlib inline

conf = pyspark.SparkConf().setMaster("local[*]").setAll([
                                   ('spark.driver.memory','240g'),
                                   ('spark.driver.maxResultSize', '32G'),
                                   ('spark.local.dir', '/scratch/tmp/'),
                                   ('spark.yarn.stagingDir', '/scratch/tmp/'),
                                    ('spark.sql.warehouse.dir', '/scratch/tmp/')
                                  ])


# create the session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
# create the context
sc = spark.sparkContext

In [2]:
spark

In [3]:
sites = ["jawiki", "cswiki", "cawiki", "svwiki", "arwiki", "dewiki", 
             "elwiki", "enwiki", "eswiki", "fawiki", "fiwiki", "frwiki", 
             "hewiki", "idwiki", "itwiki", "kowiki", "nlwiki", "plwiki", 
             "ptwiki", "rowiki", "ruwiki", "sqwiki", "srwiki", "trwiki", 
             "ukwiki", "viwiki", "warwiki", "zhwiki"]

In [4]:
all_links_enriched_rdd = sc.emptyRDD()
for s in sites:
    links = spark.read.parquet("datasets/{}/enriched_all_links.parquet".format(s))
    all_links_enriched_rdd = all_links_enriched_rdd.union(links.selectExpr("'{}' as site".format(s), "*").rdd)
    
all_links_enriched = spark.createDataFrame(all_links_enriched_rdd)
all_links_enriched

DataFrame[site: string, qid: string, links: array<string>]

### Filter

In [5]:
all_links_enriched = all_links_enriched.where("SIZE(links) >= 0")

----

In [6]:
qids_rdd = sc.emptyRDD()
for s in sites:
    qids = spark.read.parquet("datasets/{}/heldout_documents.parquet".format(s))\
            .selectExpr("*", "'{}' as site".format(s))
    qids_rdd = qids_rdd.union(qids.rdd)
    
qids = spark.createDataFrame(qids_rdd)
qids

DataFrame[qid: string, heldout: boolean, site: string]

In [7]:
training_qids = qids.where("heldout = FALSE")
testing_qids = qids.where("heldout = TRUE")

In [15]:
training = all_links_enriched.join(training_qids, (all_links_enriched.qid==training_qids.qid)
                                   & (all_links_enriched.site==training_qids.site))
training_self_loop = spark.createDataFrame(training.rdd.map(lambda r: Row(site=r.site, links=r.links+[r.qid])))
training_self_loop

KeyboardInterrupt: 

In [9]:
testing = all_links_enriched.join(testing_qids, (all_links_enriched.qid==testing_qids.qid) 
                                  & (all_links_enriched.site==testing_qids.site))
testing_self_loop = spark.createDataFrame(testing.rdd.map(lambda r: Row(site=r.site, qid=r.qid, links=r.links+[r.qid])))
testing_self_loop

DataFrame[links: array<string>, qid: string, site: string]

In [10]:
testing_self_loop.write.mode("overwrite").parquet("models/EnrichedLinks/testing_self_loop.parquet")

In [None]:
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.clustering import LDA

wordsVector = CountVectorizer(inputCol="links", outputCol="features")
transformer = wordsVector.fit(training_self_loop)
result = transformer.transform(training_self_loop).cache()

result

In [12]:
result.count()

32639223

In [13]:
transformer.write().overwrite().save("models/EnrichedLinks/transformer.model")

In [16]:
result.selectExpr( "SIZE(links) links_count", "features")\
    .write.mode("overwrite").parquet("models/EnrichedLinks/traning_set.parquet")