In [None]:
from pyspark.sql.types import DoubleType
from pyspark.ml import Pipeline, PipelineModel
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StringType, IntegerType, TimestampType
from pyspark.sql import functions as F
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import OneHotEncoderEstimator, VectorAssembler, CountVectorizer, StringIndexer, IndexToString
kafka_brokers = "bigdataanalytics-worker-1.novalocal:6667"  

raw_orders = spark.readStream. \
    format("kafka"). \
    option("kafka.bootstrap.servers", kafka_brokers). \
    option("subscribe", "lessonpro"). \
    option("maxOffsetsPerTrigger", "5"). \
    option("startingOffsets", "earliest"). \
    load()

schema = StructType() \
    .add("number", IntegerType()) \
    .add("housing_median_age", DoubleType()) \
    .add("total_rooms", DoubleType()) \
    .add("total_bedrooms",DoubleType()) \
    .add("median_income", DoubleType()) \
    .add("ocean_proximity", StringType())

value_sales = raw_orders.select(F.from_json(F.col("value").cast("String"), schema).alias("value"), "offset")
sales_flat = value_sales.select(F.col("value.*"), "offset")

def console_output(df, freq):
    return df.writeStream \
        .format("console") \
        .trigger(processingTime='%s seconds' % freq ) \
        .options(truncate=True) \
        .start()

s = console_output(sales_flat, 5)
s.stop()

#подгружаем ML из HDFS
pipeline_model = PipelineModel.load("my_LR_model8")


def writer_logic(df, epoch_id):
    df.persist()
    print("---------I've got new batch--------")
    print("This is what I've got from Kafka:")
    df.show()
    features_from_kafka = df
    print("Here is the sums from Kafka:")
    features_from_kafka.show()
    cassandra_kafka_aggregation = features_from_kafka
    cassandra_kafka_aggregation.show()
    predict = pipeline_model.transform(cassandra_kafka_aggregation)
    predict_short = predict.select("number","housing_median_age",
                                   "total_rooms","total_bedrooms","median_income",
                                   "ocean_proximity", F.col("prediction").alias("median_house_value"))
    print("Here is what I've got after model transformation:")
    predict_short.show()
    #обновляем исторический агрегат в касандре
    predict_short.write \
        .format("org.apache.spark.sql.cassandra") \
        .options(table="data_test_pytrain", keyspace="lessoneight") \
        .mode("append") \
        .save()
    print("I saved the prediction and aggregation in Cassandra. Continue...")
    df.unpersist()


#связываем источник Кафки и foreachBatch функцию
stream = sales_flat \
    .writeStream \
    .trigger(processingTime='100 seconds') \
    .foreachBatch(writer_logic) \
    .option("checkpointLocation", "data_test_pytrain_checkpoint")

#поехали
s = stream.start()
s.stop()
