# Guided Capstone PySpark

#### Start a simple Spark Session

In [1]:
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType
import json, decimal

spark = SparkSession.builder.master("local").appName("app").getOrCreate()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
40,application_1628529429212_0044,pyspark3,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
# conf.set is used to input your key value pairs.
spark.conf.set(
"fs.azure.account.key.adfrcdevstorage.blob.core.windows.net",
"xrbJuOZpG1+WGZk2yJhbNbaIjB+adtF6d8BN+3xLnYMEDtwA+B3fzi44qPaPNpr8OJgzFXehzBQFC4/ZqX6vCQ=="
)

In [3]:
#csv function
def parse_csv(line):
    record_type_pos = 2
    record = line.split(",")
    try:
        # [logic to parse records]
        if record[record_type_pos] == "T":
            event = [record[0], record[1], record[2], record[3], record[4], int(record[5]), record[6],
                     decimal.Decimal(record[7]), int(record[8]), None, None,"T"]
            return event
        elif record[record_type_pos] == "Q":
            event = [record[0], record[1], record[2], record[3], record[4], int(record[5]), record[6],
                     decimal.Decimal(record[7]), int(record[8]), decimal.Decimal(record[9]),int(record[10]), "Q"]
            return event
    
    except Exception as e:
        event = [None, None, None, None, None, None, None, None, None, None, None, "B"]
        return event
        

In [4]:
# JSON Function
def parse_json(record):
    try:
        #record = json.loads(line)
        record_type = record["event_type"]
        # logic to parse records
        if record_type == "T":
            #Get the applicable field values from json
            event = [record['trade_dt'], record['file_tm'], record['event_type'], record['symbol'],record['event_tm'],
                     int(record['event_seq_nb']),record['exchange'],None ,None,decimal.Decimal(record['price']), int(record['size']),'T']
            return event
        elif record_type == "Q":
            # [Get the applicable field values from json]
            event = [record['trade_dt'], record['file_tm'], record['event_type'], record['symbol'],record['event_tm'],
                     int(record['event_seq_nb']),record['exchange'],decimal.Decimal(record['bid_pr']),int(record['bid_size']),
                     decimal.Decimal(record['ask_pr']), int(record['ask_size']), 'Q']
            return event
        return record
    except Exception as e:
    # [save record to dummy event in bad partition]
    # [fill in the fields as None or empty string]
        event = [None, None, None, None, None, None, None, None, None, None, None, "B"]
        return event

In [5]:
# Table schema for both csv and json files below. Note, that for both csv and json files, events with type Q or T may have 
# different item/events to categorize.
schema = StructType([
    StructField("trade_dt", StringType(), True),
    StructField("arrival_tm", StringType(), True),
    StructField("rec_type", StringType(), True),
    StructField("symbol", StringType(), True),
    StructField("event_tm", StringType(), True),
    StructField("event_seq_nb", IntegerType(), True),
    StructField("trade_pr/exch", StringType(), True),
    StructField("bid_pr", DecimalType(), True),
    StructField("bid_size", IntegerType(), True),
    StructField("ask_pr", DecimalType(), True),
    StructField("ask_size", IntegerType(), True),
    StructField("partition", StringType(), True)
])

In [6]:
# SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, 
# accumulators and broadcast variables on that cluster. Only one SparkContext should be active per JVM. 
# You must stop() the active SparkContext before creating a new one.

# To load the csv text files
csv1 =spark.sparkContext.textFile("wasbs://guidedcapstone@adfrcdevstorage.blob.core.windows.net/NYSE_csv.txt")
csv2 =spark.sparkContext.textFile("wasbs://guidedcapstone@adfrcdevstorage.blob.core.windows.net/NYSE_8.06_csv.txt")

# To load the JSON text files
json1 =spark.sparkContext.textFile("wasbs://guidedcapstone@adfrcdevstorage.blob.core.windows.net/Nasdaq_8.05_json.txt")
json2 =spark.sparkContext.textFile("wasbs://guidedcapstone@adfrcdevstorage.blob.core.windows.net/Nasdaq_8.06_json.txt")

In [7]:
# To implement the map/lambda function for each of the text files. Printing parsed# will only show
# PythonRDD[26] at RDD at PythonRDD.scala:53

parsed1 = csv1.map(lambda line: parse_csv(line))
#print(parsed1.collect())
parsed2 = csv2.map(lambda line: parse_csv(line))

# We need to read the file as a JSON format BEFORE we pass it through the function
parsed3 = json1.map(lambda line: parse_json(json.loads(line)))
#print(parsed3.collect())
parsed4 = json2.map(lambda line: parse_json(json.loads(line)))

In [8]:
# Create the dataframes

csv_8_05 = spark.createDataFrame(parsed1, schema)
csv_8_06 = spark.createDataFrame(parsed2, schema)
json_8_05 = spark.createDataFrame(parsed3, schema)
json_8_06 = spark.createDataFrame(parsed4, schema)

# To test if the dataframe has been loaded with nonnull items
csv_8_05.limit(5).show()
json_8_05.limit(5).show()

+----------+--------------------+--------+------+--------------------+------------+-------------+------+--------+------+--------+---------+
|  trade_dt|          arrival_tm|rec_type|symbol|            event_tm|event_seq_nb|trade_pr/exch|bid_pr|bid_size|ask_pr|ask_size|partition|
+----------+--------------------+--------+------+--------------------+------------+-------------+------+--------+------+--------+---------+
|2020-08-05|2020-08-05 09:30:...|       Q|  SYMA|2020-08-05 09:34:...|           1|         NYSE|    75|     100|    75|     100|        Q|
|2020-08-05|2020-08-05 09:30:...|       Q|  SYMA|2020-08-05 09:40:...|           2|         NYSE|    77|     100|    79|     100|        Q|
|2020-08-05|2020-08-05 09:30:...|       Q|  SYMA|2020-08-05 09:50:...|           3|         NYSE|    77|     100|    77|     100|        Q|
|2020-08-05|2020-08-05 09:30:...|       Q|  SYMA|2020-08-05 09:57:...|           4|         NYSE|    79|     100|    80|     100|        Q|
|2020-08-05|2020-08-

In [10]:
# Export this to the file destination to view whether or not it is processing correctly. Sorted in your HDIStorage container

csv_folder = 'wasbs://guidedcapstone-2021-08-09t17-05-44-969z@guidedcapstonhdistorage.blob.core.windows.net/HdiNotebooks/csv_folder'
json_folder = 'wasbs://guidedcapstone-2021-08-09t17-05-44-969z@guidedcapstonhdistorage.blob.core.windows.net/HdiNotebooks/json_folder'

csv_8_05.write.partitionBy("partition").mode("overwrite").parquet(csv_folder)
csv_8_06.write.partitionBy("partition").mode("overwrite").parquet(csv_folder)
json_8_05.write.partitionBy("partition").mode("overwrite").parquet(json_folder)
json_8_06.write.partitionBy("partition").mode("overwrite").parquet(json_folder)
