# Data Transformation and Ingestion

### Loading Libraries
##### Spark Session, Dataframe Functions, Data types and Json

In [69]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import from_json
import json

### Variables Initialization

In [70]:
cassandra_host = "cassandra"
cassandra_user = "cassandra"
cassandra_pwd = "cassandra"
cassandra_port = 9042
cassandra_table = "job"
cassandra_keyspace = "jobs_analytics"
key_space = "jobs_analytics"
table_name = "job"
kafka_server = "kafka:9092"
kafka_topic = "scraped_jobs"
hdfs_folder_path = 'hdfs://namenode:8021/output1/scraped_jobs/'

### Spark Session
##### Spark Session object creation with configuration data stax spark-cassandra connector and cassandra related connectivity credentials.

In [71]:
#Spark Session creation configured to interact with Cassandra
spark = SparkSession.builder.appName("pyspark-notebook").\
config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0,com.datastax.spark:spark-cassandra-connector_2.12:3.0.0,com.datastax.spark:spark-cassandra-connector-driver_2.12:3.0.0").\
config("spark.cassandra.connection.host",cassandra_host).\
config("spark.cassandra.auth.username",cassandra_user).\
config("spark.cassandra.auth.password",cassandra_pwd).\
getOrCreate()

### Get data from Kafka with Schema
##### Read data from Kafka topic using Spark structured streaming API by providing Kafka server and Topic details.

In [72]:
#Read data from Kafka topic
job_data = spark\
  .readStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers",kafka_server)\
  .option("subscribe", kafka_topic)\
  .option("startingOffsets", "earliest")\
  .load()

##### Set the json schema

In [73]:
# Define the schema for your JSON data
json_schema = StructType([
    StructField("TITLE", StringType(), True),  
    StructField("COMPANY_NAME", StringType(), True),
    StructField("JOB_DATE", StringType(), True),
    StructField("JOB_LINK", StringType(), True),
    StructField("JOB_LOCATION", StringType(), True),
    StructField("JOB_SENIORITY_LEVEL", StringType(), True),
    StructField("JOB_EMPLOYMENT_TYPE", StringType(), True),
    StructField("JOB_FUNCTION", StringType(), True),
    StructField("JOB_INDUSTRIES", StringType(), True),
    StructField("NUMBER_APPLICANTS", StringType(), True),
    StructField("JOB_DESCRIPTION", StringType(), True),
    StructField("NUMBER_JOBS", StringType(), True)
])

##### Convert the key and the value of the Kafka message (the actual sent data, typically serialized as JSON) to a string. It also assigns an alias "json_data" to the column value.
##### Selec the data using the json_schema we set

In [74]:
def is_integer(s):
    try:
        int(s)
        return True
    except ValueError:
        return False

job_data = job_data.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING) as json_data")\
    .select(from_json("json_data", json_schema).alias("data"))\
    .select("data.TITLE", "data.COMPANY_NAME", "data.JOB_DATE", "data.JOB_LINK", "data.JOB_LOCATION", 
        "data.JOB_SENIORITY_LEVEL", "data.JOB_EMPLOYMENT_TYPE", "data.JOB_FUNCTION", "data.JOB_INDUSTRIES", 
        "data.NUMBER_APPLICANTS", "data.JOB_DESCRIPTION")  # didn't select  "data.NUMBER_JOBS"

# convert JOB_DATE into date column
job_data = job_data.withColumn("JOB_DATE", to_date(job_data["JOB_DATE"], "yyyy-MM-dd"))
    
# Set non-numeric values to null
job_data = job_data.withColumn(
    "NUMBER_APPLICANTS",
    when(col("NUMBER_APPLICANTS").isNotNull() & col("NUMBER_APPLICANTS").rlike(r"^\d+$"), col("NUMBER_APPLICANTS").cast("int")).otherwise(None)
)

In [75]:
job_data

