## Analyzing ACM Citation Network
#### Genevieve Peters | April 26, 2020
#### References
GraphX Programming Guide: ampcamp.berkeley.edu/big-data-mini-course/graph-analytics-with-graphx.html  
Dataset By: Jie Tang, Jing Zhang, Limin Yao, Juanzi Li, Li Zhang, and Zhong Su. ArnetMiner: Extraction and Mining of Academic Social Networks. In Proceedings of the Fourteenth ACM SIGKDD International Conference on Knowledge Discovery and Data Mining (SIGKDD'2008). 990-998.  
Dataset Description: https://aminer.org/billboard/citation  
Spark Functions: https://spark.apache.org/docs/2.3.1/api/sql/index.html

Spark configuration on yarn

In [1]:
%%init_spark
launcher.master="yarn"
launcher.num_executors=6
launcher.executor_cores=2
launcher.executor_memory="6000m"

Import spark, hadoop, and graphx packages

In [2]:
import org.apache.hadoop.conf._
import org.apache.hadoop.io._
import org.apache.hadoop.mapreduce.lib.input._
import scala.util.matching.Regex
import org.apache.spark._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

Intitializing Scala interpreter ...

Spark Web UI available at http://bd-hm:8088/proxy/application_1587937229181_0026
SparkContext available as 'sc' (version = 2.4.4, master = yarn, app id = application_1587937229181_0026)
SparkSession available as 'spark'


import org.apache.hadoop.conf._
import org.apache.hadoop.io._
import org.apache.hadoop.mapreduce.lib.input._
import scala.util.matching.Regex
import org.apache.spark._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD


### Explore ACM Dataset
**Task**: Load ACM dataset. Review file format.  
**Results**: ACM dataset was filtered to include only papers that cite at least 1 other paper.

In [3]:
@transient val hadoopConf = new Configuration
hadoopConf.set("textinputformat.record.delimiter","#*")

val inputRdd = sc.newAPIHadoopFile("/hadoop-user/data/citation-acm-v8.txt",classOf[TextInputFormat],classOf[LongWritable], classOf[Text], hadoopConf)
                 .map{case(key,value)=>value.toString}
                 .filter(value=>value.length!=0)
                 .filter(paper=>paper.contains("#%"))//remove papers that do not reference any other paper

hadoopConf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml
inputRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at filter at <console>:57


In [4]:
/*For Presentation*/
val sampPid = "5390879920f70186a0d42417"
inputRdd.filter(paper=>paper.contains(s"#index$sampPid")).collect.foreach(println)

A modification support system—automatic correction of side—effects caused by type modifications
#@T. Tenma, Y. Sato, Y. Morimoto, M. Tanaka, T. Ichikawa
#t1990
#cCSC '90 Proceedings of the 1990 ACM annual conference on Cooperation
#index5390879920f70186a0d42417
#%5390877920f70186a0d2cae4
#%5390878320f70186a0d329d9
#%5390878a20f70186a0d36e5e
#%5390878a20f70186a0d37f0a
#%53908bad20f70186a0dc2ff5
#!Programmers modify software for many reasons. When a part of a software is modified, side-effects of the modification are propagated to other parts of the software. By correcting these side-effects automatically, the productivity and the reliability of software development increases. This paper proposes an approach based on constraints and component-based software representation for detecting and correcting side-effects. A system based on the approach for type modifications is presented. The system is developed independent of a particular language.




sampPid: String = 5390879920f70186a0d42417


### Step 1. Building ACM Citation Network

**Task**: Extract all references for each paper. Create an rdd of links.  
**Results**:  
- <font color="blue">getLinks</font>: function that concatenates ids of all references for a paper    
- <font color="blue">citationRdd</font>(<font color="blue">pid</font>,<font color="blue">title</font>,<font color="blue">link</font>) RDD: extracts a paper's id, title, and references  
- <font color="blue">pidRdd</font>(<font color="blue">pid</font>,<font color="blue">link</font>) RDD: removes the title (the title will be needed later for page rank)
- <font color="blue">indices</font>(<font color="blue">index1</font>,<font color="blue">index2</font>) RDD: represents every link  
- <font color="blue">linkGraph</font>(<font color="blue">index1</font>,<font color="blue">index2</font>) Graph: graph of links

In [5]:
def getLinks(record: String): String ={
    val regCite = "#%.*".r
    return regCite.findAllIn(record).mkString("").split("#%").mkString(" ").trim()
}

val citationRdd = inputRdd.map(paper=>((paper.split("#index")(1).split("#")(0).trim(),paper.split("#")(0).trim()),getLinks(paper)))
                          .flatMapValues(links=>links.split(" "))
                          .map{case(((pid,title),link))=>(pid,title,link)}

var pidRdd = citationRdd.map{case(pid,title,link)=>(pid,link)}                             

val indices = pidRdd.keys.distinct.union(pidRdd.values.distinct).distinct.zipWithIndex

val linkRdd = pidRdd.join(indices).map{case(pid1,(pid2,index1))=>(pid2,index1)}
                    .join(indices).map{case(pid2,(index1,index2))=>(index1,index2)}
val linkGraph = Graph.fromEdgeTuples(linkRdd,null)

getLinks: (record: String)String
citationRdd: org.apache.spark.rdd.RDD[(String, String, String)] = MapPartitionsRDD[7] at map at <console>:59
pidRdd: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[8] at map at <console>:61
indices: org.apache.spark.rdd.RDD[(String, Long)] = ZippedWithIndexRDD[21] at zipWithIndex at <console>:63
linkRdd: org.apache.spark.rdd.RDD[(Long, Long)] = MapPartitionsRDD[29] at map at <console>:66
linkGraph: org.apache.spark.graphx.Graph[Null,Int] = org.apache.spark.graphx.impl.GraphImpl@4587f890


In [6]:
/*For Presentation*/
println("Unique Papers: " + indices.count)
println("Papers citing >0 papers: " + inputRdd.count())
println("Links: " + linkRdd.count())
println("Vertices: " + linkGraph.numVertices)
println("Edges: " + linkGraph.numEdges)

Unique Papers: 1369128
Papers citing >0 papers: 916906
Links: 8650267
Vertices: 1357308
Edges: 8650267


In [7]:
/*For Presentation*/
indices.toDF("pid","index").createOrReplaceTempView("indices")
linkRdd.toDF("src","dst").createOrReplaceTempView("linkRdd")
val row = spark.sql("""SELECT index FROM indices WHERE (pid=='5390879920f70186a0d42417')""").take(1)
val sampSrc = row(0).getLong(0)
spark.sql(s"""SELECT src, pid, index FROM indices, linkRdd
                    WHERE (src==$sampSrc) AND (dst==index)""").collect.foreach(println)

[349088,53908bad20f70186a0dc2ff5,681896]
[349088,5390878a20f70186a0d36e5e,1083440]
[349088,5390878320f70186a0d329d9,881476]
[349088,5390877920f70186a0d2cae4,1152009]
[349088,5390878a20f70186a0d37f0a,828727]


row: Array[org.apache.spark.sql.Row] = Array([349088])
sampSrc: Long = 349088


### Step 2.1 Visualizing the in-degree distribution

**Task**: Compute the in-degree distribution and save rdd to file.  
**Results**: The in-degree distribution compares the number of in-links across the dataset.  
> <font color = "green" size ="4">&emsp;$P(k)=\frac{n_{k}}{n}$</font>  
Where  
<font color = "green" size ="2">&emsp;$P(k)$</font><font size ="2">*- In-degree distribution*</font>  
<font color = "green" size ="2">&emsp;$n_{k}$</font><font size ="2">*- Number of papers with k in-links*</font>  
<font color = "green" size ="2">&emsp;$n$</font><font size ="2">*- Number of unique papers*</font>  

In [8]:
val inDegree = linkGraph.inDegrees
val n = inDegree.count().toDouble
val pk = inDegree.map{case(dst,k)=>(k,1L)}.reduceByKey((nk1,nk2)=>(nk1+nk2))
                 .map{case(k,nk)=>(k,(nk/n).toDouble)}

pk.repartition(1).saveAsTextFile("/hadoop-user/data/acm_pk")

inDegree: org.apache.spark.graphx.VertexRDD[Int] = VertexRDDImpl[62] at RDD at VertexRDD.scala:57
n: Double = 1007293.0
pk: org.apache.spark.rdd.RDD[(Int, Double)] = MapPartitionsRDD[66] at map at <console>:55


In [9]:
println(pk.count)

911


In [10]:
/*For Presentation*/
inDegree.toDF("dst","in").createOrReplaceTempView("inD")
val sampGroup = spark.sql(s"SELECT lr.dst AS dst, in FROM inD i, linkRdd lr WHERE (src==$sampSrc) AND (lr.dst==i.dst)")
sampGroup.collect.foreach(println)
sampGroup.createOrReplaceTempView("sampGroup")

[681896,106]
[1083440,1]
[881476,12]
[1152009,1149]
[828727,46]


sampGroup: org.apache.spark.sql.DataFrame = [dst: bigint, in: int]


In [11]:
/*For Presentation*/
pk.toDF("k","pk").createOrReplaceTempView("pk")
spark.sql("SELECT k, pk FROM pk, sampGroup WHERE (in==k)").collect.foreach(println)

[12,0.012770862102685118]
[1,0.3094998178285762]
[1149,1.9855196055169647E-6]
[106,1.4295741159722145E-4]
[46,8.090992392481631E-4]


### Step 2.2 Implementing the weighted page rank algorithm  

**Task**: Calculate the in-weights and out-weights of each edge.

<font color = "green" size ="3">&emsp;$W^{in}_{(src,dst)}=\frac{\#inlinks(dst)}{\sum_{k\in out(src)}\#inlinks(k)}$</font>
> Where  
<font color = "green" size ="2">&emsp;$W_{(src,dst)}$</font><font size ="2">*- the in-weight of edge (src,dst)*</font>  
<font color = "green" size ="2">&emsp;$\#inlinks(dst)$</font><font size ="2">*- the in-degrees of dst*</font>  
<font color = "green" size ="2">&emsp;${\sum_{k\in out(src)}\#inlinks(k)}$</font><font size ="2">*- the sum of the in-degrees of all papers that src references*</font>  

<font color = "green" size ="3">&emsp;$W^{out}_{(src,dst)}=\frac{\#outlinks(dst)}{\sum_{k\in out(src)}\#outlinks(k)}$</font>
> Where  
<font color = "green" size ="2">&emsp;$W_{(src,dst)}$</font><font size ="2">*- the out-weight of edge (src,dst)*</font>  
<font color = "green" size ="2">&emsp;$\#outlinks(dst)$</font><font size ="2">*- the out-degrees of dst*</font>  
<font color = "green" size ="2">&emsp;${\sum_{k\in out(src)}\#outlinks(k)}$</font><font size ="2">*- the sum of the out-degrees of all papers that src references*</font>  

**Results**: 
- <font color="blue">degreeGraph</font> Graph: calculates the in-degree and out-degree for every vertex
- <font color="blue">outVinK</font> Vertex, <font color="blue">outVoutK</font> Vertex:     
graph.aggregateMessages is a function to send "messages" to vertices in a graph. In this instance, for every src, we added the in-degrees of every paper it references and stored it as a vertex attribute. We repeated this for out-degrees. These two values represent the denominators of the in-weight and out-weight equations for each edge (src,_).
- <font color="blue">buildWeights</font> Graph: joins aggregated messages with <font color="blue">degreeGraph</font> and formats its attributes
- <font color="blue">weights</font> Graph: calculates the in-weight and out-weight for every edge

In [12]:
val degreeGraph = linkGraph.outerJoinVertices(linkGraph.inDegrees){case(id,empty,inDegOpt)=>(inDegOpt.getOrElse(0))}
                      .outerJoinVertices(linkGraph.outDegrees){case(id,inDeg,outDegOpt)=>(inDeg,outDegOpt.getOrElse(0))}
degreeGraph.persist()

val outVinK = degreeGraph.aggregateMessages[Int](triplet=>triplet.sendToSrc(triplet.dstAttr._1),(a,b)=>(a+b))
val outVoutK = degreeGraph.aggregateMessages[Int](triplet=>triplet.sendToSrc(triplet.dstAttr._2),(a,b)=>(a+b))

val buildWeights = degreeGraph.outerJoinVertices(outVinK){case(id,(in,out),suminK)=>(in,out,suminK match
                                                                                {case Some(suminK) => suminK.toDouble
                                                                                 case None => 0 })}
                            .outerJoinVertices(outVoutK){case(id,(in,out,suminK),sumoutK)=>(in,out,suminK,sumoutK match
                                                                                {case Some(sumoutK) => sumoutK.toDouble
                                                                                 case None => 0 })}

val weights = buildWeights.mapTriplets(triplet=>(triplet.dstAttr._1/triplet.srcAttr._3, triplet.dstAttr._2/triplet.srcAttr._4))

degreeGraph: org.apache.spark.graphx.Graph[(Int, Int),Int] = org.apache.spark.graphx.impl.GraphImpl@59977e13
outVinK: org.apache.spark.graphx.VertexRDD[Int] = VertexRDDImpl[127] at RDD at VertexRDD.scala:57
outVoutK: org.apache.spark.graphx.VertexRDD[Int] = VertexRDDImpl[131] at RDD at VertexRDD.scala:57
buildWeights: org.apache.spark.graphx.Graph[(Int, Int, Double, Double),Int] = org.apache.spark.graphx.impl.GraphImpl@afa5ff8
weights: org.apache.spark.graphx.Graph[(Int, Int, Double, Double),(Double, Double)] = org.apache.spark.graphx.impl.GraphImpl@4b936383


In [13]:
/*For Presentation*/
degreeGraph.vertices.toDF("v","attr").createOrReplaceTempView("degG")
spark.sql("SELECT v, attr FROM degG, sampGroup WHERE (dst==v)").collect.foreach(println)

[681896,[106,8]]
[1083440,[1,17]]
[881476,[12,0]]
[1152009,[1149,0]]
[828727,[46,8]]


In [14]:
/*For Presentation*/
outVinK.toDF("src","vk").createOrReplaceTempView("outVinK")
spark.sql(s"SELECT * FROM outVinK WHERE src==$sampSrc").collect.foreach(println)
outVoutK.toDF("src","vk").createOrReplaceTempView("outVoutK")
spark.sql(s"SELECT * FROM outVoutK WHERE src==$sampSrc").collect.foreach(println)
weights.edges.toDF("srcId","dstId","attr")createOrReplaceTempView("weights")
spark.sql(s"SELECT * FROM weights WHERE srcId==$sampSrc").collect.foreach(println)

[349088,1314]
[349088,33]
[349088,681896,[0.0806697108066971,0.24242424242424243]]
[349088,828727,[0.0350076103500761,0.24242424242424243]]
[349088,881476,[0.0091324200913242,0.0]]
[349088,1083440,[7.6103500761035E-4,0.5151515151515151]]
[349088,1152009,[0.8744292237442922,0.0]]


**Task**: Calculate the page rank for all dst vertices. Compute 10 iterations.

<font color = "green" size ="3">&emsp;$PR(dst)=\frac{1-d}{N}+d * \sum_{src\in (dst)} PR(src)*W^{in}_{(src,dst)}*W^{out}_{(src,dst)}$</font>  
Where  
<font color = "green" size ="2">&emsp;$PR(dst)$</font><font size ="2">*- the page rank of dst*</font>  
<font color = "green" size ="2">&emsp;$d$</font><font size ="2">*- constant => 0.85*</font>  
<font color = "green" size ="2">&emsp;$N$</font><font size ="2">*- number of vertices => 1356780</font>  
<font color = "green" size ="2">&emsp;$PR(src)$</font><font size ="2">*- </font>  
<font color = "green" size ="2">&emsp;$W^{in}_{(src,dst)}$</font><font size ="2">*- the in-weight of edge (src,dst)*</font>  
<font color = "green" size ="2">&emsp;$W^{out}_{(src,dst)}$</font><font size ="2">*- the out-weight of edge (src,dst)*</font>  
<font color = "green" size ="2">&emsp;$\sum_{src\in (dst)}$</font><font size ="2">*- the sum of the product of the page rank of src, the in-weight, and the out-weight for edge (src,dst)*</font>  

**Results**:
> <font color = "blue">edgeWeights</font>: dataframe of edges and their in-weight and out-weight  
> <font color = "blue">vertPRs</font>: dataframe of vertices and their page ranks  
> <font color = "blue">updatePRs</font>: dataframe that calculates a new page rank for all dst vertices  
> <font color = "blue">leftJoin</font>: dataframe that updates the vertPRs dataframe with new page ranks 

In [15]:
val N = weights.numVertices.toDouble
val initPR = (1.0/N)
val d = 0.85

weights.edges.toDF("srcId","dstId","attr").createOrReplaceTempView("weights")
val edgeWeights = spark.sql("SELECT srcId AS src, dstId AS dst, attr._1 AS win, attr._2 AS wout FROM weights")
edgeWeights.createOrReplaceTempView("eWeights")

linkGraph.vertices.map{case(v,n)=>v}.toDF("v").createOrReplaceTempView("verts")
val vertPRs = spark.sql(s"SELECT v, $initPR AS pr FROM verts")
vertPRs.createOrReplaceTempView("vPRs")

for (i<-1 to 10)
{
    val updatePRs = spark.sql(s"""SELECT e.dst AS dst, nanvl(((1-$d)/$N)+$d*SUM(vp.pr*e.win*e.wout), (1-$d)/$N) AS pr
                FROM eWeights e, vPRs vp
                WHERE e.src==vp.v
                GROUP BY dst""")
    updatePRs.createOrReplaceTempView("updatePRs")                          
                              
    val leftJoin = spark.sql(s"""SELECT vp.v AS v, CASE WHEN up.pr IS NULL THEN vp.pr ELSE up.pr END AS pr
                                    FROM vPRs vp, updatePRs up
                                    WHERE vp.v==up.dst""")
    leftJoin.createOrReplaceTempView("vPRs")
     /*For Presentation*/
    println(s"***$i***")
    spark.sql(s"SELECT * FROM vPRs WHERE (v==$sampSrc)").take(1).foreach(println)
    spark.sql(s"SELECT g.dst, vp.pr FROM vPRs vp, sampGroup g WHERE (vp.v==g.dst)").collect.foreach(println)
}

***1***
[349088,1.1054459122768472E-7]
[681896,1.4269153454260554E-6]
[1083440,1.1075551618841542E-7]
[881476,1.1051E-7]
[1152009,1.1051E-7]
[828727,4.404706174608952E-7]
***2***
[349088,1.1051522139576891E-7]
[681896,3.9366235976030644E-7]
[1083440,1.1054683799970573E-7]
[881476,1.1051E-7]
[1152009,1.1051E-7]
[828727,2.3705712968161005E-7]
***3***
[349088,1.1051519355934562E-7]
[681896,2.8232434451742944E-7]
[1083440,1.1054682821247103E-7]
[881476,1.1051E-7]
[1152009,1.1051E-7]
[828727,1.9915914507976426E-7]
***4***
[349088,1.1051519349265721E-7]
[681896,2.5876804060096163E-7]
[1083440,1.1054682820319479E-7]
[881476,1.1051E-7]
[1152009,1.1051E-7]
[828727,1.6515345100400431E-7]
***5***
[349088,1.1051519348759664E-7]
[681896,2.503740578140372E-7]
[1083440,1.1054682820317257E-7]
[881476,1.1051E-7]
[1152009,1.1051E-7]
[828727,1.6365417840411145E-7]
***6***
[349088,1.1051519348180802E-7]
[681896,2.479202922966625E-7]
[1083440,1.1054682820317089E-7]
[881476,1.1051E-7]
[1152009,1.1051E-7]
[8

N: Double = 1357308.0
initPR: Double = 7.367524541224247E-7
d: Double = 0.85
edgeWeights: org.apache.spark.sql.DataFrame = [src: bigint, dst: bigint ... 2 more fields]
vertPRs: org.apache.spark.sql.DataFrame = [v: bigint, pr: decimal(22,22)]


**Task**: After the 10th iteration, find the top 10 papers with the highest ranks. Print results to a tab delimited text file.

**Results**: Selected title, in-degree, and page rank from dataframes. 

In [17]:
citationRdd.map{case(pid,title,link)=>(pid,title)}.distinct.toDF("pid","title").createOrReplaceTempView("titles")
indices.toDF("pid","id").createOrReplaceTempView("indices")
inDegree.toDF("id","in").createOrReplaceTempView("inDeg")

spark.sql("SELECT i.id AS id, t.title AS title, n.in AS in FROM titles t, indices i, inDeg n WHERE t.pid==i.pid AND i.id==n.id").createOrReplaceTempView("info")

val ranked = spark.sql("""SELECT vp.v AS id, f.title AS title, f.in AS indegree, vp.pr AS rank
                            FROM info f, vPRs vp
                            WHERE f.id==vp.v
                            ORDER BY vp.pr DESC
                            LIMIT 10""")

ranked: org.apache.spark.sql.DataFrame = [id: bigint, title: string ... 2 more fields]


In [18]:
ranked.write.format("csv").option("sep","\t").save("/hadoop-user/data/acm_ranked")

### Step 2.3 Finding the average clustering coefficient

**Task**:

<font color = "green" size ="3">&emsp;$CC(src)=\frac{2t_{src}}{k_{src} (k_{src} -1)}$</font>  
Where  
<font color = "green" size ="2">&emsp;$CC(src)$</font><font size ="2">*- clustering coefficient of src*</font>  
<font color = "green" size ="2">&emsp;$t_{src}$</font><font size ="2">*- number of triangles that contain src*</font>  
<font color = "green" size ="2">&emsp;$k_{src}$</font><font size ="2">*- degrees of src*</font>  

**Results**: The number of triangles is easily computed using the graph.triangleCount() function

In [19]:
val vertTri = linkGraph.partitionBy(PartitionStrategy.RandomVertexCut).triangleCount().vertices
val vertDeg = linkGraph.degrees
val ccv = vertDeg.join(vertTri).filter{case(v,(deg,tri))=>deg>1.0}.map{case(v,(deg,tri))=>(v,(2*tri)/(deg*(deg-1.0)))}
ccv.repartition(1).saveAsTextFile("/hadoop-user/data/acm_ccv")

val n = ccv.count
ccv.toDF("v","cc").createOrReplaceTempView("ccv")
spark.sql(s"SELECT SUM(cc)/$n FROM ccv").write.csv("/hadoop-user/data/acm_avg")

vertTri: org.apache.spark.graphx.VertexRDD[Int] = VertexRDDImpl[4778] at RDD at VertexRDD.scala:57
vertDeg: org.apache.spark.graphx.VertexRDD[Int] = VertexRDDImpl[4784] at RDD at VertexRDD.scala:57
ccv: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, Double)] = MapPartitionsRDD[4789] at map at <console>:56
n: Long = 1156468


In [21]:
val degrees = linkGraph.degrees
degrees.toDF("v","deg").createOrReplaceTempView("deg")
spark.sql(s"SELECT s.dst, d.deg FROM sampGroup s, deg d WHERE (d.v==s.dst)").collect.foreach(println)

[681896,114]
[1083440,18]
[881476,12]
[1152009,1149]
[828727,54]


degrees: org.apache.spark.graphx.VertexRDD[Int] = VertexRDDImpl[4784] at RDD at VertexRDD.scala:57


In [22]:
vertTri.toDF("v","t").createOrReplaceTempView("vertTri")
println("*triangles*")
spark.sql(s"SELECT s.dst, vt.t FROM vertTri vt, sampGroup s WHERE (vt.v==s.dst)").collect.foreach(println)

*triangles*
[681896,165]
[1083440,20]
[881476,5]
[1152009,2570]
[828727,34]


In [23]:
/*For Presentation*/
spark.sql(s"SELECT * FROM ccv WHERE (v==$sampSrc)").take(1).foreach(println)
spark.sql(s"SELECT v, cc FROM ccv, sampGroup WHERE (v==dst)").collect.foreach(println)

[349088,0.13333333333333333]
[681896,0.025617140195621797]
[1083440,0.13071895424836602]
[881476,0.07575757575757576]
[1152009,0.003896737960292695]
[828727,0.023759608665269043]
