In [1]:
#import package
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
import csv
from pyspark.sql.functions import col, create_map, lit
from itertools import chain
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.functions import row_number
from pyspark.sql.functions import array, col, explode, struct, lit,first
from pyspark.sql.functions import udf
from pyspark.sql.types import MapType, StringType, IntegerType
from collections import Counter
from pyspark.sql.functions import date_format
from pyspark.sql.functions import collect_list
from pyspark.sql import Window
from pyspark.sql.functions import array_contains
import numpy as np
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import udf
from pyspark.sql.functions import struct
from pyspark.sql.window import Window
from pyspark.sql.functions import mean as _mean, stddev as _stddev, col
from pyspark.sql.functions import *
import matplotlib.pyplot as plt
from pyspark.sql.types import *

In [2]:
!pip install nltk

## Data Preparation

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [5]:
#Read file 
# to read parquet file
venmo_t = spark.read.parquet('/FileStore/tables/VenmoSample_snappy-e020d.parquet')
venmo = venmo_t.cache()
word_t =  spark.read.format('com.databricks.spark.csv').options(header = 'true',inferschema = 'true').load('/FileStore/tables/Venmo_Word_Classification_Dictonary.csv')
word = word_t.cache()

In [6]:
emoji = spark.read.format('com.databricks.spark.csv').options(header = 'true',inferschema = 'true').load('/FileStore/tables/Venmo_Emoji_Classification_Dictionary.csv')
#Transform the Emoji Dict from wide to long
w = Window().orderBy(lit('Event'))
emoji = emoji.withColumn("row_num", row_number().over(w))

df = sc.parallelize([(1, 0.0, 0.6), (1, 0.6, 0.7)]).toDF(["A", "col_1", "col_2"])

def to_long(df, by):

    # Filter dtypes and split into column names and type description
    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
    
    # Spark SQL supports only homogeneous columns
    assert len(set(dtypes)) == 1, "All columns have to be of the same type"

    # Create and explode an array of (column_name, column_value) structs
    kvs = explode(array([
      struct(lit(c).alias("key"), col(c).alias("val")) for c in cols
    ])).alias("kvs")

    return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])

emoji_long = to_long(emoji, ["row_num"])
emoji_long = emoji_long.drop('row_num')
emoji_long.show(5)

## Clean the text

In [8]:
# tokenize the description word group
from pyspark.ml.feature import Tokenizer
from pyspark.sql.functions import monotonically_increasing_id

tokenizer = Tokenizer(inputCol="description", outputCol="description_words")
tokenized_venmo = tokenizer.transform(venmo)


In [9]:
#Spilt all the Emoji into emoji_in_post section

import emoji
import re
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, IntegerType

emojis_list = map(lambda x: ''.join(x.split()), emoji.UNICODE_EMOJI.keys())
escape_list = '|'.join(re.escape(p) for p in emojis_list)

def find_all_emo(plain_text):
    if plain_text is None:
        return None
    else:
        emo_list = re.findall(escape_list,plain_text)
    return emo_list

search_all_emojis = F.udf(lambda y: find_all_emo(y), ArrayType(StringType()))

tokenized_venmo = tokenized_venmo.withColumn("emoji_in_post", search_all_emojis(F.col("description")))

In [10]:
#Spilt all the word into word_in_post section
def select_text(x):
    text_list=[]
    for val in x:   
        if val.isalpha():
            text_list.append(val)
    return text_list

select_text_udf = F.udf(lambda y: select_text(y), ArrayType(StringType()))
tokenized_venmo = tokenized_venmo.withColumn("word_in_post", select_text_udf(F.col('description_words')))

In [11]:
tokenized_venmo.show(10)

In [12]:
from pyspark.sql.functions import udf, col, lower, regexp_replace
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from nltk.stem.snowball import SnowballStemmer

# Remove stop words
remover = StopWordsRemover(inputCol='word_in_post', outputCol='word_in_post_nostop')
tokenized_venmo_nostop = remover.transform(tokenized_venmo).select('user1','user2', 'transaction_type','datetime','description','emoji_in_post','word_in_post_nostop')

