In [0]:
!pip install emojis
!pip install nltk
import emojis
import string
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from pyspark.sql import SparkSession
nltk.download('stopwords')
nltk.download('wordnet')
from nltk.stem import WordNetLemmatizer
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import  *
from pyspark.sql import DataFrame
from collections import Counter
from typing import Iterable
import matplotlib.pyplot as plt
import seaborn as sns

In [0]:
# initialise sparkContext
spark = SparkSession.builder \
        .master('local[*]') \
        .appName('Venmo') \
        .config('spark.executor.memory', '10gb') \
        .config("spark.cores.max", "8") \
        .config("spark.executor.cores", "6") \
        .getOrCreate()

In [0]:
spark.sparkContext.getConf().getAll()

In [0]:
print(int(spark.sparkContext.getConf().get('spark.executor.cores','1')))
print(spark._jsc.sc().getExecutorMemoryStatus().size())
print(len(spark.sparkContext._jsc.sc().statusTracker().getExecutorInfos()))

In [0]:
data = spark.read.csv("/FileStore/tables/All_files.csv", header = 'TRUE')
data.cache()   # cache to speed up following operations
display(data)

user1,user2,transaction_type,datetime,app_description,description,story_id
2482900494712832556,2.0824970011607048e+18,payment,2018-08-07T02:11:16,Venmo for iPhone,fuk ya,2.540405007614739e+18
2363395470786560486,2.538731244355585e+18,payment,2018-08-07T02:11:16,Venmo for iPhone,:venmo_dollar:,2.540405007799289e+18
2278060275531776951,1.9213155691397125e+18,payment,2018-08-07T02:11:15,Venmo for Android,🎉,2.540404999133857e+18
2085027231825920983,2.019594260709376e+18,payment,2018-08-07T02:11:15,Venmo for iPhone,Boyz,2.540405000417314e+18
2029336009900032272,2.0808953306808325e+18,payment,2018-08-07T02:11:15,Venmo for iPhone,🥩,2.5404049986976486e+18
2302008090427392306,2.22876301000704e+18,payment,2018-08-07T02:11:16,Venmo for iPhone,Internet,2.5404050089988593e+18
1843968157417472010,1.8635405498777605e+18,payment,2018-08-07T02:11:15,Venmo for iPhone,🤗,2.540405002774512e+18
2127116921470976995,1.4062062917386248e+18,payment,2018-08-07T02:11:15,Venmo for iPhone,🚕🚕🚕,2.540405002589963e+18
2354110858788864178,2.423820619087873e+18,payment,2018-08-07T02:11:16,Venmo for iPhone,🤬,2.540405008562652e+18
2308452848238592156,2.4677613138083845e+18,payment,2018-08-07T02:11:14,Venmo for Android,seltzer,2.5404049945368986e+18


In [0]:
# increase partition to speed up
data = data.repartition(200)
data.rdd.getNumPartitions()

In [0]:
data.printSchema()

In [0]:
data.count()

In [0]:
text_dic = spark.read.format("csv") \
  .option("header", "true")  \
  .option("sep", ",") \
  .load("/FileStore/tables/Word_Classification_Dict.csv")
display(text_dic)

In [0]:
emoji_dic = spark.read.format("csv") \
  .option("header", "true")  \
  .option("sep", ",") \
  .load("/FileStore/tables/Emoji_Classification_Dictionary.csv")
display(emoji_dic)

In [0]:
# Convert columns to list
list(text_dic.select('People').dropna(how='all').toPandas()['People'])
list(text_dic.select('Food').dropna(how='all').toPandas()['Food'])
list(text_dic.select('Event').dropna(how='all').toPandas()['Event'])
list(text_dic.select('Activity').dropna(how='all').toPandas()['Activity'])
list(text_dic.select('Travel').dropna(how='all').toPandas()['Travel'])
list(text_dic.select('Transportation').dropna(how='all').toPandas()['Transportation'])
list(text_dic.select('Utility').dropna(how='all').toPandas()['Utility'])
list(text_dic.select('Cash').dropna(how='all').toPandas()['Cash'])
list(text_dic.select('Illegal/Sarcasm').dropna(how='all').toPandas()['Illegal/Sarcasm'])

