In [1]:
from pyspark.sql import SparkSession
import os

spark_version = '3.4.1'
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:{spark_version},org.apache.spark:spark-sql-kafka-0-10_2.12:{spark_version} pyspark-shell'

import findspark
findspark.init(os.environ['SPARK_HOME'] )

spark = (SparkSession
        .builder
        .appName('appstream')
        .getOrCreate())

In [2]:
spark

# Write Stream

### Write with built-in to_json mechanics

In [3]:
kafka_servers='localhost:9092'
kafka_servers='broker_kafka:29092'
topic= 'WriteTopic'

from pyspark.sql import functions as F
from pyspark.sql import types as T

In [4]:
import pandas as pd

df = pd.read_csv("datasets/Pokemon_full.csv")

In [5]:
data = spark.createDataFrame(df)

for col, new_col in zip(data.columns, [x.replace(' ', '_') for x in data.columns]):
    data = data.withColumnRenamed(col, new_col)


  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:


In [6]:
data = data.withColumnRenamed('pokedex_id', 'покедекс_ид')

In [7]:
data_w_json = data.withColumn('json', F.to_json(F.struct('покедекс_ид','name','hp','attack','defense')))

In [9]:
df_out = data_w_json.groupby('type', 'secundary_type').agg(F.collect_list('json').alias('метрики'))

In [30]:
df_out.selectExpr("CAST(type AS STRING) AS key", "to_json(struct(*)) AS value") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka_servers) \
  .option("topic", topic) \
  .save()

### Write with custom to_json function

In [11]:
import json
from datetime import datetime

@F.udf(returnType=T.BinaryType())
def prepare(type, secundary_type, metrics):
    json_line = {
        "header": {
            "send_at": datetime.now().strftime("%Y-%m-%d %H:%M:%SZ")
            },
        "body": {
        "Тип": type,
        "доп. тип": secundary_type,
        "метрики": json.loads(str(metrics))
        }
    }

    json_line_str = json.dumps(json_line, ensure_ascii=False)
    json_line_ser = json_line_str.encode('utf-8')

    return json_line_ser

In [14]:
df_out_ser= df_out.withColumn('метрики', F.to_json(F.struct('метрики')))

df_out_ser = df_out_ser.withColumn('value', prepare(F.col('type'), F.col('secundary_type'), F.col('метрики')))

In [16]:
# ASCII кодировка выведет ������,поскольку исходный json создан с пераметром "ensure_ascii=False"
df_out_ser.withColumn('view', F.decode(F.col('value'), 'US-ASCII')).select('view').show(1,truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [25]:
df_out_ser.select('value') \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka_servers) \
  .option("topic", topic) \
  .save()

## Read Stream

In [18]:
stream = spark.read.format("kafka") \
    .option("kafka.bootstrap.servers", kafka_servers) \
    .option("kafka.security.protocol", "PLAINTEXT") \
    .option("subscribe", topic) \
    .option("startingOffsets", "earliest") \
    .load()

In [19]:
last_ts = stream.selectExpr('max(timestamp)').collect()[0][0]
last_stream = stream.filter(F.col('timestamp') == last_ts)

In [20]:
last_stream.withColumn('view', F.decode(F.col('value'), 'US-ASCII')).select('view').show(1, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|view                                                                                                                                                                                                                                                                                                                                                     |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [24]:
last_stream.withColumn('view', F.decode(F.col('value'), 'UTF-8')).select('view').show(1, truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|view                                                                                                                                                                                                                                                                                                          |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"header": {"send_at": "2024-03-08 19:27:12Z"}, "body": {"Тип": "water", "доп. тип":

In [22]:
last_stream.selectExpr('cast(value as STRING)').show(1, truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                                                                         |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"header": {"send_at": "2024-03-08 19:27:12Z"}, "body": {"Тип": "water", "доп. тип":