# Log File - Data Transformation and Ingestion

### Loading Libraries
##### Spark Session, Dataframe Functions, Data types and Json

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json

### Variables Initialization

In [2]:
cassandra_host = "cassandra"
cassandra_user = "cassandra"
cassandra_pwd  = "cassandra"
cassandra_port = 9042
key_space      = "LogAnalysis"
table_name     = "NASALog"
kafka_server   = "kafka:9092"
kafka_topic    = "nasa_logs_demo"

### Spark Session
##### Spark Session object creation with configuration data stax spark-cassandra connector and cassandra related connectivity credentials.

In [3]:
#Spark Session creation configured to interact with MongoDB
spark = SparkSession.builder.appName("pyspark-notebook").\
config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0,com.datastax.spark:spark-cassandra-connector_2.12:3.0.0,com.datastax.spark:spark-cassandra-connector-driver_2.12:3.0.0").\
config("spark.cassandra.connection.host",cassandra_host).\
config("spark.cassandra.auth.username",cassandra_user).\
config("spark.cassandra.auth.password",cassandra_pwd).\
getOrCreate()

Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/local/lib/python3.7/dist-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
com.datastax.spark#spark-cassandra-connector-driver_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f8746af2-0020-46b0-a47a-5c2ee505bb47;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 in central
	found org.apache.kafka#kafka-clients;2.4.1 in central
	found com.github.luben#zstd-jni;1.4.4-3 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.7.5 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-

### Get data from Kafka with Schema
##### Read data from Kafka topic vai Spark structured streaming API by providing Kafka server and Topic details.

In [4]:
#Read data from Kafka topic
split_logic = split(col("url"),"\.").getItem(1)
log_data = spark\
  .readStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers",kafka_server)\
  .option("subscribe", kafka_topic)\
  .option("startingOffsets", "earliest")\
  .load()\
  .selectExpr("split(value,',')[1] as host",
              "split(value,',')[2] as time",
              "split(value,',')[3] as method",
              "split(value,',')[4] as url",
              "split(value,',')[5] as response",
              "split(value,',')[6] as bytes"
             )\
  .withColumn("time_added",unix_timestamp())\
  .withColumn("extension",when(split_logic.isNull(),"None").otherwise(split_logic))

### Foreach Batch method
##### This method be called from Spark froeachBatch sink and writes to cassandra database. It takes micro batch(dataframe) and its the unique id as input.

In [5]:
def process_row(df, epoch_id):
    """Writes data to Cassandra and HDFS location

    Parameters
    ----------
    df : DataFrame
        Streaming Dataframe
    epoch_id : int
        Unique id for each micro batch/epoch
    """
    df.write\
    .format("org.apache.spark.sql.cassandra")\
    .mode('append')\
    .options(table="nasalog", keyspace="loganalysis")\
    .save() #hot path
    df.write.csv("hdfs://namenode:8020/output/nasa_logs/",mode="append") #cold path

### Cassandra Sink
##### Writes stream of delta data to Cassandra using foreachBatch sink continuosly until an interruption occurs. Stores processed indices at a checkpoint location so that it will not process the messages already processed.

In [7]:
#Writes streaming dataframe to ForeachBatch console which ingests data to Cassandra
log_data \
    .writeStream \
    .option("checkpointLocation", "checkpoint/data") \
    .foreachBatch(process_row) \
    .start() \
    .awaitTermination()

24/12/07 07:57:51 WARN StreamingQueryManager: Stopping existing streaming query [id=50dd6fc6-676c-4e7a-ace8-2f84f7f58c2c, runId=745dce51-762f-4d35-9071-420b8ff2e988], as a new run is being started.
                                                                                

KeyboardInterrupt: 