In [0]:
list(emoji_dic.select('People').dropna(how='all').toPandas()['People'])
list(emoji_dic.select('Food').dropna(how='all').toPandas()['Food'])
list(emoji_dic.select('Event').dropna(how='all').toPandas()['Event'])
list(emoji_dic.select('Activity').dropna(how='all').toPandas()['Activity'])
list(emoji_dic.select('Travel').dropna(how='all').toPandas()['Travel'])
list(emoji_dic.select('Transportation').dropna(how='all').toPandas()['Transportation'])
list(emoji_dic.select('Utility').dropna(how='all').toPandas()['Utility'])
list(emoji_dic.select('Cash').dropna(how='all').toPandas()['Cash'])
list(emoji_dic.select('Illegal/Sarcasm').dropna(how='all').toPandas()['Illegal/Sarcasm'])

In [0]:
# add index column and create temporary view
data = data.sort(col("user1").desc()).withColumn("index", monotonically_increasing_id())
data.createOrReplaceTempView("Venmo")
data.show()

In [0]:
# Using flatMap() on Spark DataFrame - https://sparkbyexamples.com/spark/spark-flatmap-usage-with-example/
people = text_dic.select('People ').rdd.flatMap(lambda x: x).collect()
food = text_dic.select('Food ').rdd.flatMap(lambda x: x).collect()
event = text_dic.select('Event ').rdd.flatMap(lambda x: x).collect()
activity = text_dic.select('Activity').rdd.flatMap(lambda x: x).collect()
travel = text_dic.select('Travel ').rdd.flatMap(lambda x: x).collect()
transportation = text_dic.select('Transportation ').rdd.flatMap(lambda x: x).collect()
utility = text_dic.select('Utility ').rdd.flatMap(lambda x: x).collect()
cash = text_dic.select('Cash').rdd.flatMap(lambda x: x).collect()
illegal = text_dic.select('Illegal/Sarcasm ').rdd.flatMap(lambda x: x).collect()

In [0]:
# Using flatMap() on Spark DataFrame - https://sparkbyexamples.com/spark/spark-flatmap-usage-with-example/
people_emoji = emoji_dic.select('People').rdd.flatMap(lambda x: x).collect()
food_emoji = emoji_dic.select('Food').rdd.flatMap(lambda x: x).collect()
event_emoji = emoji_dic.select('Event').rdd.flatMap(lambda x: x).collect()
activity_emoji = emoji_dic.select('Activity').rdd.flatMap(lambda x: x).collect()
travel_emoji = emoji_dic.select('Travel').rdd.flatMap(lambda x: x).collect()
transportation_emoji = emoji_dic.select('Transportation').rdd.flatMap(lambda x: x).collect()
utility_emoji = emoji_dic.select('Utility').rdd.flatMap(lambda x: x).collect()

In [0]:
# use the emojis package instead of the provided dictionary, not sure if it's allowed, need to check with professor
@udf
def convert_emojis(text):
    text = " ".join(emojis.decode(text).replace(":", " ").replace("_", "").split())
    return text

@udf
def rm_punctuation(text):
  return text.translate(str.maketrans("","", string.punctuation))

@udf
def lemmatize(text):
    lemmatized_array = []
    lemmatizer = WordNetLemmatizer()
    for word in text:
      lemmatized_array.append(lemmatizer.lemmatize(word))
    return lemmatized_array
lemmatize_udf = udf(lemmatize, ArrayType(StringType()))

In [0]:
data = data.withColumn("replace_emoji", convert_emojis(col("description")))
data = data.withColumn("rm_punctuation", rm_punctuation(col("replace_emoji")))
tokenizer = Tokenizer(inputCol="rm_punctuation", outputCol="tokenized")
data = tokenizer.transform(data)
remover = StopWordsRemover(inputCol="tokenized", outputCol="rm_stopwords")
data = remover.transform(data)
data = data.withColumn("text_preprocessed", lemmatize_udf(col("rm_stopwords")))

columns_to_drop = ['replace_emoji', 'tokenized', 'rm_stopwords', "rm_punctuation"]
data = data.drop(*columns_to_drop)
display(data)

