In [None]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, FloatType, IntegerType
from pyspark.sql.functions import from_json, to_json, col, struct
from typing import Union
import subprocess


topic_name = 'lesson3' # название топика для кафки
checkpoints = '/tmp/checkpoints/stream_read_write' # место хранения чекпойнтов внутри hdfs


def console_output(df, freq:int=5):
    """Воводит содержимое потока spark в консоль
    :param df: количество
    :param freq: периодичность вывода. Кол-во секунд
    """
    res = df.writeStream \
        .format("console") \
        .trigger(processingTime=f'{freq} seconds') \
        .outputMode("complete") \
        .options(truncate=False) \
        .start()
    
    return res

def stop_all_streams(spark):
    """Останаваливает все стримы"""
    streams = spark.streams.active
    if not streams:
        print('Нет активных потоков')
    else:
        for active_stream in spark.streams.active:
            print(f'Stopping stream: {active_stream}')
            active_stream.stop()


def console_output(df, freq:int=5):
    """Воводит содержимое потока spark в консоль
    :param df: количество
    :param freq: периодичность вывода. Кол-во секунд
    """
    return df.writeStream \
        .format("console") \
        .trigger(processingTime=f'{freq} seconds') \
        .options(truncate=False) \
        .start()

class Spark_HDFS():
    SC = None # spark context
    FS = None # hdfs FileSystem in spark
    Path = None # функция получения java идентификаторов файлов в hdfs
    
    def __init__(self, spark):
        self.SC = spark.sparkContext
        self.Path = self.SC._jvm.org.apache.hadoop.fs.Path
        self.FS = (self.SC._jvm.org
                  .apache.hadoop
                  .fs.FileSystem
                  .get(self.SC._jsc.hadoopConfiguration()) )
    
    def ls(self, path:str, recursive:bool=True) -> None:
        """Показывает список файлов в директории HDFS по заданному пути
        :param path: путь к директории внутри HDFS
        :param recursive: удалить рекурсивно
        :return: Ture (если успешно) и False (если проблема)
        """
        status = self.FS.listStatus(self.Path(path))
        for fileStatus in status:
            print(fileStatus.getPath())
    
    def rm(self, path:str, recursive:bool=True) -> bool:
        """Удаляет файл в HDFS по заданному пути
        :param path: путь к файлу внутри HDFS
        :param recursive: удалить рекурсивно
        :return: Ture (если успешно) и False (если проблема)
        """
        res = self.FS.delete(self.SC._jvm.org.apache.hadoop.fs.Path(path), True)
        msg = f'Файл успешно удалён: {path}' if res else f'Ну удалось удалить файл: {path}'
        return res

    def put(self, local_file:str, hdfs_file:str, delSrc:bool=False, overwrite:bool=True) -> bool:
        """Копирует локальный файл в HDFS
        :param local_file: путь к локальному файлу 
        :param hdfs_file: путь к файлу внутри HDFS
        :param delSrc: удалить файл-источник после успешной передачи
        :param overwrite: перезаписать файл, если он уже существует
        :return: Ture (если успешно) и False (если проблема)
        """ 
        res = True
        try:
            self.FS.copyFromLocalFile(delSrc, 
                                 overwrite, 
                                 self.Path(local_file), 
                                 self.Path(hdfs_file)
                                )
        except Exception as e:
            print(e)
            res = False

        msg = f'Файл успешно скопирован: {local_file}' if res else f'Ну удалось скопировать файл: {local_file}'
        print(msg)
        return res
    

