<a href="https://colab.research.google.com/github/Ananya-Upadhyaya/Ananya-Upadhyaya/blob/main/Structured_Streaming_and_Delta_Tables.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {
        "nteract": {
          "transient": {
            "deleting": false
          }
        }
      },
      "source": [
        "# Spark Structured Streaming and Delta Tables\n",
        "\n",
        "Spark provides support for streaming data through *Spark Structured Streaming* and extends this support through *delta tables* that can be targets (*sinks*) or *sources* of streaming data.\n",
        "\n",
        "In this exercise, you'll use Spark to ingest a stream of data from a folder of JSON files that consists of simulated status messages from devices. In a real scenario, the data could come from some other real-time source, such as a Kafka queue or an Azure Event Hub.\n",
        "\n",
        "## Create a folder for the incoming stream of data\n",
        "\n",
        "1. Ensure this notebook is attached to your Spark pool (using this **Attach to** drop-down list above).\n",
        "2. Run the cell below to create a folder named **data** to which the simulated device data will be written.\n",
        "\n",
        "    > **Note**: The first cell may take some time to run because the Spark pool must be started.\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "jupyter": {
          "outputs_hidden": false,
          "source_hidden": false
        },
        "nteract": {
          "transient": {
            "deleting": false
          }
        }
      },
      "outputs": [],
      "source": [
        "from notebookutils import mssparkutils\n",
        "\n",
        "# Create a folder\n",
        "inputPath = '/data/'\n",
        "mssparkutils.fs.mkdirs(inputPath)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "nteract": {
          "transient": {
            "deleting": false
          }
        }
      },
      "source": [
        "## Use Spark Structured Streaming to query a stream of data\n",
        "\n",
        "1. Run the cell below to create a streaming dataframe that reads data from the folder based on a JSON schema that includes the name of the device and its status."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false,
        "microsoft": {}
      },
      "outputs": [],
      "source": [
        "from pyspark.sql.types import *\n",
        "from pyspark.sql.functions import *\n",
        "\n",
        "# Create a stream that reads data from the folder, using a JSON schema\n",
        "jsonSchema = StructType([\n",
        "  StructField(\"device\", StringType(), False),\n",
        "  StructField(\"status\", StringType(), False)\n",
        "])\n",
        "\n",
        "fileDF = spark.readStream.schema(jsonSchema).option(\"maxFilesPerTrigger\", 1).json(inputPath)\n",
        "\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "nteract": {
          "transient": {
            "deleting": false
          }
        }
      },
      "source": [
        "2. Wait for the cell above to complete.\n",
        "3. When the streaming dataframe has been created, you can apply a transformation query to aggregate the data and write the results to an output stream. Run the following code to filter the incoming stream for errors in the device data, and count the number of errors per device."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "jupyter": {
          "outputs_hidden": false,
          "source_hidden": false
        },
        "nteract": {
          "transient": {
            "deleting": false
          }
        }
      },
      "outputs": [],
      "source": [
        "countDF = fileDF.filter(\"status == 'error'\").groupBy(\"device\").count()\n",
        "query = countDF.writeStream.format(\"memory\").queryName(\"counts\").outputMode(\"complete\").start()\n",
        "print('Streaming query started.')"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "nteract": {
          "transient": {
            "deleting": false
          }
        }
      },
      "source": [
        "4. The query output is streamed to an in-memory table. Run the cell below to use SQL to query this table and veiw the number of errors per device."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false,
        "jupyter": {
          "outputs_hidden": false,
          "source_hidden": false
        },
        "microsoft": {
          "language": "sparksql"
        },
        "nteract": {
          "transient": {
            "deleting": false
          }
        }
      },
      "outputs": [],
      "source": [
        "%%sql\n",
        "select * from counts\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "nteract": {
          "transient": {
            "deleting": false
          }
        }
      },
      "source": [
        "5. Note that the query returns no data, because we haven't written any device status data there yet.\n",
        "6. Let's fix that by writing some status event data from a couple of simulated devices."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "jupyter": {
          "outputs_hidden": false,
          "source_hidden": false
        },
        "nteract": {
          "transient": {
            "deleting": false
          }
        }
      },
      "outputs": [],
      "source": [
        "device_data = '''{\"device\":\"Dev1\",\"status\":\"ok\"}\n",
        "{\"device\":\"Dev1\",\"status\":\"ok\"}\n",
        "{\"device\":\"Dev1\",\"status\":\"ok\"}\n",
        "{\"device\":\"Dev2\",\"status\":\"error\"}\n",
        "{\"device\":\"Dev1\",\"status\":\"ok\"}\n",
        "{\"device\":\"Dev1\",\"status\":\"error\"}\n",
        "{\"device\":\"Dev2\",\"status\":\"ok\"}\n",
        "{\"device\":\"Dev2\",\"status\":\"error\"}\n",
        "{\"device\":\"Dev1\",\"status\":\"ok\"}'''\n",
        "\n",
        "mssparkutils.fs.put(inputPath + \"data.txt\", device_data, True)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "nteract": {
          "transient": {
            "deleting": false
          }
        }
      },
      "source": [
        "7. Run the SQL query again to see the aggregated error counts (if the query still returns no data, wait a few seconds and try again!) There should be one error for device 1, and two errors for device 2."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false,
        "jupyter": {
          "outputs_hidden": false,
          "source_hidden": false
        },
        "microsoft": {
          "language": "sparksql"
        },
        "nteract": {
          "transient": {
            "deleting": false
          }
        }
      },
      "outputs": [],
      "source": [
        "%%sql\n",
        "select * from counts\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "nteract": {
          "transient": {
            "deleting": false
          }
        }
      },
      "source": [
        "8. Review the results, noting the number of errors. Then run the following code to write more device data."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "jupyter": {
          "outputs_hidden": false,
          "source_hidden": false
        },
        "nteract": {
          "transient": {
            "deleting": false
          }
        }
      },
      "outputs": [],
      "source": [
        "more_data = '''{\"device\":\"Dev1\",\"status\":\"ok\"}\n",
        "{\"device\":\"Dev1\",\"status\":\"ok\"}\n",
        "{\"device\":\"Dev1\",\"status\":\"ok\"}\n",
        "{\"device\":\"Dev1\",\"status\":\"ok\"}\n",
        "{\"device\":\"Dev1\",\"status\":\"error\"}\n",
        "{\"device\":\"Dev2\",\"status\":\"error\"}\n",
        "{\"device\":\"Dev1\",\"status\":\"ok\"}'''\n",
        "\n",
        "mssparkutils.fs.put(inputPath + \"more-data.txt\", more_data, True)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "nteract": {
          "transient": {
            "deleting": false
          }
        }
      },
      "source": [
        "9. Run the SQL query again (waiting a few seconds if necessary) to see the new status events reflected in the aggregations. There should now be two errors for device 1, and three errors for device 2."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false,
        "jupyter": {
          "outputs_hidden": false,
          "source_hidden": false
        },
        "microsoft": {
          "language": "sparksql"
        },
        "nteract": {
          "transient": {
            "deleting": false
          }
        }
      },
      "outputs": [],
      "source": [
        "%%sql\n",
        "select * from counts\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "nteract": {
          "transient": {
            "deleting": false
          }
        }
      },
      "source": [
        "## Create a delta table\n",
        "\n",
        "Azure Synapse Analytics supports the Linux Foundation *Delta Lake* architecture, which builds on Spark Structured Streaming to add support for transactions, versioning, and other useful capabilities.\n",
        "\n",
        "In particular, you can create *delta tables* as a target (or *sink*) for streaming data, or as a *source* of streaming data for downstream queries.\n",
        "\n",
        "To explore this, we'll write the streaming dataframe based on the **data** folder we created previously to a new delta table, which we'll define using a path to a location in the file system.\n",
        "\n",
        "1. Run the cell below to stream the folder data to a delta table."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "jupyter": {
          "outputs_hidden": false,
          "source_hidden": false
        },
        "nteract": {
          "transient": {
            "deleting": false
          }
        }
      },
      "outputs": [],
      "source": [
        "delta_table_path = inputPath + 'deltatable'\n",
        "stream = fileDF.writeStream.format(\"delta\").option(\"checkpointLocation\", inputPath + 'checkpoint').start(delta_table_path)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "nteract": {
          "transient": {
            "deleting": false
          }
        }
      },
      "source": [
        "2. Now run the next cell to query the delta table to see the data that has been streamed to it. If at first the query returns no data, wait a few seconds and run the cell again)."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false,
        "jupyter": {
          "outputs_hidden": false,
          "source_hidden": false
        },
        "nteract": {
          "transient": {
            "deleting": false
          }
        }
      },
      "outputs": [],
      "source": [
        "df = spark.read.format(\"delta\").load(delta_table_path)\n",
        "display(df)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "nteract": {
          "transient": {
            "deleting": false
          }
        }
      },
      "source": [
        "Delta tables enable you to use a feature named *time travel* to view the data at a previous point in time.\n",
        "\n",
        "4. Run the following query to retrieve the initial micro-batch of data that was streamed from the **devdata.txt** file. "
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "collapsed": false,
        "jupyter": {
          "outputs_hidden": false,
          "source_hidden": false
        },
        "nteract": {
          "transient": {
            "deleting": false
          }
        }
      },
      "outputs": [],
      "source": [
        "df = spark.read.format(\"delta\").option(\"versionAsOf\", 0).load(delta_table_path)\n",
        "display(df)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "nteract": {
          "transient": {
            "deleting": false
          }
        }
      },
      "source": [
        "5. Now that you've finished exploring Spark Structured Streaming and delta tables, stop the stream of data and clean up the files used in this exercise."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "jupyter": {
          "outputs_hidden": false,
          "source_hidden": false
        },
        "nteract": {
          "transient": {
            "deleting": false
          }
        }
      },
      "outputs": [],
      "source": [
        "stream.stop()\n",
        "query.stop()\n",
        "print(\"Stream stopped\")\n",
        "mssparkutils.fs.rm(inputPath, True)"
      ]
    }
  ],
  "metadata": {
    "kernel_info": {
      "name": "synapse_pyspark"
    },
    "kernelspec": {
      "display_name": "Synapse PySpark",
      "language": "Python",
      "name": "synapse_pyspark"
    },
    "language_info": {
      "name": "python"
    },
    "save_output": true,
    "synapse_widget": {
      "state": {},
      "version": "0.1"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 2
}
