# Reto 9: Nifi + Kafka + PySpark Notebook + BD

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import *

## PostgreSQL

### Definición de dataframe tabla 'simpsons'

In [2]:
spark_postgresql = SparkSession \
    .builder \
    .config("spark.jars", "/usr/local/postgresql-42.2.25.jar") \
    .master("local") \
    .appName("PySpark_Postgres_test") \
    .getOrCreate()

In [3]:
df_simpsons = spark_postgresql.read.format("jdbc").option("url", "jdbc:postgresql://docker_test-postgres-1:5432/simpsons") \
.option("driver", "org.postgresql.Driver").option("dbtable", "simpsons") \
.option("user", "root").option("password", "1234").load()

In [4]:
df_simpsons.printSchema()

root
 |-- quote: string (nullable = true)
 |-- character: string (nullable = true)
 |-- image: string (nullable = true)
 |-- characterDirection: string (nullable = true)



In [6]:
df_simpsons.show()

+-----+---------+-----+------------------+
|quote|character|image|characterDirection|
+-----+---------+-----+------------------+
+-----+---------+-----+------------------+



## MongoDB

### Definición de dataframes colección 'quotes'

In [7]:
 spark_mongodb = SparkSession \
    .builder \
    .appName("PySpark_MongoDB_test") \
    .master('local')\
    .config("spark.driver.memory", "40g") \
    .config("spark.mongodb.input.uri", "mongodb://root:1234@mongo:27017/simpsons.quotes?authSource=admin") \
    .config("spark.mongodb.output.uri", "mongodb://root:1234@mongo:27017/simpsons.quotes?authSource=admin") \
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.4.2') \
    .getOrCreate()

In [8]:
 df_quotes = spark_mongodb.read\
    .format('com.mongodb.spark.sql.DefaultSource')\
    .option('uri', "mongodb://root:1234@mongo:27017/simpsons.quotes?authSource=admin") \
    .load()

In [9]:
df_quotes.show()

++
||
++
++



# Kafka

In [10]:
spark_kafka = SparkSession \
        .builder \
        .appName("test") \
        .config("spark.sql.debug.maxToStringFields", "100") \
        .config("spark.jars.packages",  "org.apache.spark:spark-sql-kafka-0-10_2.11:4.2.5") \
        .getOrCreate()

In [12]:
kafka_df = spark_kafka.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "docker_test-kafka-1:29092") \
    .option("failOnDataLoss", "false") \
    .option("subscribe", "simpsons-quotes") \
    .option("startingOffsets", "earliest") \
    .load()

In [13]:
kafka_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [14]:
# Save message value decoded as stream in new dataframe
string_df = kafka_df.selectExpr("CAST(value AS STRING)")
string_df.printSchema()

root
 |-- value: string (nullable = true)



In [15]:
# Save string dataframe in new dataframe with DB schema

schema = StructType([StructField('quote', StringType()), \
                     StructField('character',StringType()), \
                     StructField('image',StringType()), \
                     StructField('characterDirection',StringType())])

json_df = string_df.withColumn("data",from_json(col("value"), schema)).select("data.*")
json_df.printSchema()

root
 |-- quote: string (nullable = true)
 |-- character: string (nullable = true)
 |-- image: string (nullable = true)
 |-- characterDirection: string (nullable = true)



In [16]:
# Write to postgres and mongodb from topic
def write_to_postgres_and_mongo(df, batch_id):
        mode="append"
        url_postgres = "jdbc:postgresql://docker_test-postgres-1:5432/simpsons"
        uri_mongo = "mongodb://root:1234@mongo:27017/simpsons.quotes?authSource=admin"
        properties = {"user": "root", "password": "1234", "driver": "org.postgresql.Driver"}
        # Filter on character name
        filtered_df = df.filter(df['character']=="Homer Simpson")
        # Insert into PostgreSQL
        filtered_df.write.jdbc(url=url_postgres, table="public.simpsons", mode=mode, properties=properties)
        # Insert into MongoDB
        filtered_df.write\
        .format('com.mongodb.spark.sql.DefaultSource')\
        .mode('append')\
        .option("spark.mongodb.output.uri", uri_mongo)\
        .save()
        
json_df.writeStream \
    .format("console") \
    .foreachBatch(write_to_postgres_and_mongo) \
    .start()

<pyspark.sql.streaming.StreamingQuery at 0x7f58369c2c90>