In [1]:
import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql.window import Window
import pyspark.sql.functions as F
import pyspark.sql.types as T
from datetime import datetime, timedelta, timezone
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateExternalTableOperator
import os
import subprocess

In [2]:
# Get GCP input data
PROJECT_ID = 'intricate-reef-411403'
BUCKET = "test_bucket-intricate-reef-41103"
PROJECT_HOME = os.getenv("HOME")+"/data-engineering-zoomcamp/project" # this may need to be updated when dockerized
credentials_location = PROJECT_HOME+"/.google/credentials/gcp.json"

# Get file structure data
local_data_path = "/.project/data/raw/Mendeley_data/" # this will need to change when dockerized
temp_path = "/.project/data/raw/temp/"
local_data_file = "100_Batches_IndPenSim_V3.csv"
path_to_local_home = os.environ.get("AIRFLOW_HOME", "/opt/airflow/")
gcs_input_path = "raw/"
gcs_output_path = "processed/raman_context/"
spark_jar_path = f"{PROJECT_HOME}/lib/gcs-connector-hadoop3-2.2.5.jar,{PROJECT_HOME}/lib/spark-3.5-bigquery-0.37.0.jar"

In [3]:
# start spark standalone instance with worker
start_spark_master = "cd $SPARK_HOME && ./sbin/start-master.sh --port 7078"
start_spark_worker = "cd $SPARK_HOME && ./sbin/start-worker.sh spark://127.0.0.1:7078"

