# Idéation sur les features

1. Cosine similarity between titles / abstracts
2. Graphe des citations dirigé 
    > weight of existing citations
    > clustering dans le graphe ?
3. "Pagerank" de chaque auteur
4. Ecart entre les années
5. Shortest path

# Initialisation

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--master local[7]  --packages graphframes:graphframes:0.8.2-spark3.1-s_2.12  pyspark-shell'

import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession

import graphframes


In [2]:
ss = SparkSession.builder.appName('challenge').getOrCreate()
sc = ss.sparkContext

dft = ss.read.csv("data/node_information.csv")
df_nodes = dft.toDF("id", "year", "topics", "authors", "temp1", "text")
df_nodes.createOrReplaceTempView("nodes")

dft = ss.read.option("delimiter", " ").csv("data/training_set.txt")
df_edges = dft.toDF("source", "target", "cites")
df_edges.createOrReplaceTempView("edges") 

22/04/08 19:15:58 WARN Utils: Your hostname, Dominique resolves to a loopback address: 127.0.1.1; using 172.22.77.94 instead (on interface eth0)
22/04/08 19:15:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/flo/.ivy2/cache
The jars for the packages stored in: /home/flo/.ivy2/jars
graphframes#graphframes added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-6568f1f5-d395-4895-88ba-efbae8b2d43a;1.0
	confs: [default]
	found graphframes#graphframes;0.8.2-spark3.1-s_2.12 in spark-packages
	found org.slf4j#slf4j-api;1.7.16 in central
:: resolution report :: resolve 220ms :: artifacts dl 7ms
	:: modules in use:
	graphframes#graphframes;0.8.2-spark3.1-s_2.12 from spark-packages in [default]
	org.slf4j#slf4j-api;1.7.16 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	---------------------------------------

AnalysisException: Path does not exist: file:/home/flo/dssp/15-challenge/node_information.csv

# Graphe des articles

In [18]:
df_nodes.head(1)
df_edges.head(1)

[Row(source='9510123', target='9502114', cites='1')]

In [125]:
from pyspark.sql.functions import lower, col

# transformation des dataframes pour qu'ils soient utilisables par GraphFrames
v=df_edges.rdd.flatMap(lambda x:[[x['source']],[x['target']]]).toDF(["id"]).distinct()
e=df_edges.select( col("source").alias("src"),col("target").alias("dst"), col('cites').alias('cites') )

# crétion du graphe des articles
articles_graph = graphframes.GraphFrame(v ,e)
in_degrees = articles_graph.inDegrees

In [170]:
def in_degrees_feature(edges, in_degrees):

    return edges.join(in_degrees, edges.dst == in_degrees.id).drop(in_degrees.id)

In [172]:
zou = in_degrees_feature(e, in_degrees=in_degrees)
zou.show()

+-------+-------+-----+--------+
|    src|    dst|cites|inDegree|
+-------+-------+-----+--------+
|9510123|9502114|    1|      20|
|9707075|9604178|    1|     132|
|9312155|9506142|    0|       8|
|9911255| 302165|    0|      12|
|9701033| 209076|    0|      12|
|9710020|9709228|    1|      21|
|9901042|9510135|    1|     737|
| 209146|9502077|    0|      70|
|9705079|9702201|    1|      81|
|   3016|9207067|    0|      46|
|9402099| 105041|    0|      13|
|9705061|9503216|    1|      10|
| 109090|   9107|    0|      14|
| 107016|9304156|    0|      17|
|9812004|9212092|    0|      15|
|9401058|9511114|    0|      12|
|  12261| 202126|    0|      30|
| 207246|9807137|    1|      85|
|9501144|9311081|    1|      54|
|9304048|9510008|    0|      28|
+-------+-------+-----+--------+
only showing top 20 rows



# Graphe d'auteurs

In [104]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import lower, col
import numpy as np
from pyspark.ml.linalg import SparseVector, VectorUDT
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import ArrayType

def explode_authors(row):
    my_input=row.asDict()

    my_output=[]
    #if my_input['author']==None or my_input['cited'] == None : my_output['author']='' #We want o avoid any null issues

    for author in my_input['author'].split(','):
        for cited in my_input['cited'].split(','):
            ret = {}
            ret['src'] = str(author).strip()
            ret['dst'] = str(cited).strip()
            newRow = Row(*ret.keys()) #a. the Row object specification (column names)
            newRow = newRow(*ret.values())#b. the corresponding column values

            my_output.append(newRow)
    return my_output

In [105]:
author_edges = ss.sql("select A.authors author, B.authors cited, edges.cites \
       from edges inner join nodes A on edges.target = A.id \
                   inner join nodes B on edges.source = B.id \
       where edges.cites = 1 and A.authors is not null and B.authors is not null")

author_edges.rdd.flatMap(explode_authors).collect()

                                                                                