In [0]:
# use array to store the categories for each description, then get the most common one
@udf
def classification(text):
  categories = []
  for word in text:
    if word in people:
      categories.append('People')
    if word in food:
      categories.append('Food')
    if word in event:
      categories.append('Event')
    if word in activity:
      categories.append('Activity')
    if word in travel:
      categories.append('Travel')
    if word in transportation:
      categories.append('Transportation')
    if word in utility:
      categories.append('Utility')
    if word in cash:
      categories.append('Cash')
    if word in illegal:
      categories.append('Illegal')
  if not categories:
    return "Not Classified"
  else:
    return Counter(categories).most_common(1)[0][0]

In [0]:
data = data.withColumn("classification", classification(col("text_preprocessed")))
display(data)

In [0]:
@udf
def emoji_only(text):
  if len(text) == emojis.count(text):
    return 1
  return 0

data = data.withColumn("emoji_only", emoji_only(col("description")))
display(data)

In [0]:
# Percantage of transactions are emoji only
percent_emoji_only = data.select(sum("emoji_only")/data.count()).show()
percent_emoji_only

In [0]:
def get_emoji(text):
  if emojis.get(text):
    return list(emojis.get(text))
get_emoji_udf = udf(get_emoji, ArrayType(StringType()))

data = data.withColumn("emojis", get_emoji_udf(col("description")))
display(data)

In [0]:
bag_of_emojis = data.filter(col("emojis").isNotNull()).select("emojis").rdd.flatMap(lambda x: x).collect()
bag_of_emojis_flatten = []
for item in bag_of_emojis:
  for emoji in item:
    bag_of_emojis_flatten.append(emoji)
bag_of_emojis_flatten

In [0]:
# the top 5 most popular emoji:
Counter(bag_of_emojis_flatten).most_common(5)

In [0]:
def get_emoji_category(emoji):
  if emoji in people_emoji:
    return 'People'
  elif emoji in food_emoji:
    return 'Food'
  elif emoji in event_emoji:
    return 'Event'
  elif emoji in activity_emoji:
    return 'Activity'
  elif emoji in travel_emoji:
    return 'Travel'
  elif emoji in transportation_emoji:
    return 'Transportation'
  elif emoji in utility_emoji:
    return 'Utility'
  else:
    return 'Not Classified'

# use dictionary to speed up the running time
emoji_category_dic = dict()
emoji_category_arr = []

for emoji in bag_of_emojis_flatten:
  if emoji not in emoji_category_dic:
    emoji_category_dic[emoji] = get_emoji_category(emoji)
    emoji_category_arr.append(emoji_category_dic[emoji])
  else:
    emoji_category_arr.append(emoji_category_dic[emoji])

emoji_category_arr = [x for x in emoji_category_arr if x != "Not Classified"]
emoji_category_arr

In [0]:
# the top three most popular emoji categories are Food, People, Activity
Counter(emoji_category_arr).most_common(3)

In [0]:
data.createOrReplaceTempView("datatable")

In [0]:
%sql
-- merge user1 and user2
CREATE TABLE union_table
USING HIVE
AS 
  SELECT user1 AS user, classification
  FROM datatable
  UNION ALL
  SELECT user2 AS user, classification
  FROM datatable

