In [1]:
import os
import sys
import math
import numpy as np
import pandas as pd
from itertools import islice

import networkx as nx

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.util import rddToFileName, TransformFunction

from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.clustering import StreamingKMeans

%matplotlib widget

import matplotlib.pyplot as plt

In [2]:
SparkContext.setSystemProperty('spark.executor.memory', '52g')
SparkContext.setSystemProperty('spark.app.name', 'stars')
ssc = StreamingContext(sc, 1) 
spark

In [3]:
Gt = nx.Graph()
directed_Gt = nx.DiGraph()

In [4]:
batchsize = 10000

#batch_file_folder =  "/common/home/sdb202/project/temp/"
batch_file_folder =  "/common/home/milky-way/temp2/"
output_file_folder = "/common/home/milky-way/temp/"

In [5]:
def map_for_vector(line):
    vector = [float(x) for x in line.split(',')]
    return vector

def node_filter(n) :
    
    if n[0][0] >= n[1][0] : # filter out redundant pairs
        return False
    
    ra_1 = n[0][2]
    ra_2 = n[1][2]
    
    d_1 = n[0][3]
    d_2 = n[1][3]

    # 𝛾≈ sqrt([(𝛼𝑎−𝛼𝑏)cos((𝛿𝑎 + 𝛿𝑏) / 2)]2+(𝛿𝑎−𝛿𝑏)2)
    ra_diff = (ra_1 - ra_2)
    d_diff = (d_1 - d_2)
    d_avg = (d_1 + d_2) / 2
    
    distance = math.sqrt( ((ra_diff * math.cos(d_avg)) ** 2) + (d_diff ** 2) )
    if distance < 0.0001: # filter with distance
        
        return True
    
    else :
        return False
    
    
    
def add_edges(n):
    Gt.add_edge(n[0][0], n[1][0]) # adding edge from A to B
    print("--- Adding edge: (", n[0][0], n[1][0], ") ---")
    return (n[0][0], 1) # return A, 1 : meaning, the A has a neighbor


def takeAndPrint(time, rdd):
    print("--" + str(time) + "--\n")
    try:
        taken = rdd.collect()

        for record in taken:
            with open(output_file_folder + str(time) + ".txt", "a") as myfile:
                myfile.write(str(record) + "\n")
    
    except Exception as e:
        print("Got exception: " + str(e))

In [6]:
star_tile_batches = ssc.textFileStream(batch_file_folder)\
                        .mapPartitionsWithIndex(lambda idx, it: islice(it, 1, None) if idx == 0 else it)\
                        .map(map_for_vector)
                        #.window(1000,1000)

In [7]:
star_pairs = star_tile_batches.transform(lambda rdd: rdd.cartesian(rdd))
filtered_star_pairs = star_pairs.filter(node_filter)
filtered_star_pairs.foreachRDD(takeAndPrint)

In [8]:
ssc.start()

In [None]:
ssc.awaitTerminationOrTimeout(80000)

--2020-12-12 04:58:23--

--2020-12-12 04:58:24--

--2020-12-12 04:58:25--

--2020-12-12 04:58:26--

--2020-12-12 04:58:27--

--2020-12-12 04:58:28--

--2020-12-12 04:58:29--

--2020-12-12 04:58:30--

--2020-12-12 04:58:31--

--2020-12-12 04:58:32--

--2020-12-12 04:58:33--

--2020-12-12 04:58:34--

--2020-12-12 04:58:35--

--2020-12-12 04:58:36--

--2020-12-12 04:58:37--

--2020-12-12 04:58:38--

--2020-12-12 04:58:39--

--2020-12-12 04:58:40--

--2020-12-12 04:58:41--



In [None]:
ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [3]:
# os.environ['PYSPARK_PYTHON'] = '/koko/system/anaconda/envs/python38/bin/python3'
# print(os.environ['PYSPARK_PYTHON'] )

# os.environ['PYSPARK_DRIVER_PYTHON'] = '/koko/system/anaconda/envs/python38/bin/python3'
# print(os.environ['PYSPARK_DRIVER_PYTHON'] )

In [26]:
# star_tile_batches.pprint(5)
# star_tile_batches.saveAsTextFile(output_file_folder + "stars")
# filtered_star_pairs.saveAsTextFiles(output_file_folder + "edges")

# def saveAsTextFile(t, rdd):
#     path = rddToFileName(output_file_folder + "edges", None, t)
#     try:
#         rdd.saveAsTextFile(path)
#     except Exception as e:
#         # after recovered from checkpointing, the foreachRDD may
#         # be called twice
#         print(str(e))

# filtered_star_pairs.foreachRDD(saveAsTextFile)

In [25]:
# degree_rdd = filtered_star_pairs.map(add_edges) \
#                                 .reduceByKey(lambda val1, val2: val1 + val2) #count degree of each node        


In [24]:
# from pyspark import SparkContext
# stars_df = sqlContext.createDataFrame(star_tile_batches, ["SOURCEID", "RA2000", "DEC2000", "L", "B", "J", "K"])
# edges_df = sqlContext.createDataFrame(filtered_star_pairs, ["node1", "node2"])

In [None]:
# nx.write_gpickle(Gt, "test.gpickle")
# Gt = nx.read_gpickle("test.gpickle")


In [None]:
# from pyspark.sql import SparkSession
# from pyspark.sql.types import StructType
# dschema = StructType()\
#                 .add("row_id", "double")\
#                 .add("source_id", "double")\
#                 .add("ra", "double")\
#                 .add("d", "double")\
#                 .add("l", "double")\
#                 .add("b", "double")\
#                 .add("j", "double")\
#                 .add("k", "double")\

# dfCSV = spark.readStream.option("sep", ",").option("header", "false").schema(dschema).csv(batch_file_folder)
# dfCSV.createOrReplaceTempView("stars")
# totalSalary = spark.sql("select ra,sum(d) from stars group by ra")
# query = totalSalary.writeStream.outputMode("complete")\
#     .option("checkpointLocation", "hello")\
#     .format("memory")\
#     .queryName('kothi')\
#     .start()

# query.awaitTermination(60)

In [44]:
# 
ra_1, d_1, ra_2, d_2 = ( 270.50987177, -29.867119148333302, 270.500969023333, -29.86276306 )

ra_diff = (ra_1 - ra_2)
d_diff = (d_1 - d_2)
d_avg = (d_1 + d_2) / 2
distance = math.sqrt( ((ra_diff * math.cos(d_avg)) ** 2) + (d_diff ** 2) )

In [45]:
distance

0.0043596569087387205