# Homework 07 (streams)

## Radosław Jurczak

-------------------------------------------------

A docker network `de_network` is used, created by
```{bash}
docker network create de_network
```

Minio was run with the following command:
```{bash}
docker run -p 9000:9000 -p 9090:9090 --name minio --network=de_network -v ~/minio/data:/data -e "MINIO_ROOT_USER=admin" -e "MINIO_ROOT_PASSWORD=adminadmin" quay.io/minio/minio server /data --console-address ":9090"
```

Pulsar was instantiated with the following command:
```{bash}
docker run -it --rm \
  --name standalone-pulsar \
   --network=de_network \
  -p 6650:6650  \
  -p 8080:8080 \
  --mount source=pulsardata,target=$(pwd)/pulsar/data \
  --mount source=pulsarconf,target=$(pwd)/pulsar/conf \
  apachepulsar/pulsar:2.10.0 sh \
  -c "bin/apply-config-from-env.py \
  conf/standalone.conf && \
  bin/pulsar standalone"
```

To succesfully run the code below, you'll need to create a minio bucket called `hw7`.

The notebook was run inside docker, set up by
```{bash}
docker run \
-it -d --rm \
--network=de_network \
-p 10000:8888 -p 4041:4040 \
-v "${PWD}":/home/rj/data_engineering \
quay.io/jupyter/all-spark-notebook:spark-3.4.0
```

In [1]:
!pip install delta-spark==2.4.0 pulsar-client[avro]
!pip install randomtimestamp



In [2]:
import random 

import pulsar
import randomtimestamp
from fastavro.schema import parse_schema
from pulsar import schema as pulsar_schema
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from tqdm import tqdm

