# Streaming data from a csv file with PySpark

> A simple demo showing how to stream data from a CSV file on a local directory.

- title: "Structred Streaming with PySpark"
- toc: true
- comments: true

### Install the necessary packages

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

### Create a Spark Session

In [2]:
spark = SparkSession \
        .builder \
        .appName("ScrapedProps") \
        .master("local[4]") \
        .getOrCreate()

### Define a schema for your data (necessary for Structured Streams)

In [3]:
schema1 = StructType([StructField('Property Title', StringType(), True),
                     StructField('Property Price', DoubleType(), True),
                     StructField('Property Location', StringType(), True),
                     StructField('Property Bedrooms', DoubleType(), True),
                     StructField('Property Bathrooms', DoubleType(), True),
                     StructField('DistanceFromCBD', DoubleType(), True)])

### Load the data into a streaming dataframe

In [4]:
rentalprops = spark.readStream.format("csv").schema(schema1)\
                .option("header",None).load(r"/home/alloyce/sparkdata1")

### Check the status of the stream

In [5]:
rentalprops.isStreaming

True

### Check the schema of the dataframe

In [6]:
rentalprops.printSchema()

root
 |-- Property Title: string (nullable = true)
 |-- Property Price: double (nullable = true)
 |-- Property Location: string (nullable = true)
 |-- Property Bedrooms: double (nullable = true)
 |-- Property Bathrooms: double (nullable = true)
 |-- DistanceFromCBD: double (nullable = true)



### Final Code

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession \
        .builder \
        .appName("ScrapedProps") \
        .master("local[4]") \
        .getOrCreate()

schema1 = StructType([StructField('Property Title', StringType(), True),
                     StructField('Property Price', DoubleType(), True),
                     StructField('Property Location', StringType(), True),
                     StructField('Property Bedrooms', DoubleType(), True),
                     StructField('Property Bathrooms', DoubleType(), True),
                     StructField('DistanceFromCBD', DoubleType(), True)])

rentalprops = spark.readStream.format("csv").schema(schema1)\
                .option("header",None).load(r"/home/alloyce/sparkdata1")

check1 = rentalprops.isStreaming

schemacheck1 = rentalprops.printSchema()