# Stem text
stemmer = SnowballStemmer(language='english')
stemmer_udf = udf(lambda tokens: [stemmer.stem(token) for token in tokens], ArrayType(StringType()))
tokenized_venmo_nostop_stemmed = tokenized_venmo_nostop.withColumn("words_stemmed", stemmer_udf("word_in_post_nostop"))


In [13]:
tokenized_venmo_nostop_stemmed.show(100)

# Q1

## Word Cout Section

In [16]:
nword = word.drop('_c9')

#Collect all the word category into list
People = [row[0] for row in nword.select(nword.columns[0]).collect()]
Food = [row[0] for row in nword.select(nword.columns[1]).collect()]
Event = [row[0] for row in nword.select(nword.columns[2]).collect()]
Activity = [row[0] for row in nword.select(nword.columns[3]).collect()]
Travel = [row[0] for row in nword.select(nword.columns[4]).collect()]
Tran = [row[0] for row in nword.select(nword.columns[5]).collect()]
Utility = [row[0] for row in nword.select(nword.columns[6]).collect()]
Cash = [row[0] for row in nword.select(nword.columns[7]).collect()]
Illegal = [row[0] for row in nword.select(nword.columns[8]).collect()]

In [17]:
#Compare the word with the word category dict
def word_type(x):
    type = []
    for val in x:
        if val in People:
            type.append('People')
        elif val in Food:
            type.append("Food")
        elif val in Event:
            type.append("Event")
        elif val in Activity:
            type.append("Activity")
        elif val in Utility:
            type.append("Utility")
        elif val in Cash:
            type.append("Cash")
        elif val in Illegal:
            type.append("Illegal")
        elif val in Tran:
            type.append("Transportation")   
        else: type.append("Others")
    return list(set(type))

In [18]:
#Build the UDF function
FindWordCate_udf = F.udf(lambda y: word_type(y), ArrayType(StringType()))

#Run the UDF function
venmo_cleaned= tokenized_venmo_nostop_stemmed.withColumn("word_type", FindWordCate_udf(F.col('words_stemmed')))

#show result
venmo_cleaned.sort('user1').show()

## Emoji Cout Section

In [20]:
emoji = sqlContext.read.format('com.databricks.spark.csv').options(header = 'true',inferschema = 'true').load('/FileStore/tables/Venmo_Emoji_Classification_Dictionary.csv')
#Collect all the emoji category into list
Event_e = [row[0] for row in emoji.select(emoji.columns[0]).collect()]
Travel_e = [row[0] for row in emoji.select(emoji.columns[1]).collect()]
Food_e = [row[0] for row in emoji.select(emoji.columns[2]).collect()]
Activity_e = [row[0] for row in emoji.select(emoji.columns[3]).collect()]
Tran_e = [row[0] for row in emoji.select(emoji.columns[4]).collect()]
People_e = [row[0] for row in emoji.select(emoji.columns[5]).collect()]
Utility_e = [row[0] for row in emoji.select(emoji.columns[6]).collect()]

In [21]:
#Compare the emoji with the emoji category dict
def emoji_type(x):
    type = []
    for val in x:
        if val in Event_e:
            type.append('Event')
        elif val in Travel_e:
            type.append('Travel')
        elif val in Activity_e:
            type.append('Activity')
        elif val in Tran_e:
            type.append('Transportation')
        elif val in People_e:
            type.append('People')
        elif val in Utility_e:
            type.append('Utility')
        else: type.append("Others")
    if type is []:
        type.append("Others")
    return list(set(type))

In [22]:
#Build the UDF function
FindEmoCateg_udf = F.udf(lambda y: emoji_type(y), ArrayType(StringType()))

#Run the UDF function
venmo_cleaned = venmo_cleaned.withColumn("emoji_type", FindEmoCateg_udf(F.col('emoji_in_post')))

## Concate Section

In [24]:
#Concate emoji type and word type into a new col called type
from pyspark.sql.functions import concat
col_list = ['word_type','emoji_type']
venmo_type_t = venmo_cleaned.withColumn('type',concat(*col_list))
venmo_type_final = venmo_type_t.cache()

