In [None]:
%spark.pyspark

import sys

from pprint import pprint

# each JSON is small, there's no need in iterative processing
import json
import sys
import os
import xml
import time

import pyspark
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType, FloatType, ArrayType
import pyspark.sql.functions as sparkf
from pyspark.sql.functions import pandas_udf, PandasUDFType

import copy
import uuid

spark = (pyspark.sql.SparkSession.builder.getOrCreate())

coauthor_dir = "gs://clpub/data_lake/arnet/tables/coauthor/merge-0"
author_org_group_dir = "gs://clpub/data_lake/arnet/tables/author_org_discrete/merge-0"

from typing import Iterator, Tuple
import pandas as pd

coauthor_schema = StructType([
    StructField('_id', StringType(), False),
    StructField('_status', IntegerType(), False),
    StructField('_order', IntegerType(), False),
    StructField('paper_id', StringType(), False),
    StructField('paper_title', StringType(), False),
    StructField('author1_id', StringType(), False),
    StructField('author1_name', StringType(), False),
    StructField('author1_org', StringType(), False),
    StructField('author2_id', StringType(), False),
    StructField('author2_name', StringType(), False),
    StructField('author2_org', StringType(), False),
    StructField('year', FloatType(), False),
])
coauthor_df = spark.read.schema(coauthor_schema).parquet(coauthor_dir)
coauthor_df.createOrReplaceTempView("coauthor_df")

author_org_group_schema = StructType([
    StructField('author_id', StringType(), False),
    StructField("author_org", StringType(), False),
    StructField('org_rank', FloatType(), False),
    StructField('computed', IntegerType(), False),
])
author_org_group_df = spark.read.schema(author_org_group_schema).parquet(author_org_group_dir)
author_org_group_df.createOrReplaceTempView("author_org_group_df")

group_coauthor = spark.sql("""
    select author1_id, author2_id, author1_org, author2_org, count(_id) as collab
    from coauthor_df
    group by author1_id, author2_id, author1_org, author2_org
""")

sample = group_coauthor.sample(0.1, 999)
sample.createOrReplaceTempView("coauthor_sample")

sample_ranking = spark.sql("""
    select cs.collab, aogd1.org_rank as author1_rank, aogd2.org_rank as author2_rank
    from coauthor_sample as cs
        inner join author_org_group_df as aogd1 on aogd1.author_id = cs.author1_id and aogd1.author_org = cs.author1_org
        inner join author_org_group_df as aogd2 on aogd2.author_id = cs.author2_id and aogd2.author_org = cs.author2_org
    limit 5000
""")

from scipy.spatial import distance
import numpy as np

@pandas_udf("float", PandasUDFType.SCALAR)
def node_proximity(v1, v2):
    list_r1     =  v1.values.tolist()
    list_r2     =  v2.values.tolist() \
    
    list_res    = []
    for idx in range(0, len(list_r1)):
        proximity = 1.
        if list_r1[idx] > 0 and list_r2[idx] > 0:
            proximity = abs(list_r1[idx] - list_r2[idx]) \
                / max(list_r1[idx], list_r2[idx])
        list_res.append(proximity) \
    
    return pd.Series(list_res)

sample_proximity = sample_ranking.repartition(1).withColumn("datapoint", sparkf.monotonically_increasing_id()) \
    .select(sparkf.col("datapoint"), sparkf.col("collab"), sparkf.col("author1_rank"), sparkf.col("author2_rank"), \
    node_proximity(sparkf.col("author1_rank"), sparkf.col("author2_rank")).alias("node_proximity"))

sample_proximity.repartition(1).write.mode("overwrite").csv("gs://clpub/diagram/org_discrete_vs_coauthor")
