In [1]:
from pyspark.sql import SparkSession 
from pyspark.sql.functions import col, udf, split, mean
from pyspark.sql import Row, DataFrame

from pyspark.sql.types import *
from pyspark.sql.functions import from_json, udf
import pandas as pd
from ast import literal_eval
import re

import pandas as pd
import requests

def hpml1(df):
    df1 = df.withColumn('area', col('area').cast('double')) \
        .withColumn('price', col('price').cast('double')) \
        .withColumn('bedroom', col('bedroom').cast('double'))
        
    return df1

def hpml2(df):
    df1=df.filter("area>0").filter("price>0").filter("bedroom>0").filter("area<500").filter("price<20")
    return df1

def extract_data():
    packages = ["org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.2",
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2"]
    packages = ",".join(packages)

    master = "spark://spark-master:7077"
    # master = "spark://spark-master:7077"
    appName = "sparkTest"

    sparkSs = SparkSession.builder \
                        .master(master) \
                        .appName(appName) \
                        .config("spark.jars.packages", packages) \
                        .config("spark.executor.memory", "512m") \
                        .config("spark.executor.core", "1") \
                        .getOrCreate()
    
    return sparkSs

def transform_data(sparkSs):
    schema = StructType() \
        .add("name", StringType()) \
        .add("address", StringType()) \
        .add("city", StringType()) \
        .add("district", StringType()) \
        .add("area", StringType()) \
        .add("bedroom", StringType()) \
        .add("investor", StringType()) \
        .add("price", StringType()) 

    df = sparkSs \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:9093, kafka1:9093") \
        .option("subscribe", "post") \
        .option("startingOffsets", "earliest") \
        .option("failOnDataLoss", "false") \
        .load()\
        .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
    
    df1 = df.select("parsed_value.*")
    df2 = hpml1(df1)
    df3 = hpml2(df2)
   
    return df3
    
def load_data(df):
    df.writeStream \
        .format("console") \
        .outputMode("append") \
        .start() \
        .awaitTermination()

In [2]:
spark = extract_data()
spark

:: loading settings :: url = jar:file:/usr/bin/spark-3.1.2-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-streaming-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-761a08f6-6a01-4be0-b21b-efadbbd68a64;1.0
	confs: [default]
	found org.apache.spark#spark-streaming-kafka-0-10_2.12;3.1.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.2 in central
	found org.apache.kafka#kafka-clients;2.6.0 in central
	found com.github.luben#zstd-jni;1.4.8-1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.2 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
downloading https://repo1.maven.org/maven2/org/apache/spark/sp

In [3]:
data = transform_data(spark)

In [4]:
data.printSchema()

root
 |-- name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- district: string (nullable = true)
 |-- area: double (nullable = true)
 |-- bedroom: double (nullable = true)
 |-- investor: string (nullable = true)
 |-- price: double (nullable = true)



In [5]:
from pyspark.sql.functions import udf, struct, to_json

data.select(to_json(struct([data[x] for x in data.columns])).alias("value")).select("value") \
    .writeStream \
    .outputMode('append') \
    .format('kafka') \
    .option("kafka.bootstrap.servers", "kafka:9093,kafka1:9093") \
    .option('topic', 'Preprocessing_Post') \
    .option('checkpointLocation', '/opt/workspace/tmp/kafka_checkpoint') \
    .start()

<pyspark.sql.streaming.StreamingQuery at 0x7f520034b5e0>

22/01/03 03:37:10 WARN KafkaOffsetReaderConsumer: Found incorrect offsets in some partitions (partition, previous offset, fetched offset): ArrayBuffer((post-0,344,20))
22/01/03 03:37:10 WARN KafkaOffsetReaderConsumer: Retrying to fetch latest offsets because of incorrect offsets
22/01/03 03:37:11 WARN KafkaOffsetReaderConsumer: Found incorrect offsets in some partitions (partition, previous offset, fetched offset): ArrayBuffer((post-0,344,20))
22/01/03 03:37:11 WARN KafkaOffsetReaderConsumer: Retrying to fetch latest offsets because of incorrect offsets
22/01/03 03:37:12 WARN KafkaOffsetReaderConsumer: Found incorrect offsets in some partitions (partition, previous offset, fetched offset): ArrayBuffer((post-0,344,20))
22/01/03 03:37:13 WARN KafkaMicroBatchStream: Partition post-0's offset was changed from 344 to 20, some data may have been missed. 
Some data may have been lost because they are not available in Kafka any more; either the
 data was aged out by Kafka or the topic may have