In [2]:
from pyspark.sql import SparkSession

# you need these two to transform the json strings to dataframes
from pyspark.sql.types import MapType,StringType
from pyspark.sql.functions import from_json, current_timestamp


# Spark session & context
spark = (SparkSession
         .builder
         .master('local')
         .appName('kafka-postgres-streaming')     
         # Add kafka package and postgres package. Make sure to to this as one string!
         # Versions need to match the Spark version (trial & error)
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5,org.postgresql:postgresql:42.3.1")
         # Postgres config including the username and password from compose file
         .config("spark.postgres.input.uri","jdbc:postgresql://postgres_ins:5432/postgresdb.autoclaims_docs?user=postgres&password=data")
         .config("spark.postgres.output.uri","jdbc:postgresql://postgres_ins:5432/postgresdb.autoclaims_docs?user=postgres&password=data")
         .config('spark.sql.session.timeZone', "America/New_York")
         .getOrCreate())
sc = spark.sparkContext


In [3]:
# Read the message from the kafka stream
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:9092") \
  .option("subscribe", "auto-claims-ingestion-topic") \
  .load()

# convert the binary values to string
df1 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [4]:
#Create a temporary view for SparkSQL
df1.createOrReplaceTempView("message")

In [5]:
# Write out the message to the console of the environment
res = spark.sql("SELECT * from message")
res.writeStream.format("console") \
            .outputMode("append") \
            .start()

<pyspark.sql.streaming.StreamingQuery at 0x7f1af949a7d0>

In [6]:
# Write the unvonverted dataframe (no strings)
# message back into Kafka in another topic#
# listen to it with a local consumer
ds = df \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:9092") \
  .option("topic", "spark-output") \
  .option("checkpointLocation", "/tmp") \
  .start() 

In [11]:
from pyspark.sql.functions import from_json, current_timestamp

# Write the message into Postgres
def foreach_batch_function(df, epoch_id):
    # Transform and write batchDF in this foreach
    # writes the dataframe with complete kafka message into postgres
 
    db_target_url = "jdbc:postgresql://postgres_ins:5432/postgresdb"#public.autoclaimsdocs?user=postgres&password=data"#"jdbc:postgresql:postgresdb"
    table = "autoclaims_docs" #public.autoclaims_docs
    db_target_properties =  {"driver": 'org.postgresql.Driver',"user":"postgres", "password":"data"}

    
    #Transform the values of all rows in column value and create a dataframe out of it (will also only have one row)
    df2=df.withColumn("value",from_json(df.value,MapType(StringType(),StringType())))  
   
    # Transform the dataframe so that it will have individual columns 
    df3 = df2.select(["value.months_as_customer","value.age","value.policy_number","value.policy_bind_date", \
    "value.policy_state","value.policy_csl","value.policy_deductable","value.policy_annual_premium", \
    "value.umbrella_limit","value.insured_zip","value.insured_sex","value.insured_education_level", \
    "value.insured_occupation","value.insured_hobbies","value.insured_relationship","value.capital_gains", \
    "value.capital_loss","value.incident_date","value.incident_type","value.collision_type",  \
    "value.incident_severity","value.authorities_contacted","value.incident_state","value.incident_city",  \
    "value.incident_location","value.incident_hour_of_the_day","value.number_of_vehicles_involved","value.property_damage", \
    "value.bodily_injuries","value.witnesses","value.police_report_available","value.total_claim_amount", \
    "value.injury_claim","value.property_claim", "value. vehicle_claim","value.auto_make", "value.auto_model", \
    "value.auto_year","value.fraud_reported","value.c39"])
    
    ## add load datetime column to df3
    df3 = df3.withColumn("load_datetimestamp", current_timestamp())##convert in Postgres
    
    # Send the dataframe into Postgres which will create a JSON document out of it
    db_target_url = "jdbc:postgresql://postgres_ins:5432/postgresdb"
    table = "autoclaims_docs" #public.autoclaims_docs
    db_target_properties =  {"driver": 'org.postgresql.Driver',"user":"postgres", "password":"data"}
    
    df3.write.jdbc(url=db_target_url, table=table, properties=db_target_properties, mode="append")#.mode("append")
    #df3.show()
    pass

In [12]:
# Start the Postgres stream and wait for termination
#df1.writeStream.foreachBatch(foreach_batch_function).start().awaitTermination()
df1.writeStream.foreachBatch(foreach_batch_function).start().awaitTermination()

+------------------+---+-------------+----------------+------------+----------+-----------------+---------------------+--------------+-----------+-----------+-----------------------+------------------+---------------+--------------------+-------------+------------+-------------+--------------------+--------------+-----------------+---------------------+--------------+-------------+-----------------+------------------------+---------------------------+---------------+---------------+---------+-----------------------+------------------+------------+--------------+--------------+---------+----------+---------+--------------+----+--------------------+
|months_as_customer|age|policy_number|policy_bind_date|policy_state|policy_csl|policy_deductable|policy_annual_premium|umbrella_limit|insured_zip|insured_sex|insured_education_level|insured_occupation|insured_hobbies|insured_relationship|capital_gains|capital_loss|incident_date|       incident_type|collision_type|incident_severity|authorities

KeyboardInterrupt: 