In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, concat, col, lit, from_json, window, sum, to_json, struct
from pyspark.sql.functions import slice as F_slice
from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType, TimestampType, MapType, IntegerType, ArrayType
from time import sleep

sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("Batch_process")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")

# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

# We need to set the following configuration whenever we need to use GCS.
# Setup hadoop fs configuration for schema gs://
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

# Use the Cloud Storage bucket for temporary BigQuery export data used by the connector.
bucket = "temp_de_jads"
spark.conf.set('temporaryGcsBucket', bucket)

schema = StructType([
     StructField('type', StringType(), True),
     StructField('symbol_id', StringType(), True),
     StructField('sequence', IntegerType(), True),
     StructField('time_exchange', TimestampType(), True),
     StructField('time_coinapi', TimestampType(), True),
     StructField('uuid', StringType(), True),
     StructField('price', DoubleType(), True),
     StructField('size', DoubleType(), True),
     StructField('taker_side', StringType(), True),
     ])
symbol_parts = ["broker", "symbol_type", "asset", "asset_quote"]
df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka1:9093") \
        .option("subscribe", "trade") \
        .option("startingOffsets", "latest") \
        .option("failOnDataLoss", "false") \
        .load()

trades = df.select(col('value').cast('string')).select(from_json(col('value'), schema).alias('data')).select('data.*')

reduced_trades = trades.select(['symbol_id', 'time_coinapi', 'size', 'taker_side', 'uuid'])

trades_aggregated = reduced_trades.withWatermark('time_coinapi', '1 minutes') \
              .groupBy(window(col('time_coinapi'), '1 minutes'), col('symbol_id'), 'taker_side') \
              .agg(sum(col('size')), count(col('uuid'))) \
              .select(col('sum(size)').alias('size'), col('symbol_id'), col('window').alias('time_frame'), col('count(uuid)').alias('count'), col('taker_side'))

trades_symbol_split = trades_aggregated.withColumn('symbol_parts', F_slice(split(col('symbol_id'), '_'), 1, 4))
trades_clean = trades_symbol_split\
            .withColumn("broker", col('symbol_parts')[0])\
            .withColumn("symbol_type", col('symbol_parts')[1])\
            .withColumn("asset", col('symbol_parts')[2])\
            .withColumn("asset_quote", col('symbol_parts')[3])\
            .drop('symbol_parts')\
            .select(to_json(struct('*')).alias('value'))

query = trades_clean.writeStream \
              .trigger(processingTime='1 minutes')\
              .format("kafka") \
              .option("kafka.bootstrap.servers", "kafka1:9093") \
              .option("checkpointLocation", "/home/jovyan/checkpoint")\
              .option("topic", "trades_aggregated") \
              .outputMode("complete") \
              .start()
query.awaitTermination()

In [None]:
spark.stop()