[Row(src='Atish Dabholkar', dst='L.E.Ibanez'),
 Row(src='Atish Dabholkar', dst='A.M.Uranga'),
 Row(src='Jaemo Park', dst='L.E.Ibanez'),
 Row(src='Jaemo Park', dst='A.M.Uranga'),
 Row(src='Cumrun Vafa', dst='G. Michaud'),
 Row(src='Cumrun Vafa', dst='R. C. Myers'),
 Row(src='Swapna Mahapatra', dst='Sukanta Bose'),
 Row(src='Swapna Mahapatra', dst='Sayan Kar'),
 Row(src='Karmadeva Maharana', dst='Sukanta Bose'),
 Row(src='Karmadeva Maharana', dst='Sayan Kar'),
 Row(src='Lambodar P. Singh', dst='Sukanta Bose'),
 Row(src='Lambodar P. Singh', dst='Sayan Kar'),
 Row(src='H.J. Boonstra', dst='Hitoshi Nishino'),
 Row(src='H.J. Boonstra', dst='Subhash Rajpoot'),
 Row(src='K. Skenderis', dst='Hitoshi Nishino'),
 Row(src='K. Skenderis', dst='Subhash Rajpoot'),
 Row(src='P.K. Townsend', dst='Hitoshi Nishino'),
 Row(src='P.K. Townsend', dst='Subhash Rajpoot'),
 Row(src='D.I.Olive', dst='M.B. Halpern'),
 Row(src='D.I.Olive', dst='E. Kiritsis'),
 Row(src='D.I.Olive', dst='N. Obers'),
 Row(src='D.I.Ol

# Classifier

In [186]:
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.types import FloatType
import numpy as np 


custom_udf_schema = VectorUDT()
def to_sparse_(v):
    
    if isinstance(v, SparseVector):
        return v
    vs = np.array([v], dtype=np.float64)
    nonzero = np.nonzero(vs)[0]
    return SparseVector(len(vs), nonzero, vs[nonzero])

to_sparse = udf(to_sparse_, custom_udf_schema)

zou.show()

zou2 = zou.withColumn('inDegree2', to_sparse(zou.inDegree))
zou3 = zou2.withColumn('cites', zou2['cites'].cast(FloatType()))

(train, test) = zou3.rdd.randomSplit([0.8, 0.2])

clf = LogisticRegression(featuresCol="inDegree2", labelCol="cites", predictionCol="pred", rawPredictionCol="raw_pred", maxIter=10)


model = clf.fit(train.toDF())
result = model.transform(test.toDF())

print("Prediction Output :")
print(result.head(1))
 



+-----+-------+-----+--------+-----+----+--------------------+--------------------+-----+--------------------+
|  src|    dst|cites|inDegree|   id|year|              topics|             authors|temp1|                text|
+-----+-------+-----+--------+-----+----+--------------------+--------------------+-----+--------------------+
|10009|9701020|    0|      15|10009|2000|normalized weyl-t...|       Takuya Masuda| null|we define a norma...|
|10009|9305185|    0|     180|10009|2000|normalized weyl-t...|       Takuya Masuda| null|we define a norma...|
|10009|   8224|    0|      15|10009|2000|normalized weyl-t...|       Takuya Masuda| null|we define a norma...|
|10009|   8197|    0|      15|10009|2000|normalized weyl-t...|       Takuya Masuda| null|we define a norma...|
|10009|   4177|    0|      14|10009|2000|normalized weyl-t...|       Takuya Masuda| null|we define a norma...|
|10009|9311189|    0|      37|10009|2000|normalized weyl-t...|       Takuya Masuda| null|we define a norma...|
|

                                                                                

Prediction Output :


ERROR:root:KeyboardInterrupt while sending command.                 (0 + 1) / 1]
Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/clientserver.py", line 475, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [173]:
zou.show()

                                                                                

+-------+-------+-----+--------+
|    src|    dst|cites|inDegree|
+-------+-------+-----+--------+
|9510123|9502114|    1|      20|
|9707075|9604178|    1|     132|
|9312155|9506142|    0|       8|
|9911255| 302165|    0|      12|
|9701033| 209076|    0|      12|
|9710020|9709228|    1|      21|
|9901042|9510135|    1|     737|
| 209146|9502077|    0|      70|
|9705079|9702201|    1|      81|
|   3016|9207067|    0|      46|
|9402099| 105041|    0|      13|
|9705061|9503216|    1|      10|
| 109090|   9107|    0|      14|
| 107016|9304156|    0|      17|
|9812004|9212092|    0|      15|
|9401058|9511114|    0|      12|
|  12261| 202126|    0|      30|
| 207246|9807137|    1|      85|
|9501144|9311081|    1|      54|
|9304048|9510008|    0|      28|
+-------+-------+-----+--------+
only showing top 20 rows



In [177]:
def get_authors(edges, nodes):
    
    edges_authors_src = edges.join(nodes, edges.src == nodes.id, ).drop(nodes.)
    # edges_authors_dst = edges_authors_src.join(nodes, edges.dst == nodes.id)

    return edges_authors_src

In [179]:
zou.show()

+-----+-------+-----+--------+-----+----+--------------------+--------------------+-----+--------------------+
|  src|    dst|cites|inDegree|   id|year|              topics|             authors|temp1|                text|
+-----+-------+-----+--------+-----+----+--------------------+--------------------+-----+--------------------+
|10009|9701020|    0|      15|10009|2000|normalized weyl-t...|       Takuya Masuda| null|we define a norma...|
|10009|9305185|    0|     180|10009|2000|normalized weyl-t...|       Takuya Masuda| null|we define a norma...|
|10009|   8224|    0|      15|10009|2000|normalized weyl-t...|       Takuya Masuda| null|we define a norma...|
|10009|   8197|    0|      15|10009|2000|normalized weyl-t...|       Takuya Masuda| null|we define a norma...|
|10009|   4177|    0|      14|10009|2000|normalized weyl-t...|       Takuya Masuda| null|we define a norma...|
|10009|9311189|    0|      37|10009|2000|normalized weyl-t...|       Takuya Masuda| null|we define a norma...|
|

                                                                                