In [3]:
spark_conf = (
    SparkConf()
    .setAppName("DemoPipeline")
    .set("spark.jars.packages", 'org.apache.hadoop:hadoop-client:3.3.4'
         ',org.apache.hadoop:hadoop-aws:3.3.4'
         ',io.delta:delta-core_2.12:2.4.0'
         ',io.streamnative.connectors:pulsar-spark-connector_2.12:3.4.0.3'
        )
    .set("spark.driver.memory", "6g")
    .set("spark.hadoop.fs.s3a.endpoint", "minio:9000")
    .set("spark.hadoop.fs.s3a.access.key", "admin")
    .set("spark.hadoop.fs.s3a.secret.key", "adminadmin" )
    .set("spark.hadoop.fs.s3a.path.style.access", "true") 
    .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .set('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')
    .set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
    .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") 
    .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

sc = SparkContext.getOrCreate(spark_conf)
spark = SparkSession(sc)

In [4]:
print(f"Hadoop version = {spark._jvm.org.apache.hadoop.util.VersionInfo.getVersion()}")
print(f"Spark version = {spark.version}")

Hadoop version = 3.3.4
Spark version = 3.4.0


In [5]:
N_EVENTS = 50_000
N_BANKS = 1_000
N_LOCATIONS = 1_000
N_COUNTRIES = 195

#### Generate static datasets:
 - time-indexed dataset of 50 000 transactions: `timestamp`, `sender`, `receiver`, `amount`; a small random subset (~2%) of transactions get null sender;
 - bank dataset for enrichment, containing detailed info about sender/receiver banks: `bank_id`, `location_id`, `full_name`, `license_number`, `country`

In [6]:
transactions = [
    [randomtimestamp.randomtimestamp(start_year=2022, end_year=2023),
     f"bank_{random.randint(0, N_BANKS-1)}",
     f"bank_{random.randint(0, N_BANKS-1)}",
     round(random.uniform(0.5, 5_000_000), 2),
    ]
    for i in tqdm(range(N_EVENTS))
]
null_sender_events = random.sample(list(range(0, N_EVENTS-1)), int(0.02*N_EVENTS))
for i in null_sender_events:
    transactions[i][1] = None
transaction_df = spark.createDataFrame(transactions, ["timestamp", "sender", "receiver", "amount"])
transaction_df.printSchema()

100%|██████████| 50000/50000 [00:00<00:00, 143089.56it/s]


root
 |-- timestamp: timestamp (nullable = true)
 |-- sender: string (nullable = true)
 |-- receiver: string (nullable = true)
 |-- amount: double (nullable = true)



In [7]:
banks = [
    (f"bank_{i}",
     f"location_{random.randint(0, N_LOCATIONS-1)}",
     f"full_name_{i}",
     int("".join(random.sample("12344567890", 7))),
     f"country_{random.randint(0, N_COUNTRIES)}",
    )
    for i in tqdm(range(N_BANKS))
]
bank_df = spark.createDataFrame(banks, ["bank_id", "location_id", "full_name", "license_id", "country"])
bank_df.printSchema()

100%|██████████| 1000/1000 [00:00<00:00, 155057.45it/s]

root
 |-- bank_id: string (nullable = true)
 |-- location_id: string (nullable = true)
 |-- full_name: string (nullable = true)
 |-- license_id: long (nullable = true)
 |-- country: string (nullable = true)






In [8]:
transaction_df.write.format("parquet").mode("overwrite").save("s3a://hw7/transaction")
bank_df.write.format("parquet").mode("overwrite").save("s3a://hw7/bank")

#### Push the time-indexed transactions dataset to Pulsar
(if you're running the code live, please wait a while after `writeStream()` until Pulsar processes everything)

In [9]:
ref = spark.read.load("s3a://hw7/transaction")
transaction_streaming_df = spark.readStream.format('parquet').schema(ref.schema).load("s3a://hw7/transaction") 

In [10]:
avro_schema = parse_schema(
{
  "type" : "record",
  "name" : "transactions",
  "fields" : [ {
    "name" : "timestamp",
    "type" : [ {"type": "long", "logicalType": "timestamp-millis"}, "null" ]
  }, {
    "name" : "sender",
    "type" : [ "string", "null" ]
  }, {
    "name" : "receiver",
    "type" : [ "string", "null" ]
  }, {
    "name" : "amount",
    "type" : [ "double", "null" ]
  } ]
} 
)

client = pulsar.Client("pulsar://standalone-pulsar:6650")
producer = client.create_producer(
    topic="persistent://public/default/transactions",
    schema=pulsar_schema.AvroSchema(None, schema_definition=avro_schema)
)

In [11]:
transaction_writer = transaction_streaming_df.writeStream \
  .format("pulsar") \
  .option("service.url", "pulsar://standalone-pulsar:6650") \
  .option("admin.url", "pulsar://standalone-pulsar:8080") \
  .option("topic", "persistent://public/default/transactions") \
  .option("pulsar.client.tlsAllowInsecureConnection","true") \
  .option("pulsar.client.tlsHostnameVerificationenable","false") \
  .option("checkpointLocation", './checkpoints-pulsar1') \
    .start()

In [12]:
transaction_writer.status

{'message': 'Getting offsets from FileStreamSource[s3a://hw7/transaction]',
 'isDataAvailable': False,
 'isTriggerActive': True}

In [13]:
transaction_writer.status

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

#### Read data from Pulsar, dropping transactions with empty sender

In [14]:
pulsar_transaction_df = spark \
  .readStream \
  .format("pulsar") \
  .option("service.url", "pulsar://standalone-pulsar:6650") \
  .option("admin.url", "pulsar://standalone-pulsar:8080") \
  .option("topic", "persistent://public/default/transactions") \
  .option("pulsar.client.tlsAllowInsecureConnection","true") \
  .option("pulsar.client.tlsHostnameVerificationenable","false") \
  .option("checkpointLocation", './checkpoints-pulsar2') \
  .load() \
.filter(f.col("sender").isNotNull())

In [15]:
pulsar_transaction_df.printSchema()

root
 |-- timestamp: timestamp (nullable = true)
 |-- sender: string (nullable = true)
 |-- receiver: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- __key: binary (nullable = true)
 |-- __topic: string (nullable = true)
 |-- __messageId: binary (nullable = true)
 |-- __publishTime: timestamp (nullable = true)
 |-- __eventTime: timestamp (nullable = true)
 |-- __messageProperties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



#### Enrich the stream with information about the sender bank from `banks` dataset
(of course this is just an exercise, in real life we could want to add info about the receiver, aliasing some columns etc.)

In [16]:
pulsar_transaction_df = pulsar_transaction_df.join(
    bank_df, [pulsar_transaction_df.sender == bank_df.bank_id], "left"
)

In [17]:
pulsar_transaction_df.printSchema()

root
 |-- timestamp: timestamp (nullable = true)
 |-- sender: string (nullable = true)
 |-- receiver: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- __key: binary (nullable = true)
 |-- __topic: string (nullable = true)
 |-- __messageId: binary (nullable = true)
 |-- __publishTime: timestamp (nullable = true)
 |-- __eventTime: timestamp (nullable = true)
 |-- __messageProperties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- bank_id: string (nullable = true)
 |-- location_id: string (nullable = true)
 |-- full_name: string (nullable = true)
 |-- license_id: long (nullable = true)
 |-- country: string (nullable = true)



#### Apply a window aggregation over the stream: average transaction value per sender bank; 
#### Store result to delta lake, read again from delta and display it to demonstrate that the aggregation worked

(again, if running the code live, please wait a little after `writeStream`)

In [18]:
avg_transaction_by_sender = pulsar_transaction_df.groupby(
    f.window(f.col("__publishTime"), "10 seconds", "5 seconds"),
    f.col("sender")
).agg(
    f.first("full_name").alias("sender_bank_full_name"),
    f.first("license_id").alias("sender_bank_license_number"),
    f.round(f.avg("amount"), 2).alias("average_transaction_amount")
)

In [19]:
avg_transaction_by_sender.printSchema()

root
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- sender: string (nullable = true)
 |-- sender_bank_full_name: string (nullable = true)
 |-- sender_bank_license_number: long (nullable = true)
 |-- average_transaction_amount: double (nullable = true)



In [20]:
avg_df_writer = avg_transaction_by_sender.writeStream \
   .format("delta") \
   .outputMode("complete") \
   .option("checkpointLocation", "./checkpoints-avg") \
   .start("s3a://hw7/avg_transaction_by_sender/")
avg_df_writer.status

{'message': 'Initializing sources',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [21]:
avg_df_writer.status

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

In [24]:
avg_df_writer.status

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [25]:
spark.read.format("delta").load("s3a://hw7/avg_transaction_by_sender/").show()

+--------------------+--------+---------------------+--------------------------+--------------------------+
|              window|  sender|sender_bank_full_name|sender_bank_license_number|average_transaction_amount|
+--------------------+--------+---------------------+--------------------------+--------------------------+
|{2023-12-27 23:33...|bank_139|        full_name_139|                   9740163|                  74260.06|
|{2023-12-27 23:33...|bank_366|        full_name_366|                   1745964|                1867323.71|
|{2023-12-27 23:33...|bank_115|        full_name_115|                   3125644|                1958504.28|
|{2023-12-27 23:33...|bank_386|        full_name_386|                   4021657|                 899371.39|
|{2023-12-27 23:33...|bank_414|        full_name_414|                   6071853|                3265484.82|
|{2023-12-27 23:33...| bank_46|         full_name_46|                    763924|                    745.93|
|{2023-12-27 23:33...|bank_6

#### Store the whole enriched transactions stream to delta lake, just in case

In [26]:
transaction_df_writer = pulsar_transaction_df.writeStream \
   .format("delta") \
   .option("checkpointLocation", "./checkpoints-full") \
   .start("s3a://hw7/transaction_enriched/")
transaction_df_writer.status

{'message': 'Initializing StreamExecution',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [27]:
transaction_df_writer.status

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

In [28]:
transaction_df_writer.status

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [29]:
spark.read.format("delta").load("s3a://hw7/transaction_enriched/").show()

+-------------------+--------+--------+----------+-----+--------------------+--------------------+--------------------+-----------+-------------------+--------+------------+-------------+----------+----------+
|          timestamp|  sender|receiver|    amount|__key|             __topic|         __messageId|       __publishTime|__eventTime|__messageProperties| bank_id| location_id|    full_name|license_id|   country|
+-------------------+--------+--------+----------+-----+--------------------+--------------------+--------------------+-----------+-------------------+--------+------------+-------------+----------+----------+
|2022-11-16 11:55:19|bank_222| bank_55|3659652.81| null|persistent://publ...|[08 0A 10 25 20 1...|2023-12-27 23:59:...|       null|                 {}|bank_222|location_593|full_name_222|   3467184|country_74|
|2022-03-08 17:35:32|bank_222|bank_620|4084391.72| null|persistent://publ...|[08 0A 10 26 20 F...|2023-12-27 23:59:...|       null|                 {}|bank_222|