In [36]:
spark.stop()

## Household - Most Living Site

In [1]:
import os
import networkx as nx

In [2]:
%env  SPARK_HOME=/opt/cloudera/parcels/CDH/lib/spark
%env  PYSPARK_PYTHON=/opt/cloudera/parcels/Anaconda/bin/python3.6
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages graphframes:graphframes:0.6.0-spark2.3-s_2.11 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /home/paul/jar/graphframes-0.5.0-spark2.1-s_2.11.jar,/home/paul/jar/scala-logging-api_2.11-2.1.2.jar,/home/paul/jar/scala-logging-slf4j_2.11-2.1.2.jar pyspark-shell'

env: SPARK_HOME=/opt/cloudera/parcels/CDH/lib/spark
env: PYSPARK_PYTHON=/opt/cloudera/parcels/Anaconda/bin/python3.6


In [146]:
import ast
import logging
import math
import numpy as np
import os
import pandas as pd
import pyspark.sql.functions as F
import warnings
import json

from datetime import datetime, timedelta
from pyspark import SparkConf
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import (DoubleType, FloatType,IntegerType, ArrayType,
    StringType, TimestampType, StructField, StructType, DecimalType, LongType)
from pyspark.sql.window import Window
from typing import Iterable
from graphframes import *


log_path = os.getcwd() + "/somelog.log"
logging.basicConfig(level=logging.INFO, filename=log_path, filemode="a+",
                    format="%(asctime)-15s %(levelname)-8s %(message)s")

In [4]:
import networkx as nx
from graphframes import GraphFrame
from matplotlib import pyplot as plt

%matplotlib inline

def PlotGraph(edge_list):
    Gplot=nx.Graph()
    for row in edge_list.select('src','dst').take(1000):
        Gplot.add_edge(row['src'],row['dst'])

    plt.subplot(121)
    nx.draw(Gplot, with_labels=True, font_weight='bold')

In [5]:
# Spark Set-up
conf = SparkConf()
conf.set('spark.master','yarn')
conf.set('spark.yarn.queue','root.QUEUE')
conf.set('spark.driver.memory',"10G")

# Executor Memory
conf.set('spark.executor.cores',"25")
conf.set('spark.executor.memory',"10G")
conf.set('spark.memory.storageFraction','0.3')
conf.set('spark.executor.memoryOverhead',"8G")
conf.set('spark.executor.instances',"5")

# Dynamic Allocation 
conf.set('spark.dynamicAllocation.enabled',"false")

# KyroSerializer - suppose to replace Java serializer
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrationRequired", "false")

# Disable Schema Merging, no need scan so many partition
conf.set("spark.sql.hive.convertMetastoreParquet.mergeSchema","false")
conf.set("parquet.enable.summary-metadata","false")
conf.set("spark.sql.parquet.mergeSchema", "false")

conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

# Replace existing table even when error
conf.set("spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation","true")

# Graph Frame
# conf.set('spark.jars.packages','graphframes:graphframes:0.6.0-spark2.3-s_2.11')

# Create the SparkSession
spark = SparkSession.builder.appName("household_most_living_site").config(conf=conf)\
            .enableHiveSupport().getOrCreate()

# Creating Sample

In [150]:
# Vertex DataFrame
v = spark.createDataFrame([
  (1,"sub1-X",'sub'),
  (2,"sub2-X",'sub'),
  (3,"sub3-Y",'sub'),
  (4,"sub4-Z",'sub'),
  (5,"siteX",'site'),
  (6,"siteY",'site'),
  (7,"siteZ",'site')
], ["id","name","category"])

# Edge DataFrame
e = spark.createDataFrame([
  (5, 6, "near",2),
  (1, 5, "live",0),
  (2, 5, "live",0),
  (3, 6, "live",0),
  (4, 7, "live",0),
  (2,1,"call",5),
  (2,3,"call",10)
], ["src", "dst", "relationship","weightage"], schema)
# Create a GraphFrame
g = GraphFrame(v, e)