class Kafka():
    HOMEDIR = '/home/hsk/kafka/bin'  # путь к папке с исполняемыми файлами Кафки: kafka-topics.sh...
    SERVERS = 'localhost:9092'  # адреса серверов с портами через запятую
    
    def __init__(self, servers:str='', homedir:str=''):
        if servers:
            self.SERVERS = servers
        if homedir:
            self.HOMEDIR = homedir

    def _decode_bash_output(self, output:bytes) -> list:
        """Преобразует вывод bash команд в список
        :param name: название топика
        :return: список со строками, которые вывелись в bash
        """
        return list(filter(len, output.decode('utf-8').split('\n')))

    def topic_create(self, name:Union[str, list], retention:float=120) -> bool:
        """Создаёт топик в kafka
        :param name: название топика
        :param retention: время жизни информации в топике, секунды
        :return: Ture (если успешно) и False (если проблема)
        """
        retention *= 1000
        retention = int(retention)
        
        names = name
        if isinstance(name, str):
            names = [name]
        
        result = True
        for name in names:
            if name in self.topic_list():
                print(f'Топик уже существует: {name}')
                continue

            bashCommand = f'{self.HOMEDIR}/kafka-topics.sh --bootstrap-server {self.SERVERS} ' +\
                          f'--create --topic {name} --replication-factor 1 --partitions 1 ' +\
                          f'--config retention.ms={retention}'
            
            process = subprocess.Popen(bashCommand.split(), stdout=subprocess.PIPE)
            output, error = process.communicate()
            output = self._decode_bash_output(output)

            if name not in self.topic_list():
                if output:
                    print(output)
                print(f'Не удалось создать топик: {name}')
                result = False
            else:
                print(f'Топик успешно создан: {name}')
            
            if error:
                if output:
                    print(output)
                result = False

        return result

    def topic_list(self) -> list:
        """Список топиков в kafka
        :return: список
        """
        bashCommand = f'{self.HOMEDIR}/kafka-topics.sh --bootstrap-server {self.SERVERS} --list'

        process = subprocess.Popen(bashCommand.split(), stdout=subprocess.PIPE)
        output, error = process.communicate()
        output = self._decode_bash_output(output)
        if error or not len(output):
            return []

        return output

    def topic_delete(self, name:Union[str, list]) -> bool:
        """Удаляет топик в kafka
        :param name: название топика или список из названий
        :return: Ture - если успешно созданы все топики
                 False - если была ошибка хотя бы с одним
                        (остуствие топика не считается ошибкой)
        """
        names = name
        if isinstance(name, str):
            names = [name]
            
        result = True
        for name in names:
            if name not in self.topic_list():
                print(f'Такого топика не существует: {name}')
                continue

            bashCommand = f'{self.HOMEDIR}/kafka-topics.sh --bootstrap-server {self.SERVERS} ' +\
                          f'--delete --topic {name}'
            
            process = subprocess.Popen(bashCommand.split(), stdout=subprocess.PIPE)
            output, error = process.communicate()
            output = self._decode_bash_output(output)

            if name in self.topic_list():
                if output:
                    print(output)
                print(f'Не удалось удалить топик: {name}')
                result = False
            else:
                print(f'Топик успешно удален: {name}')

            if error:
                if output:
                    print(output)
                result = False

        return result
    
    def topic_read(self, name:Union[str, list], from_beginning:bool=False, timeout:float=1) -> bool:
        """Читает все текущие сообщения в топике kafka. После окончания не продолжает следить за потоком
        :param name: название топика или список из названий
        :return: Ture - если успешно созданы все топики
                 False - если была ошибка хотя бы с одним
                        (остуствие топика не считается ошибкой)
        """
        timeout *= 1000
        timeout = int(timeout)
        if name not in self.topic_list():
            print(f'Такого топика не существует: {name}')

        bashCommand = f'{self.HOMEDIR}/kafka-console-consumer.sh --bootstrap-server {self.SERVERS} ' +\
                      f'--topic {name} --timeout-ms {timeout}  2>/dev/null'

        if from_beginning:
            bashCommand += ' --from-beginning'
        
        process = subprocess.Popen(bashCommand.split(), stdout=subprocess.PIPE)
        output, error = process.communicate()
        output = self._decode_bash_output(output)
        print(output)
    
    
spark = SparkSession.builder.appName("my_spark").getOrCreate()
hdfs = Spark_HDFS(spark)
kf = Kafka()

In [None]:
# копируем файл в hdfs с локального диска (для тестов один!)
hdfs.put("db.csv", "/data/db.csv")

In [None]:
hdfs.ls('/data/')

In [None]:
# читаем из локальной системы
df = spark.read.csv(path="/data/db.csv", sep=",", header=True)

df.printSchema()

In [None]:
df.show(n=3)

In [None]:
# разделяем выйл на партифии, чтобы считать всё
df.repartition(2).write.csv(path="/data/db_re", sep=",", header=True, mode="overwrite")

In [None]:
schema = StructType() \
    .add("column_1", StringType()) \
    .add("column_2", IntegerType())

raw_files = spark \
    .readStream \
    .format("csv") \
    .schema(schema) \
    .options(path="/data/db_re",
             header=True,
             maxFilesPerTrigger=1) \
    .load()

In [None]:
out = console_output(raw_files, 5)

In [None]:
stop_all_streams(spark)

In [None]:
out.stop()

In [None]:
kf.topic_create(name=topic_name)

In [None]:
kf.topic_list()

In [None]:
# Удалим чекпоинты, чтобы начать всё заново
hdfs.rm(checkpoints)

# https://stackoverflow.com/questions/44584476/do-we-need-to-checkpoint-both-readstream-and-writestream-of-kafka-in-spark-struc
# https://sparkbyexamples.com/spark/spark-streaming-with-kafka/

# Добавляем столбец id с уникальным значением. Оно будет ключем для кафки. Передаём стрим напрямую в кафку. 
# Тут надо обратить внимание, что используется одна опция checkpointLocation
# Ещё важен порядок методов - надо ставить withColumn и другие запросы после option и csv.
raw = spark \
    .readStream \
    .format("csv") \
    .schema(schema) \
    .option("header", True) \
    .option("maxFilesPerTrigger", 1) \
    .csv("/data/db_re") \
    .withColumn('id', F.concat(F.unix_timestamp(), F.rand())) \
    .withColumn('value', to_json(struct(schema.names))) \
    .selectExpr("id as key", "value as value") \
    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .writeStream \
    .format("kafka") \
    .outputMode("append") \
    .option("kafka.bootstrap.servers", kf.SERVERS) \
    .option("topic", topic_name) \
    .option("checkpointLocation", checkpoints) \
    .start()

In [None]:
kf.topic_read(name='lesson3', from_beginning=True)

In [None]:
# Остановим все потоки
stop_all_streams(spark)

In [None]:
# Удаляем топик
kf.topic_delete(name=['lesson3'])