In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import mean as _mean, stddev as _stddev, col
from pyspark.sql.types import *

import numpy as np
import random
import time

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
df = spark.read.csv('data/transactions_small.csv').toDF('record_id','date','user_id','amount')
df = df.select(df.record_id.cast('string'),
               df.date.cast('string'),
               df.user_id.cast('string'),
               df.amount.cast('float'))

In [4]:
unique_userIds = df.select("user_id").distinct().collect()
rdd = spark.sparkContext.parallelize(unique_userIds)
unique_userId_DF = rdd.toDF()
sample = unique_userId_DF.sample(0.02,2021).collect()

In [5]:
for user_id in sample[:]:
   _df = df.where(f"user_id = {user_id[0]}")
   _df.coalesce(1).write.mode("append").option("header", "true").csv("data/unique")

In [6]:
schema = df.schema
streaming = (
    spark.readStream.schema(schema)
    .option("maxFilesPerTrigger", 100000)
    .csv("data/unique")
)

In [7]:
# uid_count = streaming.groupBy("user_id").agg({"amount": "avg"})

uid_count = streaming.groupBy("user_id").agg({"amount": "stddev"})

In [12]:
activityQuery = (
    uid_count.writeStream.queryName("uid_counts")
    .format("memory")
    .outputMode("complete")
    .start()
)
start_time = time.time()
for x in range(20):
    _df = spark.sql(
        "SELECT * FROM uid_counts WHERE user_id != 'user_id' "
    )
    if _df.count() > 0:
        _df.show(10)
    time.sleep(0.5)
end_time = time.time()
print("Total execution time: {} seconds".format(end_time - start_time))

+-------+------------------+
|user_id|    stddev(amount)|
+-------+------------------+
|  10096|22.570164773907422|
|  10089|13.529707017310592|
|  10051|19.341167849854642|
|  10219|15.060886932800063|
|  10201|24.421381846670556|
|  10135|22.867309683600794|
|  10109| 20.51821516068312|
|  10419| 22.22032029624457|
|  10161| 16.80578641081692|
|  10231|21.542464716322378|
+-------+------------------+
only showing top 10 rows

+-------+------------------+
|user_id|    stddev(amount)|
+-------+------------------+
|  10096|22.570164773907422|
|  10089|13.529707017310592|
|  10051|19.341167849854642|
|  10219|15.060886932800063|
|  10201|24.421381846670556|
|  10135|22.867309683600794|
|  10109| 20.51821516068312|
|  10419| 22.22032029624457|
|  10161| 16.80578641081692|
|  10231|21.542464716322378|
+-------+------------------+
only showing top 10 rows

+-------+------------------+
|user_id|    stddev(amount)|
+-------+------------------+
|  10096|22.570164773907422|
|  10089|13.52970701

In [13]:
activityQuery.stop()