In [1]:
# Import package
import findspark
findspark.init()
import pyspark
from pyspark import SparkConf, SparkContext
import pandas as pd
from pyspark.sql.functions import expr
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType, IntegerType, FloatType
from pyspark.sql.functions import *
from pyspark.sql.functions import count
from pyspark.sql import Window
from pyspark.sql.functions import concat
from pyspark.sql import SparkSession
import pandas as pd
import matplotlib.pyplot as plt

In [2]:
# Add a file path for Java Home
import os
os.environ["JAVA_HOME"] = "/Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home"

In [3]:
# Import Venmo data
MAX_MEMORY = '12g'
spark = SparkSession\
        .builder\
        .appName('venmo')\
        .config("spark.driver.memory", MAX_MEMORY) \
        .getOrCreate()
venmo=spark.read.parquet('VenmoSample.snappy.parquet')
venmo.show(5)

+-------+-------+----------------+-------------------+------------+-----------+--------------------+
|  user1|  user2|transaction_type|           datetime| description|is_business|            story_id|
+-------+-------+----------------+-------------------+------------+-----------+--------------------+
|1218774|1528945|         payment|2015-11-27 02:48:19|        Uber|      false|5657c473cd03c9af2...|
|5109483|4782303|         payment|2015-06-17 04:37:04|      Costco|      false|5580f9702b64f70ab...|
|4322148|3392963|         payment|2015-06-19 00:05:31|Sweaty balls|      false|55835ccb1a624b14a...|
| 469894|1333620|          charge|2016-06-03 16:34:13|          🎥|      false|5751b185cd03c9af2...|
|2960727|3442373|         payment|2016-05-29 16:23:42|           ⚡|      false|574b178ecd03c9af2...|
+-------+-------+----------------+-------------------+------------+-----------+--------------------+
only showing top 5 rows



**Text Analytics**

**1: Use the text dictionary and the emoji dictionary to classify Venmo’s transactions in sample dataset.**

In [4]:
# Import the stacked dictionary contained all the text and emojis
dictionary=spark.read.csv('text_all.csv',header=True)
dictionary.show(5)

+----------+---------+---------+--------+-------+---------------+-----------+-----+----------------+
|   People |    Food |   Event |Activity|Travel |Transportation |   Utility | Cash|Illegal/Sarcasm |
+----------+---------+---------+--------+-------+---------------+-----------+-----+----------------+
|    friend|     food| birthday|    ball|  beach|           lyft|       bill| atm |       addiction|
|friendship|      bbq|christmas|    boat|  place|           uber|      cable|bank |            drug|
|      baby|     bean|    happy|     bar|     la|            cab|        fee|cash |           wangs|
|       boy|    latte|     bday|    book|  world|            bus|   electric|money|            weed|
|      girl|breakfast|  wedding|    club|  hotel|            car|electricity| buck|            anal|
+----------+---------+---------+--------+-------+---------------+-----------+-----+----------------+
only showing top 5 rows



In [5]:
# Create lists for dictionary
People=[row[0] for row in dictionary.select(dictionary.columns[0]).collect()]
Food=[row[0] for row in dictionary.select(dictionary.columns[1]).collect()]
Event=[row[0] for row in dictionary.select(dictionary.columns[2]).collect()]
Activity=[row[0] for row in dictionary.select(dictionary.columns[3]).collect()]
Travel=[row[0] for row in dictionary.select(dictionary.columns[4]).collect()]
Trans=[row[0] for row in dictionary.select(dictionary.columns[5]).collect()]
Utility=[row[0] for row in dictionary.select(dictionary.columns[6]).collect()]
Cash=[row[0] for row in dictionary.select(dictionary.columns[7]).collect()]
Illegal=[row[0] for row in dictionary.select(dictionary.columns[8]).collect()]

In [6]:
# Define a function to classify the words
def word_type(x):
    categories=[]
    for var in x:
        if var in People:
            categories.append('People')
        if var in Food:
            categories.append('Food')
        if var in Event:
            categories.append('Event')
        if var in Activity:
            categories.append('Activity')
        if var in Travel:
            categories.append('Travel')
        if var in Trans:
            categories.append('Trans')
        if var in Utility:
            categories.append('Utility')
        if var in Cash:
            categories.append('Cash')
        if var in Illegal:
            categories.append('Illegal')
    return categories