In [0]:
# assumping the denominator doesn't include transactions that can't be classified
spend_profile_df = sqlContext.sql(
  '''
  SELECT user, CONCAT_WS(", ", 
                    CASE WHEN count_activity != 0 THEN CONCAT(ROUND(count_activity * 100), "% Activity") ELSE NULL END, 
                    CASE WHEN count_people != 0 THEN CONCAT(ROUND(count_people * 100), "% People") ELSE NULL END, 
                    CASE WHEN count_event != 0 THEN CONCAT(ROUND(count_event * 100), "% Event") ELSE NULL END, 
                    CASE WHEN count_travel != 0 THEN CONCAT(ROUND(count_travel * 100), "% Travel") ELSE NULL END, 
                    CASE WHEN count_transportation != 0 THEN CONCAT(ROUND(count_transportation * 100), "% Transportation") ELSE NULL END, 
                    CASE WHEN count_food != 0 THEN CONCAT(ROUND(count_food * 100), "% Food") ELSE NULL END, 
                    CASE WHEN count_utility != 0 THEN CONCAT(ROUND(count_utility * 100), "% Utility") ELSE NULL END, 
                    CASE WHEN count_cash != 0 THEN CONCAT(ROUND(count_cash * 100), "% Cash") ELSE NULL END,
                    CASE WHEN count_illegal != 0 THEN CONCAT(ROUND(count_illegal * 100), "% Illegal") ELSE NULL END
              ) AS spent_profile
  FROM
  (
    SELECT user,
        SUM(CASE WHEN classification == "Activity" THEN 1 ELSE 0 END)/SUM(CASE WHEN classification != "Not Classified" THEN 1 ELSE 0 END) AS count_activity,
        SUM(CASE WHEN classification == "Food" THEN 1 ELSE 0 END)/SUM(CASE WHEN classification != "Not Classified" THEN 1 ELSE 0 END) AS count_food,
        SUM(CASE WHEN classification == "People" THEN 1 ELSE 0 END)/SUM(CASE WHEN classification != "Not Classified" THEN 1 ELSE 0 END) AS count_people,
        SUM(CASE WHEN classification == "Event" THEN 1 ELSE 0 END)/SUM(CASE WHEN classification != "Not Classified" THEN 1 ELSE 0 END) AS count_event,
        SUM(CASE WHEN classification == "Travel" THEN 1 ELSE 0 END)/SUM(CASE WHEN classification != "Not Classified" THEN 1 ELSE 0 END) AS count_travel,
        SUM(CASE WHEN classification == "Transportation" THEN 1 ELSE 0 END)/SUM(CASE WHEN classification != "Not Classified" THEN 1 ELSE 0 END) AS count_transportation,
        SUM(CASE WHEN classification == "Utility" THEN 1 ELSE 0 END)/SUM(CASE WHEN classification != "Not Classified" THEN 1 ELSE 0 END) AS count_utility,
        SUM(CASE WHEN classification == "Cash" THEN 1 ELSE 0 END)/SUM(CASE WHEN classification != "Not Classified" THEN 1 ELSE 0 END) AS count_cash,
        SUM(CASE WHEN classification == "Illegal" THEN 1 ELSE 0 END)/SUM(CASE WHEN classification != "Not Classified" THEN 1 ELSE 0 END) AS count_illegal
    FROM union_table
    GROUP BY user
  )
  '''
)
display(spend_profile_df)

In [0]:
# merge user1 & user2
union_table_with_date_df = sqlContext.sql(
  '''
  SELECT user1 AS user, classification, datetime
  FROM datatable
  UNION ALL
  SELECT user2 AS user, classification, datetime
  FROM datatable
  '''
)

In [0]:
# read from storage
union_table_with_date_df = spark.read.format('csv').options(header='true', inferSchema='true')\
    .load('dbfs:/FileStore/union_table_with_date_df.csv')
union_table_with_date_df.createOrReplaceTempView("uniontable")

# find the first 12 life points
# assumption: 30 days a month
union_table_label_lifepoints = sqlContext.sql(
  '''
  SELECT user, classification, life_point
  FROM
  (
    SELECT user, classification,
      CASE WHEN datetime = first_day THEN 0
           WHEN DATEDIFF(datetime, first_day) <= 30 THEN 1
           WHEN DATEDIFF(datetime, first_day) <= 60 THEN 2
           WHEN DATEDIFF(datetime, first_day) <= 90 THEN 3
           WHEN DATEDIFF(datetime, first_day) <= 120 THEN 4
           WHEN DATEDIFF(datetime, first_day) <= 150 THEN 5
           WHEN DATEDIFF(datetime, first_day) <= 180 THEN 6
           WHEN DATEDIFF(datetime, first_day) <= 210 THEN 7
           WHEN DATEDIFF(datetime, first_day) <= 240 THEN 8
           WHEN DATEDIFF(datetime, first_day) <= 270 THEN 9
           WHEN DATEDIFF(datetime, first_day) <= 300 THEN 10
           WHEN DATEDIFF(datetime, first_day) <= 330 THEN 11
           WHEN DATEDIFF(datetime, first_day) <= 360 THEN 12
      END AS life_point
    FROM 
    (
      SELECT *, MIN(datetime) OVER (PARTITION BY user) AS first_day
      FROM uniontable
    )
  )
  WHERE life_point <= 12
  '''
)
display(union_table_label_lifepoints)

