In [0]:
db = "deltadb"

spark.sql(f"CREATE DATABASE IF NOT EXISTS {db}")
spark.sql(f"USE {db}")

Out[1]: DataFrame[]

In [0]:
import random
from datetime import datetime
from pyspark.sql.functions import udf, col, to_date
from pyspark.sql.types import StringType, TimestampType
from pyspark.sql import Row

def my_checkpoint_dir(): 
  return "/tmp/delta_demo/chkpt/%s" % str(random.randint(0, 10000))

@udf(returnType=StringType())
def random_session():
    return random.choice(["sessao_#123","sessao_#456","sessao_#789"])

@udf(returnType=StringType())
def random_ip():
    return random.choice(["123.221.14.56","16.180.70.237","10.182.189.79", "218.193.16.244","198.122.118.164"])

@udf(returnType=StringType())
def random_url():
    return random.choice(["/home", "/produtos", "/contato", "/sobre", "/carrinho"])


def generate_and_append_data_stream(table_format, table_name, schema_ok=False):
    stream_data = (spark.readStream
        .format("rate")
        .option("rowsPerSecond", 500)
        .load()
        .withColumn("ip",         random_ip())
        .withColumn("pagina",     random_url())
        .withColumn("sessao",     random_session())
        .withColumn("data_acesso", to_date(col("timestamp")))
    )

    if schema_ok:
      stream_data = stream_data.select("ip", "pagina", "sessao", "data_acesso")

    query = (stream_data.writeStream
      .format(table_format)
      .option("checkpointLocation", my_checkpoint_dir())
      .option("mergeSchema", "true")
      .trigger(processingTime = "5 seconds")
      .table(table_name)
    )

    return query

In [0]:
def stop_all_streams():
    print("Stopping all streams")
    for s in spark.streams.active:
        try:
            s.stop()
        except:
            pass
    print("Stopped all streams")
    dbutils.fs.rm("/tmp/delta_demo/chkpt/", True)


def cleanup_paths_and_tables():
    dbutils.fs.rm("/tmp/delta_demo/", True)

In [0]:
%sql

drop table log_sql;

create table log_sql
(ip string,
sessao string,
pagina string,
data_acesso date);

In [0]:
%sql
describe log_sql

col_name,data_type,comment
ip,string,
sessao,string,
pagina,string,
data_acesso,date,


In [0]:
stream_query_A = generate_and_append_data_stream(table_format="delta", table_name="log_sql", schema_ok=True)

In [0]:
display(spark.readStream.format("delta").table("log_sql"))

In [0]:
from pyspark.sql.functions import approx_count_distinct

display(
    spark.readStream
        .format("delta")
        .table("log_sql")
        .groupBy("sessao")
        .agg(approx_count_distinct("pagina").alias("qtd_paginas_distintas"))
        .orderBy("sessao")
)

sessao,qtd_paginas_distintas
sessao_#123,5
sessao_#456,5
sessao_#789,5


In [0]:
stop_all_streams()

Stopping all streams
Stopped all streams


In [0]:
%sql
SELECT 
  sessao,
  approx_count_distinct(pagina) AS qtd_paginas_distintas
FROM log_sql
GROUP BY sessao
ORDER BY sessao;


sessao,qtd_paginas_distintas
sessao_#123,5
sessao_#456,5
sessao_#789,5
