In [2]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [None]:
! apt install ncat -y

In [None]:
! echo "that is some text file with content inside text file with text" > text.txt

In [3]:
import os

import pyspark.sql.functions as sf
import pyspark.sql.types as st
from pyspark.sql import SparkSession
from pyspark import SparkContext

In [4]:
os.environ['SPARK_HOME'] = '/opt/conda/lib/python3.8/site-packages/pyspark'
# os.environ["SPARK_CONF_DIR"] = "/opt/conda/lib/python3.8/site-packages/pyspark/conf"

In [None]:
! mkdir -p /opt/conda/lib/python3.8/site-packages/pyspark/conf

In [None]:
%%writefile /opt/conda/lib/python3.8/site-packages/pyspark/conf/log4j2.properties
appender.console.type = Console
appender.console.name = Console
appender.console.target = SYSTEM_OUT
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{HH:mm:ss.SSS} %-5p %c{1}: %m%n

rootLogger.level = INFO
rootLogger.appenderRef.console.ref = Console

## Чтение из сокета

In [None]:
spark = (
    SparkSession.builder
    .appName("Spark streaming")
    .master("local[*]")
    .enableHiveSupport()
    .getOrCreate()
)

In [None]:
spark

### Batch processing

In [None]:
df_raw = spark.read.format('text').load('file:///home/jovyan/notebooks/text.txt')

In [None]:
df_raw.show(20,0)

In [None]:
df_raw.printSchema()

In [None]:
df_split = df_raw.withColumn('splitted_words', sf.split('value', ' '))
df_splitted = df_split.withColumn('word', sf.explode(sf.col('splitted_words')))

In [None]:
df_splitted.show(10,0)

In [None]:
df_words = df_splitted.select('word')

In [None]:
df_wc = df_words.groupBy('word').agg(sf.count(sf.lit(1)).alias('count'))

In [None]:
df_wc.show(10,0)

### Complete режим

In [None]:
df_raw_stream = (
    spark.readStream
    .format('socket')
    .option("host", "localhost")
    .option("port", "9999")
    .load()
)

In [None]:
type(df_raw_stream)

In [None]:
df_raw_stream.printSchema()

In [None]:
df_raw_stream.head(10)

In [None]:
df_splitted_stream = df_raw_stream.withColumn('word', sf.explode(sf.split('value', ' '))).select('word')

In [None]:
df_wc_stream = df_splitted_stream.groupBy('word').agg(sf.count(sf.lit(1)).alias('count'))

In [None]:
# update, append

In [None]:
(
    df_wc_stream.writeStream.format('console')
    .outputMode('complete')
    .start()
    .awaitTermination()
)

In [None]:
spark.stop()

Давайте уменьшим repartition и посмотрим в UI

In [None]:
spark = (
    SparkSession.builder
    .appName("Spark streaming")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions", 4)
    .enableHiveSupport()
    .getOrCreate()
)

In [None]:
spark

In [None]:
df_raw_stream = (
    spark.readStream
    .format('socket')
    .option("host", "localhost")
    .option("port", "9999")
    .load()
)

df_splitted_stream = df_raw_stream.withColumn('word', sf.explode(sf.split('value', ' '))).select('word')

df_wc_stream = df_splitted_stream.groupBy('word').agg(sf.count(sf.lit(1)).alias('count'))

(
    df_wc_stream.writeStream.format('console')
    .outputMode('complete')
    .start()
    .awaitTermination()
)

### Update режим

In [None]:
df_raw_stream = (
    spark.readStream
    .format('socket')
    .option("host", "localhost")
    .option("port", "9999")
    .load()
)

df_splitted_stream = df_raw_stream.withColumn('word', sf.explode(sf.split('value', ' '))).select('word')

df_wc_stream = df_splitted_stream.groupBy('word').agg(sf.count(sf.lit(1)).alias('count'))

(
    df_wc_stream.writeStream.format('console')
    .outputMode('update')
    .start()
    .awaitTermination()
)

## Чтение из файла

