# Spark injector

The purpose of this notebook is to build the injection mechanism using SparkStreaming.
Data is read through the network in batches, meaning each RDD will contain multiple rows. Our job then is to convert each RDD in a spark dataframe and save it to HDFS in CSV format.

In [None]:
import findspark
findspark.init()

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession

## Conversion function

Here we define the function responsible for the CSV conversion. It is fed an RDD whose content is a batch of rows sent through the network.
The function will then take each row, convert it to a Spark DataFrame and save it to HDFS.

In [None]:
def rdd_to_hdfs(rdd):
  if not rdd.isEmpty():
    spark = SparkSession.builder.appName("RDD_to_HDFS").getOrCreate()

    # Converting content of RDD into a list of elements
    lst = rdd.collect()
    hdfs_path = "hdfs://localhost:54310/user/ubuntu/dataset"

    # Each row is converted and stored separately due to possible inconsistencies on number of fields
    for s in lst:

      # Converting to python list because it needs to be iterable
      l = [str(s).split("/t")]

      # Constructing the schema of DataFrame
      schema = ""
      for i in range(len(l)):
        schema = schema+(chr(ord('`')+(i+1)))+" string, "
      schema = schema[0:len(schema)-2]

      # Creating DataFrame and saving it to HDFS
      df = spark.createDataFrame(l, schema = schema)
      df.write.csv(hdfs_path, mode="append", header=False)

## Initiating Spark Streaming

In this section we initiate the streaming job after connecting to the streamer service.

In [None]:
# Create a local StreamingContext with two working threads and batch interval of 1 second
sc = SparkContext("local[2]")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 1)

# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)

# Separating the lines in the batch and converting
batch=lines.map(lambda line: line.split("\n"))
batch.foreachRDD(rdd_to_hdfs)

# Start the computation and wait for the termination
ssc.start()
ssc.awaitTermination()

In [None]:
sc.stop()