start_master_process = subprocess.Popen(start_spark_master, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
start_master_output, start_master_error = start_master_process.communicate()

start_worker_process = subprocess.Popen(start_spark_worker, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
start_worker_output, start_worker_error = start_worker_process.communicate()

In [4]:
# define spark configuration
conf = SparkConf() \
    .setMaster("spark://127.0.0.1:7078") \
    .setAppName("process_raw_data") \
    .set("spark.jars", spark_jar_path) \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

In [5]:
# set up spark context
sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

24/04/14 08:38:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [6]:
# Start Spark session using standalone cluster
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [7]:
# Gather Data
df_raw_values = spark.read.parquet('gs://test_bucket-intricate-reef-41103/raw/*.parquet')
df_context = spark.read.parquet('gs://test_bucket-intricate-reef-41103/processed/sample_context/*.parquet')
# filter columns
raman_cols = ["id"," 1-Raman spec recorded","2-PAT control(PAT_ref:PAT ref)","Fault flag"]
raman_cols.extend([str(i) for i in list(range(350,1751))])
sample_cols = ["id","Time (h)","Penicillin concentration(P:g/L)","Fault reference(Fault_ref:Fault ref)","0 - Recipe driven 1 - Operator controlled(Control_ref:Control ref)"]
# split raw df into relevant sample values and raman measurement data
df_samples = df_raw_values.select(sample_cols) \
    .withColumnRenamed("id","id_sample")
df_raman = df_raw_values.select(raman_cols) \
    .withColumnRenamed("id","id_raman")

                                                                                

In [8]:
# find most recent existing record in T_SAMPLE_CONTEXT
# gather all sample records produced in the last 5 minutes
#current_time = datetime.utcnow()
current_time = "2024-04-14 02:55:00"
current_time = datetime.strptime(current_time, "%Y-%m-%d %H:%M:%S")
back_time = current_time - timedelta(minutes=5)
current_ts = current_time.strftime("%Y-%m-%d %H:%M:%S")
back_ts = back_time.strftime("%Y-%m-%d %H:%M:%S")
date_range = (current_ts,back_ts)
where_clause = """sample_ts BETWEEN TO_TIMESTAMP('{1}','yyyy-MM-dd HH:mm:ss') AND TO_TIMESTAMP('{0}','yyyy-MM-dd HH:mm:ss')""".format(*date_range)
try:
    most_recent_time = spark.read.format("bigquery") \
            .option("project",PROJECT_ID) \
            .option("dataset","test_schema") \
            .option("table","test_table") \
            .load() \
            .where(where_clause) \
            .agg(F.max("sample_ts")) \
            .collect()[0][0]
except Exception as e:
    print(f"Error - no existing records found: {e}")
    most_recent_time = back_time
most_recent_ts = most_recent_time.strftime("%Y-%m-%d %H:%M:%S")

                                                                                

In [9]:
# gather new data that has not been traced into T_SAMPLE_CONTEXT
date_range = (current_ts,most_recent_ts)
where_clause = """sample_ts BETWEEN TO_TIMESTAMP('{1}','yyyy-MM-dd HH:mm:ss') AND TO_TIMESTAMP('{0}','yyyy-MM-dd HH:mm:ss')""".format(*date_range)
df_context = df_context.select(["id","Batch Number","sample_ts"]) \
    .where(where_clause) \
    .withColumnRenamed("Batch Number","batch_number")
# join to raman and sample dfs
df_sample_context = df_samples.join(df_context,df_samples.id_sample == df_context.id,"inner").drop("id_sample")
df_raman_context = df_raman.join(df_context,df_raman.id_raman == df_context.id,"inner").drop("id_raman")

In [10]:
# rename columns
sample_colnames = ["time_hrs","penicillin_concentration_g_l","fault_reference","recipe_0_or_operator_1_controlled","id","batch_number","sample_ts"]
raman_colnames = ["1_raman_spec_recorded","2_pat_control","fault_flag"]
raman_colnames.extend([str(n) for n in range(350,1751)])
raman_colnames.extend(["id","batch_number","sample_ts"])
df_sample_context = df_sample_context.toDF(*sample_colnames)
df_raman_context = df_raman_context.toDF(*raman_colnames)
# fill null values with 0
df_sample_context = df_sample_context.fillna(0)
df_raman_context = df_raman_context.fillna(0)

In [11]:
# define schema for T_SAMPLE_CONTEXT
sample_schema = T.StructType([
    T.StructField("time_hrs",T.DoubleType()),
    T.StructField("penicillin_concentration_g_l",T.DoubleType()),
    T.StructField("fault_reference",T.LongType()),
    T.StructField("recipe_0_or_operator_1_controlled",T.LongType()),
    T.StructField("id",T.IntegerType()),
    T.StructField("batch_number",T.LongType()),
    T.StructField("sample_ts",T.TimestampType())
])

In [12]:
# define schema for T_RAMAN_CONTEXT
raman_col_types = [
    T.StructField("id", T.IntegerType()),
    T.StructField("batch_number",T.LongType()),
    T.StructField("sample_ts",T.TimestampType()),
    T.StructField("1_raman_spec_recorded",T.LongType()),
    T.StructField("2_pat_control",T.LongType()),
    T.StructField("fault_flag",T.LongType())
]
wavelengths = []
for i in range(350,1751):
    wavelength = T.StructField(str(i),T.DoubleType())
    wavelengths.append(wavelength)
raman_col_types.extend(wavelengths)
raman_schema = T.StructType(raman_col_types)

In [None]:
# add new sample context data to table
df_sample_context.write.format("bigquery") \
    .option("temporaryGcsBucket", BUCKET) \
    .option("table", PROJECT_ID+".test_schema.t_sample_context") \
    .option("createDisposition", "CREATE_IF_NEEDED") \
    .option("writeDisposition", "WRITE_TRUNCATE") \
    .option("schema", sample_schema.json()) \
    .mode("append") \
    .save()

In [13]:
# add new sample context data to table
df_raman_context.write.format("bigquery") \
    .option("temporaryGcsBucket", BUCKET) \
    .option("table", PROJECT_ID+".test_schema.t_raman_context") \
    .option("createDisposition", "CREATE_IF_NEEDED") \
    .option("writeDisposition", "WRITE_TRUNCATE") \
    .option("schema", raman_schema.json()) \
    .mode("append") \
    .save()

24/04/14 08:39:57 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/04/14 08:40:24 WARN TaskSetManager: Lost task 2.0 in stage 6.0 (TID 10) (10.168.0.7 executor 0): java.lang.OutOfMemoryError: Java heap space
	at org.apache.hadoop.io.compress.DecompressorStream.<init>(DecompressorStream.java:64)
	at org.apache.hadoop.io.compress.DecompressorStream.<init>(DecompressorStream.java:71)
	at org.apache.parquet.hadoop.codec.NonBlockedDecompressorStream.<init>(NonBlockedDecompressorStream.java:36)
	at org.apache.parquet.hadoop.codec.SnappyCodec.createInputStream(SnappyCodec.java:75)
	at org.apache.parquet.hadoop.CodecFactory$HeapBytesDecompressor.decompress(CodecFactory.java:112)
	at org.apache.parquet.hadoop.ColumnChunkPageReadStore$ColumnChunkPageReader.readDictionaryPage(ColumnChunkPageReadStore.java:236)
	at org.apache.spark.sql.execution.datasources.parquet.Vectorize

[[34m2024-04-14T08:40:47.622+0000[0m] {[34mjava_gateway.py:[0m1066} ERROR[0m - KeyboardInterrupt while sending command.[0m
Traceback (most recent call last):
  File "/home/jdelzio/data-engineering-zoomcamp/project/.venv/lib/python3.11/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jdelzio/data-engineering-zoomcamp/project/.venv/lib/python3.11/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt
[[34m2024-04-14T08:40:47.630+0000[0m] {[34mclientserver.py:[0m543} INFO[0m - Closing down clientserver connection[0m


KeyboardInterrupt: 

24/04/14 08:40:59 WARN TaskSetManager: Lost task 3.1 in stage 6.0 (TID 15) (10.168.0.7 executor 1): java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:289)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:513)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$10(ShuffleExchangeExec.scala:375)
	at org.apache.spark.s

In [14]:
spark.stop()

In [15]:
# Stop Local Standalone cluster
!cd $SPARK_HOME && ./sbin/stop-master.sh

IOStream.flush timed out
stopping org.apache.spark.deploy.master.Master


In [16]:
# Stop Worker
!cd $SPARK_HOME && ./sbin/stop-worker.sh

IOStream.flush timed out
stopping org.apache.spark.deploy.worker.Worker
