In [None]:

bucket= "output-bucket-default-7judoaym"
import pyspark

In [None]:

patients = spark.read.parquet("s3a://%s/patients/_raw" %(bucket))

print(patients.count())
patients.printSchema()


In [None]:

from pyspark.sql import functions as f
from pyspark.sql import types as t
from pyspark.sql import Window as w
from pyspark.sql.functions import udf

def ascii_ignore(x):
    return x.encode('ascii', 'ignore').decode('ascii')

ascii_udf = udf(ascii_ignore)

def trim_to_null(c):
  return (
    f.lower(
      f.when(f.trim(f.col(c)) == '', None)
      .when(f.trim(f.col(c)) == 'null', None)
      .otherwise(f.trim(f.col(c)))
    )
  )

keep_cols = ['id', 'birth_date', 'first_name', 'surname', 'social_sec_num','suffix']

people_bronze = patients.select(*keep_cols)
people_bronze = people_bronze.withColumn("first_name", ascii_udf('first_name'))
people_bronze = people_bronze.withColumn("last_name", ascii_udf('surname'))
people_bronze = people_bronze.withColumn("suffix", f.lower(trim_to_null("suffix")))
people_bronze = people_bronze.drop(people_bronze.surname)
people_bronze.printSchema()
people_bronze.write.format("parquet").mode("overwrite").save("s3a://%s/patients/_bronze" %(bucket))

In [None]:
people_bronze.write.mode("overwrite").saveAsTable("bronze_patients")

In [None]:

distinct_id = spark.sql("SELECT distinct id FROM bronze_patients").count()
print(distinct_id)

distinct_all = spark.sql("SELECT id, first_name, last_name, birth_date, social_sec_num FROM bronze_patients").count()
print(distinct_all)

distinct_ssn = spark.sql("SELECT distinct social_sec_num FROM bronze_patients").count()
print(distinct_ssn)

In [None]:
clean = spark.read.parquet("s3a://%s/patients/_clean" %(bucket))

print(clean.count())
clean.printSchema()

In [None]:
clean.write.mode("overwrite").saveAsTable("patients_clean")


In [None]:
import pyspark.sql.functions as F

df = spark.sql("select * from patients_clean order by social_sec_num")
df.printSchema()

json_cols = ['id','first_name', 'last_name', 'birth_date', 'suffix']
stream_df = df.withColumn('json_col', F.to_json(F.struct(*[F.col(c) for c in json_cols]), options={"ignoreNullFields":False}))
stream_df.show(truncate=False)

In [None]:
stream_df.selectExpr("CAST(id AS STRING) as key", "CAST(json_col AS STRING) as value") \
    .write.format("kafka") \
    .option("kafka.bootstrap.servers", "pkc-419q3.us-east4.gcp.confluent.cloud:9092") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.ssl.endpoint.identification.algorithm", "https") \
    .option("kafka.sasl.jaas.config", """org.apache.kafka.common.security.plain.PlainLoginModule required username="4LLSVZ3RHEGUNCOZ" password="nEQUIVk+xZFF5S6FLFgPwk6+1TCzkuEEf2tE0Y5pUrJnqnu+yy6+okwQhmF/H/2c";""") \
    .option("topic", "patients") \
    .save()