__2017-04-19:__ build property graph

|quarter|records|
|---|---|
|17Q1|21569108 |

__2017-04-25:__ analyze @midnight

In [2]:
import pyspark.sql.functions as F; from pyspark.sql.types import *; from pyspark.sql import Window, Row
import graphframes as G
import datetime as dt; import numpy as np; import networkx as nx; import re; import pandas as pd

In [3]:
e = sqlContext.read.parquet('/mnt/vmn.tianzi/graphframes/l3d_17q1/edges')
v = sqlContext.read.parquet('/mnt/vmn.tianzi/graphframes/l3d_17q1/vertices')

g = G.GraphFrame(v, e)

In [4]:
display(v.filter(F.col('program').rlike("MIDNIGHT")))

### Part I  
1. isolate pids that watch either `CMDY_AT MIDNIGHT_O_PRIME TIME` or `CMDY_AT MIDNIGHT_R_NIGHT TIME`  
2. create subgraph using these pids  
3. check out their `WEEKDAY DAYTIME` edges  
5. competitive programming in `WEEKDAY DAYTIME`

In [6]:
help(g.bfs)

In [7]:
# create expression for pids 

pids_expr = '('

for p in pids:
  pids_expr = pids_expr+(p[0])+','
  
pids_expr = pids_expr[:-1] + ')'

In [8]:
# step 1, 2

pids = (e.select('pid', F.col('src').rlike('^CMDY_AT MIDNIGHT_')).select('pid').dropDuplicates().collect())

e2 = (e.filter('pid in '+pids_expr)) 

g2 = G.GraphFrame(v, e2)

In [9]:
# step 3,5 weekday daytime competitors

print(g2.vertices.filter('daypart = "WEEKDAY DAYTIME"').orderBy('wt_min_tot', ascending = False).show(15, False))

In [10]:
pr = g2.pageRank(0.15, 'CMDY_AT MIDNIGHT_R_NIGHT TIME', 10).cache()
print(pr.vertices.orderBy('pagerank', ascending=False).filter('daypart="WEEKDAY DAYTIME"').show(15, False))

In [11]:
pr2 = g2.pageRank(0.15, 'CMDY_AT MIDNIGHT_O_PRIME TIME', 10).cache()
print(pr2.vertices.orderBy('pagerank', ascending=False).filter('daypart="WEEKDAY DAYTIME"').show(15, False))

## Part II  
1. Construct graph with `day_hour` as nodes  
2. calculate pagerank by weekday and hour

In [13]:
df = spark.read.parquet('/mnt/vmn.tianzi/graphframes/l3d_17q1_hour/dat')
print(df.show(10))

In [14]:
pids = (df.select('pid', F.col('program').rlike('^AT MIDNIGHT'), F.col('net').rlike('^CMDY')).select('pid').dropDuplicates().collect())

In [15]:
print(len(pids))

pids_expr = '('

for p in pids:
  pids_expr = pids_expr+(p[0])+','
  
pids_expr = pids_expr[:-1] + ')'

In [16]:
# vertices for At Midnight viewers

v = (df
     .filter('pid in '+pids_expr)
     .select(F.substring_index(df.id, '_', -2).alias('id'), 'daypart', 'day', 'hour', 'wt_min_tot')
     .dropDuplicates()
     .groupby(['id', 'daypart']).agg({"wt_min_tot": "sum"})
     .orderBy('id')
     .cache())

print(v.show(10, False))

In [17]:
# edges for At Midnight viewers

HEAD = Row("id", "pid", "age", "gender", "net", "program", "date", "raw", "avg_wt_min", "ind", 
           "wt_min", "daypart", "program_id", "hour", "day", "avg_wt_min_tot", "wt_min_tot", "dur", "freq", "wt_min_shr")

head = HEAD('HEAD_HEAD_HEAD_HEAD', 'HEAD', 1, 'HEAD', 'HEAD', 'HEAD', dt.date(1970,1,1), 1, 1, 'HEAD',
           1, 'HEAD', 'HEAD', 'HEAD', 'HEAD', 1, 1, 1, 1, 1)

w = Window.partitionBy('pid')

def f(x): return x