DataFrame[TITLE: string, COMPANY_NAME: string, JOB_DATE: date, JOB_LINK: string, JOB_LOCATION: string, JOB_SENIORITY_LEVEL: string, JOB_EMPLOYMENT_TYPE: string, JOB_FUNCTION: string, JOB_INDUSTRIES: string, NUMBER_APPLICANTS: int, JOB_DESCRIPTION: string]

### Foreach Batch method
##### This method be called from Spark froeachBatch sink and writes to cassandra database. It takes micro batch(dataframe) and its the unique id as input.

In [76]:
def process_row(df, epoch_id):
    """Writes data to Cassandra and HDFS location

    Parameters
    ----------
    df : DataFrame
        Streaming Dataframe
    epoch_id : int
        Unique id for each micro batch/epoch
    """
    df.show(truncate=False)
    df.write\
    .format("org.apache.spark.sql.cassandra")\
    .mode('append')\
    .options(table=cassandra_table, keyspace=cassandra_keyspace)\
    .save() 
    df.write.csv(hdfs_folder_path,mode="append") 

### Cassandra Sink
##### Writes stream of delta data to Cassandra using foreachBatch sink continuosly until an interruption occurs. Stores processed indices at a checkpoint location so that it will not process the messages already processed.

In [79]:
#Writes streaming dataframe to ForeachBatch console which ingests data to Cassandra
job_data \
    .writeStream \
    .option("checkpointLocation", "checkpoint/data") \
    .foreachBatch(process_row) \
    .start() \
    .awaitTermination()

+--------------------------------+---------------+----------+----------------------------------------------------------------------------------------------------------------------------+-------------+-------------------+-------------------+--------------------------------------+-------------------------------+-----------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

23/10/02 03:30:26 ERROR MicroBatchExecution: Query [id = 3035b345-3517-4ea7-b0d1-359829c6634a, runId = 27415d7d-7da8-44fc-8ac5-3f3e0a4e858f] terminated with error
py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py", line 2442, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/lib/python3.7/dist-packages/pyspark/sql/utils.py", line 210, in call
    raise e
  File "/usr/local/lib/python3.7/dist-packages/pyspark/sql/utils.py", line 207, in call
    self.func(DataFrame(jdf, self.sql_ctx), batch_id)
  File "/tmp/ipykernel_14/1590744430.py", line 15, in process_row
    .options(table=cassandra_table, keyspace=cassandra_keyspace)\
  File "/usr/local/lib/python3.7/dist-packages/pyspark/sql/readwriter.py", line 825, in save
    self._jwrite.save()
  File "/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py", l

StreamingQueryException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py", line 2442, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/lib/python3.7/dist-packages/pyspark/sql/utils.py", line 210, in call
    raise e
  File "/usr/local/lib/python3.7/dist-packages/pyspark/sql/utils.py", line 207, in call
    self.func(DataFrame(jdf, self.sql_ctx), batch_id)
  File "/tmp/ipykernel_14/1590744430.py", line 15, in process_row
    .options(table=cassandra_table, keyspace=cassandra_keyspace)\
  File "/usr/local/lib/python3.7/dist-packages/pyspark/sql/readwriter.py", line 825, in save
    self._jwrite.save()
  File "/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/lib/python3.7/dist-packages/pyspark/sql/utils.py", line 131, in deco
    return f(*a, **kw)
  File "/usr/local/lib/python3.7/dist-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o801.save.
: com.datastax.spark.connector.datasource.CassandraCatalogException: Attempting to write to C* Table but missing
primary key columns: [job_location,company_name,job_date,job_seniority_level,title]
	at com.datastax.spark.connector.datasource.CassandraWriteBuilder.<init>(CassandraWriteBuilder.scala:44)
	at com.datastax.spark.connector.datasource.CassandraTable.newWriteBuilder(CassandraTable.scala:69)
	at org.apache.spark.sql.execution.datasources.v2.BatchWriteHelper.newWriteBuilder(WriteToDataSourceV2Exec.scala:346)
	at org.apache.spark.sql.execution.datasources.v2.BatchWriteHelper.newWriteBuilder$(WriteToDataSourceV2Exec.scala:341)
	at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.newWriteBuilder(WriteToDataSourceV2Exec.scala:253)
	at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:259)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.doExecute(V2CommandExec.scala:54)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:944)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:944)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:335)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


