DSSP16 - polytechnique Data Challenge 11-12 December 2020

Big Graphs & Text Mining with Spark in Python


*Team 3 : Giulio Gallegos, Gueorgui Nedev, Christopher Cala, Salomon Hazan*

# Table of contents <a id="Table of contents">

*   [Abstract](#Abstract)
*   [Introduction](#Introduction)
*   [Related work](#Related_work)
*   [Data set](#Data_set) 

*   [Methodology](#Methodology)\
      -[Data Preparation](#Data_preparation)\
      -[Feature selection](#Feature_selection)\
      -[Graph features](#Graph_features)\
      -[Models](#Models)

*   [Experiments](#Experiments)\
    -[Experimental setup](#Experimental_setup)\
    -[Baselines](#Baselines)\
    -[Evaluation](#Evaluation)

*   [Results & discussion](#Results_&_discussion)\
    -[Sample issue](#Sample_issue)

*   [Conclusion](#Conclusion)
*   [Acknowledgements](#Acknowledgements)
*   [References](#References)
*   [Pyspark code](#Pyspark_code)



# Abstract <a id="Abstract">
Wikipedia pages are linked with origin page text titles, links to destination page titles from other pages, introductory paragraph text for each page, and more features. Directional relationships between pages can be represented in a graph form. Text mining and graph analysis of such deep data sets of unstructured text and graph data including directions and edges can be done on a Spark cluster using Python and Structured Query Language commands to preprocess, clean, analyse, join, and extract meaningful features from the data to train and test supervised machine learning classification models to predict page link presence.
From our two data sets of web page titles, introduction texts, graph of web page origin-destination page title links and a boolean classification of page link presence, we were able to reach over 99% performance as measured in Area Under the Precision Recall Curve with only seven features (three from the text data and four from the graph) and trials with three supervised classification models (ranging from Logistic Regression to Support Vector Classifier to Random Forest Classifier) using ten-fold cross validation with low risk of overfitting.
Performance may be enhanced through further experimentation with other graph methods including StronglyConnectedComponents, Communities via LabelPropagation, PageRank & Graph of Words. Stopword list adjustment to Wikipedia type language corpus rather than the one we used from the reference film review language corpus example could also potentially improve results.
Training and testing the model on new wikipedia datasets and generating more False boolean links could provide further insight for model optimization and utilization.





# Introduction <a id="Introduction">
Starting with unstructured text data, titles and introductory paragraphs of wikipedia pages and semi-structured graph data titles of origin pages, destination pages, and a boolean indicating the presence of a link between the two titles, we aim to process, clean, analyse and extract relevant features from the data and predict links between wikipedia pages by leveraging a cluster of machines running on Apache Spark to parallelize the work. 



# Related work <a id="Related_work">
Seminal works including Porter 1980 suffix stripping stemming algorithm, Page & Brin 1998 Page rank, Newman 2002 community structure, Manning 2008 introduction to information retrieval, Leskovec & Kleinberg from 2010 Kronecker graphs, and Giatsidis & Vazirgiannis from 2011 graph degeneracy, D-cores, and graph of words, provide the methods to convert complex text into simplified roots for further analysis including term frequencies, probabilistic information retrieval, vector space classification, machine learning, and link analysis with graphs.     



# Data set <a id="Data_set">
The unstructured text data "Articles" is in the form of a csv file containing two fields: Title of the Wikipedia page and Intro paragraph of the page.
The semi-structured graph data 'Links' is in the form of a csv file representing he links between pages with three fields: Source (title of the origin page link), Target (title of the destination page link) and True/False (binary 1/0 indicating whether or not a true link exists) 



# Methodology <a id="Methodology">

## Data preparation <a id="Data_preparation">

The unstructured text data underwent preprocessing, including removal of punctuation marks, conversion to lower case text, removal of common stop words, and stemming to bring words down to their roots (cf. Pyspark code lines 189-209, 212-256).





### Punctuation cleaning <a id="Punctuation_cleaning">
Punctuation was removed using regex as done in the text mining lab. We kept the parentheses as they are often used in Wikipedia titles to indicate disambiguation article titles.


### Lower case <a id="Lower_case">
The text case was harmonized to lower case in order faciliate further analyses.  For example, 'Everything' becomes 'everything' 


### Stopwords removal <a id="Stopwords_removal">
Common stopwords appearing largely across the text corpus in a non-distinctive way were removed, using the English language stopwords list provided in the movie review lab. The list merits some second thought as many of its words reflect informally written language that appears in reviews, rather than more formal definition language that would appear in Wikipedia. 


### Stemming <a id="Stemming">
The text data was further reduced to root words through stemming.  We used the Snowball stemmer in English to reduce the words to their word stems.  



## Feature selection  <a id="Feature_selection">


### Text features <a id="Text_features">
The source wikipedia page introduction text word count that was provided in example#2 was not sufficient in terms of feature importance, since it alone was equivalent to a random guess.  

We added two types of additional features built from the text data. \
 \
 
The first feature is the extent to which one finds the target wikipedia page title text (target.title) in the source wikipedia page introduction text (source.intro). This custom defined function ranges between 0 and 1 but is binary most of the time.  Because of this binary behavior, we think that evaluating it with anything more complex than logistic regression may be irrelevant for this feature by itself.  Evaluation with this first feature alone brings the performance up to 95%. (cf. Pyspark code lines 142-187)\
 \


The second set of features derived from text data involved Term Frequency similarity.  We defined the Term Frequency as the sum over all words of the product of word occurences in the two sets of source and target Wikipedia page introductory texts, all divided by a power of the product of the total number of words in both documents. \

 $$\frac{\sum_{w \in Intro1 \cup Intro2} count_1^w count_2^w}{(count_1 count_2)^\alpha}$$   
 \
When $\alpha=1$ it is biased toward short articles, and when $\alpha<0.5$ it is biased toward long articles (cf. Pyspark code lines 212-254).  Evaluation with these two features alone, with $\alpha=1$ and $\alpha=0.5$ brought performance of 91%. 



## Graph features <a id="Graph_features">
We transformed the semi-structured graph dataframe into a graph object by integrating the vertices and edges from the links dataframe into a graphframe using the GraphFrames library in order to extract in degrees and out degrees of each set of nodes, source wikipedia page title and destination page title, as well as to calculate other features such as the ratio degree, strongly connected components and communities.  \
 \
We created structural components for further analysis by counting the number of double and triple links among the titles of source and destination Wikipedia page titles (cf. Pyspark code lines 28-60).  By mapping and reducing the links we were able to create two features, the number of double links and the number of triple links.    


The addition of four graph features (degree ratio of source page, degree ratio of destination page, number of double links, and number of triple links) brought performance to over 99% without adding strongly connected components or communities (cf. Pyspark code lines 63-97).  We think integrating the raw in degrees and out degrees rather than the composite degree ratio may allow for better performance.  



## Models <a id="Models">
Three supervised learning classifier models were used to predict Wikipedia webpage links, Logistic Regression, Support Vector Classifier, and Random Forests.  The choice of these classifiers was motivated by the small number of features.  



# Experiments <a id="Experiments">


## Experimental setup <a id="Experimental_setup">
Features were transformed into a sparse array and the logistic regression model was trained and tested using a ten-fold cross validataion set and hyperparameters were tuned using gridsearch. 
Logistic regression on one feature of wikipedia source page introduction paragraph word count was our baseline.  
Support Vector Classifier was also trained and tested using a ten-fold cross validation set and hyperparameters were tuned using gridsearch.
Finally, Random Forest Classifier was trained and tested using a ten-fold cross validataion set and hyperparameters were tuned using gridsearch  (cf. Pyspark code lines 265-373). 



## Baselines  <a id="Baselines">
We consider the baseline to be the performance achieved with Logistic Regression Classifier on one Wikipedia page text introduction paragraph length feature as described in example2.  
Model performance vs. baseline is outlined in the table below:

**Table 1: Model performance vs baseline**

| Feature set\Model                                          |Logistic Regression Classifier | Support Vector Classifier | Random Forest Classifier
|------------------------------------------------|-----------|---------|----|
| example2.py feature     |    53.165%    |    53.181%    |    68.575%    |
| Team 3 text features    |    96.949%    |    97.105%    |    97.163%    |
| Team 3 graph features     |    97.477%    |    96.118%    |    99.999%    |
| Team 3 text & graph features     |    99.921%    |    99.884%    |    99.999%    |


## Evaluation <a id="Evaluation">

Each model was trained and tested using a ten-fold cross validation set and performance evaluated by Area under Precision Recall Curve. \
According to the results in table 1 above, the Random Forest Classifier performs best with the features we engineered at a performance over 99% vs a baseline of 53%. 



# Results & discussion <a id="Results_&_discussion">

With a very limited set of features, starting with one, we were able to reach over 96% performance. By integrating both features derived from the text data and incrementing with features derived from graph data we were able to extend to near 100% performance, while still remaining under 10 features (seven total), but with a depth of samples.  \

Model selection and hyperparameter tuning also helped expand performance closer to 100% on a limited number of features (seven), yet with a depth of samples.   
 

Our challenges were inherent to having deep sample data distributed on a spark cluster.  Spark cluster submissions were very long and often broke, even with sampling and it would have been perhaps easier to work with smaller sample sets on local machines for more efficient code prototyping and testing.  


We had difficulties with the spark SQL joins at first and came across some disparities in row counts post-join for one of the joins, which although small at under a percentage point of difference, may perhaps be responsible for not reaching 100% performance.  


Another problem inherent to the Spark ecosystem is not having full ability to exploit full python packages & libraries.  As an example, GraphFrames on Spark offers a more limited scope of functions than GraphX that was used in the labs.  


Further experimentation with other graph methods including StronglyConnectedComponents, Comunities via LabelPropagation, PageRank & Graph of Words could yield promising results pending further research into the best ways to deploy these on Spark clusters. 




## Sample issue <a id="Sample_issue">

**To do** : we have to ask or to try a sample again regarding to rdd 

# Conclusion <a id="Conclusion">
The combination of deeply sampled unstructured text data of Wikipedia page titles & introductory paragraphs along with semi-structured graph data on Wikipedia page origin titles and destination titles and boolean presence of links allowed optimal use of a Spark cluster to perform text and graph data preprocessing, cleaning, joining, feature extraction and predictive modeling and ten-fold cross validation to reach excellent performance with under ten features and low risk of overfitting.   

Our feature engineering yielded 7 features tested with three models using ten-fold cross validation to reach over 99% performance as measured in Area Under the Precision Recall Curve.


We could expand performance through further experimentation with other graph methods including StronglyConnectedComponents, Communities via LabelPropagation, PageRank & Graph of Words.  Further optimization could be achieved by stopword list alignment with Wikipedia type language rather than the film review language example.


Training and testing the model on new wikipedia datasets and generating more False boolean links could provide further insight for model optimization and utilization. 



# Final thoughts and further development <a id="Conclusion">
While it was not that obviuos from the very beginning, it so happened that the proposed DataLab problem easily allowed for virtually 100% precision predictions that first hint to ovefitting, although it is not the case. 

The reason for these near-perfect results are two-fold. 

First, the text features that we use, title of the target in the source intro, and then the comparison of term frequencies, would lead to 97% precision. One can suppose that better text-based features could lead to even better results. Given time constraints, we prefered to focus on the graph-based features, which greatly improved the precision. 

Second, it is the graph structure that hints to existing links. The proportion of fake links being about 50%, a random edge has a 50% chance of having a real link, which is many orders of magnitude higher than the probalility of a random Wikipedia page linking to another random Wikipedia page. We have thought about how the problem may change if the data were "fake flooded", i.e. for each couple (p1, p2), we generate an edge of fake link (that is, class = 0).  By introducing imbalanced classes, we may consider stratified k-fold cross-validation rather than standard 10-fold cross-validation to adjust for class imbalance. In this case, of course the graph structure becomes irrelevant, as it would be a complete graph. The methods to tackle such a problem could be A) Consider text features only (hence unrelated to the graph structure) that would probably lead to at least 97% precision. B) consider the graph structure of the existing (class 1) links in the *training* set only with a special model based on the text features only in case source or target is not in the training set. We think that such an aproach could achieve over 99% precision for this more difficult problem.

# Acknowledgements <a id="Acknowledgements">
Special thanks to our teachers Christos Giatsidis, Apostolos N. Papadopoulos, Michalis Vazirgiannis, Johannes Lutzeyer, Changmin Wu, Christos Xypolopoulos, Ioannis Nikolentzos, Abdine Hadi,... who gave us the theoretical and practical pedagogical support we needed to accomplish this task.   



# References <a id="References">
Rousseau, F. and M. Vazirgiannis (2013b). Graph-of-word and TW-IDF: New Approach to Ad Hoc IR. In Proceedings of the 22nd ACM CIKM ’13, pp. 59–68,
Rousseau, F. and M. Vazirgiannis (2015a). Main Core Retention on Graph-of-words for Single-Document Keyword Extraction. In Proceedings of the 37th European Conference on Information Retrieval. ECIR ’15,
Christopher D. Manning, Prabhakar Raghavan and Hinrich Schütze, Introduction to Information Retrieval, Cambridge University Press. 2008. http://www-nlp.stanford.edu/IR-book/
“Indexing by Latent Semantic Analysis”, S.Deerwester, S.Dumais, T.Landauer, G.Fumas, R.Harshman, Journal of the Society for Information Science, 1990
“Mining the Web: Discovering Knowledge from Hypertext Data”, Soumen Chakrabarti
J. Leskovec. Modeling Large Social and Information Networks. Tutorial
at ICML, 2009.
J. McAuley. Data Mining and Predictive Analytics, UCSD, 2015.
D. Easley and J. Kleinberg. Networks, Crowds, and Markets: Reasoning
About a Highly Connected World. Cambridge University Press, 2010.
J. Leskovec, D. Chakrabarti, J. Kleinberg, C. Faloutsos, Z. Ghahramani.
Kronecker Graphs: An approach to modeling networks. JMLR, 2010.
M.E.J. Newman and M. Girvan. Finding and evaluating community structure in
networks. Physical Review E 69(02), 2004.
M.E.J. Newman. Modularity and community structure in networks. PNAS, 103(23),
2006.
S.E. Schaeffer. Graph clustering. Computer Science Review 1(1), 2007.
S. Fortunato. Community detection in graphs. Physics Reports 486 (3-5), 2010.
M. Coscia, F. Giannotti, and D. Pedreschi. A classification for community discovery methods in complex networks. Statistical Analysis and Data Mining 4 (5), 2011.
A. Arenas, J. Duch, A. Fernandez, and S. Gomez. Size reduction of complex networks preserving modularity. New J. Phys., 9(176), 2007.
M. Girvan and M.E.J. Newman. Community structure in social and biological networks. PNAS 99(12), 2002.
U. Brandes, D. Delling, M. Gaertler, R. Gorke, M. Hoefer, Z. Nikoloski, and D. Wagner. On Modularity Clustering. IEEE TKDE 20(2), 2008.
M.E.J. Newman. Fast algorithm for detecting community structure in networks. Phys. Rev. E 69, 2004.
A. Clauset, M.E.J. Newman, and C. Moore. Finding community structure in very large networks. Phys. Rev. E 70, 2004.
C. Giatsidis, D. Thilikos, and M. Vazirgiannis, D-cores: Measuring Collaboration of Directed Graphs Based on Degeneracy. Knowledge and Information Systems Journal, Springer, 2012.
C. Giatsidis, K. Berberich, D. M. Thilikos, M. Vazirgiannis, Visual exploration of collaboration networks based ongraph degeneracy, ACM KDD, 2012.
C. Giatsidis, D. Thilikos, and M. Vazirgiannis, D-cores: Measuring Collaboration of Directed Graphs Based on Degeneracy. IEEE ICDM, 2011,
C. Giatsidis, D. Thilikos, and M. Vazirgiannis, Evaluating Cooperation in Communities with the kCore Structure. ACM/IEEE ASONAM, 2011.
F. D. Malliaros and M. Vazirgiannis, To Stay or Not to Stay: Modeling Engagement Dynamics in Social Graphs. ACM CIKM, 2013.
F. D. Malliaros and M. Vazirgiannis, Clustering and Community Detection in Directed Networks: A Survey. Physics Reports, 533(4), Elsevier, 2013.
F. D. Malliaros and M. Vazirgiannis, Vulnerability Assessment in Social Networks under Cascadebased Node Departures, EPL (Europhysics Letters), 11(6), 2015.
C. Giatsidis, F.D. Malliaros, D. Thilikos, and M. Vazirgiannis, CORECLUSTER: A Degeneracy Based Graph Clustering Framework. AAAI, 2014.
F.D. Malliaros, V. Megalooikonomou and C. Faloutsos. Estimating Robustness in Large Social Graphs. Knowledge and Information Systems (KAIS), Springer, 2015.
M.-E. G. Rossi, F.D. Malliaros, and M. Vazirgiannis, Spread It Good, Spread It Fast: Identification of Influential Nodes in Social Networks. WWW, 2015.
C. Giatsidis, F. D. Malliaros and M. Vazirgiannis, Graph Mining Tools for Community Detection and Evaluation in Social Networks and the Web. WWW, Rome, Italy, 2013.
C. Giatsidis, F. D. Malliaros and M. Vazirgiannis, Advanced graph mining for
community evaluation in social networks and the web, ACM WSDM, Rio de Janeiro,
Brazil, 2013.
C. Giatsidis, F. D. Malliaros and M. Vazirgiannis, Community Detection and Evaluation in Social and Information Networks. WISE, Thessaloniki, Greece, 2014.
F. D. Malliaros, M. Vazirgiannis and A.N. Papadopoulos, Core Decomposition:
Algorithms and Applications, IEEE/ACM ASONAM, Paris, France, 2015.
F. D. Malliaros, A.N. Papadopoulos, Core Decomposition in Graphs: Concepts,
Algorithms and Applications. ICDM, Barcelona, 2016. 
D. Billsus and M. J. Pazzani, “Learning collaborative information
filters”, In Proceedings of the Fifteenth International Conference on
Machine Learning, pages 46-54, 1998
“A Comparative Study of Collaborative Filtering Algorithms”, Joonseok Lee, Mingxuan Sun, Guy Lebanon, http://arxiv.org/pdf/1205.3193.pdf
A. Paterek. Improving regularized singular value decomposition for
collaborative filtering, Statistics, 2007:2{5, 2007.
J. Leskovec, A. Rajaraman and J. Ullman. Mining of Massive Datasets. Cambridge University Press, 2014.
J. Breese, D. Heckerman and C. Kadie. Empirical Analysis of
Predictive Algorithms for Collaborative Filtering. In Proceedings of
the 14th Conference on Uncertainty in Artificial Intelligence, 1998.
D. Billsus and M. J. Pazzani, “Learning collaborative information
filters”, In Proceedings of the Fifteenth International Conference on
Machine Learning, pages 46{54, 1998
“A Comparative Study of Collaborative Filtering Algorithms”, Joonseok Lee, Mingxuan Sun, Guy Lebanon, http://arxiv.org/pdf/1205.3193.pdf
A. Paterek. Improving regularized singular value decomposition for
collaborative filtering, Statistics, 2007:2{5, 2007.
J. Leskovec, A. Rajaraman and J. Ullman. Mining of Massive
Datasets. Cambridge University Press, 2014.
J. Breese, D. Heckerman and C. Kadie. Empirical Analysis of
Predictive Algorithms for Collaborative Filtering. In Proceedings of
the 14th Conference on Uncertainty in Artificial Intelligence, 1998.
Page, Lawrence & Brin, Sergey & Motwani, Rajeev & Winograd, Terry. (1998). The PageRank Citation Ranking: Bringing Order to the Web. 




# Pyspark code <a id="Pyspark_code">

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import lower, col
import numpy as np
from pyspark.ml.linalg import SparseVector, VectorUDT
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import ArrayType
from nltk.stem.snowball import SnowballStemmer
from operator import add
from pyspark.sql.functions import struct
import pyspark.sql.functions as sqlf

spark = SparkSession.builder.appName('example').getOrCreate()
sc = spark.sparkContext

df_articles = spark.read.csv("/dssp/datacamp/articles_text.csv",sep=",").toDF("Title","Intro")
df_links = spark.read.csv("/dssp/datacamp/edges_class.csv",sep=",").toDF("source","target","class")
df_articles = df_articles.withColumn("title_lower",lower(col("Title")))
df_articles.createOrReplaceTempView("text")
df_links.createOrReplaceTempView("links")
print("nodes")
print(df_articles.count())
print("edges")
print(df_links.count())


df_double_links = spark.sql("""Select links.source as source, links2.target as target 
                                from links 
                                left join links as links2 on links.target==links2.source""")


print("length 2 paths")
print(df_double_links.count())
df_double_links.createOrReplaceTempView("double_links")

df_triple_links = spark.sql("""Select links.source as source, double_links.target as target 
                                from links 
                                left join double_links on links.target==double_links.source""")

print("length 3 paths")
print(df_triple_links.count())


df_double_links = df_double_links.rdd.map(lambda (x,y):((x,y),1.0)).reduceByKey(add).map(lambda ((x,y),n) : (x,y,n)).toDF()
#print(df_double_links.count())

df_double_links = df_double_links.select(col('_1').alias('source'), col('_2').alias('target'), col('_3').alias('numof2paths'))

df_triple_links = df_triple_links.rdd.map(lambda (x,y):((x,y),1.0)).reduceByKey(add).map(lambda ((x,y),n) : (x,y,n)).toDF()
#print(df_triple_links.count())

df_triple_links = df_triple_links.select(col('_1').alias('source'), col('_2').alias('target'), col('_3').alias('numof3paths'))

print("reduced length 2 paths")
print(df_double_links.count())
print("reduced length 3 paths")
print(df_triple_links.count())
df_double_links.createOrReplaceTempView("double_links")
df_triple_links.createOrReplaceTempView("triple_links")


# Graph features 
from graphframes import *
from pyspark.sql.functions import desc

#to build a graph we need vertices and edges
v=df_links.rdd.flatMap(lambda x:[[x['source']],[x['target']]]).toDF(["id"])
e=df_links.select(col("source").alias("src"),col("target").alias("dst"))

g = GraphFrame(v, e)
print("edges part of graph")
#print(g.edges.head(5))
print("################")
df_indeg=g.inDegrees
print("in degrees nodes")
print(df_indeg.count())
print("################")
df_outdeg=g.outDegrees
print("out degrees nodes")
print(df_outdeg.count())
print("################")
#calculate degree ratio 
degreeRatio = df_indeg.join(df_outdeg, "id")\
  .selectExpr("id", "double(inDegree)/(double(outDegree)+1) as degreeRatio")
print("degreeRATIO nodes")
print(degreeRatio.count())
print("################")
degreeRatio.createOrReplaceTempView("degreeRatio")


#scc = g.stronglyConnectedComponents(maxIter=3)
#print("strongly connected components")
#print(scc.head(10))
#communities = g.labelPropagation(maxIter=5)
#print("communities")
#print(communities.head(10))

inner_join = spark.sql("""Select links.source as source,links.target as target,
                        COALESCE(double_links.numof2paths,0) as L2_path, COALESCE(triple_links.numof3paths,0) as L3_path,
                        COALESCE(degreeRatio.degreeRatio,0) as degratio_source, 
                        COALESCE(degreeRatio2.degreeRatio,0) as degratio_target, 
                        text1.Intro as txt1,text2.Intro as txt2,links.class as class 
                        from links
                        left join double_links on (double_links.source==links.source and double_links.target==links.target)
                        left join triple_links on (triple_links.source==links.source and triple_links.target==links.target)
                        inner join text as text1 on text1.title_lower==links.source
                        inner join text as text2 on text2.title_lower==links.target
                        left join degreeRatio on degreeRatio.id==links.source 
                        left join degreeRatio as degreeRatio2 on degreeRatio2.id==links.target
                        """)
floatkeys = ["class", "L2_path", "L3_path", "degratio_source", "degratio_target"]


# inner_join = spark.sql("""Select links.source as source,links.target as target,
                        # COALESCE(double_links.numof2paths,0) as L2_path, COALESCE(triple_links.numof3paths,0) as L3_path, 
                        # text1.Intro as txt1,text2.Intro as txt2,links.class as class 
                        # from links
                        # left join double_links on (double_links.source==links.source and double_links.target==links.target)
                        # left join triple_links on (triple_links.source==links.source and triple_links.target==links.target)
                        # inner join text as text1 on text1.title_lower==links.source
                        # inner join text as text2 on text2.title_lower==links.target
                        # """)
# floatkeys = ["class", "L2_path", "L3_path"]

inner_join.head(10)
print("size of innerjoin (should be equal to nb edges)")

print(inner_join.count())

inner_join_distinct = inner_join.dropDuplicates()
println("Distinct count inner_join: "+inner_join_distinct.count())
inner_join_disti.show(false)

#sampling
#inner_join = inner_join.randomSplit([0.05, 0.95], seed=0)
#inner_join = inner_join.sample(False,0.1,42)

# first sprint on punctuation
import re

#sum of number of occurences of words in list1 times num_occ in list2, all divided by (numberwordA * numberwordsB)^alpha 
def tf_similarity(list1, list2, alpha):
    tf = 0.0
    l1 = float(len(list1))
    l2 = float(len(list2))
    if l1*l2 == 0:
        return 0
    for w1 in list1:
        for w2 in list2:
            if w1==w2:
                tf+=1
    if tf == 0:
        return 0

    tf = tf / ((l1 * l2)**alpha)	
    return tf		
    

#counts the words in a before parentheses. Counts what part of a is in b 
def b_contains_a(a, b):
    i=0
    maxi = 0
    if len(a)==0:
        return 0.0
    for w in a:
        if w != '(':
            i+=1
        else:
            break
    lena = i
    if lena == 0:
        return 0.0
    
    i=0
    for w in b:
        if i == len(a):
            return 1.0
        if w == a[i]: 
            i+=1
            maxi = i+1    
            if maxi == lena:
                return 1.0
        else:
            i=0
    #return 3.0
    return float(maxi)/float(lena)
    
# clean punctuation & remove stopwords
def clean_punctuation_str(init_string):
    string = init_string
    string = re.sub(r'\(', ' \( ' , string)
    string = re.sub(r'\)', ' \) ', string)
    string = re.sub(r'[^\w\s()]',' ',string)
    string = re.sub(r"\s{2,}", " ", string)
    return string

stopwords = ['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', 'your', 'yours',
             'yourself', 'yourselves', 'he', 'him', 'his', 'himself', 'she', 'her', 'hers',
             'herself', 'it', 'its', 'itself', 'they', 'them', 'their', 'theirs', 'themselves',
             'what', 'which', 'who', 'whom', 'this', 'that', 'these', 'those', 'am', 'is', 'are',
             'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'having', 'do', 'does',
             'did', 'doing', 'a', 'an', 'the', 'and', 'but', 'if', 'or', 'because', 'as', 'until',
             'while', 'of', 'at', 'by', 'for', 'with', 'about', 'against', 'between', 'into',
             'through', 'during', 'before', 'after', 'above', 'below', 'to', 'from', 'up', 'down',
             'in', 'out', 'on', 'off', 'over', 'under', 'again', 'further', 'then', 'once', 'here',
             'there', 'when', 'where', 'why', 'how', 'all', 'any', 'both', 'each', 'few', 'more',
             'most', 'other', 'some', 'such', 'no', 'nor', 'not', 'only', 'own', 'same', 'so',
             'than', 'too', 'very', 's', 't', 'can', 'will', 'just', 'don', 'should', 'now', 'br']


# create features with word count on source
def transform(row):
    data = row.asDict()  # Rows are immutable; We convert them to dictionaries
    for key, value in data.items():
        if not key in floatkeys:
            if data[key] == None: data[key] = ''  # We want o avoid any null issues
            data[key] = clean_punctuation_str(data[key].encode('utf-8'))
            # lowercase and remove stopwords here
            data[key] = data[key].encode('utf-8').lower()
            data[key] = [w for w in data[key].split() if w not in stopwords]
            #stemming
            stemmer = SnowballStemmer("english")
            data[key] = [stemmer.stem(w) for w in data[key]]
        else:
            if data[key] == None: data[key]=0.0
            #if dtaa[key] == Null: data[key]=0.0
    # create a new column with an array of number to be the features
    text1 = set([x for x in data['txt1']])
    alpha = 0.5
    beta = 1.0
    a_in_b = b_contains_a(data["target"], data["txt1"])
    tf1 = tf_similarity(data['txt1'],data['txt2'], alpha)
    tf2 =tf_similarity(data['txt1'],data['txt2'], beta)
    #data["features"] = [len(text1), b_contains_a(data["target"], data["txt1"]), tf_similarity(data["txt1"], data["txt1"], alpha), tf_similarity(data["txt1"], data["txt2"], beta)]  # four features
    #data["features"] = [len(text1), b_contains_a(data["target"], data["txt1"]), tfsim2(data["txt1"], data["txt1"]), tfsim2(data["txt1"], data["txt2"])]  # four features
    #data["features"] = [a_in_b, tf1,tf2,float(data["L2_path"]), float(data["L3_path"]), data["degratio_source"], data["degratio_target"]]  # 7 features
    #data["features"] = [a_in_b, tf1,tf2,float(data["L2_path"]), float(data["L3_path"])]  # 5 features
    data["features"] = [data["txt1"]]
    #data["features"] = [float(data["L2_path"])]  # one feature features
    
    # keep only features and class
    ret = {}
#    ret["txt1"] = data["txt1"]
#    ret["txt2"] = data["txt2"]
#    ret["source"] = data["source"]
#    ret["target"] = data["target"]
    
    ret["features"] = data["features"]
    ret["class"] = float(data["class"])
    # convert the dictionary back to a Row
    newRow = Row(*ret.keys())  # a. the Row object specification (column names)
    newRow = newRow(*ret.values())  # b. the corresponding column values
    return newRow

data=inner_join.rdd.map(transform).toDF()
print("New dataframe with map on RDD")
#print(data.head(20))
data.show(10)

#data.summary().show()

print("################")

#models use sparse arrays

#Lets apply a function directly on the dataframe
#UDF
#applying functions to a DATAFRAME requires the "SQL" logic of
#User Defined Functions (UDF)
#as an example: convert features to sparse array
#1. define what data type your UDF returns VectorUDT : UDF SCHEMA
custom_udf_schema = VectorUDT()
#2. define function
def to_sparse_(v):
        import numpy as np
        if isinstance(v, SparseVector):
            return v
        vs = np.array(v)
        nonzero = np.nonzero(vs)[0]
        return SparseVector(len(v), nonzero, vs[nonzero])
#3. create a udf from that function and the schema
to_sparse = udf(to_sparse_,custom_udf_schema)
#4. apply UDF to DF and create new column

data= data.withColumn('feats',to_sparse(data.features))

print("new sparse array column")
#print(data.head(1))
print("################")


#Build models
# Logistic Regression classifier
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

#Create estimator as logistic regression classifier
logistic=LogisticRegression(featuresCol="feats",labelCol="class",predictionCol='class_pred',rawPredictionCol="class_pred_raw",maxIter=10)

#Set hyperparameters 
paramGrid = ParamGridBuilder().addGrid(logistic.regParam, [0.1, 0.01]).build()

#Create evaluation metric as Area under the Precision Recall Curve
evaluator_auc = BinaryClassificationEvaluator(rawPredictionCol="class_pred_raw",labelCol='class',metricName="areaUnderPR",)

#Define cross validation function with above parameters and #folds to 10
crossval = CrossValidator(estimator=logistic,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator_auc,
                          numFolds=3)
#Fit crossvalidator to dataset
cvModel = crossval.fit(data)

#Average scores of multiple folds on the parameters evaluated
avg_scores=cvModel.avgMetrics

print(avg_scores)

# Support Vector Classifier
from pyspark.ml.classification import LinearSVC
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

#Create estimator as support vector classifier
svm=LinearSVC(featuresCol="feats",labelCol="class",predictionCol='class_pred',rawPredictionCol="class_pred_raw",maxIter=10)

#Set hyperparameters 
paramGrid = ParamGridBuilder().addGrid(svm.regParam, [0.1, 0.01]).build()

#Create evaluation metric as Area under the Precision Recall Curve
evaluator_auc = BinaryClassificationEvaluator(rawPredictionCol="class_pred_raw",labelCol='class',metricName="areaUnderPR",)

#Define cross validation function with above parameters and #folds to 10
crossval = CrossValidator(estimator=svm,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator_auc,
                          numFolds=3)
#Fit crossvalidator to dataset
cvModel = crossval.fit(data)

#Average scores of the multiple folds on the parameters evaluated
avg_scores=cvModel.avgMetrics

print(avg_scores)

#Random Forest
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

#Create estimator as random forest classifier
rf = RandomForestClassifier(featuresCol = 'feats', labelCol = 'class',predictionCol='class_pred',rawPredictionCol="class_pred_raw")

#Set hyperparameters 
paramGrid = ParamGridBuilder().addGrid(RandomForestClassifier.maxDepth, [4, 6, 8]).build()

#Create evaluation metric as Area under the Precision Recall Curve
evaluator_auc = BinaryClassificationEvaluator(rawPredictionCol="class_pred_raw",labelCol='class',metricName="areaUnderPR",)

#Define cross validation function with above parameters and #folds to 10
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator_auc,
                          numFolds=3)
#Fit crossvalidator to dataset
cvModel = crossval.fit(data)

#Average scores of the multiple folds on the parameters evaluated
avg_scores=cvModel.avgMetrics

print(avg_scores)

<


**Document modifications :**

*   ...
*   19 dec. put global doc into ArXiv publication type structure with table of contents, introduction, methodology, results, discussion, conclusion, ...
*   13 dec. **Salomon** :  Layout and adding points 3,4,5 to complete.
*   12 dec. **Gueorgui** : I'm throwing here som preliminary notes on what we have done (and what we will have done) on the project.

19 dec
Gueorgui Nedev is inviting you to a scheduled Zoom meeting.

Topic: Gueorgui Nedev's Personal Meeting Room

Join Zoom Meeting
https://us04web.zoom.us/j/5962690709?pwd=yJ9rjDQ0YPI

Meeting ID: 596 269 0709
Passcode: 0xptj1

In [None]:
#To Use in jupiter to convert into PDF
pip install -U notebook-as-pdf

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import lower, col
import numpy as np
from pyspark.ml.linalg import SparseVector, VectorUDT
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import ArrayType
from nltk.stem.snowball import SnowballStemmer
from operator import add
from pyspark.sql.functions import struct
import pyspark.sql.functions as sqlf

spark = SparkSession.builder.appName('example').getOrCreate()
sc = spark.sparkContext

df_articles = spark.read.csv("/dssp/datacamp/articles_text.csv",sep=",").toDF("Title","Intro")
df_links = spark.read.csv("/dssp/datacamp/edges_class.csv",sep=",").toDF("source","target","class")
df_articles = df_articles.withColumn("title_lower",lower(col("Title")))


print("articles")
df_articles = df_articles.dropDuplicates(['title_lower'])
print(df_articles.count())

print("edges")
df_links = df_links.dropDuplicates(['source', 'target'])
print(df_links.count())

print("real edges")
class1_links = df_links.filter("class = 1")
print(class1_links.count())



df_articles.createOrReplaceTempView("text")
df_links.createOrReplaceTempView("links")

(train,test)=df_links.rdd.randomSplit([0.8,0.2])

train = train.toDF()
class1_train = train.filter("class = 1")
test = test.toDF()
#class1_train.createOrReplaceTempView("links")
train.createOrReplaceTempView("links")

df_double_links = spark.sql("""Select links.source as source, links2.target as target 
                                from links 
                                left join links as links2 on links.target==links2.source""")

# for each A to B in links and B to C in links we make line A to B to C and then keep only A to C


print("length 2 paths")
print(df_double_links.count())
df_double_links.createOrReplaceTempView("double_links")

df_triple_links = spark.sql("""Select links.source as source, double_links.target as target 
                                from links 
                                left join double_links on links.target==double_links.source""")

print("length 3 paths")
print(df_triple_links.count())

df_triple_links.createOrReplaceTempView("triple_links")

df_quadruple_links = spark.sql("""Select links.source as source, triple_links.target as target 
                                from links 
                                left join triple_links on links.target==triple_links.source""")

print("length 4 paths")
print(df_quadruple_links.count())





df_double_links = df_double_links.rdd.map(lambda (x,y):((x,y),1.0)).reduceByKey(add).map(lambda ((x,y),n) : (x,y,n)).toDF()
#print(df_double_links.count())

df_double_links = df_double_links.select(col('_1').alias('source'), col('_2').alias('target'), col('_3').alias('numof2paths'))

df_triple_links = df_triple_links.rdd.map(lambda (x,y):((x,y),1.0)).reduceByKey(add).map(lambda ((x,y),n) : (x,y,n)).toDF()
#print(df_triple_links.count())

df_triple_links = df_triple_links.select(col('_1').alias('source'), col('_2').alias('target'), col('_3').alias('numof3paths'))


df_quadruple_links = df_quadruple_links.rdd.map(lambda (x,y):((x,y),1.0)).reduceByKey(add).map(lambda ((x,y),n) : (x,y,n)).toDF()
#print(df_triple_links.count())

df_quadruple_links = df_quadruple_links.select(col('_1').alias('source'), col('_2').alias('target'), col('_3').alias('numof4paths'))



print("reduced length 2 paths")
print(df_double_links.count())
print("reduced length 3 paths")
print(df_triple_links.count())
print("reduced length 4 paths")
print(df_quadruple_links.count())

df_double_links.createOrReplaceTempView("double_links")
df_triple_links.createOrReplaceTempView("triple_links")
df_quadruple_links.createOrReplaceTempView("quadruple_links")


# Graph features 
from graphframes import *
from pyspark.sql.functions import desc

#to build a graph we need vertices and edges
v=df_links.rdd.flatMap(lambda x:[[x['source']],[x['target']]]).toDF(["id"])
e=train.select(col("source").alias("src"),col("target").alias("dst"))

g = GraphFrame(v, e)
print("edges part of graph")
#print(g.edges.head(5))
print("################")
df_indeg=g.inDegrees
print("in degrees nodes")
print(df_indeg.count())
print("################")
df_outdeg=g.outDegrees
print("out degrees nodes")
print(df_outdeg.count())
print("################")
#calculate degree ratio 
degreeRatio = df_indeg.join(df_outdeg, "id")\
  .selectExpr("id", "double(inDegree)/(double(outDegree)+1) as degreeRatio")
print("degreeRATIO nodes")
print(degreeRatio.count())
print("################")
degreeRatio.createOrReplaceTempView("degreeRatio")
df_indeg.createOrReplaceTempView("indegree")
df_outdeg.createOrReplaceTempView("outdegree")

# degreejoin = spark.sql("""Select indegree.id as id, COALESCE(indegree.inDegree,0) as indeg, COALESCE(outdegree.outDegree,0) as outdeg,
#                         from indegree left join outdegree on indegree.id==outdegree.id
#                         """)


degreejoin = spark.sql("""Select indegree.id as id, COALESCE(indegree.inDegree,0) as indeg, COALESCE(outdegree.outDegree,0) as outdeg
                        from indegree full join outdegree on indegree.id==outdegree.id
                        """)

degreejoin.createOrReplaceTempView("inoutdegrees")

#scc = g.stronglyConnectedComponents(maxIter=3)
#print("strongly connected components")
#print(scc.head(10))
#communities = g.labelPropagation(maxIter=5)
#print("communities")
#print(communities.head(10))

# inner_join = spark.sql("""Select links.source as source,links.target as target,
#                         COALESCE(double_links.numof2paths,0) as L2_path, COALESCE(triple_links.numof3paths,0) as L3_path,
#                         COALESCE(degreeRatio.degreeRatio,0) as degratio_source, 
#                         COALESCE(degreeRatio2.degreeRatio,0) as degratio_target, 
#                         text1.Intro as txt1,text2.Intro as txt2,links.class as class 
#                         from links
#                         left join double_links on (double_links.source==links.source and double_links.target==links.target)
#                         left join triple_links on (triple_links.source==links.source and triple_links.target==links.target)
#                         inner join text as text1 on text1.title_lower==links.source
#                         inner join text as text2 on text2.title_lower==links.target
#                         left join degreeRatio on degreeRatio.id==links.source 
#                         left join degreeRatio as degreeRatio2 on degreeRatio2.id==links.target
#                         """)
# floatkeys = ["class", "L2_path", "L3_path", "degratio_source", "degratio_target"]


# inner_join = spark.sql("""Select links.source as source,links.target as target,
                        # COALESCE(double_links.numof2paths,0) as L2_path, COALESCE(triple_links.numof3paths,0) as L3_path, 
                        # text1.Intro as txt1,text2.Intro as txt2,links.class as class 
                        # from links
                        # left join double_links on (double_links.source==links.source and double_links.target==links.target)
                        # left join triple_links on (triple_links.source==links.source and triple_links.target==links.target)
                        # inner join text as text1 on text1.title_lower==links.source
                        # inner join text as text2 on text2.title_lower==links.target
                        # """)
# floatkeys = ["class", "L2_path", "L3_path"]
train.createOrReplaceTempView("links")
inner_join_train = spark.sql("""Select links.source as source,links.target as target,
                        COALESCE(double_links.numof2paths,0) as L2_path, COALESCE(triple_links.numof3paths,0) as L3_path,
                        COALESCE(quadruple_links.numof4paths,0) as L4_path,
                        COALESCE(inoutdegrees.indeg,0) as indeg_source, 
                        COALESCE(inoutdegrees.outdeg,0) as outdeg_source, 
                        COALESCE(inoutdegrees2.indeg,0) as indeg_target, 
                        COALESCE(inoutdegrees2.outdeg,0) as outdeg_target, 
                        text1.Intro as txt1,text2.Intro as txt2,links.class as class 
                        from links
                        left join double_links on (double_links.source==links.source and double_links.target==links.target)
                        left join triple_links on (triple_links.source==links.source and triple_links.target==links.target)
                        left join quadruple_links on (quadruple_links.source==links.source and quadruple_links.target==links.target)
                        inner join text as text1 on text1.title_lower==links.source
                        inner join text as text2 on text2.title_lower==links.target
                        left join inoutdegrees on inoutdegrees.id==links.source 
                        left join inoutdegrees as inoutdegrees2 on inoutdegrees2.id==links.target
                        """)
test.createOrReplaceTempView("links")
inner_join_test = spark.sql("""Select links.source as source,links.target as target,
                        COALESCE(double_links.numof2paths,0) as L2_path, COALESCE(triple_links.numof3paths,0) as L3_path,
                        COALESCE(quadruple_links.numof4paths,0) as L4_path,
                        COALESCE(inoutdegrees.indeg,0) as indeg_source, 
                        COALESCE(inoutdegrees.outdeg,0) as outdeg_source, 
                        COALESCE(inoutdegrees2.indeg,0) as indeg_target, 
                        COALESCE(inoutdegrees2.outdeg,0) as outdeg_target, 
                        text1.Intro as txt1,text2.Intro as txt2,links.class as class 
                        from links
                        left join double_links on (double_links.source==links.source and double_links.target==links.target)
                        left join triple_links on (triple_links.source==links.source and triple_links.target==links.target)
                        left join quadruple_links on (quadruple_links.source==links.source and quadruple_links.target==links.target)
                        inner join text as text1 on text1.title_lower==links.source
                        inner join text as text2 on text2.title_lower==links.target
                        left join inoutdegrees on inoutdegrees.id==links.source 
                        left join inoutdegrees as inoutdegrees2 on inoutdegrees2.id==links.target
                        """)

floatkeys = ["class", "L2_path", "L3_path", "L4_path", "indeg_source", "indeg_target", "outdeg_source", "outdeg_target"]


inner_join_train.head(10)
print("size of innerjoin (should be equal to nb edges)")

print(inner_join_train.count())

# inner_join = inner_join.dropDuplicates(["source", "target"])
# print("Distinct count inner_join: ")
# print(inner_join.count())


#sampling
#inner_join = inner_join.randomSplit([0.05, 0.95], seed=0)
#inner_join = inner_join.sample(False,0.1,42)

# first sprint on punctuation
import re

#sum of number of occurences of words in list1 times num_occ in list2, all divided by (numberwordA * numberwordsB)^alpha 
def tf_similarity(list1, list2, alpha):
    tf = 0.0
    l1 = float(len(list1))
    l2 = float(len(list2))
    if l1*l2 == 0:
        return 0
    for w1 in list1:
        for w2 in list2:
            if w1==w2:
                tf+=1
    if tf == 0:
        return 0

    tf = tf / ((l1 * l2)**alpha)	
    return tf		
    

#counts the words in a before parantheses. Counts what part of a is in b 
def b_contains_a(a, b):
    i=0
    maxi = 0
    if len(a)==0:
        return 0.0
    for w in a:
        if w != '(':
            i+=1
        else:
            break
    lena = i
    if lena == 0:
        return 0.0
    
    i=0
    for w in b:
        if i == len(a):
            return 1.0
        if w == a[i]: 
            i+=1
            maxi = i+1    
            if maxi == lena:
                return 1.0
        else:
            i=0
    #return 3.0
    return float(maxi)/float(lena)
    
# clean punctunation
def clean_punctuation_str(init_string):
    string = init_string
    string = re.sub(r'\(', ' \( ' , string)
    string = re.sub(r'\)', ' \) ', string)
    string = re.sub(r'[^\w\s()]',' ',string)
    string = re.sub(r"\s{2,}", " ", string)
    return string

stopwords = ['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', 'your', 'yours',
             'yourself', 'yourselves', 'he', 'him', 'his', 'himself', 'she', 'her', 'hers',
             'herself', 'it', 'its', 'itself', 'they', 'them', 'their', 'theirs', 'themselves',
             'what', 'which', 'who', 'whom', 'this', 'that', 'these', 'those', 'am', 'is', 'are',
             'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'having', 'do', 'does',
             'did', 'doing', 'a', 'an', 'the', 'and', 'but', 'if', 'or', 'because', 'as', 'until',
             'while', 'of', 'at', 'by', 'for', 'with', 'about', 'against', 'between', 'into',
             'through', 'during', 'before', 'after', 'above', 'below', 'to', 'from', 'up', 'down',
             'in', 'out', 'on', 'off', 'over', 'under', 'again', 'further', 'then', 'once', 'here',
             'there', 'when', 'where', 'why', 'how', 'all', 'any', 'both', 'each', 'few', 'more',
             'most', 'other', 'some', 'such', 'no', 'nor', 'not', 'only', 'own', 'same', 'so',
             'than', 'too', 'very', 's', 't', 'can', 'will', 'just', 'don', 'should', 'now', 'br']


# create features with word count on source
def transform(row):
    data = row.asDict()  # Rows are immutable; We convert them to dictionaries
    for key, value in data.items():
        if not key in floatkeys:
            if data[key] == None: data[key] = ''  # We want o avoid any null issues
            data[key] = clean_punctuation_str(data[key].encode('utf-8'))
            # lowercase and remove stopwords here
            data[key] = data[key].encode('utf-8').lower()
            data[key] = [w for w in data[key].split() if w not in stopwords]
            #stemming
            stemmer = SnowballStemmer("english")
            data[key] = [stemmer.stem(w) for w in data[key]]
        else:
            if data[key] == None: data[key]=0.0
            #if dtaa[key] == Null: data[key]=0.0
    # create a new column with an array of number to be the features
    text1 = set([x for x in data['txt1']])
    alpha = 0.5
    beta = 1.0
    a_in_b = b_contains_a(data["target"], data["txt1"])
    tf1 = tf_similarity(data['txt1'],data['txt2'], alpha)
    tf2 =tf_similarity(data['txt1'],data['txt2'], beta)
    #data["features"] = [len(text1), b_contains_a(data["target"], data["txt1"]), tf_similarity(data["txt1"], data["txt1"], alpha), tf_similarity(data["txt1"], data["txt2"], beta)]  # four features
    #data["features"] = [len(text1), b_contains_a(data["target"], data["txt1"]), tfsim2(data["txt1"], data["txt1"]), tfsim2(data["txt1"], data["txt2"])]  # four features
    #data["features"] = [a_in_b, tf1,tf2,float(data["L2_path"]), float(data["L3_path"]), data["degratio_source"], data["degratio_target"]]  # 7 features
    # data["features"] = [a_in_b, tf1,tf2,float(data["L2_path"]), float(data["L3_path"])]  # 5 features
    #data["features"] = [float(data["L2_path"]), float(data["L3_path"])]  # 2 features
    #data["features"] = [data["degratio_source"], data["degratio_target"]]  # 2 features
    #data["features"] = [float(data["L2_path"])]  # one feature features
    data["features"] = [a_in_b, tf1,tf2,float(data["L2_path"]), float(data["L3_path"]),float(data["L4_path"]), float(data["indeg_source"]),
            float(data["outdeg_source"]),float(data["indeg_target"]),float(data["outdeg_target"])]  # 9 features
    # data["features"] = [float(data["indeg_source"]),
    #        float(data["outdeg_source"]),float(data["indeg_target"]),float(data["outdeg_target"])]  # 4 features
    #data["features"] = [a_in_b, tf1,tf2]  # text features only
    # data["features"] = [float(data["L2_path"]), float(data["L3_path"]), float(data["indeg_source"]),
    #        float(data["outdeg_source"]),float(data["indeg_target"]),float(data["outdeg_target"])]  # graph features
    # data["features"] = [float(data["L2_path"])]  # 1 graph feature
    #data["features"] = [a_in_b, tf1,tf2,float(data["L2_path"]), float(data["L3_path"])]  # 5 features

    # keep only features and class
    ret = {}
#    ret["txt1"] = data["txt1"]
#    ret["txt2"] = data["txt2"]
#    ret["source"] = data["source"]
#    ret["target"] = data["target"]
    
    ret["features"] = data["features"]
    ret["class"] = float(data["class"])
    # convert the dictionary back to a Row
    newRow = Row(*ret.keys())  # a. the Row object specification (column names)
    newRow = newRow(*ret.values())  # b. the corresponding column values
    return newRow

train=inner_join_train.rdd.map(transform).toDF()
test = inner_join_test.rdd.map(transform).toDF()
print("New dataframe with map on RDD")
#print(data.head(20))
train.show(10)

#data.summary().show()

print("################")

#models use sparse arrays

#Lets apply a function directly on the dataframe
#UDF
#applying functions to a DATAFRAME requires the "SQL" logic of
#User Defined Functions (UDF)
#as an example: convert features to sparse array
#1. define what data type your UDF returns VectorUDT : UDF SCHEMA
custom_udf_schema = VectorUDT()
#2. define function
def to_sparse_(v):
        import numpy as np
        if isinstance(v, SparseVector):
            return v
        vs = np.array(v)
        nonzero = np.nonzero(vs)[0]
        return SparseVector(len(v), nonzero, vs[nonzero])
#3. create a udf from that function and the schema
to_sparse = udf(to_sparse_,custom_udf_schema)
#4. apply UDF to DF and create new column

#data= data.withColumn('feats',to_sparse(data.features))
train= train.withColumn('feats',to_sparse(train.features))
test= test.withColumn('feats',to_sparse(test.features))

print("new sparse array column")
#print(data.head(1))
print("################")







#Build a model on our data
# Logistic Regression
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


#(train,test)=data.rdd.randomSplit([0.8,0.2])



# we only have 10 iterations for this example
# pay attention to the input/output columns




logistic=LogisticRegression(featuresCol="feats",labelCol="class",predictionCol='class_pred',rawPredictionCol="class_pred_raw",maxIter=100)
lrModel = logistic.fit(train)
result=lrModel.transform(test)
print("prediction output")
print(result.head(1))
print("################")


#perform evaluation

from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="class_pred_raw",labelCol='class',metricName="areaUnderPR",)
print("evaluation")
print(evaluator.evaluate(result))
print("################")

from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

rf = RandomForestClassifier(featuresCol = 'feats', labelCol = 'class',predictionCol='class_pred',rawPredictionCol="class_pred_raw")

rfModel = rf.fit(train)
result=rfModel.transform(test)
print("prediction output")
print(result.head(1))
print("################")


#perform evaluation

from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="class_pred_raw",labelCol='class',metricName="areaUnderPR",)
print("evaluation")
print(evaluator.evaluate(result))
print("################")

