In [57]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col,  explode, split, concat, col, lit, from_json
from pyspark.sql.types import StructType, StructField, LongType, StringType, IntegerType, ArrayType, DoubleType
from time import sleep


### first we define the schema for our dataaa
# Define the schema
dataSchemaString = StructType([
    StructField("game_id", StringType(), True),
    StructField("player_id", StringType(), True),
    StructField("team_id", StringType(), True),
    StructField("player_name", StringType(), True),
    StructField("team_abbreviation", StringType(), True),
    StructField("min", StringType(), True),
    StructField("ast", IntegerType(), True),
    StructField("stl", IntegerType(), True),
    StructField("pf", IntegerType(), True)
])

schema = StructType(
        [
             StructField("raw_json", ArrayType(dataSchemaString), True),     
        ]
)
# Initialize Spark session
spark = SparkSession.builder.appName("JsonSchemaCreator").getOrCreate()

# Use the schema to create an empty DataFrame
empty_df = spark.createDataFrame([], schema)

# Print the schema
empty_df.printSchema()

root
 |-- raw_json: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- game_id: string (nullable = true)
 |    |    |-- player_id: string (nullable = true)
 |    |    |-- team_id: string (nullable = true)
 |    |    |-- player_name: string (nullable = true)
 |    |    |-- team_abbreviation: string (nullable = true)
 |    |    |-- min: string (nullable = true)
 |    |    |-- ast: integer (nullable = true)
 |    |    |-- stl: integer (nullable = true)
 |    |    |-- pf: integer (nullable = true)



In [60]:
### create a function which add the NBA results
def write_nba_results(df, batch_id):
    print('posted')
    df.show()
    df \
      .write.format('bigquery') \
      .option('table', 'dataengineeringcourse2023.Output_processing_pipeline.new_nba_results') \
      .mode("overwrite") \
      .save()

In [61]:
sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("Streaming_pipeline")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")

# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

# Use the Cloud Storage bucket for temporary BigQuery export data used by the connector.
bucket = "temp_de2023_2124849"  
spark.conf.set('temporaryGcsBucket', bucket)

## so here is the first step of the stream preprocessing pipeline, we read the data which is produced by producer (in our case our laptop )
# Read the whole dataset as a batch
df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka1:9093") \
        .option("failOnDataLoss", "false") \
        .option("subscribe", "games_details") \
        .option("startingOffsets", "earliest") \
        .load()

df.printSchema()

print("After starting first spark session")

df = df.select(from_json(col("value").cast("string"), schema).alias("parsed_value")) \
          .withColumn("exploded", explode(col("parsed_value.raw_json"))) \
          .drop(col("parsed_value")).select("exploded.*")

query = df \
    .writeStream \
    .foreachBatch(write_nba_results) \
    .start()
## some preprocessing of data 
# Split the concatenated data into individual JSON strings
# json_strings = concatenated_data.split('\n')

## here is the checkpoint 
## A streaming query saves its progress information into the checkpoint
## location. Upon failure, this metadata is used to restart the failed
## query exactly where it left off.
## So in our case, we keep adding the data every day and if something break, we will use the metadata from the query to resume where we left


try:
    query.awaitTermination
except KeyboardInterrupt:
    query.stop()
    # Stop the spark context
    spark.stop()
    print("Stoped the streaming query and the spark context")
except:
    query.stop()
    
    # Stop the spark context
    spark.stop()
    print("Unexpected error")
    print("Stoped the streaming query and the spark context")

# print('Before the gamesDetailsSchema')



root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

After starting first spark session
posted
+-------+---------+-------+-----------+-----------------+---+---+---+---+
|game_id|player_id|team_id|player_name|team_abbreviation|min|ast|stl| pf|
+-------+---------+-------+-----------+-----------------+---+---+---+---+
+-------+---------+-------+-----------+-----------------+---+---+---+---+



In [24]:
spark.stop()