In [151]:
motifs = (
    g\
        .find("(suba)-[livea]->(sitea); (sitea)-[near]->(siteb); (subb)-[liveb]->(siteb); (suba)-[call]->(subb)")\
        .filter("livea.relationship = 'live'")\
        .filter("liveb.relationship = 'live'")\
        .filter("call.relationship != 'live'")
        .select(F.col("suba.id").alias('src'),
                F.col("subb.id").alias('dst'),
                F.col("near.weightage").alias("distance_weightage"),
                F.col("call.weightage").alias("call_weightage"))\
        .withColumn("weightage",F.col('call_weightage')+F.col("distance_weightage"))\
        .filter(F.col('weightage')>0)\
        .select(F.col('src'),
                F.col('dst'),
                F.col('weightage'))
)

In [152]:
motifs.show()

+---+---+---------+
|src|dst|weightage|
+---+---+---------+
|  2|  3|       12|
+---+---+---------+



In [153]:
motifs2 = (
    g\
        .find("(suba)-[livea]->(sitea); (subb)-[liveb]->(sitea); (suba)-[call]->(subb)")\
        .filter("livea.relationship = 'live'")\
        .filter("liveb.relationship = 'live'")\
        .filter("call.relationship != 'live'")\
        .select(F.col("suba.id").alias('src'),
                F.col("subb.id").alias('dst'),
                F.lit(10).alias('distance_weightage'),
                F.col("call.weightage").alias("call_weightage"))\
        .withColumn("weightage",F.col('call_weightage')+F.col("distance_weightage"))\
        .filter(F.col('weightage')>0)\
        .select(F.col('src'),
                F.col('dst'),
                F.col('weightage'))
)

In [154]:
motifs2.show()

+---+---+---------+
|src|dst|weightage|
+---+---+---------+
|  2|  1|       15|
+---+---+---------+



In [183]:
final_motif = motifs.union(motifs2)
final_motif.show()

+---+---+---------+
|src|dst|weightage|
+---+---+---------+
|  2|  3|       12|
|  2|  1|       15|
+---+---+---------+



In [184]:
final_motif = final_motif.\
    select(F.col('src').cast('long').alias('from'),
           F.col('dst').cast('long').alias('to'),
           F.col('weightage').cast('double').alias('weight'))

In [185]:
final_motif.printSchema()

root
 |-- from: long (nullable = true)
 |-- to: long (nullable = true)
 |-- weight: double (nullable = true)



# Edges

In [186]:
final_motif.repartition(1).write.format("parquet").mode("overwrite").save("hdfs:///user/paul/paul_sample_edges")

# Vertex

In [178]:
final_vertex = newG.vertices\
    .withColumn('module',F.lit(0))\
    .select(F.col('id').cast('long').alias('idx'),
            F.col('name'),
            F.col('module').cast('long'))

In [179]:
final_vertex.printSchema()

root
 |-- idx: long (nullable = true)
 |-- name: string (nullable = true)
 |-- module: long (nullable = false)



In [180]:
final_vertex.repartition(1).write.format("parquet").mode("overwrite").save("hdfs:///user/paul/paul_sample_vertices")

In [164]:
# schema = StructType()
# for field in final_vertex.schema.fields:
#     schema.add(StructField(field.name, field.dataType, False))
# f_vertex = spark.createDataFrame(final_vertex.rdd, schema)

# f_vertex.schema

StructType(List(StructField(idx,IntegerType,false),StructField(name,StringType,false),StructField(module,IntegerType,false)))

# Save as Pajek (Backup If HDFS really Cannot)

In [39]:
edges = pd.read_parquet('Nets/paul_sample_edges.parquet')
edges.columns = ['source','target','weight']

In [40]:
edges.head()

Unnamed: 0,source,target,weight
0,b,c,12
1,b,a,15


In [41]:
G = nx.from_pandas_edgelist(edges,edge_attr=True)

In [57]:
G2 = nx.MultiGraph(G)

In [54]:
nx.write_pajek(G,"Nets/paul_sample_graph.net")

In [67]:
nx.write_pajek(G2,"Nets/paul_sample_graph.net")