### Научимся парсить JSON

In [None]:
device_df = (
    spark.read
    .format('json')
    .load('file:////home/jovyan/notebooks/data/input/device_01.json')
)

In [None]:
device_df.show(10,0)

In [None]:
# schema inference
device_df.printSchema()

In [None]:
exploded_df = device_df.withColumn("device", sf.explode('data.devices')).drop("data")
exploded_df.printSchema()

In [None]:
exploded_df.show(10,0)

In [None]:
flattened_df = (
    exploded_df
    .withColumn("deviceId", sf.col("device.deviceId"))
    .withColumn("measure", sf.col("device.measure"))
    .withColumn("status", sf.col("device.status"))
    .withColumn("temperature", sf.col("device.temperature"))
    .drop("device")
)
flattened_df.printSchema()

In [None]:
flattened_df.show(10,0)

In [None]:
spark.stop()

### Сделаем через стриминг

In [None]:
spark = (
    SparkSession.builder
    .appName("Spark streaming")
    .master("local[*]")
    .config("spark.streaming.stopGracefullyOnShutdown", True)
    .getOrCreate()
)

In [None]:
spark

In [None]:
spark.conf.set("spark.sql.streaming.schemaInference", True)

device_df = (
    spark.readStream
    .option("cleanSource", "archive")
    .option("sourceArchiveDir", "file:///home/jovyan/notebooks/data/archive_dir")
    .option("maxFilePerTrigger", 1)
    .format('json')
    .load('file:////home/jovyan/notebooks/data/input/')
)

In [None]:
device_df.printSchema()

In [None]:
exploded_df = device_df.withColumn("device", sf.explode('data.devices')).drop("data")
flattened_df = (
    exploded_df
    .withColumn("deviceId", sf.col("device.deviceId"))
    .withColumn("measure", sf.col("device.measure"))
    .withColumn("status", sf.col("device.status"))
    .withColumn("temperature", sf.col("device.temperature"))
    .drop("device")
)
flattened_df.printSchema()

In [None]:
(
    flattened_df.writeStream
    .format("console")
    .outputMode("update")
    .start()
    .awaitTermination()
)

### Запишем в файл

In [None]:
(
    flattened_df.writeStream
    .format("csv")
    .outputMode("append")
    .option("path", "file:///home/jovyan/notebooks/data/output/device_data.csv")
    .option("checkpointLocation", "file:///home/jovyan/notebooks/data/checkpoint_dir")
    .start()
    .awaitTermination()
)

In [None]:
spark.stop()

# Чтение из Kafka

In [5]:
spark = (
    SparkSession.builder
    .appName("Spark streaming with Kafka")
    .master("local[*]")
    .config("spark.streaming.stopGracefullyOnShutdown", True)
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3")
    .config("spark.sql.shuffle.partitions", 4)
    .getOrCreate()
)

In [6]:
spark

In [7]:
kafka_df = (
    spark.read
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:29092")
    .option("subscribe", "device-data-2")
    .option("startingOffsets", "earliest")
    .load()
)

In [8]:
kafka_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [9]:
kafka_df.show(10,0)

+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [10]:
from pyspark.sql.types import *
json_schema = (
    StructType(
    [StructField('customerId', StringType(), True), 
    StructField('data', StructType(
        [StructField('devices', 
             ArrayType(StructType([ 
                StructField('deviceId', StringType(), True), 
                StructField('measure', StringType(), True), 
                StructField('status', StringType(), True), 
                StructField('temperature', LongType(), True)
            ]), True), True)
        ]), True), 
    StructField('eventId', StringType(), True), 
    StructField('eventOffset', LongType(), True), 
    StructField('eventPublisher', StringType(), True), 
    StructField('eventTime', StringType(), True)
    ])
)

In [11]:
json_df = kafka_df.select(sf.expr('cast(value as string)'))
json_df.show(10,0)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                                                                                                                                                                                                      |
+-------------------------------------------------------------------------------------------------------------------------------------------

