In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

conf = SparkConf()
 
conf.setMaster("k8s://https://kubernetes.default:443")

conf.setAppName("Spark minIO Test")
conf.set("spark.hadoop.fs.s3a.endpoint", "http://minio-minio-storage:9000")
conf.set("spark.hadoop.fs.s3a.access.key", "silveira")
conf.set("spark.hadoop.fs.s3a.secret.key", "guilherme@123")
conf.set("spark.hadoop.fs.s3a.path.style.access", True)
conf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
conf.set('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')
conf.set("spark.kubernetes.container.image", "guisilveira/spark-base") 
conf.set("spark.kubernetes.container.image.pullPolicy", "Always")
conf.set("spark.kubernetes.authenticate.caCertFile", "/run/secrets/kubernetes.io/serviceaccount/ca.crt")
conf.set("spark.kubernetes.authenticate.oauthTokenFile", "/run/secrets/kubernetes.io/serviceaccount/token")
conf.set("spark.kubernetes.namespace", "bigdata")
conf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "jupyterhub")
conf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
conf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") 
conf.set("spark.executor.instances", "2")
conf.set("spark.executor.memory", "2g")
conf.set("spark.driver.bindAddress", "0.0.0.0")
conf.set("spark.driver.host", "jupyterhub") 
conf.set("spark.driver.port", 7078)
conf.set("spark.driver.blockManager.port", 7079)
conf.set("hive.metastore.uris", "thrift://hive:9083")

spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()

# Landing Layer

In [None]:
df_titles = spark.read.option('inferschema', 'true').option('header', 'true').csv('s3a://bronze/titles')
df_credits = spark.read.option('inferschema', 'true').option('header', 'true').csv('s3a://bronze/credits')

In [None]:
df_titles.write.format('delta').save('s3a://silver/titles')
df_credits.write.format('delta').save('s3a://silver/credits')

# Processing Layer

In [None]:
df_titles_delta = spark.read.format('delta').load('s3a://silver/titles')
df_credits_delta = spark.read.format('delta').load('s3a://silver/credits')

In [None]:
df_titles_delta.createOrReplaceTempView('df_titles')
df_credits_delta.createOrReplaceTempView('df_credits')

In [None]:
df_final = spark.sql('''
    SELECT credits.name, COUNT(title) AS total 
    FROM df_titles AS titles 
    INNER JOIN df_credits AS credits 
    ON titles.id = credits.id 
    WHERE 
      titles.description LIKE '%war%' 
      AND
      titles.type = 'MOVIE'
    GROUP BY credits.name
    ORDER BY total DESC
''')

In [None]:
df_final.write.format('delta').save('s3a://gold/war_movies_participation')

# Kafka

In [None]:
# Subscribe to 1 topic
kafka_df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka-kafka-bootstrap:9092") \
  .option("subscribe", "spark_input") \
  .load()

In [None]:
schema = StructType() \
      .add("name", StringType()) \
      .add("idade", IntegerType())

In [None]:
df_person_string = kafka_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df_person = df_person_string.select(from_json(col("value"), schema).alias("data")).select("data.*")

In [None]:
df_person_final = df_person.select('name', 'idade').where('idade >= 18')

In [None]:
ds = df_person_final \
    .writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/kafka_demo_delta") \
    .start('s3a://gold/kafka_demo')

In [None]:
ds_kafka = df_person_final \
  .selectExpr("to_json(struct(*)) AS value") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka-kafka-bootstrap:9092") \
  .option("checkpointLocation", "/tmp/kafka_demo") \
  .option("topic", "spark_output") \
  .start()

In [None]:
df_check_kafka = spark.read.format('delta').load('s3a://gold/kafka_demo')
df_check_kafka.show()

# Hive Metastore Tables

In [None]:
df_metastore = spark.read.table('gold.war_movies_participation')
df_metastore.show()