In [14]:
sample = nx.read_pajek("Nets/rosvall.net")

In [22]:
sample.nodes()

NodeView(('red001', 'red0000', 'red11', 'red100', 'red101', 'red01', 'orange110', 'orange011', 'orange00', 'orange111', 'orange1010', 'orange100', 'orange010', 'green111', 'green000', 'green1100', 'green10', 'green0010', 'green1101', 'green011', 'green010', 'blue00', 'blue10', 'blue11', 'blue011'))

In [30]:
G.nodes()

NodeView(('b', 'c', 'a'))

In [49]:
sample.edges['red001','red01',0]

{'weight': 1.0}

In [24]:
sample.edges()

MultiEdgeDataView([('red001', 'red01'), ('red001', 'red01'), ('red001', 'red11'), ('red001', 'red11'), ('red0000', 'red11'), ('red0000', 'red11'), ('red11', 'red01'), ('red11', 'red01'), ('red11', 'red100'), ('red11', 'red100'), ('red11', 'red101'), ('red11', 'red101'), ('red100', 'red101'), ('red100', 'red101'), ('red101', 'red01'), ('red101', 'red01'), ('red01', 'orange110'), ('red01', 'orange110'), ('orange110', 'orange111'), ('orange110', 'orange111'), ('orange110', 'orange00'), ('orange110', 'orange00'), ('orange110', 'orange011'), ('orange110', 'orange011'), ('orange011', 'orange00'), ('orange011', 'orange00'), ('orange011', 'orange100'), ('orange011', 'orange100'), ('orange00', 'orange111'), ('orange00', 'orange111'), ('orange00', 'orange1010'), ('orange00', 'orange1010'), ('orange00', 'orange100'), ('orange00', 'orange100'), ('orange111', 'orange010'), ('orange111', 'orange010'), ('orange111', 'green111'), ('orange111', 'green111'), ('orange1010', 'orange100'), ('orange1010', '

In [65]:
G2.edges()

MultiEdgeDataView([('b', 'c'), ('b', 'a')])

In [50]:
G.edges['b','c']

{'weight': 12}

In [44]:
nx.get_edge_attributes(G,'weight')

{('b', 'c'): 12, ('b', 'a'): 15}

In [66]:
nx.get_edge_attributes(G2,'weight')

{('b', 'c', 0): 12, ('b', 'a', 0): 15}

In [45]:
nx.get_edge_attributes(sample,'weight')

{('red001', 'red01', 0): 1.0,
 ('red001', 'red01', 1): 1.0,
 ('red001', 'red11', 0): 1.0,
 ('red001', 'red11', 1): 1.0,
 ('red0000', 'red11', 0): 1.0,
 ('red0000', 'red11', 1): 1.0,
 ('red11', 'red01', 0): 1.0,
 ('red11', 'red01', 1): 1.0,
 ('red11', 'red100', 0): 1.0,
 ('red11', 'red100', 1): 1.0,
 ('red11', 'red101', 0): 1.0,
 ('red11', 'red101', 1): 1.0,
 ('red100', 'red101', 0): 1.0,
 ('red100', 'red101', 1): 1.0,
 ('red101', 'red01', 0): 1.0,
 ('red101', 'red01', 1): 1.0,
 ('red01', 'orange110', 0): 0.1,
 ('red01', 'orange110', 1): 0.1,
 ('orange110', 'orange111', 0): 2.0,
 ('orange110', 'orange111', 1): 1.0,
 ('orange110', 'orange00', 0): 1.0,
 ('orange110', 'orange00', 1): 1.0,
 ('orange110', 'orange011', 0): 1.0,
 ('orange110', 'orange011', 1): 1.0,
 ('orange011', 'orange00', 0): 1.0,
 ('orange011', 'orange00', 1): 1.0,
 ('orange011', 'orange100', 0): 1.0,
 ('orange011', 'orange100', 1): 1.0,
 ('orange00', 'orange111', 0): 1.0,
 ('orange00', 'orange111', 1): 1.0,
 ('orange00', 