In [25]:
# save the result datafrmae into parquet file, so that later we can use it more efficiently
venmo_type_final.write.parquet("venmo_type_final.parquet", mode='overwrite')

In [26]:
venmo_type_final.show()

# Q2

What is the percent of emoji only transactions?

In [29]:
#write udf to return False when the 'word_only' list is not null
def emoji_only(value):
    if (value == []) or (value == ['']):
        return 'True'
    else: return 'False'

In [30]:
# Run the udf function
emoji_only_test = F.udf(lambda x: emoji_only(x))
venmo_new = tokenized_venmo.withColumn('emoji_only',emoji_only_test(F.col('word_in_post')))

In [31]:
# calculate the percentage
total = venmo_new.count()
percent = venmo_new.groupBy("emoji_only").count().withColumnRenamed("count","per_per_group").withColumn("per_total",(F.col("per_per_group")/total)*100)

In [32]:
# show the result
percent.show()

In [33]:
#Which are the top 5 most popular emoji?
venmo_new.select("emoji_in_post").withColumn("emoji",F.explode(F.col("emoji_in_post"))).groupBy("emoji").count().sort(F.col("count").desc()).show(5)

In [34]:
#Which are the top three most popular emoji categories?
venmo_new.select("emoji_type").withColumn("emoji_type_count",F.explode(F.col("emoji_type"))).groupBy("emoji_type_count").count().sort(F.col("count").desc()).show(3)

# Q3

In [36]:
venmo_type_final.createOrReplaceTempView("venmo_type_final")

# since we only want the profile for transaction where user spent money, we only select user1 when transaction type is payment 
# and user2 when transaction type is charge
venmo_spending = spark.sql("""(SELECT user1 as user, type, datetime FROM venmo_type_final WHERE transaction_type = 'payment') UNION ALL (SELECT user2 as user, type,datetime FROM venmo_type_final WHERE transaction_type = 'charge') """)

category_count = venmo_spending.select('user', F.explode('type').alias("type"))\
                                       .groupBy('user','type')\
                                       .count()\
                                       .select('user','type', 'count',F.sum('count').over(Window.partitionBy("user")).alias('total_count'))\
                                       .sort('user', 'type')
category_count.show() # 'count' column shows the count of transaction for certain user in the whole lifetime and 'total_count' is number of all transaction for the certain user

In [37]:
# percentage = 'count'/'total_count'
category_count = category_count.select('user','type',round(((col("count") /col("total_count"))),2).alias("proportion"))

In [38]:
# pivot the percentage of each category
category_count = category_count.groupBy('user')\
                      .pivot("type").sum("proportion").fillna(0).sort("user")


# Q4

In [40]:
# create the month indicator for each transaction
# We set the first transaction happend in time 0, and any transaction happend in 30 days after first transaction tagged as 'month 1', and so on. thus here we user
# ceil() function

window = Window.partitionBy('user').orderBy('datetime')
venmo_spending_dyn = venmo_spending.withColumn("month_passed", F.ceil((F.datediff(venmo.datetime, 
                                   first(venmo.datetime).over(window)))/30))

In [41]:
# Save as parquet format
venmo_spending_dyn.coalesce(1).write.mode('overwrite').format("parquet").save('/FileStore/tables/venmo.parquet')

In [42]:
# First, count each users' trasnaction count for each category in each month.
venmo_spending_dyn = spark.read.parquet('/FileStore/tables/venmo.parquet/part-00000-tid-4840438754299892081-68daf463-17ae-46f7-aa3c-33f93078eb62-5141-1-c000.snappy.parquet')
user_dynamic_profile = venmo_spending_dyn.select('user','month_passed', F.explode('type').alias("type"))\
                                       .groupBy('user','month_passed','type')\
                                       .count()\
                                       .select('user','month_passed','type', 'count',F.sum('count').over(Window.partitionBy("user","month_passed")).alias('total_count'))\
                                       .sort('user','month_passed', 'type')

In [43]:
# pivor the transaction count 
user_dynamic_profile = user_dynamic_profile.groupBy('user','month_passed',"total_count")\
                       .pivot("type").sum("count").fillna(0).sort("user","month_passed")
