# Vnemo Transaction Analysis

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Install dependencies (for future questions)
!pip install pyspark
!pip install graphframes
!pip install networkx
!pip install matplotlib seaborn
!pip install torch torch-geometric
!pip install transformers
!pip install emoji

# Import libraries (for future questions)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, trim, udf, explode
import networkx as nx
import matplotlib.pyplot as plt
import seaborn as sns
from graphframes import GraphFrame
from transformers import pipeline
import torch
from torch_geometric.data import Data
from pyspark.sql.types import BooleanType, ArrayType, StringType
import pandas as pd
import emoji
import re
from google.colab import files

Collecting graphframes
  Downloading graphframes-0.6-py2.py3-none-any.whl.metadata (934 bytes)
Collecting nose (from graphframes)
  Downloading nose-1.3.7-py3-none-any.whl.metadata (1.7 kB)
Downloading graphframes-0.6-py2.py3-none-any.whl (18 kB)
Downloading nose-1.3.7-py3-none-any.whl (154 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m154.7/154.7 kB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: nose, graphframes
Successfully installed graphframes-0.6 nose-1.3.7
Collecting torch-geometric
  Downloading torch_geometric-2.6.1-py3-none-any.whl.metadata (63 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m63.1/63.1 kB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-

## 1. Text Analytics

### 1.1 Use the text dictionary and the emoji dictionary to classify Venmo’s transactions in your sample dataset. What is the percent of emoji only transactions? Which are the top 5 most popular emoji? Which are the top three most popular emoji categories?

In [None]:
# Import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col as spark_col, lower as spark_lower, trim as spark_trim, udf, explode
from pyspark.sql.types import BooleanType, ArrayType, StringType
import pandas as pd
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import seaborn as sns
import emoji
import re

# Reset Matplotlib font settings to default
plt.rcParams['font.family'] = 'DejaVu Sans'

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("VenmoAnalysisQ2_Full") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()

# File paths
venmo_path = "/content/VenmoSample.snappy.parquet"
word_dict_path = "/content/Venmo Word Classification Dictonary BAX-423.xlsx"
emoji_dict_path = "/content/Venmo_Emoji_Classification_Dictionary.csv"

# Load Venmo dataset
try:
    df = spark.read.parquet(venmo_path)
    print("Venmo dataset loaded successfully.")
except Exception as e:
    print(f"Error loading Venmo dataset: {e}")
    spark.stop()
    raise e

# Clean descriptions
try:
    df = df.withColumn("description", spark_lower(spark_trim(spark_col("description")))) \
           .na.drop(subset=["description"])
    print("Descriptions cleaned successfully.")
except Exception as e:
    print(f"Error cleaning descriptions: {e}")
    spark.stop()
    raise e

# Load word dictionary
try:
    word_dict_pd = pd.read_excel(word_dict_path, sheet_name="Word_Dict")
    print("Word dictionary loaded successfully.")
except Exception as e:
    print(f"Error loading word dictionary: {e}")
    spark.stop()
    raise e

# Convert word dictionary to Spark DataFrame
word_dict_melted = []
for category in word_dict_pd.columns[:-1]:
    words = word_dict_pd[category].dropna().tolist()
    for word in words:
        word_dict_melted.append((word, category))
word_dict_df = spark.createDataFrame(word_dict_melted, ["word", "category"])

# Load emoji dictionary
try:
    emoji_dict_df = spark.read.csv(emoji_dict_path, header=True)
    print("Emoji dictionary loaded successfully.")
except Exception as e:
    print(f"Error loading emoji dictionary: {e}")
    spark.stop()
    raise e

# Melt emoji dictionary
emoji_dict_melted = []
for col in emoji_dict_df.columns:
    emojis = emoji_dict_df.select(col).dropna().collect()
    for row in emojis:
        emoji_dict_melted.append((row[0], col))
emoji_dict_spark = spark.createDataFrame(emoji_dict_melted, ["emoji", "category"])

# Define UDF for emoji-only detection
def is_emoji_only(text):
    if not text:
        return False
    cleaned = re.sub(r'[a-zA-Z0-9\s]', '', text)
    return len(cleaned) > 0 and all(any(c in item['emoji'] for item in emoji.emoji_list(text)) for c in cleaned)
is_emoji_only_udf = udf(is_emoji_only, BooleanType())

# Flag emoji-only transactions
try:
    df = df.withColumn("is_emoji_only", is_emoji_only_udf(spark_col("description")))
    print("Emoji-only transactions flagged.")
except Exception as e:
    print(f"Error flagging emoji-only transactions: {e}")
    spark.stop()
    raise e

# Calculate percent emoji-only
total_transactions = df.count()
emoji_only_count = df.filter(spark_col("is_emoji_only")).count()
percent_emoji_only = (emoji_only_count / total_transactions) * 100 if total_transactions > 0 else 0

# Define UDF for emoji extraction
def extract_emojis(text):
    return [item['emoji'] for item in emoji.emoji_list(text)]
extract_emojis_udf = udf(extract_emojis, ArrayType(StringType()))

# Extract emojis
df_emojis = df.withColumn("emojis", extract_emojis_udf(spark_col("description")))
df_emojis_exploded = df_emojis.select(explode(spark_col("emojis")).alias("emoji"))

# Count emoji frequencies
emoji_counts = df_emojis_exploded.groupBy("emoji").count().orderBy(spark_col("count").desc())
top_5_emojis = emoji_counts.limit(5).collect()

# Debug: Find uncategorized emojis
uncategorized_emojis = df_emojis_exploded.join(emoji_dict_spark, "emoji", "left") \
    .filter(spark_col("category").isNull()) \
    .groupBy("emoji").count().orderBy(spark_col("count").desc())
top_5_uncategorized = uncategorized_emojis.limit(5).collect()
print("\nTop 5 Uncategorized Emojis (None category):")
for row in top_5_uncategorized:
    print(f"Emoji: {row['emoji']}, Count: {row['count']}")

# Map emojis to categories (exclude None)
emoji_category_counts = df_emojis_exploded.join(emoji_dict_spark, "emoji", "left") \
    .groupBy("category").count().orderBy(spark_col("count").desc())
top_3_categories = emoji_category_counts.filter(spark_col("category").isNotNull()).limit(3).collect()

# Convert results to pandas, using simplified text labels for all emojis
emoji_names = {
    "🍕": "PizzaSlice",
    "🍻": "BeerMugs",
    "💸": "MoneyWithWings",
    "🍷": "WineGlass",
    "🎉": "PartyPopper",
    "🎈": "Balloon",
    "❤️": "RedHeart",
    "🏡": "HouseWithGarden",
    "🏠": "House",
    "😭": "CryingFace",
    "🍹": "TropicalDrink",
    "💰": "MoneyBag"
}
top_5_emojis_df = pd.DataFrame([(row["emoji"], row["count"]) for row in top_5_emojis], columns=["Emoji", "Count"])
top_5_emojis_df["Emoji"] = top_5_emojis_df["Emoji"].map(emoji_names).fillna(top_5_emojis_df["Emoji"])
top_3_categories_df = pd.DataFrame([(row["category"], row["count"]) for row in top_3_categories], columns=["Category", "Count"])

# Plot top 5 emojis with simplified text labels
plt.figure(figsize=(8, 6))
sns.barplot(x="Count", y="Emoji", data=top_5_emojis_df)
plt.title("Top 5 Most Popular Emojis in Venmo Transactions")
plt.tight_layout()
plt.savefig("/content/top_5_emojis.png")
plt.close()

# Plot top 3 categories
plt.figure(figsize=(8, 6))
sns.barplot(x="Count", y="Category", data=top_3_categories_df)
plt.title("Top 3 Most Popular Emoji Categories in Venmo Transactions")
plt.tight_layout()
plt.savefig("/content/top_3_categories.png")
plt.close()

# Print results
print(f"Percent of Emoji-Only Transactions: {percent_emoji_only:.2f}%")
print("\nTop 5 Emojis:")
for row in top_5_emojis:
    emoji_label = emoji_names.get(row['emoji'], row['emoji'])
    print(f"Emoji: {emoji_label}, Count: {row['count']}")
print("\nTop 3 Emoji Categories (excluding None):")
for row in top_3_categories:
    print(f"Category: {row['category']}, Count: {row['count']}")

# Save cleaned DataFrame for future questions
df.write.mode("overwrite").parquet("/content/venmo_cleaned.parquet")

# Stop SparkSession
spark.stop()

Venmo dataset loaded successfully.
Descriptions cleaned successfully.
Word dictionary loaded successfully.
Emoji dictionary loaded successfully.
Emoji-only transactions flagged.

Top 5 Uncategorized Emojis (None category):
Emoji: 💸, Count: 124727
Emoji: 🏠, Count: 65987
Emoji: ❤️, Count: 56701
Emoji: 🏡, Count: 30932
Emoji: 💰, Count: 28303
Percent of Emoji-Only Transactions: 32.35%

Top 5 Emojis:
Emoji: PizzaSlice, Count: 215039
Emoji: BeerMugs, Count: 145233
Emoji: MoneyWithWings, Count: 124727
Emoji: WineGlass, Count: 111157
Emoji: PartyPopper, Count: 94327

Top 3 Emoji Categories (excluding None):
Category: Food, Count: 1744262
Category: People, Count: 787257
Category: Activity, Count: 405737


### 1.2 For each user, create variables to classify their spending behavior profile into categories. For example, if a user has made 10 transactions, where 5 of them are food and the other 5 are activity, then the user’s spending profile will be 50% food and 50% activity.

In [None]:
# Import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col as spark_col, lower as spark_lower, trim as spark_trim, udf, explode, split, lit, sum as spark_sum
from pyspark.sql.types import ArrayType, StringType, StructType, StructField
import pandas as pd
import emoji
import re

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("VenmoAnalysisQ3") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()

# File paths
venmo_path = "/content/venmo_cleaned.parquet"
word_dict_path = "/content/Venmo Word Classification Dictonary BAX-423.xlsx"
emoji_dict_path = "/content/Venmo_Emoji_Classification_Dictionary.csv"

# Load cleaned Venmo dataset from Q2
try:
    df = spark.read.parquet(venmo_path)
    print("Venmo dataset loaded successfully.")
except Exception as e:
    print(f"Error loading Venmo dataset: {e}")
    spark.stop()
    raise e

# Load word dictionary
try:
    word_dict_pd = pd.read_excel(word_dict_path, sheet_name="Word_Dict")
    print("Word dictionary loaded successfully.")
except Exception as e:
    print(f"Error loading word dictionary: {e}")
    spark.stop()
    raise e

# Convert word dictionary to Spark DataFrame
word_dict_melted = []
for category in word_dict_pd.columns[:-1]:  # Exclude @dropdown
    words = word_dict_pd[category].dropna().tolist()
    for word in words:
        word_dict_melted.append((word, category))
word_dict_df = spark.createDataFrame(word_dict_melted, ["word", "category"]).withColumn("word", spark_lower(spark_col("word")))

# Load emoji dictionary
try:
    emoji_dict_df = spark.read.csv(emoji_dict_path, header=True)
    print("Emoji dictionary loaded successfully.")
except Exception as e:
    print(f"Error loading emoji dictionary: {e}")
    spark.stop()
    raise e

# Melt emoji dictionary
emoji_dict_melted = []
for col in emoji_dict_df.columns:
    emojis = emoji_dict_df.select(col).dropna().collect()
    for row in emojis:
        emoji_dict_melted.append((row[0], col))
emoji_dict_spark = spark.createDataFrame(emoji_dict_melted, ["emoji", "category"])

# Define UDF to extract words from description
def extract_words(text):
    if not text:
        return []
    # Split on whitespace, remove punctuation
    words = re.split(r'\s+', text.strip())
    return [word.lower() for word in words if word]
extract_words_udf = udf(extract_words, ArrayType(StringType()))

# Define UDF for emoji extraction
def extract_emojis(text):
    return [item['emoji'] for item in emoji.emoji_list(text)]
extract_emojis_udf = udf(extract_emojis, ArrayType(StringType()))

# Extract words and emojis from descriptions
df = df.withColumn("words", extract_words_udf(spark_col("description")))
df = df.withColumn("emojis", extract_emojis_udf(spark_col("description")))

# Explode words and join with word dictionary
df_words = df.select("user1", explode(spark_col("words")).alias("word")) \
             .join(word_dict_df, "word", "left") \
             .filter(spark_col("category").isNotNull()) \
             .select("user1", "category")

# Explode emojis and join with emoji dictionary
df_emojis = df.select("user1", explode(spark_col("emojis")).alias("emoji")) \
              .join(emoji_dict_spark, "emoji", "left") \
              .filter(spark_col("category").isNotNull()) \
              .select("user1", "category")

# Combine word and emoji categories
df_categories = df_words.union(df_emojis)

# Count category occurrences per user
category_counts = df_categories.groupBy("user1", "category").count()

# Calculate total category occurrences per user
total_counts = category_counts.groupBy("user1").agg(spark_sum("count").alias("total_count"))

# Join and calculate percentages
user_profiles = category_counts.join(total_counts, "user1") \
                              .withColumn("percentage", (spark_col("count") / spark_col("total_count") * 100).cast("float")) \
                              .select("user1", "category", "percentage") \
                              .orderBy("user1", "category")

# Save results
user_profiles.write.mode("overwrite").parquet("/content/user_spending_profiles.parquet")

# Print sample results
print("Sample of User Spending Profiles (first 10 rows):")
user_profiles.show(10, truncate=False)

# Stop SparkSession
spark.stop()

Venmo dataset loaded successfully.
Word dictionary loaded successfully.
Emoji dictionary loaded successfully.
Sample of User Spending Profiles (first 10 rows):
+-----+---------------+----------+
|user1|category       |percentage|
+-----+---------------+----------+
|3    |Food           |33.333332 |
|3    |People         |33.333332 |
|3    |Utility        |33.333332 |
|4    |Activity       |33.333332 |
|4    |Food           |33.333332 |
|4    |Illegal/Sarcasm|16.666666 |
|4    |Travel         |16.666666 |
|10   |Activity       |10.0      |
|10   |Food           |60.0      |
|10   |People         |20.0      |
+-----+---------------+----------+
only showing top 10 rows



### 1.3 In the previous question, you got a static spending profile. However, life and social networks are evolving over time. Therefore, let’s explore how a user’s spending profile is evolving over her lifetime in Venmo. First of all, you need to analyze a user’s transactions in monthly intervals, starting from 0 (indicating their first transaction only) up to 12.

In [None]:
# Ensure emoji library is installed
!pip install emoji

# Import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col as spark_col, lower as spark_lower, trim as spark_trim, udf, explode, split, lit, sum as spark_sum, min as spark_min, floor, months_between, datediff, when
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.window import Window
import pandas as pd
import matplotlib
matplotlib.use('Agg')  # Use non-interactive backend
import matplotlib.pyplot as plt
import seaborn as sns
import emoji
import re
import numpy as np

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("VenmoAnalysisQ4") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()

# File paths
venmo_path = "/content/venmo_cleaned.parquet"
word_dict_path = "/content/Venmo Word Classification Dictonary BAX-423.xlsx"
emoji_dict_path = "/content/Venmo_Emoji_Classification_Dictionary.csv"

# Load cleaned Venmo dataset
try:
    df = spark.read.parquet(venmo_path)
    print("Venmo dataset loaded successfully.")
except Exception as e:
    print(f"Error loading Venmo dataset: {e}")
    spark.stop()
    raise e

# Load word dictionary
try:
    word_dict_pd = pd.read_excel(word_dict_path, sheet_name="Word_Dict")
    print("Word dictionary loaded successfully.")
except Exception as e:
    print(f"Error loading word dictionary: {e}")
    spark.stop()
    raise e

# Convert word dictionary to Spark DataFrame
word_dict_melted = []
for category in word_dict_pd.columns[:-1]:
    words = word_dict_pd[category].dropna().tolist()
    for word in words:
        word_dict_melted.append((word, category))
word_dict_df = spark.createDataFrame(word_dict_melted, ["word", "category"]).withColumn("word", spark_lower(spark_col("word")))

# Load emoji dictionary
try:
    emoji_dict_df = spark.read.csv(emoji_dict_path, header=True)
    print("Emoji dictionary loaded successfully.")
except Exception as e:
    print(f"Error loading emoji dictionary: {e}")
    spark.stop()
    raise e

# Melt emoji dictionary
emoji_dict_melted = []
for col in emoji_dict_df.columns:
    emojis = emoji_dict_df.select(col).dropna().collect()
    for row in emojis:
        emoji_dict_melted.append((row[0], col))
emoji_dict_spark = spark.createDataFrame(emoji_dict_melted, ["emoji", "category"])

# Define UDF to extract words
def extract_words(text):
    if not text:
        return []
    words = re.split(r'\s+', text.strip())
    return [word.lower() for word in words if word]
extract_words_udf = udf(extract_words, ArrayType(StringType()))

# Define UDF for emoji extraction
def extract_emojis(text):
    return [item['emoji'] for item in emoji.emoji_list(text)]
extract_emojis_udf = udf(extract_emojis, ArrayType(StringType()))

# Extract words and emojis
df = df.withColumn("words", extract_words_udf(spark_col("description")))
df = df.withColumn("emojis", extract_emojis_udf(spark_col("description")))

# Find first transaction date per user
window_spec = Window.partitionBy("user1").orderBy("datetime")
df = df.withColumn("first_transaction_date", spark_min("datetime").over(window_spec))

# Calculate month offset from first transaction
df = df.withColumn("month_offset", floor(months_between(spark_col("datetime"), spark_col("first_transaction_date")))) \
       .filter(spark_col("month_offset").between(0, 12))

# Explode words and join with word dictionary
df_words = df.select("user1", "month_offset", explode(spark_col("words")).alias("word")) \
             .join(word_dict_df, "word", "left") \
             .filter(spark_col("category").isNotNull()) \
             .select("user1", "month_offset", "category")

# Explode emojis and join with emoji dictionary
df_emojis = df.select("user1", "month_offset", explode(spark_col("emojis")).alias("emoji")) \
              .join(emoji_dict_spark, "emoji", "left") \
              .filter(spark_col("category").isNotNull()) \
              .select("user1", "month_offset", "category")

# Combine word and emoji categories
df_categories = df_words.union(df_emojis)

# Create cumulative window up to each month
cumulative_window = Window.partitionBy("user1").orderBy("month_offset") \
                         .rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Count category occurrences per user and month (cumulative)
category_counts = df_categories.groupBy("user1", "month_offset", "category") \
                              .count() \
                              .withColumn("cumulative_count", spark_sum("count").over(cumulative_window))

# Calculate total cumulative category occurrences per user and month
total_counts = category_counts.groupBy("user1", "month_offset") \
                             .agg(spark_sum("cumulative_count").alias("total_count"))

# Join and calculate cumulative percentages
user_profiles = category_counts.join(total_counts, ["user1", "month_offset"]) \
                              .withColumn("percentage", (spark_col("cumulative_count") / spark_col("total_count") * 100).cast("float")) \
                              .select("user1", "month_offset", "category", "percentage")

# Get all unique users and months
users = user_profiles.select("user1").distinct()
months = user_profiles.select("month_offset").distinct()
categories = user_profiles.select("category").distinct()

# Create a cross join of users, months, and categories
user_months = users.crossJoin(months)
all_combinations = user_months.crossJoin(categories)

# Join with user profiles, filling in 0% for missing categories
full_profiles = all_combinations.join(user_profiles, ["user1", "month_offset", "category"], "left_outer") \
                               .withColumn("percentage", when(spark_col("percentage").isNull(), lit(0.0)).otherwise(spark_col("percentage"))) \
                               .select("user1", "month_offset", "category", "percentage")

# Compute average and standard deviation across all users for each month and category
stats = full_profiles.groupBy("month_offset", "category") \
                    .agg(
                        spark_sum(lit(1)).alias("user_count"),
                        spark_sum("percentage").alias("sum_percentage"),
                        spark_sum(spark_col("percentage") * spark_col("percentage")).alias("sum_squares")
                    ) \
                    .withColumn("mean_percentage", spark_col("sum_percentage") / spark_col("user_count")) \
                    .withColumn("variance", (spark_col("sum_squares") / spark_col("user_count") - (spark_col("sum_percentage") / spark_col("user_count"))**2)) \
                    .withColumn("stddev", (spark_col("variance")**0.5).cast("float")) \
                    .select("month_offset", "category", "mean_percentage", "stddev")

# Collect results for plotting
stats_pd = stats.toPandas()

# Plot: Average percentage with confidence intervals (±2 * stddev)
plt.figure(figsize=(12, 8))
categories = stats_pd['category'].unique()
for category in categories:
    cat_data = stats_pd[stats_pd['category'] == category]
    months = cat_data['month_offset']
    means = cat_data['mean_percentage']
    stddevs = cat_data['stddev']
    # Only plot categories with significant means
    if means.max() > 1:  # Arbitrary threshold to reduce clutter
        plt.errorbar(months, means, yerr=2 * stddevs, label=category, capsize=5, marker='o')
plt.xlabel("Time (Months)")
plt.ylabel("Average Percentage")
plt.title("Average Spending Profile Over Time with Confidence Intervals (±2 SD)")
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.grid(True)
plt.tight_layout()
plt.savefig("/content/average_spending_profile.png")
plt.close()

# Print sample statistics
print("Sample Statistics (first 10 rows):")
stats.show(10, truncate=False)

# Save profiles for future use
user_profiles.write.mode("overwrite").parquet("/content/dynamic_spending_profiles.parquet")

# Stop SparkSession
spark.stop()

Venmo dataset loaded successfully.
Word dictionary loaded successfully.
Emoji dictionary loaded successfully.
Sample Statistics (first 10 rows):
+------------+---------------+-------------------+---------+
|month_offset|category       |mean_percentage    |stddev   |
+------------+---------------+-------------------+---------+
|12          |Food           |2.010389650637637  |13.377702|
|6           |Activity       |1.1555487713536534 |9.841908 |
|0           |People         |12.760329067396402 |30.773056|
|5           |Transportation |0.6818464025859586 |7.6585336|
|9           |Illegal/Sarcasm|0.39361779175503914|5.7560086|
|3           |Travel         |0.28679731027423416|4.8826413|
|0           |Utility        |10.197559496424695 |28.69671 |
|8           |People         |1.1243461408284632 |9.82256  |
|12          |Activity       |0.8003213456688366 |8.181095 |
|9           |Transportation |0.5702619742552362 |6.984153 |
+------------+---------------+-------------------+---------+
o

### 1.4 Recent foundation models can capture rich semantic meanings without hand-curated dictionaries. In this task you will use a pre-trained LLaMA 3-8B model (e.g., the Hugging Face checkpoint meta-llama/Meta-Llama-3-8B-Instruct) in Google Colab to embed each Venmo message and discover topics automatically.

In [None]:
# Install required libraries
!pip install -q transformers accelerate
!pip install -q huggingface_hub
!pip install -q hdbscan umap-learn scikit-learn

# Set the Hugging Face token directly (not recommended for security, but used per user preference)
# Note: Token regeneration is recommended, but this is the current token for now
import os
os.environ["HUGGINGFACE_HUB_TOKEN"] = "Put_your_tokens"
print("Hugging Face token set directly in code.")

# Import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col as spark_col, udf, row_number
from pyspark.sql.window import Window
from pyspark.sql.types import StringType, ArrayType, FloatType
import pandas as pd
import numpy as np
import emoji
import re
import unicodedata
from transformers import AutoTokenizer, AutoModel
import torch
from sklearn.metrics import adjusted_rand_score
import hdbscan
import umap
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import seaborn as sns

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("VenmoAnalysisQ4") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()

# File paths
venmo_path = "/content/venmo_cleaned.parquet"
user_profiles_path = "/content/user_spending_profiles.parquet"

# Load cleaned Venmo dataset
try:
    df = spark.read.parquet(venmo_path)
    print("Venmo dataset loaded successfully.")
except Exception as e:
    print(f"Error loading Venmo dataset: {e}")
    spark.stop()
    raise e

# Load user profiles from Q2 to get hand-assigned categories
try:
    user_profiles = spark.read.parquet(user_profiles_path)
    print("User profiles loaded successfully.")
except Exception as e:
    print(f"Error loading user profiles: {e}")
    spark.stop()
    raise e

# Sample 50k messages to manage memory constraints on CPU
df_sample = df.select("story_id", "user1", "description").distinct().sample(fraction=0.01, seed=42).limit(50000)

# Remove emojis from messages and filter out empty messages
def remove_emojis(text):
    if not text:
        return ""
    # Remove emojis using the emoji library
    text = emoji.replace_emoji(text, replace="")
    # Remove remaining Unicode emojis and symbols using regex
    text = re.sub(r'[^\x00-\x7F]+', '', text)  # Remove non-ASCII characters
    text = text.strip()
    return text if text else None  # Return None if the result is empty

remove_emojis_udf = udf(remove_emojis, StringType())
df_sample = df_sample.withColumn("text_no_emojis", remove_emojis_udf(spark_col("description")))
# Filter out messages that are empty after emoji removal
df_sample = df_sample.filter(spark_col("text_no_emojis").isNotNull())

messages = df_sample.collect()
message_ids = [row["story_id"] for row in messages]
user_ids = [row["user1"] for row in messages]
message_texts = [row["description"] for row in messages]
message_texts_no_emojis = [row["text_no_emojis"] for row in messages]
print(f"After filtering empty messages, {len(messages)} messages remain for embedding.")

# Load DistilBERT model and tokenizer (alternative due to LLaMA 3-8B access issues)
model_id = "distilbert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_id, torch_dtype=torch.float16)
model = AutoModel.from_pretrained(model_id, torch_dtype=torch.float16)  # Run on CPU
model.eval()

# Function to generate embeddings in batches (CPU version)
def get_embeddings(texts, batch_size=32):
    embeddings = []
    for i in range(0, len(texts), batch_size):
        batch_texts = texts[i:i + batch_size]
        # Skip empty batches
        batch_texts = [text if text else " " for text in batch_texts]
        inputs = tokenizer(batch_texts, return_tensors="pt", padding=True, truncation=True, max_length=128)  # Run on CPU
        with torch.no_grad():
            outputs = model(**inputs)
            # Use mean of hidden states as embedding
            batch_embeddings = outputs.last_hidden_state.mean(dim=1).cpu().numpy()
        embeddings.append(batch_embeddings)
    return np.vstack(embeddings)

# Generate embeddings
embeddings = get_embeddings(message_texts_no_emojis)
print(f"Generated embeddings with shape: {embeddings.shape}")

# Cluster embeddings using HDBSCAN with adjusted parameters
clusterer = hdbscan.HDBSCAN(min_cluster_size=200, metric='euclidean')  # Increased min_cluster_size to reduce small clusters
cluster_labels = clusterer.fit_predict(embeddings)
n_clusters = len(set(cluster_labels)) - (1 if -1 in cluster_labels else 0)  # Exclude noise (-1)
print(f"Found {n_clusters} clusters (excluding noise).")

# Label clusters: Get top 10 representative messages per cluster
df_clusters = pd.DataFrame({
    "story_id": message_ids,
    "user1": user_ids,
    "message": message_texts,
    "text_no_emojis": message_texts_no_emojis,
    "cluster": cluster_labels
})

# Function to extract keywords (simple word frequency for now)
def get_top_keywords(texts, n=10):
    words = " ".join(texts).lower().split()
    words = [word for word in words if len(word) > 2]  # Ignore short words
    word_counts = pd.Series(words).value_counts()
    return word_counts.head(n).index.tolist()

# Analyze each cluster
cluster_summary = {}
for cluster_id in set(cluster_labels):
    if cluster_id == -1:  # Skip noise
        continue
    cluster_data = df_clusters[df_clusters["cluster"] == cluster_id]
    top_messages = cluster_data["message"].head(10).tolist()
    top_keywords = get_top_keywords(cluster_data["text_no_emojis"])
    # Assign a short name based on keywords
    cluster_name = "Cluster_" + str(cluster_id)  # Placeholder
    if "food" in top_keywords or "dinner" in top_keywords or "lunch" in top_keywords:
        cluster_name = "Food/Drink"
    elif "rent" in top_keywords or "bill" in top_keywords or "electricity" in top_keywords:
        cluster_name = "Bills & Utilities"
    elif "friend" in top_keywords or "thanks" in top_keywords or "bff" in top_keywords:
        cluster_name = "Social Payments"
    elif "trip" in top_keywords or "travel" in top_keywords or "flight" in top_keywords:
        cluster_name = "Travel"
    elif "cash" in top_keywords or "pay" in top_keywords or "money" in top_keywords:
        cluster_name = "Cash Transactions"
    cluster_summary[cluster_id] = {
        "name": cluster_name,
        "top_messages": top_messages,
        "top_keywords": top_keywords
    }

# Print cluster summaries
for cluster_id, summary in cluster_summary.items():
    print(f"\nCluster {cluster_id}: {summary['name']}")
    print("Top Keywords:", ", ".join(summary['top_keywords']))
    print("Top Messages:", ", ".join(summary['top_messages'][:5]))  # Show first 5 for brevity

# Map messages to hand-assigned categories
# Use a window function to find the category with the maximum percentage for each user
window_spec = Window.partitionBy("user1").orderBy(spark_col("percentage").desc())
user_profiles_ranked = user_profiles.withColumn("rank", row_number().over(window_spec))
user_dominant_category = user_profiles_ranked.filter(spark_col("rank") == 1).select("user1", spark_col("category").alias("dominant_category"))

# Join with df_clusters to assign dominant categories
df_sample_with_user = df_clusters.merge(user_dominant_category.toPandas(), on="user1", how="left")
df_sample_with_user = df_sample_with_user.dropna(subset=["dominant_category"])  # Drop rows with missing categories

# Compute Adjusted Rand Index
true_labels = df_sample_with_user["dominant_category"]
pred_labels = df_sample_with_user["cluster"]
ari_score = adjusted_rand_score(true_labels, pred_labels)
print(f"Adjusted Rand Index between clustered topics and hand-assigned categories: {ari_score:.4f}")

# Reduce embeddings to 2D with UMAP and plot
reducer = umap.UMAP(n_components=2, random_state=42)
embeddings_2d = reducer.fit_transform(embeddings)
df_clusters["x"] = embeddings_2d[:, 0]
df_clusters["y"] = embeddings_2d[:, 1]
df_clusters["cluster_label"] = df_clusters["cluster"].apply(
    lambda x: cluster_summary[x]["name"] if x != -1 else "Noise"
)

plt.figure(figsize=(10, 8))
sns.scatterplot(data=df_clusters, x="x", y="y", hue="cluster_label", palette="deep", alpha=0.6)
plt.title("Venmo Message Clusters (UMAP Projection)")
plt.xlabel("UMAP Dimension 1")
plt.ylabel("UMAP Dimension 2")
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.tight_layout()
plt.savefig("/content/message_clusters_umap.png")
plt.close()

# Stop SparkSession
spark.stop()

Hugging Face token set directly in code.
Venmo dataset loaded successfully.
User profiles loaded successfully.
After filtering empty messages, 36876 messages remain for embedding.
Generated embeddings with shape: (36876, 768)




Found 3 clusters (excluding noise).

Cluster 0: Cluster_0
Top Keywords: uber, uberz, ubers, ubered, uberrr, uber.
Top Messages: uber, uber, uber, uber 🚖🚖, uber

Cluster 1: Food/Drink
Top Keywords: food, groceries, foods, snacks, sandwich, for, eyelashes, beverages, food., pancakes
Top Messages: food, food, food, food, food

Cluster 2: Bills & Utilities
Top Keywords: rent, and, rent., rents
Top Messages: rent, rent, rent, rent, 🏠💸 rent
Adjusted Rand Index between clustered topics and hand-assigned categories: -0.0047


  warn(


## 2. Social Network Analytics

### Q5: Write a script to identify each user’s friends and friends of friends. (Definition: A friend is anyone who has transacted with the user—either by sending or receiving money.)
*   Describe your algorithm clearly.
*   Calculate its computational complexity.
*   Can you improve its efficiency?

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (col, collect_set, explode, array_except, array,)

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("VenmoAnalysisQ5") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()

# File paths
venmo_path = "/content/VenmoSample.snappy.parquet"

# Load cleaned Venmo dataset
try:
    df = spark.read.parquet(venmo_path)
    print("Venmo dataset loaded successfully.")
except Exception as e:
    print(f"Error loading Venmo dataset: {e}")
    spark.stop()
    raise e

# Make the relation between users undirected, flip each edge by adding its reverse
allusers = (
    df.select(col("user1").alias("u"), col("user2").alias("v"))
      .union(df.select(col("user2").alias("u"), col("user1").alias("v")))
      .distinct())

# Compute direct friends, group by u first and find all the v it has
friends = allusers.groupBy("u") \
               .agg(collect_set("v").alias("friends"))

# Turns each user’s friend list into one row per user per friend
friend_list = friends.select(
    col("u").alias("user"),
    explode("friends").alias("friend"))

# For each of user's friends, looks up their friends, aggregates back into the listed created above
friend_of_friends = (
    friend_list
    .join(friends.withColumnRenamed("u", "friend"), on="friend")
    .select(
        col("user"),
        explode("friends").alias("fof_candidate"))
    .groupBy("user")
    .agg(collect_set("fof_candidate").alias("fof_all")))

# Clean the friend_of_friends by removing any repeating direct friends and user themself
result = (
    friends
    .select(col("u").alias("user"), "friends")
    .join(friend_of_friends, on="user", how="left")
    .withColumn(
        "friends_of_friends",
        # first remove direct friends, then remove the user
        array_except(
          array_except(col("fof_all"), col("friends")),
          array(col("user"))))
    .select("user", "friends", "friends_of_friends"))

# Show the results
result.show(truncate=False)

In [None]:
# Calculate its computational complexity
from pyspark.sql.functions import countDistinct, size, avg

# Calculate the number of users
num_users = result.select("user").distinct().count()

# Average number of direct friends per user
avg_friends = result.select(size("friends").alias("num_friends")) \
                    .agg(avg("num_friends")).first()[0]

# Total number of friend-of-friend pairs
fof_pairs = result.select("user", explode("friends_of_friends").alias("fof"))
num_fof_pairs = fof_pairs.count()

print(f"Number of users (n): {num_users}")
print(f"Average number of friends per user (d): {avg_friends:.2f}")
print(f"Estimated n × d²: {int(num_users * (avg_friends ** 2))}")
print(f"Actual number of user and their friend of friend pairs: {num_fof_pairs}")

In [None]:
# Stop SparkSession
spark.stop()

### Q6: Now that you have each user’s list of friends and friends of friends, you're ready to compute various social network metrics. Using the same dynamic framework from earlier, calculate the following metrics over the user’s lifetime on Venmo, from month 0 through month 12.
*   i) Number of friends and number of friends of friends.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (col, least, floor, months_between,min as spark_min,
                                   explode, collect_set, size, lit, coalesce, array,
                                   posexplode, when, coalesce, expr, desc)
from pyspark.sql.window import Window

# Initialize the Spark Session
spark = SparkSession.builder \
    .appName("VenmoAnalysisQ6") \
    .config("spark.driver.memory","8g") \
    .getOrCreate()

# File paths
venmo_path = "/content/VenmoSample.snappy.parquet"

# Load cleaned Venmo dataset, sample with 1% of the data
try:
    df = spark.read.parquet(venmo_path).sample(fraction=0.01, seed=42)
    print("Venmo dataset loaded successfully.")
except Exception as e:
    print(f"Error loading Venmo dataset: {e}")
    spark.stop()
    raise e

# Join all the users together
users1 = df.select(col("user1").alias("user"), "datetime")
users2 = df.select(col("user2").alias("user"), "datetime")
all_users = users1.union(users2)

# Compute each user’s first‐ever transaction datetime
first_month = (all_users
              .withColumn("first_month", spark_min("datetime").over(Window.partitionBy("user")))
              .select("user","first_month")
              .distinct())

# Build the undirected between users and friends with datetime, with months limit to 12
edge1 = (df.join(first_month, df.user1 == first_month.user)
        # Keep 12 months since the first transcation
        .withColumn("month", least(floor(months_between(col("datetime"), col("first_month"))),lit(12)))
        .select(col("user1").alias("user"), col("user2").alias("friend"), "month"))

edge1 = (df.join(first_month, df.user2 == first_month.user)
        .withColumn("month", least(floor(months_between(col("datetime"), col("first_month"))),lit(12)))
        .select(col("user2").alias("user"), col("user1").alias("friend"), "month"))

edges = edge1.union(edge1).distinct()

# Creat a loop to compute the cumulative friends up to 12 months
results = []
for m in range(0,13):
    # Filter for cumulative graph up to month m
    sub = edges.filter(col("month") <= lit(m))

    # Calculate direct friends by month m
    direct_friend = (sub
                 .groupBy("user")
                 .agg(collect_set("friend").alias("friends")))

    # Calculate friends‐of‐friends by month m
    fof = (direct_friend
            .select(col("user"), explode("friends").alias("f"))
            .join(direct_friend.select(col("user").alias("f"), col("friends").alias("f_friends")),on="f", how="left")
            .select(col("user"), explode("f_friends").alias("fof"))
            .filter(col("fof") != col("user"))
            .groupBy("user")
            .agg(collect_set("fof").alias("fof_set")))

    # Join all the results
    df_month = (direct_friend.join(fof, on="user", how="left")
              .select(col("user"),
                      lit(m).alias("month"),
                      size("friends").alias("num_friends"),
                      size(coalesce(col("fof_set"), array().cast("array<long>"))).alias("num_fof")))

    results.append(df_month)

# Union all months
dynamic = results[0]
for part in results[1:]:
    dynamic = dynamic.union(part)

In [None]:
# Show the result
dynamic.show(20, truncate=False)

###    ii) Clustering coefficient of each user’s network

In [None]:
# Change the loop in previous questions to calculate the clustering coefficient
results = []

for m in range(0, 13):
    # Filter for cumulative graph up to month m
    sub = edges.filter(col("month") <= lit(m))

    # Direct friends
    direct_friend = sub.groupBy("user") \
                .agg(collect_set("friend").alias("friends"))

    # Friends‐of‐friends
    fof = (direct_friend
          .select(col("user"), explode("friends").alias("f"))
          .join(direct_friend.select(col("user").alias("f"), col("friends").alias("f_friends")),on="f", how="left")
          .select(col("user"), explode("f_friends").alias("fof"))
          .filter(col("fof") != col("user"))
          .groupBy("user")
          .agg(collect_set("fof").alias("fof_set")))

    # Build  unique friend‐pairs for each user
    friend_i = direct_friend.select("user", posexplode("friends").alias("i","friend1"))
    friend_j = direct_friend.select("user", posexplode("friends").alias("j","friend2"))

    pairs = (
      friend_i
      .join(friend_j, on="user")
      .filter(col("i") < col("j")) # Keep only rows with i < j, to have unique friend pairs
      .select("user","friend1","friend2"))

    # Count actual links among friends
    links = (pairs
      .join(
        sub.select(col("user").alias("user1"), col("friend").alias("user2")).distinct(),
        (col("friend1")==col("user1")) & (col("friend2")==col("user2")),how="inner")
      .groupBy("user")
      .count()
      .withColumnRenamed("count","Links"))

    # Bring it all together into dfm
    df_month = (direct_friend
      .join(fof, on="user", how="left")
      .join(links, on="user", how="left")
      .select(
         col("user"),
         lit(m).alias("month"),
         size("friends").alias("num_friends"),
         size("fof_set").alias("num_fof"),
         # replace NULL L with 0
         coalesce(col("Links"), lit(0)).alias("Links")
      )
      .withColumn(
        "clustering_coeff",
        # Clustering Coefficient: Links / (k*(k-1)/2), but guard divide‐by‐zero
        when(col("num_friends") >= 2,
             col("Links") / (col("num_friends")*(col("num_friends")-1)/2)
        ).otherwise(lit(0.0))
      ))

    results.append(df_month)

# Union all months into one DataFrame
dynamic_net = results[0]
for r in results[1:]:
    dynamic_net = dynamic_net.union(r)

In [None]:
# Show the result
dynamic_net.orderBy(desc("month")).show(20, truncate=False)

### iii) PageRank of each user

In [None]:
# As GraphFrames is not supported with Spark 3.5, we use Networkx for this problem
import networkx as nx
from pyspark.sql import Row
from pyspark.sql.functions import col, coalesce, lit, desc

# Collect as edge list
edges = df.select("user1", "user2").rdd.map(lambda row: (row.user1, row.user2)).collect()

# Build undirected graph using networkx
graph = nx.Graph()
graph.add_edges_from(edges)

# Compute PageRank
pr_dict = nx.pagerank(graph, alpha=0.85, max_iter=100)

# Convert to Spark DataFrame
pr_rows = [Row(user=int(k), pagerank=float(v)) for k, v in pr_dict.items()]
pr_df = spark.createDataFrame(pr_rows)

# Join PageRank into each user's per-month profile
dynamic_pagerank = dynamic_net.join(pr_df, on="user", how="left")

# Replace null or unsupported values in num_fof with 0
dynamic_pagerank = dynamic_pagerank.withColumn("num_fof",
    coalesce(col("num_fof"), lit(0)))

# Show result
dynamic_pagerank.orderBy(desc("month")).show(20, truncate=False)

## Predictive Analytics with MLlib

### Q7: Create the dependent variable Y, defined as the total number of transactions a user makes during their first 12 months on Venmo.