In [0]:
%fs ls s3://s3-belisco-turma-6-production-data-lake-raw

# Aula de Spark

In [0]:
%fs ls s3://s3-belisco-turma-6-production-data-lake-raw

# Batch

In [0]:
df = spark.read.format('json').load('s3://s3-belisco-turma-6-production-data-lake-raw/atomic_events/')
display(df)

In [0]:
df.createOrReplaceTempView('atomic_events')

In [0]:
%sql SELECT 
  date(event_timestamp) as event_date, 
  utm_source, 
  count(1) 
  FROM atomic_events 
  group by 1, 2
  order by 1, 2

In [0]:
from pyspark.sql import functions as F

df = df.withColumn('event_date', F.to_date(F.col('event_timestamp')))
df.coalesce(1).write.format('parquet').mode('overwrite').partitionBy('event_date').save('s3://s3-belisco-turma-6-production-data-lake-processed/atomic_events_batch/')

In [0]:
df = spark.read.format('parquet').load('s3://s3-belisco-turma-6-production-data-lake-processed/atomic_events_batch/')
display(df)

# Streaming

In [0]:
data_schema = spark.read.format('json').load('s3://s3-belisco-turma-6-production-data-lake-raw/atomic_events/').schema
data_schema

In [0]:
df = spark.readStream.format('json').schema(data_schema).load('s3://s3-belisco-turma-6-production-data-lake-raw/atomic_events/')


In [0]:
display(df)

In [0]:
from pyspark.sql import functions as F

df = df.withColumn('etl_timestamp', F.current_timestamp())
df = df.withColumn('event_date', F.to_date(F.col('event_timestamp')))
df = df.withColumn('source_filename', F.input_file_name())

df.createOrReplaceTempView('atomic_events_stream')

In [0]:
%sql SELECT source_filename, count(1) from atomic_events_stream group by 1

In [0]:
%sql SELECT count(1) from atomic_events_stream -- 36620


In [0]:
df.writeStream.format('delta').partitionBy('event_date').option("checkpointLocation", "dbfs:/checkpoints/atomic_events_2").start('s3://s3-belisco-turma-6-production-data-lake-processed/atomic_events_streaming/')

In [0]:
# Roda uma vez só e desliga o cluster
df.writeStream.format('delta').partitionBy('event_date').trigger(once=True).option("checkpointLocation", "dbfs:/checkpoints/atomic_events_trigger_once2").start('s3://s3-belisco-turma-6-production-data-lake-processed/atomic_events_trigger_once/')

In [0]:
df2 = spark.read.format('delta').load('s3://s3-belisco-turma-6-production-data-lake-processed/atomic_events_trigger_once/')
df2.createOrReplaceTempView('atomic_events_stream_trigger_once')

In [0]:
%sql SELECT landing_date, count(1) from atomic_events_stream_trigger_once group by 1

# Ficou faltando

* Merge
* O que é Delta
* Airflow

In [0]:
df_orders = spark.read.format('parquet').load('s3://s3-belisco-turma-6-production-data-lake-raw/orders/public/orders')
df_orders.createOrReplaceTempView('orders')


In [0]:
%sql select * from orders where order_id = '4bd42b9d-5fdc-4a89-b87f-2c9ec388b29c'

In [0]:
from pyspark.sql.window import Window

from pyspark.sql import functions as F
display(df_orders.withColumn('row_rank', F.row_number().over(Window.partitionBy(F.col('order_id')).orderBy(F.col('extracted_at').desc()))).filter(F.col('order_id') == '4bd42b9d-5fdc-4a89-b87f-2c9ec388b29c'))

In [0]:
data_schema = spark.read.format('parquet').load('s3://s3-belisco-turma-6-production-data-lake-raw/orders/public/orders').schema

df = spark.read.format('parquet').schema(data_schema).load('s3://s3-belisco-turma-6-production-data-lake-raw/orders/public/orders')
df = df.withColumn('etl_timestamp', F.current_timestamp())
df = df.withColumn('created_date', F.to_date(F.col('created_at')))
df = df.withColumn('source_filename', F.input_file_name())

df.limit(1).write.format('delta').partitionBy('created_date').save('s3://s3-belisco-turma-6-production-data-lake-processed/orders')


In [0]:
display(spark.read.format('delta').load("s3://s3-belisco-turma-6-production-data-lake-processed/orders"))

In [0]:
from delta.tables import DeltaTable
from pyspark.sql.window import Window

from pyspark.sql import functions as F

deltaTable = DeltaTable.forPath(spark, path='s3://s3-belisco-turma-6-production-data-lake-processed/orders')

def _upsert(microbatch, batch_id):
  microbatch = microbatch.withColumn('row_rank', F.row_number().over(Window.partitionBy(F.col('order_id')).orderBy(F.col('extracted_at').desc())))
  microbatch = microbatch.filter(F.col('row_rank') == 1)
  microbatch = microbatch.drop("row_rank")
  
  deltaTable.alias("target").merge(microbatch.alias("updates"), "target.order_id = updates.order_id").whenMatchedUpdateAll("updates.Op='U'").whenNotMatchedInsertAll().execute()
  

df = spark.readStream.format('parquet').schema(data_schema).load('s3://s3-belisco-turma-6-production-data-lake-raw/orders/public/orders')
df = df.withColumn('etl_timestamp', F.current_timestamp())
df = df.withColumn('created_date', F.to_date(F.col('created_at')))
df = df.withColumn('source_filename', F.input_file_name())
df.writeStream.format('delta').option("checkpointLocation", "dbfs:/checkpoints/orders").outputMode('update').foreachBatch(_upsert).start()



In [0]:
df_final = spark.read.format('delta').load('s3://s3-belisco-turma-6-production-data-lake-processed/orders')
df_final.createOrReplaceTempView('orders_merge')

In [0]:
%sql select * from orders_merge where order_id = '4bd42b9d-5fdc-4a89-b87f-2c9ec388b29c'

In [0]:
%sql select count(distinct order_id), count(1) from orders_merge

In [0]:
%sql select count(distinct order_id), count(1) from orders

In [0]:
%sql select * from orders_merge where order_id = 'e57bf20b-9b8c-4482-8a21-30676b82458b' -- 94849

In [0]:
%sql select * from orders_merge where order_id = 'e57bf20b-9b8c-4482-8a21-30676b82458b' -- 94849

In [0]:
df_orders = spark.read.format('parquet').load('s3://s3-belisco-turma-6-production-data-lake-raw/orders/public/orders')
display(df_orders.withColumn('row_rank', F.row_number().over(Window.partitionBy(F.col('order_id')).orderBy(F.col('extracted_at').desc()))).filter(F.col('order_id') == 'e57bf20b-9b8c-4482-8a21-30676b82458b'))