user_dynamic_profile.show(20)

In [44]:
# Since users may not have transaction in each month of the 12-month lifetime, left join the entire month period for each user

venmo_spending.createOrReplaceTempView("venmo_spending")
month_cycle = spark.sql("""SELECT distinct v.user,sub.month
             FROM venmo_spending v CROSS JOIN (select 0 as month union all select 1 as month union all select 2 union all select 3 union all select 4 union all select 5 union all select 6 union all select 7 union all select 8 union all select 9 union all select 10 union all select 11 union all select 12) as sub  
            ORDER BY v.user,sub.month""")

user_dynamic_profile.createOrReplaceTempView("user_dynamic_profile")
month_cycle.createOrReplaceTempView("month_cycle")

accum_profile = spark.sql("""
                SELECT l.user, l.month, ifnull(r.total_count,0) as total_count
                      ,ifnull(r.Activity,0) as Activity, ifnull(r.Cash,0) as Cash, ifnull(r.Event,0) as Event, ifnull(r.Food,0) as Food, ifnull(r.Illegal,0) as Illegal, 
                      ifnull(r.Others,0) as Others, ifnull(r.People,0) as People, ifnull(r.Transportation,0) as Transportation, ifnull(r.Travel,0) as Travel,
                      ifnull(r.Utility,0) as Utility
                FROM month_cycle l LEFT JOIN user_dynamic_profile r
                ON l.user = r.user AND l.month = r.month_passed """)

In [45]:
#calculate the proportion for each month
accum_profile.createOrReplaceTempView("accum_profile")

accum_profile_prop = spark.sql(
         """SELECT user, month,
            ifnull( round(Activity/ total_count ,2),0) as activity_perc,
            ifnull( round(Food/ total_count ,2),0) as food_perc,
            ifnull( round(Illegal/ total_count ,2),0) as illegal_perc,
             ifnull( round(Others/ total_count ,2),0) as others_perc,
             ifnull( round(People/ total_count ,2),0) as people_perc,
             ifnull( round(Utility/ total_count ,2),0)  as utility_perc,
            ifnull( round(Cash/ total_count ,2),0) as cash_perc,
            ifnull( round(Event/ total_count ,2),0)  as event_perc,
            ifnull( round(Transportation/ total_count ,2),0)  as transp_perc,
             ifnull( round(Travel/ total_count ,2),0)  as travel_perc
            FROM accum_profile
            ORDER BY user, month """    
                 )

In [46]:
# write the result file to parquet
# accum_profile_prop.coalesce(1).write.mode('overwrite').format("parquet").save('/FileStore/tables/accum_profile_prop.parquet')

In [47]:
# For the last step, we need to calculate the mean and s.d. for transaction proportion within each category for each month interval

# first thing is to unpivot the result percentage 
accum_profile_prop.createOrReplaceTempView("accum_profile_prop")
accum_profile_unpivot = spark.sql("""SELECT user,month,
             stack(10, 'activity_perc',activity_perc,
                    'food_perc', food_perc, 
                    'illegal_perc', illegal_perc,
                    'others_perc',others_perc, 
                    'people_perc',people_perc, 
                    'utility_perc',utility_perc, 
                    'cash_perc',cash_perc, 
                    'event_perc',event_perc, 
                    'transp_perc',transp_perc, 
                    'travel_perc',travel_perc) as (category,proportion)
            FROM accum_profile_prop""")

# then calculate mean and standard deviation
accum_profile_unpivot.createOrReplaceTempView("accum_profile_unpivot")
accum_profile_sum = spark.sql("""SELECT month,category,
                          mean(proportion) as mean,
                          2*stddev(proportion) as sd_2
                          FROM accum_profile_unpivot
                          GROUP BY month, category
                          ORDER BY month, category 
                          """)

In [48]:
accum_profile_sum.sort('category','month').show(130) # display the result to see if things make sense

In [49]:
# to plot by matplotlib, we need to convert pyspark DF to pandas DataFrame
accum_profile_sum_pd = accum_profile_sum.toPandas()