In [0]:
# read from storage
union_table_qualified_subset = spark.read.format('csv').options(header='true', inferSchema='true')\
    .load('dbfs:/FileStore/union_table_label_lifepoints.csv')
union_table_label_lifepoints.createOrReplaceTempView("union_table_subset")

%sql
--  count classifications grouping by user, life_point
--  assumping the denominator doesn't include transactions that can't be classified
DROP TABLE IF EXISTS classfication_by_time;

CREATE TABLE classfication_by_time
USING HIVE
AS 
  SELECT user, life_point,
    SUM(CASE WHEN classification == "Activity" THEN 1 ELSE 0 END)/SUM(CASE WHEN classification != "Not Classified" THEN 1 ELSE 0 END) AS count_activity,
    SUM(CASE WHEN classification == "Food" THEN 1 ELSE 0 END)/SUM(CASE WHEN classification != "Not Classified" THEN 1 ELSE 0 END) AS count_food,
    SUM(CASE WHEN classification == "People" THEN 1 ELSE 0 END)/SUM(CASE WHEN classification != "Not Classified" THEN 1 ELSE 0 END) AS count_people,
    SUM(CASE WHEN classification == "Event" THEN 1 ELSE 0 END)/SUM(CASE WHEN classification != "Not Classified" THEN 1 ELSE 0 END) AS count_event,
    SUM(CASE WHEN classification == "Travel" THEN 1 ELSE 0 END)/SUM(CASE WHEN classification != "Not Classified" THEN 1 ELSE 0 END) AS count_travel,
    SUM(CASE WHEN classification == "Transportation" THEN 1 ELSE 0 END)/SUM(CASE WHEN classification != "Not Classified" THEN 1 ELSE 0 END) AS count_transportation,
    SUM(CASE WHEN classification == "Utility" THEN 1 ELSE 0 END)/SUM(CASE WHEN classification != "Not Classified" THEN 1 ELSE 0 END) AS count_utility,
    SUM(CASE WHEN classification == "Cash" THEN 1 ELSE 0 END)/SUM(CASE WHEN classification != "Not Classified" THEN 1 ELSE 0 END) AS count_cash,
    SUM(CASE WHEN classification == "Illegal" THEN 1 ELSE 0 END)/SUM(CASE WHEN classification != "Not Classified" THEN 1 ELSE 0 END) AS count_illegal
  FROM union_table_subset
  GROUP BY user, life_point;
    
SELECT * FROM classfication_by_time

In [0]:
# create profile by user, life_point
dynamic_profile = sqlContext.sql(
  '''
  SELECT user, life_point,
             CONCAT_WS(", ", 
                    CASE WHEN count_activity != 0 THEN CONCAT(ROUND(count_activity * 100), "% Activity") ELSE NULL END, 
                    CASE WHEN count_people != 0 THEN CONCAT(ROUND(count_people * 100), "% People") ELSE NULL END, 
                    CASE WHEN count_event != 0 THEN CONCAT(ROUND(count_event * 100), "% Event") ELSE NULL END, 
                    CASE WHEN count_travel != 0 THEN CONCAT(ROUND(count_travel * 100), "% Travel") ELSE NULL END, 
                    CASE WHEN count_transportation != 0 THEN CONCAT(ROUND(count_transportation * 100), "% Transportation") ELSE NULL END, 
                    CASE WHEN count_food != 0 THEN CONCAT(ROUND(count_food * 100), "% Food") ELSE NULL END, 
                    CASE WHEN count_utility != 0 THEN CONCAT(ROUND(count_utility * 100), "% Utility") ELSE NULL END, 
                    CASE WHEN count_cash != 0 THEN CONCAT(ROUND(count_cash * 100), "% Cash") ELSE NULL END,
                    CASE WHEN count_illegal != 0 THEN CONCAT(ROUND(count_illegal * 100), "% Illegal") ELSE NULL END
              ) AS spent_profile
  FROM classfication_by_time
  ORDER BY user, life_point
  '''
)
display(dynamic_profile)