e = (df
      .filter('pid in '+pids_expr).withColumn('cnt', F.count('pid').over(w))
      .orderBy(['pid', 'date'])
      .rdd
      .map(lambda x: (x[1], (x)))
#       .reduceByKey(lambda a, b: (a if type(a)==list else [a])+(b if type(b)==list else [b]))
      .groupByKey().mapValues(list)
      .map(lambda x: (x[0], [head]+x[1]))
      .map(lambda x: (x[0], zip(x[1][::1], x[1][1::1])))
      .flatMapValues(f)
      .map(lambda x: (x[0], x[1][0][0], x[1][1][0]) + tuple(x[1][1][i] for i in [2,3,6,10])))

print(e.take(3))

In [18]:
schema = StructType([StructField("pid", StringType(), nullable=True),
                     StructField("src", StringType(), True), 
                     StructField("dst", StringType(), True),
                     StructField("age", ShortType(), True),
                     StructField("gender", StringType(), True),
                     StructField("date", DateType(), True),
                     StructField("wt_min", IntegerType(), True)])

(spark
 .createDataFrame(e, schema=schema)
 .withColumn('src', F.substring_index('src', '_', -2))
 .withColumn('dst', F.substring_index('dst', '_', -2))
 .write.parquet('/mnt/vmn.tianzi/graphframes/l3d_17q1_hour/edges', mode = 'overwrite'))

In [19]:
v.withColumnRenamed('sum(wt_min_tot)', 'wt_min_tot').write.parquet('/mnt/vmn.tianzi/graphframes/l3d_17q1_hour/vertices', mode = 'overwrite')

In [20]:
v = spark.read.parquet('/mnt/vmn.tianzi/graphframes/l3d_17q1_hour/vertices')
e = spark.read.parquet('/mnt/vmn.tianzi/graphframes/l3d_17q1_hour/edges')
print(v.show(2))
print(e.show(2))

In [21]:
# initialize an empty graph for 2016. 
G = nx.Graph()

# prepare vertices.
vl = [(i.id,i.asDict()) for i in v.rdd.collect()]
print("Vertex example: \n{}".format(vl[0]))

# add vertices.
G.add_nodes_from(vl)

# prepare edges.
el =  [(i.src, i.dst, i.asDict()) for i in e.rdd.collect()]
print("Edge example: \n{}".format(el[0]))

# add edges.
G.add_edges_from(el)

In [22]:
# pg = nx.pagerank(G, weight = 'wt_min_tot', max_iter = 20)

In [23]:
temp = v.select('id', 'wt_min_tot').rdd.collect()
temp = [{t.id: t.wt_min_tot} for t in temp]
p_dict = dict()
for t in temp:
  p_dict.update(t)

In [24]:
G.remove_node('HEAD_HEAD')

In [25]:
pg = nx.pagerank(G, personalization = p_dict, weight = 'wt_min', max_iter = 20)

In [26]:
pg_pd = pd.DataFrame({'id': pg.keys(), 'pagerank': pg.values()})

In [27]:
pg_df = spark.createDataFrame(pg_pd)

In [28]:
display(pg_df.select('id', F.split('id', '_')[0].alias('day'), F.split('id', '_')[1].alias('hour'), 'pagerank'))

In [29]:
mypg = pg_df.select('id', F.split('id', '_')[0].alias('day'), F.split('id', '_')[1].alias('hour'), 'pagerank').toPandas()

# Define the sorter
sorter = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']

# Create the dictionary that defines the order for sorting
# sorterIndex = dict(zip(sorter,range(len(sorter))))

# mypg['day_Rank'] = mypg['day'].map(sorterIndex)

# mypg.sort(['day_Rank', 'hour', 'day'], ascending = [True, True, True], inplace = True)

# mypg.drop('day_Rank', 1, inplace = True)

mypg = mypg.pivot(index='hour', columns='day', values='pagerank')

mypg = mypg.reindex_axis(sorter, axis=1)

mypg

In [30]:
import seaborn as sns; import matplotlib.pyplot as plt;

In [31]:
fig, ax = plt.subplots()
ax = sns.heatmap(mypg, fmt="d", cmap="RdBu_r", linewidths=.5)
ax.set_xlabel(''); ax.set_ylabel('')

for item in ax.get_yticklabels():
    item.set_rotation(0)
    item.set_fontsize(9)
    
for item in ax.get_xticklabels():
    item.set_fontsize(9)
    
display(fig)