In [12]:
streaming_df = json_df.select(sf.from_json(sf.col("value"), json_schema).alias("json_values")).selectExpr("json_values.*")

In [13]:
streaming_df.show(1,0)

+----------+-------------------------------------------------------------------------+------------------------------------+-----------+--------------+--------------------------+
|customerId|data                                                                     |eventId                             |eventOffset|eventPublisher|eventTime                 |
+----------+-------------------------------------------------------------------------+------------------------------------+-----------+--------------+--------------------------+
|CI00101   |{[{D004, C, SUCCESS, 20}, {D004, C, SUCCESS, 1}, {D002, C, SUCCESS, 21}]}|1450324a-c546-4175-a6d8-ee58822e1d41|10038      |device        |2023-01-05 11:13:53.650313|
+----------+-------------------------------------------------------------------------+------------------------------------+-----------+--------------+--------------------------+



In [14]:
exploded_df = streaming_df.withColumn("device", sf.explode('data.devices')).drop("data")
exploded_df.printSchema()

root
 |-- customerId: string (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)
 |-- device: struct (nullable = true)
 |    |-- deviceId: string (nullable = true)
 |    |-- measure: string (nullable = true)
 |    |-- status: string (nullable = true)
 |    |-- temperature: long (nullable = true)



In [15]:
flattened_df = (
    exploded_df
    .withColumn("deviceId", sf.col("device.deviceId"))
    .withColumn("measure", sf.col("device.measure"))
    .withColumn("status", sf.col("device.status"))
    .withColumn("temperature", sf.col("device.temperature"))
    .drop("device")
)

In [16]:
flattened_df.show(10,0)

+----------+------------------------------------+-----------+--------------+--------------------------+--------+-------+-------+-----------+
|customerId|eventId                             |eventOffset|eventPublisher|eventTime                 |deviceId|measure|status |temperature|
+----------+------------------------------------+-----------+--------------+--------------------------+--------+-------+-------+-----------+
|CI00101   |1450324a-c546-4175-a6d8-ee58822e1d41|10038      |device        |2023-01-05 11:13:53.650313|D004    |C      |SUCCESS|20         |
|CI00101   |1450324a-c546-4175-a6d8-ee58822e1d41|10038      |device        |2023-01-05 11:13:53.650313|D004    |C      |SUCCESS|1          |
|CI00101   |1450324a-c546-4175-a6d8-ee58822e1d41|10038      |device        |2023-01-05 11:13:53.650313|D002    |C      |SUCCESS|21         |
+----------+------------------------------------+-----------+--------------+--------------------------+--------+-------+-------+-----------+



### Сделаем через стриминг

In [17]:
kafka_df_stream = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:29092")
    .option("subscribe", "device-data-2")
    .option("startingOffsets", "earliest")
    .load()
)

In [18]:
json_df_stream = kafka_df_stream.select(sf.expr('cast(value as string)'))

streaming_df_stream = json_df_stream.select(
    sf.from_json(sf.col("value"), json_schema).alias("json_values")
).selectExpr("json_values.*")

exploded_df_stream = streaming_df_stream.withColumn("device", sf.explode('data.devices')).drop("data")

flattened_df_stream = (
    exploded_df_stream
    .withColumn("deviceId", sf.col("device.deviceId"))
    .withColumn("measure", sf.col("device.measure"))
    .withColumn("status", sf.col("device.status"))
    .withColumn("temperature", sf.col("device.temperature"))
    .drop("device")
)

In [19]:
flattened_df_stream.printSchema()

root
 |-- customerId: string (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)
 |-- deviceId: string (nullable = true)
 |-- measure: string (nullable = true)
 |-- status: string (nullable = true)
 |-- temperature: long (nullable = true)



In [20]:
(
    flattened_df_stream.writeStream
    .format("console")
    .outputMode("append")
    .option()
    .option("checkpointLocation", "file:///home/jovyan/notebooks/data/checkpoint_dir")
    .start()
    .awaitTermination()
)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/conda/lib/python3.8/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 