# draw the mean and standard deviation in y, and month in x-axis. There are one line for each category.
fig, ax = plt.subplots(figsize=(20,10))    # 1

for key, group in accum_profile_sum_pd.groupby('category'):
    group.plot('month', 'mean', yerr='sd_2', label=key, ax=ax)   # 2

plt.show()
# we can see that the category 'other' is higher than other lines, indicating that most of the description includes category 'others' which are not included in our current dictionary.

# Social Network Analysis

In [51]:
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
import csv
from pyspark.sql.functions import col, create_map, lit
from itertools import chain
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.functions import row_number
from pyspark.sql.functions import array, col, explode, struct, lit
from pyspark.sql.functions import array_contains
import numpy as np
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import udf
from pyspark.sql.functions import struct
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline

In [52]:
# to read parquet file
venmo_data = spark.read.parquet('/FileStore/tables/VenmoSample_snappy-e020d.parquet')
venmo_data.sort('user1').show()

In [53]:
# create connection edges by union both user1-user2 and user2-user1
venmo_data.createOrReplaceTempView('venmo_data')

friending_t = spark.sql("SELECT distinct user, friend\
                               FROM  \
                               (SELECT user1 as user, user2 as friend\
                               FROM venmo_data \
                               UNION \
                               SELECT user2 as user, user1 as friend\
                               FROM venmo_data) as sub")
friending = friending_t.cache()
friending.createOrReplaceTempView('friending')
friending.sort('user','friend').show()

# Q5

Write a script to find a user’s friends and friends of friends
( Friend definition: A user’s friend is someone who has transacted with the user , either sending money to the user or receiving money from the user). Describe your algorithm and calculate its computational complexity. Can you do it better?

#### - First method: try running sql in spark

In [57]:
# try running sql in spark
# for each user, find the friend of original friend, and the fof can not be in original friend of user
fof =  spark.sql("SELECT distinct l.user, l.friend, r.friend as fof, 1 as connect\
                          FROM friending l LEFT JOIN friending r \
                          ON l.friend = r.user \
                          WHERE r.friend NOT IN (SELECT friend \
                                                 FROM friending \
                                                 WHERE user = l.user)\
                          AND r.friend !=l.user \
                          ORDER BY l.user, l.friend, fof")

In [58]:
spark.sql("SELECT distinct l.user, l.friend, r.friend as fof, 1 as connect\
                          FROM friending l LEFT JOIN friending r \
                          ON l.friend = r.user \
                          WHERE r.friend NOT IN (SELECT friend \
                                                 FROM friending \
                                                 WHERE user = l.user)\
                          AND r.friend !=l.user \
                          ORDER BY l.user, l.friend, fof").explain()

#### - Second, try spark-specific sql-liked join

In [60]:
friending_copy = friending

#join - normal join
friending.alias("user").\
         join(friending_copy.alias("friend"), col("user.friend") == col("friend.user"),"left_outer").\
         filter(col('user.user')!=col('friend.friend')).explain()

#### - Third, try broadcast join that specific to Spark

In [62]:
# When the table is small enough to fit into the memory of a single worker node, with some
# breathing room of course, we can optimize our join. Although we can use a big table–to–big
# table communication strategy, it can often be more efficient to use a broadcast join.
friending.alias("user").\
        join(broadcast(friending_copy.alias("friend")), col("user.friend") == col("friend.user"),"left_outer").\
        filter(col('user.user')!=col('friend.friend')).explain()

In [63]:
fof.show(100)

#### Second method

# Q6

## 1) find dynamic user network

In [68]:
venmo_data.createOrReplaceTempView('venmo_data')

dynamic_friending = spark.sql("SELECT distinct user, friend, datetime\
                               FROM  \
                               (SELECT user1 as user, user2 as friend, datetime \
                               FROM venmo_data \
                               UNION \
                               SELECT user2 as user, user1 as friend, datetime \
                               FROM venmo_data) ")
dynamic_friending.createOrReplaceTempView('dynamic_friending')
dynamic_friending.sort('user','friend').show()

In [69]:
# create the month indicator for each transaction
window = Window.partitionBy('user').orderBy('datetime')
dynamic_friending = dynamic_friending.withColumn("month_passed", F.ceil((F.datediff(dynamic_friending.datetime, 
                                   first(dynamic_friending.datetime).over(window)))/30) )