In [7]:
# Tokenized the description of Venmo data
from pyspark.ml.feature import Tokenizer
tokenizer = Tokenizer(inputCol="description", outputCol="words") 
tokenized = tokenizer.transform(venmo)
tokenized.show(5)

+-------+-------+----------------+-------------------+------------+-----------+--------------------+---------------+
|  user1|  user2|transaction_type|           datetime| description|is_business|            story_id|          words|
+-------+-------+----------------+-------------------+------------+-----------+--------------------+---------------+
|1218774|1528945|         payment|2015-11-27 02:48:19|        Uber|      false|5657c473cd03c9af2...|         [uber]|
|5109483|4782303|         payment|2015-06-17 04:37:04|      Costco|      false|5580f9702b64f70ab...|       [costco]|
|4322148|3392963|         payment|2015-06-19 00:05:31|Sweaty balls|      false|55835ccb1a624b14a...|[sweaty, balls]|
| 469894|1333620|          charge|2016-06-03 16:34:13|          🎥|      false|5751b185cd03c9af2...|           [🎥]|
|2960727|3442373|         payment|2016-05-29 16:23:42|           ⚡|      false|574b178ecd03c9af2...|            [⚡]|
+-------+-------+----------------+-------------------+------------

In [8]:
# Use withColumn to add category
word_type_udf=F.udf(lambda y: word_type(y), ArrayType(StringType()))

In [9]:
tokenized=tokenized.withColumn('category',word_type_udf(F.col('words')))
tokenized.show(5)

