# PySpark Transaction Analytics â€“ Mini Project

This notebook demonstrates the use of PySpark for scalable data processing and analytics
on a large synthetic transactional dataset.

The objective is to compute distributed behavioral metrics and identify high-activity
patterns that resemble use cases commonly found in fintech and AML environments, such as
transaction monitoring, user behavior analysis and risk segmentation.

The focus of this project is on distributed data transformations and aggregations rather
than advanced modeling.


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SyntheticTransactionData") \
    .getOrCreate()


## Data Generation

Synthetic transactional data is generated using PySpark to simulate a large-scale
fintech-like dataset. The dataset structure resembles typical transaction logs
used in financial services.



In [None]:
from pyspark.sql.functions import (
    col, rand, expr, monotonically_increasing_id
)

NUM_TRANSACTIONS = 500_000
NUM_USERS = 10_000

df = spark.range(NUM_TRANSACTIONS)

df = (
    df.withColumn("transaction_id", monotonically_increasing_id())
      .withColumn("user_id", (rand() * NUM_USERS).cast("int"))
      .withColumn("amount", (rand() * 500).cast("double"))
      .withColumn("currency", expr("CASE WHEN rand() < 0.8 THEN 'EUR' ELSE 'USD' END"))
      .withColumn(
          "country",
          expr("""
              CASE
                  WHEN rand() < 0.6 THEN 'ES'
                  WHEN rand() < 0.8 THEN 'FR'
                  ELSE 'DE'
              END
          """)
      )
      .withColumn(
          "merchant_category",
          expr("""
              CASE
                  WHEN rand() < 0.3 THEN 'groceries'
                  WHEN rand() < 0.5 THEN 'electronics'
                  WHEN rand() < 0.7 THEN 'travel'
                  ELSE 'other'
              END
          """)
      )
      .withColumn(
          "channel",
          expr("""
              CASE
                  WHEN rand() < 0.5 THEN 'online'
                  ELSE 'pos'
              END
          """)
      )
)


## Data Validation

The schema and a sample of records are inspected to ensure data consistency
and correct data types before proceeding with the analysis.


In [None]:
df.printSchema()
df.show(5)


In [None]:
df.write.mode("overwrite").parquet("/content/transactions_parquet")


## User-Level Aggregated Metrics

User-level aggregations are a common first step in transaction monitoring.
Here we compute basic behavioral metrics per user.


In [None]:
from pyspark.sql.functions import count, sum, avg

user_metrics = (
    df.groupBy("user_id")
      .agg(
          count("*").alias("num_transactions"),
          sum("amount").alias("total_amount"),
          avg("amount").alias("avg_amount")
      )
)

user_metrics.show(5)


## Global Distributions and Thresholds

Understanding the global distribution of transaction amounts helps define
thresholds for identifying extreme or unusual behavior.


In [None]:
df.select("amount").describe().show()

p95, p99 = df.approxQuantile("amount", [0.95, 0.99], 0.01)
p95, p99


## High-Activity Behavioral Patterns

Rather than performing fraud detection, we identify high-activity patterns
that may warrant further investigation in a real monitoring system.


In [None]:
high_volume_users = user_metrics.filter(col("num_transactions") > 100)
high_volume_users.count()


In [None]:
high_avg_amount_users = user_metrics.filter(col("avg_amount") > p99)
high_avg_amount_users.show(5)


## Segmentation by Country and Channel

Segmenting transactions by geography and channel allows comparison of
behavioral patterns across different contexts.


In [None]:
country_channel_stats = (
    df.groupBy("country", "channel")
      .agg(
          count("*").alias("num_transactions"),
          avg("amount").alias("avg_amount")
      )
)

country_channel_stats.show()


## Temporal Activity Analysis

Temporal aggregations are commonly used for monitoring transaction volume
and detecting unusual spikes in activity.


In [None]:
from pyspark.sql.functions import to_date

df = df.withColumn(
    "timestamp",
    expr("timestampadd(DAY, cast(rand()*365 as int), timestamp('2024-01-01'))")
)

daily_volume = (
    df.withColumn("date", to_date("timestamp"))
      .groupBy("date")
      .agg(
          count("*").alias("num_transactions"),
          sum("amount").alias("total_amount")
      )
      .orderBy("date")
)

daily_volume.show(5)


## Persist Aggregated Outputs

Aggregated datasets are persisted in Parquet format to simulate downstream
consumption by analytics or monitoring systems.



In [None]:
user_metrics.write.mode("overwrite").parquet("/content/output/user_metrics")
country_channel_stats.write.mode("overwrite").parquet("/content/output/country_channel_stats")
daily_volume.write.mode("overwrite").parquet("/content/output/daily_volume")


## Conclusion

This mini project demonstrates how PySpark can be used to generate and analyze
large-scale transactional data using distributed processing.

The workflow focuses on aggregation, segmentation and behavioral metrics that
are commonly used in fintech and AML environments as a foundation for monitoring
and risk assessment systems.