dynamic_friending.sort('month_passed','user','friend').show()

In [70]:
dynamic_degree = dynamic_friending.groupBy('user','month_passed').agg(countDistinct('friend').alias('degree')).sort('user','month_passed')
dynamic_degree.show()

In [71]:
# for each user, find the friend of original friend, and the fof can not be in original friend of user
dynamic_friending.createOrReplaceTempView('dynamic_friending')

dynamic_fof =  spark.sql("SELECT distinct l.user, l.friend, l.month_passed, r.friend as fof \
                          FROM dynamic_friending l LEFT JOIN dynamic_friending r \
                          ON l.friend = r.user AND l.month_passed = r.month_passed AND r.friend !=l.user\
                          WHERE r.friend NOT IN (SELECT friend \
                                                 FROM dynamic_friending \
                                                 WHERE user = l.user)\
                          ORDER BY l.month_passed, l.user, l.friend, fof")
dynamic_fof.sort('user', 'month_passed','friend').show(100)
# +----+-------+------------+--------+
# |user| friend|month_passed|     fof|
# +----+-------+------------+--------+
# |   3|     43|           0|   20530|
# |   3| 263437|           1|  266763|
# |   3| 567957|           5|  281136|
# |   3| 567957|           5| 1375797|
# |   3| 567957|           5|  615598|
# |   8| 900433|           2| 2734056|

In [72]:
dynamic_fofdegree = dynamic_fof.groupBy('user','month_passed').agg(countDistinct('fof').alias('fof_degree'))

In [73]:
dynamic_fofdegree.createOrReplaceTempView('dynamic_fofdegree')
dynamic_degree.createOrReplaceTempView('dynamic_degree')

dynamic_degree_fofdegree = spark.sql("SELECT l.user, l.month_passed, l.degree , ifnull(r.fof_degree,0) as fof_degree\
FROM dynamic_degree l LEFT JOIN dynamic_fofdegree r \
ON l.user =r.user AND l.month_passed = r.month_passed \
WHERE l.month_passed <13 ")

dynamic_degree_fofdegree.sort('user','month_passed').show()

In [74]:
dynamic_degree_fofdegree= dynamic_degree_fofdegree.withColumnRenamed("ifnull(r.`fof_degree`, 0)", "fof_degree")

In [75]:
dynamic_degree_fofdegree.sort('user','month_passed').show()

In [76]:
# confirm how many user are in the dataframe
dynamic_degree.select(countDistinct("user")).show()

In [77]:
# export the result set 
# dynamic_degree_fofdegree.coalesce(1).write.mode('overwrite').format("parquet").save('/FileStore/tables/dynamic_fof_degree.parquet')

## 2) find dynamic clustering coefficient

In [79]:
dynamic_friending= dynamic_friending.cache()
dynamic_friending.createOrReplaceTempView('f1')
dynamic_friending.createOrReplaceTempView('f2')
dynamic_friending.createOrReplaceTempView('f3')

dyn_triangle_t = spark.sql("""SELECT distinct f1.month_passed, f1.user as tri_1, f1.friend as tri_2 
                                 ,f2.friend as tri_3, f3.friend as back_tri_1
                          FROM f1 
                          LEFT JOIN f2 ON f1.month_passed = f2.month_passed AND f1.friend = f2.user AND f1.user !=f2.friend
                          LEFT JOIN f3 ON f2.month_passed = f3.month_passed AND f2.friend = f3.user AND f3.friend = f1.user  
                          ORDER BY f1.month_passed, tri_1,  tri_2, tri_3""")
dyn_triangle = dyn_triangle_t.cache()

In [80]:
dyn_triangle.createOrReplaceTempView('dyn_triangle')

count_trian_t = spark.sql("""SELECT month_passed, tri_1 as user , count(*)/2 as tri_count
                          FROM dyn_triangle
                          WHERE tri_2 is not null AND tri_3 is not null and back_tri_1 is not null
                          GROUP BY month_passed,user
                          ORDER BY user, month_passed""")
