## Install Environment

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
!pip install pyspark



In [0]:
!ls '/usr/lib/jvm/'

default-java		   java-11-openjdk-amd64     java-8-openjdk-amd64
java-1.11.0-openjdk-amd64  java-1.8.0-openjdk-amd64


In [0]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [0]:
!pip install -U pyarrow

Collecting pyarrow
[?25l  Downloading https://files.pythonhosted.org/packages/ba/3f/6cac1714fff444664603f92cb9fbe91c7ae25375880158b9e9691c4584c8/pyarrow-0.17.1-cp36-cp36m-manylinux2014_x86_64.whl (63.8MB)
[K     |████████████████████████████████| 63.8MB 63kB/s 
Installing collected packages: pyarrow
  Found existing installation: pyarrow 0.14.1
    Uninstalling pyarrow-0.14.1:
      Successfully uninstalled pyarrow-0.14.1
Successfully installed pyarrow-0.17.1


In [0]:
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['SPARK_HOME'] = '/content/spark-2.4.5-bin-hadoop2.7'

## Q0 : Your first task is to open your Venmo app, find 10 words that are not already in the dictionary and add them to it. Make sure you don’t add to the dictionary a duplicate word by hitting Control+F before adding your word.

**The word we add**

---


Course -> activity

---


Behalf -> people

---

Buy -> activity

---


Schnitzel -> food

---


Wok -> food

---


Postcard -> travel

---


Voyage -> travel

---


Reimbursement -> cash

---


Espresso -> food

---


Kroger -> food

---



## Import packages

In [0]:
from pyspark.sql.functions import udf, col,lower, regexp_replace, explode, count, split
from pyspark.ml.feature import Tokenizer 
from nltk.stem.snowball import SnowballStemmer 
from nltk.tokenize.casual import TweetTokenizer 
import matplotlib.pyplot as plt
import numpy as np
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.feature import VectorAssembler

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('VenmoAnalysis').getOrCreate()

## Data Preprocessing

In [0]:
word_dict = spark.read.csv('/content/drive/Shared drives/Lucky7/venmo/Venmo Word Classification Dictonary BAX-423 - Word_Dict.csv',inferSchema = True, header = True)
word_dict =word_dict.toPandas().to_dict('list')

In [0]:
dict_word = {}
for topic,words in word_dict.items():
  for word in words:
    if word is None:
      continue
    else:
      dict_word[word] = topic

In [0]:
emoji_dict = spark.read.csv('/content/drive/Shared drives/Lucky7/venmo/Venmo_Emoji_Classification_Dictionary.csv',inferSchema = True, header = True)

In [0]:
emoji_dict.head()

Row(Event='🇦🇺', Travel='🏔', Food='🍇', Activity='👾', Transportation='🚄', People='😀', Utility='⚡')

In [0]:
emoji_dict = emoji_dict.toPandas().to_dict('list')
dict_emoji = {}
for topic,emojis in emoji_dict.items():
  for emoji in emojis:
    if emoji is None:
      continue
    else:
      dict_emoji[emoji] = topic

In [0]:
dict_emoji

In [0]:
sc = spark.sparkContext

# using SQLContext to read parquet file
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

# to read parquet file
venmo = sqlContext.read.parquet('/content/drive/Shared drives/Lucky7/venmo/VenmoSample.snappy.parquet')

In [0]:
venmo.show(10)

+-------+-------+----------------+-------------------+---------------+-----------+--------------------+
|  user1|  user2|transaction_type|           datetime|    description|is_business|            story_id|
+-------+-------+----------------+-------------------+---------------+-----------+--------------------+
|1218774|1528945|         payment|2015-11-27 10:48:19|           Uber|      false|5657c473cd03c9af2...|
|5109483|4782303|         payment|2015-06-17 11:37:04|         Costco|      false|5580f9702b64f70ab...|
|4322148|3392963|         payment|2015-06-19 07:05:31|   Sweaty balls|      false|55835ccb1a624b14a...|
| 469894|1333620|          charge|2016-06-03 23:34:13|             🎥|      false|5751b185cd03c9af2...|
|2960727|3442373|         payment|2016-05-29 23:23:42|              ⚡|      false|574b178ecd03c9af2...|
|3977544|2709470|         payment|2016-09-29 22:12:07|     Chipotlaid|      false|57ed2f4723e064eac...|
|3766386|4209061|         payment|2016-05-20 10:31:15|kitchen cou

## Get the column of user who make the transaction

In [0]:
def exchange(x,y,z):
  if x == 'charge':
    return z
  else:
    return y

In [0]:
def exchange2(x,y,z):
  if x == 'charge':
    return y
  else:
    return z

In [0]:
import pyspark.sql.functions as F
udf_exchange = F.udf(exchange)
udf_exchange2 = F.udf(exchange2)
venmo = venmo.withColumn('user_1',udf_exchange(venmo.transaction_type,venmo.user1,venmo.user2))
venmo = venmo.withColumn('user_2',udf_exchange2(venmo.transaction_type,venmo.user1,venmo.user2))

In [0]:
venmo.show(10)

+-------+-------+----------------+-------------------+---------------+-----------+--------------------+-------+-------+
|  user1|  user2|transaction_type|           datetime|    description|is_business|            story_id| user_1| user_2|
+-------+-------+----------------+-------------------+---------------+-----------+--------------------+-------+-------+
|1218774|1528945|         payment|2015-11-27 10:48:19|           Uber|      false|5657c473cd03c9af2...|1218774|1528945|
|5109483|4782303|         payment|2015-06-17 11:37:04|         Costco|      false|5580f9702b64f70ab...|5109483|4782303|
|4322148|3392963|         payment|2015-06-19 07:05:31|   Sweaty balls|      false|55835ccb1a624b14a...|4322148|3392963|
| 469894|1333620|          charge|2016-06-03 23:34:13|             🎥|      false|5751b185cd03c9af2...|1333620| 469894|
|2960727|3442373|         payment|2016-05-29 23:23:42|              ⚡|      false|574b178ecd03c9af2...|2960727|3442373|
|3977544|2709470|         payment|2016-09

##Q1: Use the text dictionary and the emoji dictionary to classify Venmo's transactions in your sample dataset.

In [0]:
# clean texts
def clean_text(column): 
    column = lower(column)
    column = regexp_replace(column, "'ve", " have") 
    column = regexp_replace(column, "'m", " am") 
    column = regexp_replace(column, "'d", " would") 
    column = regexp_replace(column, "'s", " is") 
    column = regexp_replace(column, "can't", " can not") 
    column = regexp_replace(column, "n't", " not") 
    column = regexp_replace(column, "'re", " are") 
    column = regexp_replace(column, u"[\'\"/|\?=!#&;]", "") 
    return column 
 
venmo = venmo.select("story_id", "user_1","user_2", "datetime",clean_text(venmo.description).alias('description')) 
venmo.show(20, False) 

+------------------------+--------+-------+-------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|story_id                |user_1  |user_2 |datetime           |description                                                                                                                                                                                                                                                                                         |
+------------------------+--------+-------+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
t = TweetTokenizer()

In [0]:
# classify transaction, split every word in each description and use the first one that match
def mapping(description):
  for i in t.tokenize(description):
    if i.lower() in dict_word.keys():
      return dict_word[i.lower()]
    elif i in dict_emoji.keys():
      return dict_emoji[i]
    else:
      continue
  return 'no conclusion'

In [0]:
udf_mapping = F.udf(mapping)
venmo = venmo.withColumn('Classification',udf_mapping(venmo.description))

In [0]:
venmo.show(20)

+--------------------+--------+-------+-------------------+--------------------+---------------+
|            story_id|  user_1| user_2|           datetime|         description| Classification|
+--------------------+--------+-------+-------------------+--------------------+---------------+
|5657c473cd03c9af2...| 1218774|1528945|2015-11-27 10:48:19|                uber| Transportation|
|5580f9702b64f70ab...| 5109483|4782303|2015-06-17 11:37:04|              costco|           Food|
|55835ccb1a624b14a...| 4322148|3392963|2015-06-19 07:05:31|        sweaty balls|Illegal/Sarcasm|
|5751b185cd03c9af2...| 1333620| 469894|2016-06-03 23:34:13|                  🎥|          Event|
|574b178ecd03c9af2...| 2960727|3442373|2016-05-29 23:23:42|                   ⚡|        Utility|
|57ed2f4723e064eac...| 3977544|2709470|2016-09-29 22:12:07|          chipotlaid|  no conclusion|
|573e8503cd03c9af2...| 3766386|4209061|2016-05-20 10:31:15|     kitchen counter|        Utility|
|57461d46cd03c9af2...|  730075|

## Q2 : What is the percent of emoji only transactions? Which are the top 5 most popular emoji? Which are the top three most popular emoji categories?

In [0]:
!pip install emoji

Collecting emoji
[?25l  Downloading https://files.pythonhosted.org/packages/40/8d/521be7f0091fe0f2ae690cc044faf43e3445e0ff33c574eae752dd7e39fa/emoji-0.5.4.tar.gz (43kB)
[K     |███████▌                        | 10kB 12.6MB/s eta 0:00:01[K     |███████████████                 | 20kB 3.5MB/s eta 0:00:01[K     |██████████████████████▋         | 30kB 5.0MB/s eta 0:00:01[K     |██████████████████████████████▏ | 40kB 6.4MB/s eta 0:00:01[K     |████████████████████████████████| 51kB 2.6MB/s 
[?25hBuilding wheels for collected packages: emoji
  Building wheel for emoji (setup.py) ... [?25l[?25hdone
  Created wheel for emoji: filename=emoji-0.5.4-cp36-none-any.whl size=42176 sha256=c88233e51b941a66ad859aa1c068cdb196207a8979bce0e599a30023ad109668
  Stored in directory: /root/.cache/pip/wheels/2a/a9/0a/4f8e8cce8074232aba240caca3fade315bb49fac68808d1a9c
Successfully built emoji
Installing collected packages: emoji
Successfully installed emoji-0.5.4


In [0]:
import emoji
def only_emoji(s):
  for i in t.tokenize(s):
    if i not in emoji.UNICODE_EMOJI:
      return 0
    else:
      continue
  return 1

In [0]:
from pyspark.sql.types import IntegerType
udf_emoji = F.udf(only_emoji,IntegerType())
venmo = venmo.withColumn('only_emoji',udf_emoji(venmo.description))

In [0]:
venmo.show()

+--------------------+--------+-------+-------------------+--------------------+---------------+----------+
|            story_id|  user_1| user_2|           datetime|         description| Classification|only_emoji|
+--------------------+--------+-------+-------------------+--------------------+---------------+----------+
|5657c473cd03c9af2...| 1218774|1528945|2015-11-27 10:48:19|                uber| Transportation|         0|
|5580f9702b64f70ab...| 5109483|4782303|2015-06-17 11:37:04|              costco|           Food|         0|
|55835ccb1a624b14a...| 4322148|3392963|2015-06-19 07:05:31|        sweaty balls|Illegal/Sarcasm|         0|
|5751b185cd03c9af2...| 1333620| 469894|2016-06-03 23:34:13|                  🎥|          Event|         1|
|574b178ecd03c9af2...| 2960727|3442373|2016-05-29 23:23:42|                   ⚡|        Utility|         1|
|57ed2f4723e064eac...| 3977544|2709470|2016-09-29 22:12:07|          chipotlaid|  no conclusion|         0|
|573e8503cd03c9af2...| 376638

In [0]:
venmo.select('description').show()

In [0]:
# counting the percent of emoji only transactions
emoji_count = venmo.groupby('only_emoji').sum().collect()[0][1]
emoji_percentage = emoji_count/venmo.count()
emoji_percentage

0.24025869317573947

In [0]:
venmo[venmo.only_emoji==1].show(n=10)

+--------------------+--------+-------+-------------------+-----------+--------------+----------+
|            story_id|  user_1| user_2|           datetime|description|Classification|only_emoji|
+--------------------+--------+-------+-------------------+-----------+--------------+----------+
|5751b185cd03c9af2...| 1333620| 469894|2016-06-03 23:34:13|         🎥|         Event|         1|
|574b178ecd03c9af2...| 2960727|3442373|2016-05-29 23:23:42|          ⚡|       Utility|         1|
|5689c6bdcd03c9af2...| 5317324|3942984|2016-01-04 09:11:25|         👠| no conclusion|         1|
|561080a1cd03c9af2...| 4238868|4879587|2015-10-04 08:28:01|         🍺|          Food|         1|
|577e69e723e064eac...|11719500|8702716|2016-07-07 21:40:39|          ⛽|Transportation|         1|
|570866c2cd03c9af2...| 9414481|2869012|2016-04-09 09:19:46|         🔴| no conclusion|         1|
|528e752fd56b6bac5...|  444145| 323088|2013-11-22 05:03:43|       ✌❤🏈|        People|         1|
|5574690d5d6cc8135...|  2

### Which are the top 5 most popular emoji?

In [0]:
# top 5 most popular emoji 
# strip characters 
def remove_text(column): 
    column = lower(column) 
    column = regexp_replace(column, u"[ -.+a-zA-Z0-9]", "") 
    column = regexp_replace(column, u"[\'\"/|\?=!]", "") 
    return column 
 
emojiOnly = venmo.select(remove_text(col('description')).alias('description')) 
emojiOnly.show(10, False) 

+-----------+
|description|
+-----------+
|           |
|           |
|           |
|🎥         |
|⚡          |
|           |
|           |
|           |
|           |
|           |
+-----------+
only showing top 10 rows



In [0]:
from pyspark.sql.types import StringType,ArrayType

In [0]:
# tokenize each emoji 
def separate_emoji(text): 
    return t.tokenize(text) 
 
myfunc = udf(separate_emoji, ArrayType(StringType())) 
tokenizedEmoji = emojiOnly.withColumn("description", myfunc("description"))
tokenizedEmoji.show(50)

+------------+
| description|
+------------+
|          []|
|          []|
|          []|
|        [🎥]|
|         [⚡]|
|          []|
|          []|
|          []|
|          []|
|          []|
|        [👠]|
|          []|
|        [🍺]|
|         [⛽]|
|          []|
|          []|
|        [🔮]|
|        [🔴]|
|          []|
|          []|
|          []|
|          []|
|          []|
|          []|
|          []|
|          []|
|          []|
|          []|
|          []|
|          []|
|      [휴지]|
|          []|
|          []|
|          []|
|          []|
|          []|
|          []|
|          []|
|          []|
|          []|
|          []|
|          []|
|          []|
|[💁, 🏻, 🍕]|
|          []|
|[🎉, 💞, 🎉]|
|          []|
|          []|
|  [✌, ❤, 🏈]|
|          []|
+------------+
only showing top 50 rows



In [0]:
tokenizedEmoji.printSchema

<bound method DataFrame.printSchema of DataFrame[description: array<string>]>

In [0]:
tokenizedEmoji\
    .select(explode("description")\
    .alias("emojis"))\
    .na.drop()\
    .groupBy("emojis")\
    .count()\
    .sort("count", ascending=False)\
    .show() 

+------+------+
|emojis| count|
+------+------+
|     ️|324701|
|    🍕|152565|
|    🍻|131386|
|    💸|122138|
|    🍷| 96442|
|    🎉| 90742|
|    🏼| 88978|
|    🏻| 82656|
|    🍺| 76615|
|    🏠| 65178|
|     ❤| 63457|
|    🍾| 59219|
|    🍹| 58875|
|    🍴| 57282|
|    🔥| 52930|
|    💃| 52011|
|    🔌| 50683|
|     ☕| 50585|
|    🍔| 49586|
|     :| 49451|
+------+------+
only showing top 20 rows



### Which are the top three most popular emoji categories?

In [0]:
venmo[venmo.only_emoji==1].select('Classification').groupby('Classification').count().sort("count", ascending=False).show()

+--------------+------+
|Classification| count|
+--------------+------+
|          Food|588235|
| no conclusion|358688|
|        People|339207|
|      Activity|129377|
|Transportation|102226|
|       Utility|102152|
|         Event| 49025|
|        Travel| 40083|
+--------------+------+



## Q3. For each user, create a variable to indicate their spending behavior profile. For example, if a user has made 10 transactions, where 5 of them are food and the other 5 are activity, then the user’s spending profile will be 50% food and 50% activity.

In [0]:
venmo.registerTempTable('venmodata')

In [0]:
user_info = sqlContext.sql("select * from (select user_1,Classification,count(*) as number from venmodata group by user_1,Classification) as B join (select user_1,count(*) as all_classification from venmodata group by user_1) as A using (user_1) order by user_1")
user_info.show()

+--------+---------------+------+------------------+
|  user_1| Classification|number|all_classification|
+--------+---------------+------+------------------+
|      10|  no conclusion|     7|                12|
|      10|       Activity|     1|                12|
|      10|           Food|     4|                12|
|10000054|  no conclusion|     1|                 2|
|10000054|Illegal/Sarcasm|     1|                 2|
| 1000006|           Food|     1|                 8|
| 1000006|  no conclusion|     7|                 8|
| 1000007|  no conclusion|     2|                 5|
| 1000007|           Food|     2|                 5|
| 1000007|       Activity|     1|                 5|
| 1000009|  no conclusion|     6|                 8|
| 1000009|           Food|     1|                 8|
| 1000009|       Activity|     1|                 8|
|  100001|  no conclusion|     2|                 4|
|  100001|        Utility|     1|                 4|
|  100001|           Food|     1|             

In [0]:
user_info = user_info.withColumn('Classification_Percentage',user_info.number/user_info.all_classification)

In [0]:
user_info.show()

+--------+---------------+------+------------------+-------------------------+
|  user_1| Classification|number|all_classification|Classification_Percentage|
+--------+---------------+------+------------------+-------------------------+
|      10|           Food|     4|                12|       0.3333333333333333|
|      10|       Activity|     1|                12|      0.08333333333333333|
|      10|  no conclusion|     7|                12|       0.5833333333333334|
|10000054|  no conclusion|     1|                 2|                      0.5|
|10000054|Illegal/Sarcasm|     1|                 2|                      0.5|
| 1000006|  no conclusion|     7|                 8|                    0.875|
| 1000006|           Food|     1|                 8|                    0.125|
| 1000007|           Food|     2|                 5|                      0.4|
| 1000007|       Activity|     1|                 5|                      0.2|
| 1000007|  no conclusion|     2|                 5|

In [0]:
user_spending_behavior = user_info.groupby('user_1').pivot('Classification').avg('Classification_Percentage')
user_spending_behavior = user_spending_behavior.fillna(0)
user_spending_behavior.show()

+--------+-------------------+-------------------+-----+-------------------+-------------------+-------------------+------+-------------------+-------------------+
|  user_1|           Activity|               Cash|Event|               Food|    Illegal/Sarcasm|     Transportation|Travel|            Utility|      no conclusion|
+--------+-------------------+-------------------+-----+-------------------+-------------------+-------------------+------+-------------------+-------------------+
|      10|0.08333333333333333|                0.0|  0.0| 0.3333333333333333|                0.0|                0.0|   0.0|                0.0| 0.5833333333333334|
|10000054|                0.0|                0.0|  0.0|                0.0|                0.5|                0.0|   0.0|                0.0|                0.5|
| 1000006|                0.0|                0.0|  0.0|              0.125|                0.0|                0.0|   0.0|                0.0|              0.875|
| 1000007|      

## Q4 : In the previous question, you got a static spending profile. However, life and social networks are evolving over time. Therefore, let’s explore how a user’s spending profile is evolving over her lifetime in Venmo. First of all, you need to analyze a user’s transactions in monthly intervals, starting from 0 (indicating their first transaction only) up to 12.

In [0]:
# get users who joined Venmo for at least 12 months
query =\
"""
SELECT
    user_1,
    "0,1,2,3,4,5,6,7,8,9,10,11,12" AS months,
    "Food,Transportation,Event,Utility,Cash,Illegal/Sarcasm,Activity,People,Travel,no conclusion" AS topic
FROM
    venmodata
WHERE
    DATEDIFF(DATE("2016-11-30"), datetime) >= 360
GROUP BY
    user_1
ORDER by user_1
"""
old_cust = spark.sql(query)\
    .withColumn("months", split(col("months"), ",\s*")\
    .cast(ArrayType(IntegerType())).alias("months"))\
    .select("user_1", "topic", explode("months").alias("months"))\
    .withColumn("topic", split(col("topic"), ",\s*")\
    .cast(ArrayType(StringType())).alias("topic"))\
    .select("user_1", "months", explode("topic").alias("topic"))
old_cust.createOrReplaceTempView("old_cust")

In [0]:
old_cust.show()

+------+------+---------------+
|user_1|months|          topic|
+------+------+---------------+
|    10|     0|           Food|
|    10|     0| Transportation|
|    10|     0|          Event|
|    10|     0|        Utility|
|    10|     0|           Cash|
|    10|     0|Illegal/Sarcasm|
|    10|     0|       Activity|
|    10|     0|         People|
|    10|     0|         Travel|
|    10|     0|  no conclusion|
|    10|     1|           Food|
|    10|     1| Transportation|
|    10|     1|          Event|
|    10|     1|        Utility|
|    10|     1|           Cash|
|    10|     1|Illegal/Sarcasm|
|    10|     1|       Activity|
|    10|     1|         People|
|    10|     1|         Travel|
|    10|     1|  no conclusion|
+------+------+---------------+
only showing top 20 rows



In [0]:
# combine user's classification, age, and count of classification at an age
query =\
"""
SELECT
    *,
    (COUNT(*) OVER (PARTITION BY user_1, Classification ORDER BY cust_age ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) AS topic_accu
FROM
    (SELECT
        user_1,
        Classification,
        CASE
            WHEN datetime = (MIN(datetime) OVER (PARTITION BY user_1)) THEN 0
            ELSE CEIL(DATEDIFF(datetime, (MIN(datetime) OVER (PARTITION BY user_1)))/30) 
            END AS cust_age
    FROM
        venmodata 
    WHERE user_1
        IN (SELECT user_1 FROM old_cust GROUP BY user_1))
ORDER BY user_1, cust_age, Classification
"""
pre_profile = spark.sql(query)
pre_profile.createOrReplaceTempView("pre_profile")

In [0]:
pre_profile.show(5)

In [0]:
# generate a unpivoted complete user profile
query =\
"""
SELECT
    user_1,
    months,
    topic,
    topic_accu,
    CASE
        WHEN LAST(topic_monthly_percentage) IS NULL THEN 0
        ELSE LAST(topic_monthly_percentage) END AS topic_monthly_percentage
FROM
    (SELECT
        o.user_1,
        o.months,
        o.topic,
        SUM(p.topic_accu) OVER (PARTITION BY o.user_1) AS topic_accu,
        ((SUM(topic_accu) OVER (PARTITION BY o.user_1, o.months, o.topic)) / (SUM(topic_accu) OVER (PARTITION BY o.user_1, o.months))) AS topic_monthly_percentage
    FROM
        old_cust o
    LEFT JOIN
        pre_profile p
        ON p.user_1 = o.user_1
        AND o.months = p.cust_age
        AND o.topic = p.Classification)
GROUP BY user_1, months, topic, topic_accu
ORDER BY user_1, months, topic
"""
userProfile = spark.sql(query)
userProfile.createOrReplaceTempView("userProfile")

In [0]:
# pivot user profile, make it suitable for analytical works
userProfile_pivoted = userProfile\
  .groupBy("user_1", "months", "topic_accu")\
  .pivot("topic")\
  .avg("topic_monthly_percentage")\
  .sort("user_1", "months")

userProfile_pivoted.createOrReplaceTempView("userProfile_pivoted")

In [0]:
spark.sql("select * from userProfile_pivoted").show(50)

## Q5 : Write a script to find a user’s friends and friends of friends (Friend definition: A user’s friend is someone who has transacted with the user, either sending money to the user or receiving money from the user). Describe your algorithm and calculate its computational complexity. Can you do it better?

In [0]:
# friends
query = """
select distinct * from
(select distinct user_1,user_2 from venmodata
union
select distinct user_2,user_1 from venmodata)
order by user_1
"""
Friends = spark.sql(query)

In [0]:
Friends.show(5)

+------+-------+
|user_1| user_2|
+------+-------+
|    10|  36523|
|    10|     43|
|    10| 133032|
|    10|3844713|
|    10|     13|
+------+-------+
only showing top 5 rows



In [0]:
# friends of friends
query = """
select * from
(select distinct user_1,user_2 from venmodata) as a
join
(select distinct user_2,user_1 from venmodata) as b
on a.user_2 = b.user_2 
where a.user_1<>b.user_1
order by a.user_1
"""
FriendsOfFriends = spark.sql(query)

In [0]:
FriendsOfFriends.show(5)

+------+------+------+-------+
|user_1|user_2|user_2| user_1|
+------+------+------+-------+
|    10|    43|    43|  47104|
|    10|    43|    43|1491816|
|    10|    43|    43|     96|
|    10|    43|    43|  72106|
|    10|    43|    43|  82697|
+------+------+------+-------+
only showing top 5 rows



In [0]:
# get the friends of friends different from friends
query = """
select friends.user_1,friends.user_2,fof.fofs
from
(select distinct * from
(select distinct user_1,user_2 from venmodata
union
select distinct user_2,user_1 from venmodata)) as friends
join
(select * from
(select distinct user_1,user_2 from venmodata) as a
join
(select distinct user_2,user_1 as fofs from venmodata) as b
on a.user_2 = b.user_2 
where a.user_1<>b.fofs) as fof
using (user_1)
where friends.user_2<>fof.fofs
"""
network = spark.sql(query)

In [0]:
network.show(5)

+-------+------+-------+
| user_1|user_2|   fofs|
+-------+------+-------+
|1000240|899483|1070763|
|1000240|899483| 564546|
|1000240|899483| 731901|
|1000280|627873| 385136|
|1000280|627873| 766966|
+-------+------+-------+
only showing top 5 rows



## Q6 i): Number of friends and number of friends of friends

In [0]:
# create an undirected transaction history and calculate customer's age
query = \
"""
SELECT
    *,
    CASE WHEN datetime = (MIN(datetime) OVER (PARTITION BY user1)) THEN 0
         ELSE CEIL(DATEDIFF(datetime, (MIN(datetime) OVER (PARTITION BY user1)))/30) 
         END AS cust_age
FROM
    (SELECT
        *,
        MIN(datetime) OVER (PARTITION BY user1) AS join_date
    FROM
        (SELECT
            user1, user2, datetime, story_id
        FROM
            venmodata
        UNION ALL
        SELECT
            user1 AS user2,
            user2 AS user1,
            datetime,
            story_id
        FROM
            venmodata))
"""

customerAge = spark.sql(query)
customerAge.cache()
customerAge.createOrReplaceTempView("customerAge")

In [0]:
# number of friend at each month
query = \
"""
SELECT
    user1,
    cust_age,
    count(*) OVER (PARTITION BY user1 ORDER BY cust_age ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS num_of_friend
FROM
    customerAge
GROUP BY user1, cust_age
ORDER BY user1, cust_age
"""
fof = spark.sql(query)
fof.show(5000)

In [0]:
# number of friend of friend at each month
query = \
"""
SELECT
    t1.user1,
    t1.cust_age,
    count(*) OVER (PARTITION BY t1.user1 ORDER BY t1.cust_age ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS num_of_friend_of_friend
FROM
    customerAge t1
LEFT JOIN
    customerAge t2
ON
    t2.user1 = t1.user2
WHERE
    t2.user2 != t1.user1
GROUP BY t1.user1, t1.cust_age
ORDER BY t1.user1, t1.cust_age
"""
fofof = spark.sql(query)
fofof.show(5000)

## ii) Clustering coefficient of a user's network

In [0]:
!pip install networkx



In [0]:
import networkx as nx
G = nx.Graph()

In [0]:
# get nodes
nodes = sqlContext.sql('select distinct * from (select distinct user_1 from venmodata union select distinct user_2 from venmodata)')

In [0]:
nodes.show()

+-------+
| user_1|
+-------+
|1218774|
|2954885|
|2517841|
|2073451|
|2413583|
| 629853|
|2594217|
|1912151|
|1676425|
|3817326|
|1404141|
|4109722|
|7209296|
| 244335|
|2325198|
|2515048|
|4168921|
|4414119|
| 709070|
|8671440|
+-------+
only showing top 20 rows



In [0]:
# adjust format
import numpy as np
nodes = np.array(nodes.select('user_1').collect())
nodes = nodes.flatten()

In [0]:
# add nodes to graph
G.add_nodes_from(nodes)

In [0]:
# get edges
edges = sqlContext.sql('select distinct user_1,user_2 from venmodata')

In [0]:
# adjust edges format
edges = edges.toPandas().to_records(index=False)

In [0]:
# add edges
G.add_edges_from(edges)

In [0]:
G.number_of_nodes()

3018657

In [0]:
G.number_of_edges()

4997781

In [0]:
# get clustering coefficient
clustering_coefficient = nx.clustering(G)

In [0]:
import pandas as pd
ClusteringCoefficient = pd.DataFrame(clustering_coefficient,index=['user','clustering_coefficient'])

## iii) Calculate the page rank of each user

In [0]:
from graphframes import *

In [0]:
# get nodes
nodes = sqlContext.sql('select distinct * from (select distinct user1 from venmodata union select distinct user2 from venmodata)')
nodes = nodes.select(nodes.user1.alias('id'))

In [0]:
# get edges
edges = sqlContext.sql('select distinct user1,user2 from venmodata')
edges = edges.select(edges.user1.alias('src'),edges.user1.alias('dst'))

In [0]:
g = GraphFrame(nodes, edges)

In [0]:
# get pagerank
results = g.pageRank(resetProbability=0.15, tol=0.01)
pagerank = results.vertices

## Q7: Create your dependent variable Y, i.e. the total number of transactions at lifetime point 12

In [0]:
query =\
"""
SELECT
    user_1, months,topic_accu, Activity, Cash, Event, Food, `Illegal/Sarcasm`, People, Transportation, Travel, Utility, `no conclusion`,
    friend,
    friend_of_friend,
    (friend + friend_of_friend) AS total_connections,
    pagerank,
    clustering_coefficient
FROM
    (SELECT
      upp.*,
      SUM(num_of_friend) OVER (PARTITION BY upp.user_1 ORDER BY months ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS friend,
      IF(SUM(num_of_friend_of_friend) OVER (PARTITION BY upp.user_1 ORDER BY months ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) IS NULL, 0, SUM(num_of_friend_of_friend) OVER (PARTITION BY user_1 ORDER BY months ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) AS friend_of_friend,
      pagerank,
      clustering_coefficient
    FROM
      userProfile_pivoted upp
    LEFT JOIN
      fof
    ON upp.user_1 = fof.user1
    AND upp.months = fof.cust_age
    LEFT JOIN
      fofof
    ON upp.user_1 = fofof.user1
    AND upp.months = fofof.cust_age
    LEFT JOIN
      pagerank p
    ON upp.user_1 = p.id
    LEFT JOIN
      ClusteringCoefficient cc
    ON cc.user_1 = upp.user_1
    ORDER BY
      user_1, months)
"""

venmoComplete = spark.sql(query)
venmoComplete.createOrReplaceTempView("venmoComplete")
sqlContext.cacheTable("venmoComplete")

## Q8: Create the recency and frequency variables. In CRM, this predictive framework is known as RFM. Here, you don’t have monetary amounts, so we will focus on just RF. Recency refers to the last time a user was active, and frequency is how often a user uses Venmo in a month. You need to compute these metrics across a user’s lifetime in Venmo (from 0 up to 12)

In [0]:
query = \
"""
SELECT
    t2.user_1,
    t2.months,
    IF(LAST(row_recency) IS NULL, 30, LAST(row_recency)) AS recency,
    IF(LAST(frequency) IS NULL, 0, LAST(frequency)) AS frequency
FROM
    (SELECT
        user1, 
        cust_age,
        DATEDIFF(end_of_month, datetime) AS row_recency,
        frequency
    FROM
        (SELECT
            *,
            (COUNT(*) OVER (PARTITION BY user1, cust_age))/30 AS frequency,
            ADD_MONTHS(join_date, cust_age) AS end_of_month
        FROM
            customerAge)
    ORDER BY
        user1, cust_age, row_recency DESC) t1
RIGHT JOIN
    (SELECT user_1, months
    FROM old_cust
    GROUP BY user_1, months
    ORDER BY user_1, months) t2
ON t1.user1 = t2.user_1 AND t1.cust_age = t2.months
GROUP BY
    t2.user_1, months
ORDER BY
        t2.user_1, months, recency DESC
"""
rfm = spark.sql(query)
rfm.cache()
rfm.createOrReplaceTempView("rfm")

In [0]:
rfm.show(5000)

## Q9:  For each user’s lifetime point, regress recency and frequency on Y. Plot the MSE for each lifetime point. In other words, your x-axis will be lifetime in months (0-12), and your y- axis will be the MSE.

In [0]:
# combine rfm to user profile
query =\
"""
SELECT
    v.user_1,
    v.months,
    recency,
    frequency,
    topic_accu,
    Activity,
    Cash,
    Event,
    Food,
    `Illegal/Sarcasm`,
    People,
    Transportation,
    Travel,
    Utility,
    `no conclusion`,
    friend,
    friend_of_friend,
    total_connections
FROM
    venmoComplete v
LEFT JOIN
    rfm r
ON v.user_1 = r.user_1 AND v.months = r.months

"""
venmoFinal = spark.sql(query)
venmoFinal.createOrReplaceTempView("venmoFinal")
venmoFinal.cache()

In [0]:
venmoFinal.show()

In [0]:
lr_data = venmoFinal.select("recency", "frequency", venmoFinal.topic_accu.alias("label"))
train, test = lr_data.randomSplit([0.7, 0.3], seed=23333)

In [0]:
assembler = VectorAssembler().setInputCols(['recency', "frequency"]).setOutputCol("features")
train01 = assembler.transform(train)
test01 = assembler.transform(test)

In [0]:
paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.fitIntercept, [False, True])\
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .build()

In [0]:
lr = LinearRegression(featuresCol = 'features', labelCol='label', maxIter=10, regParam=0.3)
lr_model = lr.fit(train01)

In [0]:
# training set result
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

In [0]:
# validation
lr_predictions = lr_model.transform(test01)
lr_predictions.select("prediction","label","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="label",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

In [0]:
# MSE on test set
test_result = lr_model.evaluate(test01)
print("Mean Squared Error (MSE) on test data = %g" % test_result.meanSquaredError)

## Q10: For each user’s lifetime point, regress recency, frequency AND her spending behavior profile on Y. Plot the MSE for each lifetime point like above. Did you get any improvement?

In [0]:
lr_data02 = venmoFinal.select("recency", "frequency", "Activity", "Cash", "Event", "Food", "Illegal/Sarcasm", "People", "Transportation", "Travel", "Utility", "no conclusion" , venmoFinal.topic_accu.alias("label"))
train02, test02 = lr_data02.randomSplit([0.7, 0.3], seed=23333)

In [0]:
assembler02 = VectorAssembler().setInputCols(["recency", "frequency", "Activity", "Cash", "Event", "Food", "Illegal/Sarcasm", "People", "Transportation", "Travel", "Utility", "no conclusion"]).setOutputCol("features")
train02 = assembler02.transform(train02)
test02 = assembler02.transform(test02)
lr_model02 = lr.fit(train02)

In [0]:
# training set result
print("Coefficients: " + str(lr_model02.coefficients))
print("Intercept: " + str(lr_model02.intercept))
trainingSummary02 = lr_model02.summary
print("RMSE: %f" % trainingSummary02.rootMeanSquaredError)
print("r2: %f" % trainingSummary02.r2)

In [0]:
test_result02 = lr_model02.evaluate(test02)
print("Mean Squared Error (MSE) on test data = %g" % test_result02.meanSquaredError)

## Q11: For each user’s lifetime point, regress her social network metrics on Y. Plot the MSE for each lifetime point like above. What do you observe? How do social network metrics compare with the RF framework? What are the most informative predictors?

In [0]:
lr_data03 = venmoFinal.select("clustering_coefficient，", "pagerank", "friend", "friend_of_friend", venmoFinal.topic_accu.alias("label"))
train03, test03 = lr_data03.randomSplit([0.7, 0.3], seed=23333)

In [0]:
assembler03 = VectorAssembler().setInputCols(["clustering_coefficient，", "pagerank", "friend", "friend_of_friend"]).setOutputCol("features")
train03 = assembler03.transform(train03)
test03 = assembler03.transform(test03)
lr_model03 = lr.fit(train03)

In [0]:
# training set result
print("Coefficients: " + str(lr_model03.coefficients))
print("Intercept: " + str(lr_model03.intercept))
trainingSummary03 = lr_model03.summary
print("RMSE: %f" % trainingSummary03.rootMeanSquaredError)
print("r2: %f" % trainingSummary03.r2)

In [0]:
test_result03 = lr_model03.evaluate(test03)
print("Mean Squared Error (MSE) on test data = %g" % test_result03.meanSquaredError)

## Q12: For each user’s lifetime point, regress her social network metrics and the spending behavior of her social network on Y. Plot the MSE for each lifetime point like above. Does the spending behavior of her social network add any predictive benefit compared to Q10?

In [0]:
lr_data04 = venmoFinal.select("clustering_coefficient，", "pagerank", "friend", "friend_of_friend", "recency", "frequency", "Activity", "Cash", "Event", "Food", "Illegal/Sarcasm", "People", "Transportation", "Travel", "Utility", "no conclusion", venmoFinal.topic_accu.alias("label"))
train04, test04 = lr_data04.randomSplit([0.7, 0.3], seed=23333)

In [0]:
assembler04 = VectorAssembler().setInputCols(["clustering_coefficient，", "pagerank", "friend", "friend_of_friend", "recency", "frequency", "Activity", "Cash", "Event", "Food", "Illegal/Sarcasm", "People", "Transportation", "Travel", "Utility", "no conclusion"]).setOutputCol("features")
train04 = assembler04.transform(train04)
test04 = assembler04.transform(test04)
lr_model04 = lr.fit(train04)

In [0]:
# training set result
print("Coefficients: " + str(lr_model04.coefficients))
print("Intercept: " + str(lr_model04.intercept))
trainingSummary04 = lr_model04.summary
print("RMSE: %f" % trainingSummary04.rootMeanSquaredError)
print("r2: %f" % trainingSummary04.r2)

In [0]:
test_result04 = lr_model04.evaluate(test03)
print("Mean Squared Error (MSE) on test data = %g" % test_result04.meanSquaredError)