In [1]:
import json
import os
from pathlib import Path
import uuid
import random
import time
from datetime import datetime, timezone

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType
import pyspark.sql.functions as F

PROJECT_PATH = Path.cwd().parent
DATA_DIR = '.data'
DATA_PATH = PROJECT_PATH / DATA_DIR
DATA_PATH.mkdir(exist_ok=True)

output_folder = str(DATA_PATH)
checkpoint_path = str(Path.cwd() / 'checkpoint')

file_schema = StructType() \
    .add('id', StringType()) \
    .add('temperature', DoubleType()) \
    .add('timestamp', TimestampType())

schema_name = 'dp700_e011'
table_name = 'temperature_stream'

In [2]:
spark = SparkSession.builder.appName('test').master('local[*]').getOrCreate()
spark.sql(f'CREATE SCHEMA IF NOT EXISTS {schema_name}')
table_location = f'Tables/{schema_name}/{table_name}'


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/21 14:49:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
raw_stream_df = spark.readStream \
    .schema(file_schema) \
    .option('maxFilesPerTrigger', 1) \
    .json(output_folder)

transformed_stream_df = raw_stream_df \
    .withColumn('processed_timestamp',
        F.current_timestamp())

delta_stream = transformed_stream_df.writeStream \
    .format('parquet') \
    .outputMode('append') \
    .option('checkpointLocation', checkpoint_path) \
    .start(f'Tables/{schema_name}/{table_name}')

25/08/21 14:49:08 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [4]:
# spark.range(1).select(F.current_schema()).show()
# spark.range(1).select(F.current_catalog()).show()
# for stream in spark.streams.active:
#     print(f"ID: {stream.id}, Name: {stream.name}, Active: {stream.isActive}")
# # Ensure you are in the correct schema
# spark.sql(f"USE {schema_name}")

# # See what tables are available
# print(f"Tables in schema '{schema_name}':")
# spark.sql("SHOW TABLES").show()


In [5]:
# Give the stream a moment to write the first batch of data
print("Waiting 15 seconds for the first micro-batch to complete...")
time.sleep(15)

# Now, create a table in the Spark catalog that points to your files.
# The "IF NOT EXISTS" makes this cell safe to re-run.
print(f"Creating table '{schema_name}.{table_name}'...")
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {schema_name}.{table_name}
    USING PARQUET
    LOCATION '{table_location}'
""")
print("Table created successfully!")


Waiting 15 seconds for the first micro-batch to complete...
Creating table 'dp700_e011.temperature_stream'...


25/08/21 14:49:23 WARN HadoopFSUtils: The directory file:/workspaces/aleksei-partanen-spark-structured-streaming/nbs/spark-warehouse/dp700_e011.db/Tables/dp700_e011/temperature_stream was not found. Was it deleted very recently?


AnalysisException: [UNABLE_TO_INFER_SCHEMA] Unable to infer schema for Parquet. It must be specified manually.

In [None]:
df = spark.sql(f'SELECT * FROM {schema_name}.{table_name}')

In [None]:
spark.stop()