count_trian = count_trian_t.cache()
# count_trian.sort('user').show(1000)
# +------------+------+---------+
# |month_passed|  user|tri_count|
# +------------+------+---------+
# |           0|    12|      1.0|
# |           0|   112|      1.0|
# |          12|  1293|      1.0|
# |           7|  2210|      1.0|

In [81]:
count_trian.createOrReplaceTempView('count_trian')
dynamic_degree.createOrReplaceTempView('dynamic_degree')

trian_triple = spark.sql("""SELECT d.month_passed, d.user, d.degree, ifnull(c.tri_count,0) as tri_count
                            FROM dynamic_degree d LEFT JOIN count_trian c
                            ON d.month_passed = c.month_passed AND d.user = c.user
                            WHERE d.month_passed <13
                            """
                            )
trian_triple.sort('month_passed','user').show()

In [82]:
# calculate triple count for each user by factorial:
# the potential triple count for a user who has n friend: nC2
triple_count = trian_triple\
             .withColumn('triple_count',F.when(col('degree')>=2,factorial(col('degree'))/(2*factorial(col('degree')-2))).otherwise(0))

In [83]:
cluster_coef = triple_count.withColumn('cc',F.when(col("triple_count")>0, round(col('tri_count')/col('triple_count'),2)).otherwise(0))
cluster_coef.show(20)

In [84]:
cluster_coef_output = cluster_coef.select('month_passed','user','cc')

In [85]:
cluster_coef_output.sort('user','month_passed').show()

In [86]:
#cluster_coef_output.coalesce(1).write.mode('overwrite').format("parquet").save('/FileStore/tables/cluster_coef_final.parquet')

In [87]:
cluster_coef_pd = cluster_coef.select(col('month_passed'),col('user'),col('cc')).toPandas()

In [88]:
f,ax = plt.subplots(figsize=(20, 8))
sns.set(style="whitegrid")
sns.violinplot(data=cluster_coef_pd,x='month_passed',y='cc')

## 3) calculate the pagerank

In [90]:
#!pip install networkx
import networkx as nx

In [91]:
#Read file 
# to read parquet file
venmo = spark.read.parquet('/FileStore/tables/VenmoSample_snappy-e020d.parquet')
venmo.show(100)

In [92]:
# to calculate each user's pagerank in each time point, we need to first fetch for both user1 and user2 with the right direction\
# 

venmo.createOrReplaceTempView('venmo')

venmo = spark.sql("""SELECT distinct user1 as user, user2 as friend, datetime, transaction_type
FROM venmo
WHERE transaction_type = 'payment'
UNION 
SELECT distinct user2 as user, user1 as friend, datetime, transaction_type
FROM venmo
WHERE transaction_type = 'charge' """) 


# the result includes all transactions with directed edge 

In [93]:
# create the month indicator for each transaction
window = Window.partitionBy('user').orderBy('datetime')
venmo_with_month= venmo.withColumn("month_passed", F.ceil((F.datediff(venmo.datetime, 
                                   first(venmo.datetime).over(window)))/30))

In [94]:
# loop for i in range(0,13) --- month 0 to 12
schema = StructType([StructField("user", LongType(), True), StructField("pagerank", DoubleType(), True), StructField("month_passed", StringType(), False)])
pagerank_df = sqlContext.createDataFrame([],schema)

for i in range(0,13):
  month_rdd = venmo_with_month.filter(venmo_with_month.month_passed==i).select('user','friend').rdd
  month_tuple_list =month_rdd.map(tuple).collect() 
  G = nx.DiGraph()
  G.add_edges_from(month_tuple_list)
  PR = nx.pagerank_scipy(G, alpha=0.85)
  temp = sc.parallelize([(k,)+(v,) for k,v in PR.items()]).toDF(['user','pagerank'])
  temp = temp.withColumn('month_passed',lit(i))
  pagerank_df = pagerank_df.union(temp)

In [95]:
pagerank_df.coalesce(1).write.mode('overwrite').format("parquet").save('dbfs:/FileStore/tables/pagerank_df.parquet')

In [96]:
pagerank_df.count()