## Task 1 

### Streaming from TCP Socket


In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
import logging

# logging.debug("-> start")

spark = SparkSession.builder\
        .appName("Read lines over a network stream")\
        .master("local[*]").getOrCreate()

### Print Schema

In [2]:
df = spark.readStream.format("socket") \
    .option("host", "localhost") \
    .option("port", 12345) \
    .load()

### After processing, you can write the DataFrame to console.

In [3]:
query = df.writeStream.outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination(10)

False

In [4]:
query.stop()

## Task 2

let’s create a streaming DataFrame that represents text data received from a server listening on localhost:9999, and transform the DataFrame to calculate word counts.

### Create DataFrame representing the stream of input lines from connection to localhost:9999


In [5]:
spark = SparkSession.builder\
        .appName("Read lines over a network stream")\
        .master("local[*]").getOrCreate()

lines = spark.readStream.format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

### Split the lines into words


In [6]:
from pyspark.sql.functions import explode, split

words = lines.select(explode(split(lines.value,' ')).alias('word'))

### Generate running word count


In [7]:
wordCounts = words.groupBy("word").count()

### Start running the query that prints the running counts to the console


In [9]:
query = wordCounts.writeStream.outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination(10)

False

In [10]:
query.stop()

### Read csv file "test1.csv"

In [11]:
inputDirectory = '/home/ubuntu/pyspark/ds/'

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

recordSchema = StructType([StructField('Name', StringType(), True),
                           StructField('Departments', StringType(), True),
                           StructField('salary', IntegerType(), True)])

df2 = spark.readStream.format("csv") \
     .schema(recordSchema) \
     .load(inputDirectory)

### Writing Spark Streaming to Console


In [12]:
query = df2.writeStream.outputMode("append") \
    .format("console") \
    .option("truncate", False) \
    .option("numRows", 10) \
    .start()

query.awaitTermination(10)

False

In [13]:
query.stop()