+-------+-------+----------------+-------------------+------------+-----------+--------------------+---------------+---------+
|  user1|  user2|transaction_type|           datetime| description|is_business|            story_id|          words| category|
+-------+-------+----------------+-------------------+------------+-----------+--------------------+---------------+---------+
|1218774|1528945|         payment|2015-11-27 02:48:19|        Uber|      false|5657c473cd03c9af2...|         [uber]|  [Trans]|
|5109483|4782303|         payment|2015-06-17 04:37:04|      Costco|      false|5580f9702b64f70ab...|       [costco]|   [Food]|
|4322148|3392963|         payment|2015-06-19 00:05:31|Sweaty balls|      false|55835ccb1a624b14a...|[sweaty, balls]|       []|
| 469894|1333620|          charge|2016-06-03 16:34:13|          🎥|      false|5751b185cd03c9af2...|           [🎥]|  [Event]|
|2960727|3442373|         payment|2016-05-29 16:23:42|           ⚡|      false|574b178ecd03c9af2...|            [

**2: What is the percent of emoji only transactions? 14.4%**

**Which are the top 5 most popular emoji? 🍕🍻🍴🍺⛽**

**Which are the top three most popular emoji categories? Food, People, Activity**



In [10]:
# Create two columns save the emojis and words seperately
import emoji
is_emoji = lambda x: [val for val in x if val in emoji.UNICODE_EMOJI ]
is_not_emoji = lambda x: [val for val in x if val not in emoji.UNICODE_EMOJI ]
is_emoji_udf = udf(lambda z: is_emoji(z), ArrayType(StringType()))
is_not_emoji_udf = udf(lambda z: is_not_emoji(z), ArrayType(StringType()))
description_list=tokenized.select('user1','user2','datetime', is_emoji_udf('words').alias('emojis'),is_not_emoji_udf('words').alias('words'))

In [11]:
description_list.show(5)

+-------+-------+-------------------+------+---------------+
|  user1|  user2|           datetime|emojis|          words|
+-------+-------+-------------------+------+---------------+
|1218774|1528945|2015-11-27 02:48:19|    []|         [uber]|
|5109483|4782303|2015-06-17 04:37:04|    []|       [costco]|
|4322148|3392963|2015-06-19 00:05:31|    []|[sweaty, balls]|
| 469894|1333620|2016-06-03 16:34:13|  [🎥]|             []|
|2960727|3442373|2016-05-29 16:23:42|   [⚡]|             []|
+-------+-------+-------------------+------+---------------+
only showing top 5 rows



In [12]:
# Define a function to see whether the words is a empty list, if yes, then it means this description only contains emoji
def emoji_only(x):
    list_length=len(x)
    if list_length:
        return 0
    else:
        return 1

In [13]:
emoji_only_udf=udf(lambda z:emoji_only(z),IntegerType())
description_list.select('user1','user2','datetime','emojis','words',emoji_only_udf('words').alias('emoji_only')).show()

+--------+-------+-------------------+------+--------------------+----------+
|   user1|  user2|           datetime|emojis|               words|emoji_only|
+--------+-------+-------------------+------+--------------------+----------+
| 1218774|1528945|2015-11-27 02:48:19|    []|              [uber]|         0|
| 5109483|4782303|2015-06-17 04:37:04|    []|            [costco]|         0|
| 4322148|3392963|2015-06-19 00:05:31|    []|     [sweaty, balls]|         0|
|  469894|1333620|2016-06-03 16:34:13|  [🎥]|                  []|         1|
| 2960727|3442373|2016-05-29 16:23:42|   [⚡]|                  []|         1|
| 3977544|2709470|2016-09-29 15:12:07|    []|        [chipotlaid]|         0|
| 3766386|4209061|2016-05-20 03:31:15|    []|  [kitchen, counter]|         0|
|  730075| 804466|2016-05-25 21:46:45|    []|              [food]|         0|
| 5221751|4993533|2016-07-14 15:53:49|    []|             [zaxby]|         0|
| 6843582|7308338|2016-08-31 03:32:46|    []|        [fan, sucks]

In [14]:
emoji_only_table=description_list.select('user1','user2','datetime','emojis','words',emoji_only_udf('words').alias('emoji_only'))

In [15]:
emoji_only_table.where('emoji_only=1').count()

1027597

In [16]:
# The percentage of emoji only transaction
1027597/description_list.count()

0.14446467149444753

Top 5 emojis

In [45]:
is_emoji = lambda x: [val for val in x if val in emoji.UNICODE_EMOJI ]
is_emoji_udf = udf(lambda z: is_emoji(z), StringType())
emoji_string=tokenized.select('user1','user2','datetime', is_emoji_udf('words').alias('emojis'))

In [53]:
emoji_string.show(5)

+-------+-------+-------------------+------+
|  user1|  user2|           datetime|emojis|
+-------+-------+-------------------+------+
|1218774|1528945|2015-11-27 02:48:19|    []|
|5109483|4782303|2015-06-17 04:37:04|    []|
|4322148|3392963|2015-06-19 00:05:31|    []|
| 469894|1333620|2016-06-03 16:34:13|  [🎥]|
|2960727|3442373|2016-05-29 16:23:42|   [⚡]|
+-------+-------+-------------------+------+
only showing top 5 rows



In [47]:
emoji_string.createOrReplaceTempView('emoji_string')

In [54]:
# Find the top 5 popolar emojis
spark.sql('''select emojis, count(emojis) from emoji_string where emojis is not null group by emojis order by count(emojis) desc''').show(6)

+------+-------------+
|emojis|count(emojis)|
+------+-------------+
|    []|      5677672|
|  [🍕]|        55226|
|  [🍻]|        43113|
|  [🍴]|        34172|
|  [🍺]|        28163|
|   [⛽]|        23596|
+------+-------------+
only showing top 6 rows



Top 3 category

In [56]:
# Use the categorize function again to find the category for emojis
emoji_string_category=emoji_string.withColumn('category',word_type_udf(F.col('emojis')))

+--------+-------+-------------------+------+---------+
|   user1|  user2|           datetime|emojis| category|
+--------+-------+-------------------+------+---------+
| 1218774|1528945|2015-11-27 02:48:19|    []|       []|
| 5109483|4782303|2015-06-17 04:37:04|    []|       []|
| 4322148|3392963|2015-06-19 00:05:31|    []|       []|
|  469894|1333620|2016-06-03 16:34:13|  [🎥]|  [Event]|
| 2960727|3442373|2016-05-29 16:23:42|   [⚡]|[Utility]|
| 3977544|2709470|2016-09-29 15:12:07|    []|       []|
| 3766386|4209061|2016-05-20 03:31:15|    []|       []|
|  730075| 804466|2016-05-25 21:46:45|    []|       []|
| 5221751|4993533|2016-07-14 15:53:49|    []|       []|
| 6843582|7308338|2016-08-31 03:32:46|    []|       []|
| 5317324|3942984|2016-01-04 01:11:25|  [👠]| [People]|
| 1134661|1556430|2015-10-08 18:53:52|    []|       []|
| 4238868|4879587|2015-10-04 01:28:01|  [🍺]|   [Food]|
|11719500|8702716|2016-07-07 14:40:39|   [⛽]|  [Trans]|
| 3625798|5692302|2016-10-16 07:43:41|    []|      

In [57]:
# Remove the rows with empty category
emoji_string_category=emoji_string_category.filter(F.size('category') > 0) # F.size can be used to see the length of column
emoji_string_category.show()

+--------+-------+-------------------+------+----------+
|   user1|  user2|           datetime|emojis|  category|
+--------+-------+-------------------+------+----------+
|  469894|1333620|2016-06-03 16:34:13|  [🎥]|   [Event]|
| 2960727|3442373|2016-05-29 16:23:42|   [⚡]| [Utility]|
| 5317324|3942984|2016-01-04 01:11:25|  [👠]|  [People]|
| 4238868|4879587|2015-10-04 01:28:01|  [🍺]|    [Food]|
|11719500|8702716|2016-07-07 14:40:39|   [⛽]|   [Trans]|
| 5696834|1623756|2016-05-05 19:19:29|  [🎫]|[Activity]|
| 2743865|2896157|2015-06-10 02:30:11|  [🍕]|    [Food]|
| 4771561|1597177|2015-11-08 05:15:22|  [🎉]|[Activity]|
| 1217777|1080491|2015-10-24 23:52:34|  [🌮]|    [Food]|
|  349302|2592800|2016-08-07 23:46:16|  [🍵]|    [Food]|
| 2271571| 161995|2016-07-08 19:29:28|  [🎉]|[Activity]|
| 2725725|4754438|2016-10-10 22:53:07|   [☕]|    [Food]|
| 6324245| 681532|2016-05-19 03:19:46|  [🍼]|    [Food]|
| 4897128|3575816|2015-12-16 18:47:53|  [🎁]|[Activity]|
| 6245009|1383925|2016-06-03 00:46:07|  [🤓

In [58]:
# Find the top 3 category with emojis
emoji_string_category.createOrReplaceTempView('emoji_string_category')
spark.sql('''select category, count(category) from emoji_string_category group by category order by count(category) desc''').show(3)

+----------+---------------+
|  category|count(category)|
+----------+---------------+
|    [Food]|         409379|
|  [People]|         313646|
|[Activity]|         108095|
+----------+---------------+
only showing top 3 rows



**3: For each user, create a variable to indicate their spending behavior profile. For
example, if a user has made 10 transactions, where 5 of them are food and the other 5 are
activity, then the user’s spending profile will be 50% food and 50% activity.**

In [10]:
user_category=tokenized.select('user1','user2','datetime','category')
user_category.show(5)

+-------+-------+-------------------+---------+
|  user1|  user2|           datetime| category|
+-------+-------+-------------------+---------+
|1218774|1528945|2015-11-27 02:48:19|  [Trans]|
|5109483|4782303|2015-06-17 04:37:04|   [Food]|
|4322148|3392963|2015-06-19 00:05:31|       []|
| 469894|1333620|2016-06-03 16:34:13|  [Event]|
|2960727|3442373|2016-05-29 16:23:42|[Utility]|
+-------+-------+-------------------+---------+
only showing top 5 rows



In [11]:
# Remove the rows with no category 
user_category=user_category.filter(F.size('category')>0)

In [19]:
user_category.count()

3326219

In [12]:
# Order by the user id and category
user_category.createOrReplaceTempView('user_category')
user_category_order=spark.sql('''select * from user_category order by user1,category''')

In [13]:
user_category_order.show(5)

+-----+-------+-------------------+--------------------+
|user1|  user2|           datetime|            category|
+-----+-------+-------------------+--------------------+
|    3|1204190|2016-10-08 18:56:24|              [Food]|
|    3|     52|2016-09-22 08:30:09|            [People]|
|    3|7854140|2016-10-08 20:36:13|           [Utility]|
|    4| 968271|2014-02-03 22:51:33|[Activity, Activity]|
|    4| 125527|2012-12-14 21:51:12|              [Food]|
+-----+-------+-------------------+--------------------+
only showing top 5 rows



In [35]:
user1_category_order=user_category_order.select('user1', F.explode('category').alias("category"))\
                                       .groupBy('user1','category')\
                                       .count()\
                                       .select('user1','category', 'count',F.sum('count').over(Window.partitionBy("user1")).alias('total_count'))\
                                       .sort('user1', 'category')

In [36]:
# Calculate the proprotion of each category
user1_category_order=user1_category_order.select('user1','category',(100*(round(((col("count") /col("total_count"))),2))).alias("proportion_%"))

In [37]:
# Pivot the table
user1_category_order=user1_category_order.groupBy('user1')\
                       .pivot("category").sum("proportion_%").sort("user1")
user1_category_order.show()

+-----+--------+----+-----+-----+-------+------+-----------------+------------------+-------+
|user1|Activity|Cash|Event| Food|Illegal|People|            Trans|            Travel|Utility|
+-----+--------+----+-----+-----+-------+------+-----------------+------------------+-------+
|    3|    null|null| null| 33.0|   null|  33.0|             null|              null|   33.0|
|    4|    33.0|null| null| 33.0|   17.0|  null|             null|              17.0|   null|
|   10|    10.0|null| null| 60.0|   null|  20.0|             10.0|              null|   null|
|   11|    18.0|null| 27.0|  9.0|   null|  27.0|             null|               9.0|    9.0|
|   12|    20.0|null| 20.0| 20.0|   null|  null|             null|              null|   40.0|
|   13|    15.0|null| 11.0| 22.0|    4.0|  26.0|7.000000000000001|               4.0|   11.0|
|   16|    null|null| null| 86.0|   null|  null|             null|14.000000000000002|   null|
|   19|    33.0|null| 33.0| null|   null|  null|            

**4: In the previous question, you got a static spending profile. However, life and social
networks are evolving over time. Therefore, let’s explore how a user’s spending profile is
evolving over her lifetime in Venmo. First of all, you need to analyze a user’s transactions in
monthly intervals, starting from 0 (indicating their first transaction only ) up to 12.**

In [13]:
# Create table for user profile every month

# Break the datetime into months
user_profile=user_category_order.selectExpr('user1',"CAST( MONTHS_BETWEEN(datetime, FIRST_VALUE(datetime) OVER (PARTITION BY user1 ORDER BY datetime)) as INT) as month",'category')
# Filter months that are less than 12
user_profile=user_profile.filter(F.col('month')<=12).sort('month')

In [14]:
# Explode the category
user_profile=user_profile.select('user1','month', F.explode('category').alias("category"))\
                         .groupBy('user1','month','category')\
                         .count()\
                         .select('user1','month','category','count', F.sum('count').over(Window.partitionBy("user1",'month')).alias('total_count'))\
                                       .sort('user1', 'month','category')

In [15]:
# Calculate the proportion
user_profile=user_profile.select("user1","month",'category','count','total_count',(100*(round(((col("count") /col("total_count"))),2))).alias("proportion_%"))
user_profile=user_profile.select("user1","month",'category',"proportion_%")

In [18]:
# Pivot the table
user_profile_yearly = user_profile.groupBy('user1','month')\
                       .pivot("category").sum("proportion_%").sort("user1",'month')
user_profile_yearly.repartition(1).write.mode('overwrite').parquet("/Users/calvin/Desktop/Spring Quarter/Big data/Homework 2/user_profile_yearly.parquet")

In [42]:
# Calculate the mean and standard deviation
profile_count=tokenized.selectExpr('user1',"CAST( MONTHS_BETWEEN(datetime, FIRST_VALUE(datetime) OVER (PARTITION BY user1 ORDER BY datetime)) as INT) as month",'category').sort('user1','month')
profile_count=profile_count.filter(F.col('month')<=12)

In [43]:
profile_count=profile_count.select('user1','month',F.explode('category').alias("category"))\
                           .groupBy('user1','month','category')\
                           .count()\
                           .select('user1','month','category','count')

In [44]:
profile_count.createOrReplaceTempView('profile_count')
profile_mean=spark.sql('''select month, category, avg(count), stddev(count) from profile_count where month<=12 group by category, month order by category, month''')
profile_mean_pd=profile_mean.toPandas()

In [50]:
profile_mean_pd.head(5)

Unnamed: 0,month,category,avg(count),stddev_samp(CAST(count AS DOUBLE))
0,0,Activity,1.146139,0.463561
1,1,Activity,1.129598,0.41917
2,2,Activity,1.124987,0.404048
3,3,Activity,1.127956,0.440148
4,4,Activity,1.131385,0.421853
