# Data discovery: Spark Streaming

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as sqlfn
from pyspark.sql import types as sqlt

In [2]:
spark = SparkSession.builder.appName('SparkStreamingLab1').master('local[2]').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/08/20 13:40:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Enforce schema, to avoid dynamic changes that can break things!
schema_employee = sqlt.StructType([
    sqlt.StructField('employee_id',sqlt.IntegerType(), True),
    sqlt.StructField('department_name',sqlt.StringType(), True),
    sqlt.StructField('name',sqlt.StringType(), True),
    sqlt.StructField('last_name',sqlt.StringType(), True),
    sqlt.StructField('hire_timestamp',sqlt.TimestampType(), True)
])

In [4]:
#Â Read Stream
df_employees = spark.readStream.format('csv').schema(schema_employee)\
                    .option('header',True)\
                    .option('maxFilesPerTrigger',1)\
                    .load(r'datasets/csv/')

In [5]:
# Is my stream activated?
df_employees.isStreaming

True

In [6]:
# Show schema
df_employees.printSchema()

root
 |-- employee_id: integer (nullable = true)
 |-- department_name: string (nullable = true)
 |-- name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- hire_timestamp: timestamp (nullable = true)



In [7]:
# Add aggregation
df_large_teams = df_employees.withWatermark("hire_timestamp", "10 minutes")\
                    .groupBy('department_name','hire_timestamp')\
                        .agg((sqlfn.count('employee_id').alias('count')), sqlfn.max('hire_timestamp'))\
                            .where('count > 1')

In [8]:
df_stream_large_teams = df_large_teams.writeStream.format('console').outputMode('complete').start()

23/08/20 13:41:02 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-7ef8db03-0bc1-4a8f-aef3-b4390f1f88d6. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/08/20 13:41:02 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [9]:
# append Streamed data to storage
df_stream_large_teams = df_large_teams.writeStream\
                                    .format('csv')\
                                    .outputMode('append')\
                                    .option("path", "output/large_depts/")\
                                    .option("checkpointLocation", "datasets/checkpoints/")\
                                    .start()

23/08/20 13:41:02 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [10]:
# Stop stream
df_stream_large_teams.stop()

                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+---------------+--------------+-----+-------------------+
|department_name|hire_timestamp|count|max(hire_timestamp)|
+---------------+--------------+-----+-------------------+
+---------------+--------------+-----+-------------------+

23/08/20 13:41:13 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
