##Stream Structured Data

*With some initial hickups, local Spark installation has been used.*

In the first part of the exercise, you will create a simple Spark streaming program that reads an input stream from a file source. The file source stream reader reads data from a directory on a file system. When a new file is added to the folder, Spark adds that file’s data to the input data stream.

You can find the input data for this exercise in the baby-names/streaming directory. This directory contains the baby names CSV file randomized and split into 98 individual files. You will use these files to simulate incoming streaming data.

### a. Count the Number of Females

In the first part of the exercise, you will create a Spark program that monitors an incoming directory. To simulate streaming data, you will copy CSV files from the baby-names/streaming directory into the incoming directory. Since you will be loading CSV data, you will need to define a schema before you initialize the streaming dataframe.

From this input data stream, you will create a simple output data stream that counts the number of females and writes it to the console. Approximately every 10 seconds or so, copy a new file into the directory and report the console output. Do this for the first ten files.

In [2]:
# import libraries
import os.path 
import shutil
import glob
import os
import os.path 

from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark import SparkContext
from time import sleep
from pyspark.sql.functions import window

# create sprk context
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

In [3]:
# define directories
file_path = r'/home/dsc650-master/data/baby-names/streaming'
ipstreaming_file_path = r'/home/dsc650-master/input_streaming'
batchstream_file_path = r'/home/dsc650-master/batch_streaming'

In [4]:
# define the schema from the statis file
spark = SparkSession.builder.appName('strstream').getOrCreate()
static = spark.read.csv(file_path, header = True)
dataschema = static.schema

# check structure
static.printSchema()

In [5]:
# reading stream from input directory
streaming = spark.readStream.schema(dataschema).csv(ipstreaming_file_path) 

# check count
counts = streaming.groupBy("sex").count()
counts

In [6]:
# stop stream
streamingquery.stop()

In [7]:
streamingquery = counts.writeStream.queryName("counts").format("memory").outputMode("complete").start()

In [8]:
# print active streams
spark.streams.active

In [9]:
print(static.isStreaming)

In [10]:
# display stream progress
for x in range(10):
    spark.sql("SELECT * FROM counts").show()
    sleep(1)

In [11]:
fileslist = os.listdir(file_path)
print(fileslist[1:11])

# getting list of files in the directory
files_list = glob.glob("/home/dsc650-master/data/baby-names/streaming/*.csv")
print(files_list)

In [12]:
# display sourec and destination path
print(os.path.basename(file_path))
print(os.path.dirname(files_list[1]))
print(os.path.split(files_list[1]) )

filename = os.path.splitext(files_list[1])[0]
print(filename)

print(os.path.join('input_streaming', os.path.dirname(files_list[1])))

src_path = os.path.join(file_path,fileslist[4])

dest_path = os.path.join(ipstreaming_file_path,fileslist[4])

print(src_path, dest_path)

In [13]:
# display counts
for i in range(len(fileslist[1:11])):
    
    file = fileslist[i]
    
    print(file)
    src_path = os.path.join(file_path,file)
    dest_path = os.path.join(ipstreaming_file_path,file)
    
    shutil.copy(src_path, dest_path) 
    print("File moved \n")
    
    
    print("Check counts \n ")
    sleep(2)
    spark.sql("SELECT * FROM counts").show()
    sleep(2)

### 2. Micro-Batching

Repeat the last step, but use a micro-batch interval to trigger the processing every 30 seconds. Approximately every 10 seconds or so, copy a new file into the directory and report the console output. Do this for the first ten files. How did the output differ from the previous example?

In [15]:
# import package
from pyspark.streaming import StreamingContext

# define streaming context
stc = Streamingcontext()

In [16]:
# transform and read stream from input directory
csvmb = spark.readStream.schema(dataschema).csv(batchstream_file_path)  
batch_counts = csvmb.groupBy("sex").count()

# get count of females
batch_counts.select("sex").where("sex = 'F'")
batch_counts.groupby("sex").count()

In [17]:
# define bacth writer
microbatch_writer = batch_counts.\
  writeStream.\
  trigger(processingTime = '30 seconds').\
  queryName("batch_counts").\
  format("memory").\
  outputMode("complete").
  start()

In [18]:
# start batch writer
microbatch_writer.isActive

In [19]:
# display micro batch result
for i in range(len(fileslist[1:11])):
    
    file = fileslist[i]
    
    print(file)
    src_path = os.path.join(file_path,file)
    dest_path = os.path.join(batchstream_file_path,file)
    
    shutil.copy(src_path, dest_path) 
    print("File moved \n")
    
    
    print("Check the counts \n ")
    sleep(1)
    spark.sql("SELECT * FROM batch_counts").show()
    sleep(10)