=== Streaming Query ===
Identifier: [id = 3035b345-3517-4ea7-b0d1-359829c6634a, runId = 27415d7d-7da8-44fc-8ac5-3f3e0a4e858f]
Current Committed Offsets: {KafkaV2[Subscribe[scraped_jobs]]: {"scraped_jobs":{"0":450}}}
Current Available Offsets: {KafkaV2[Subscribe[scraped_jobs]]: {"scraped_jobs":{"0":452}}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
Project [TITLE#4104, COMPANY_NAME#4105, JOB_DATE#4126, JOB_LINK#4107, JOB_LOCATION#4108, JOB_SENIORITY_LEVEL#4109, JOB_EMPLOYMENT_TYPE#4110, JOB_FUNCTION#4111, JOB_INDUSTRIES#4112, CASE WHEN (isnotnull(NUMBER_APPLICANTS#4113) AND NUMBER_APPLICANTS#4113 RLIKE ^\d+$) THEN cast(NUMBER_APPLICANTS#4113 as int) ELSE cast(null as int) END AS NUMBER_APPLICANTS#4138, JOB_DESCRIPTION#4114]
+- Project [TITLE#4104, COMPANY_NAME#4105, to_date(JOB_DATE#4106, Some(yyyy-MM-dd)) AS JOB_DATE#4126, JOB_LINK#4107, JOB_LOCATION#4108, JOB_SENIORITY_LEVEL#4109, JOB_EMPLOYMENT_TYPE#4110, JOB_FUNCTION#4111, JOB_INDUSTRIES#4112, NUMBER_APPLICANTS#4113, JOB_DESCRIPTION#4114]
   +- Project [data#4102.TITLE AS TITLE#4104, data#4102.COMPANY_NAME AS COMPANY_NAME#4105, data#4102.JOB_DATE AS JOB_DATE#4106, data#4102.JOB_LINK AS JOB_LINK#4107, data#4102.JOB_LOCATION AS JOB_LOCATION#4108, data#4102.JOB_SENIORITY_LEVEL AS JOB_SENIORITY_LEVEL#4109, data#4102.JOB_EMPLOYMENT_TYPE AS JOB_EMPLOYMENT_TYPE#4110, data#4102.JOB_FUNCTION AS JOB_FUNCTION#4111, data#4102.JOB_INDUSTRIES AS JOB_INDUSTRIES#4112, data#4102.NUMBER_APPLICANTS AS NUMBER_APPLICANTS#4113, data#4102.JOB_DESCRIPTION AS JOB_DESCRIPTION#4114]
      +- Project [from_json(StructField(TITLE,StringType,true), StructField(COMPANY_NAME,StringType,true), StructField(JOB_DATE,StringType,true), StructField(JOB_LINK,StringType,true), StructField(JOB_LOCATION,StringType,true), StructField(JOB_SENIORITY_LEVEL,StringType,true), StructField(JOB_EMPLOYMENT_TYPE,StringType,true), StructField(JOB_FUNCTION,StringType,true), StructField(JOB_INDUSTRIES,StringType,true), StructField(NUMBER_APPLICANTS,StringType,true), StructField(JOB_DESCRIPTION,StringType,true), StructField(NUMBER_JOBS,StringType,true), json_data#4098, Some(Etc/UTC)) AS data#4102]
         +- Project [cast(key#4084 as string) AS key#4099, cast(value#4085 as string) AS json_data#4098]
            +- StreamingDataSourceV2Relation [key#4084, value#4085, topic#4086, partition#4087, offset#4088L, timestamp#4089, timestampType#4090], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@2fb415f7, KafkaV2[Subscribe[scraped_jobs]]
