In [1]:
%store -r params
%store -r secrets

In [2]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import to_avro, from_avro
from pyspark.sql.functions import col, struct, lit, unbase64, udf
from pyspark.sql.types import IntegerType, StringType, LongType
import random

In [3]:
#spark = SparkSession.getActiveSession()
#spark = spark if spark else SparkSession.builder.appName("Tokenization").getOrCreate()

In [4]:
with open(params["avro_schema_file"]) as f:
    avsc = f.read()

In [5]:
read_checkpoint_location = params["checkpoints"]["tokenization_read"]
os.makedirs(read_checkpoint_location, exist_ok=True)
write_checkpoint_location = params["checkpoints"]["tokenization_write"]
os.makedirs(write_checkpoint_location, exist_ok=True)

In [6]:
raw_avro_df = (spark.readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", secrets["RED_KAFKA_SERVERS"])
          .option("kafka.sasl.mechanism", "PLAIN")
          .option("kafka.security.protocol", "SASL_SSL")
          .option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(secrets["RED_KAFKA_USERNAME"], secrets["RED_KAFKA_PASSWORD"]))
          .option("subscribe", params["topic"]["red_landing"])
          .option("kafka.group.id", "tokenization")
          .option('checkpointLocation', read_checkpoint_location)
          .load()
        )

24/04/02 14:35:27 WARN KafkaSourceProvider: Kafka option 'kafka.group.id' has been set on this query, it is
 not recommended to set this option. This option is unsafe to use since multiple concurrent
 queries or sources using the same group id will interfere with each other as they are part
 of the same consumer group. Restarted queries may also suffer interference from the
 previous run having the same group id. The user should have only one query per group id,
 and/or set the option 'kafka.session.timeout.ms' to be very small so that the Kafka
 consumers from the previous query are marked dead by the Kafka group coordinator before the
 restarted query starts running.
    


In [7]:
@udf
def tokenize_cc_num(cc_num):
    last4 = cc_num%10000
    add_nums = 1111111111110000
    tokenized_cc_num = add_nums + last4
    return tokenized_cc_num

In [8]:
def tokenize_df(df):
    value_df = df.select("value.*")
    tokenized_value_df = value_df.withColumn("cc_num", tokenize_cc_num(col("cc_num")).cast(LongType()))
    tokenized_value_kafka = tokenized_value_df.selectExpr("id as key", "(struct(*)) as value")
    return tokenized_value_kafka

In [9]:
raw_json_df = raw_avro_df.select("key","value").withColumn("value", from_avro("value", avsc))
tokenized_json_df = tokenize_df(raw_json_df)
tokenized_avro_df = tokenized_json_df.withColumn("key", col("key").cast(StringType())).withColumn("value",to_avro(col("value"), avsc))

In [10]:
sQuery = (tokenized_avro_df
        .writeStream
        .format("kafka")
        .queryName("tokenization")
        .option("kafka.bootstrap.servers", secrets["GREEN_KAFKA_SERVERS"])
        .option("kafka.sasl.mechanism", "PLAIN")
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(secrets["GREEN_KAFKA_USERNAME"], secrets["GREEN_KAFKA_PASSWORD"]))
        .option("topic",params["topic"]["green_landing"])
        .option("checkpointLocation",write_checkpoint_location)
        .outputMode("append")
        .start()
        )

24/04/02 14:35:28 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [11]:
sQuery.status

24/04/02 14:35:29 WARN KafkaSourceProvider: Kafka option 'kafka.group.id' has been set on this query, it is
 not recommended to set this option. This option is unsafe to use since multiple concurrent
 queries or sources using the same group id will interfere with each other as they are part
 of the same consumer group. Restarted queries may also suffer interference from the
 previous run having the same group id. The user should have only one query per group id,
 and/or set the option 'kafka.session.timeout.ms' to be very small so that the Kafka
 consumers from the previous query are marked dead by the Kafka group coordinator before the
 restarted query starts running.
    


{'message': 'Initializing sources',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [12]:
raw_json_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: struct (nullable = true)
 |    |-- id: integer (nullable = true)
 |    |-- trans_date_trans_time: long (nullable = true)
 |    |-- cc_num: long (nullable = true)
 |    |-- merchant: string (nullable = true)
 |    |-- category: string (nullable = true)
 |    |-- amt: double (nullable = true)
 |    |-- first: string (nullable = true)
 |    |-- last: string (nullable = true)
 |    |-- gender: string (nullable = true)
 |    |-- street: string (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- zip: integer (nullable = true)
 |    |-- lat: double (nullable = true)
 |    |-- long: double (nullable = true)
 |    |-- city_pop: integer (nullable = true)
 |    |-- job: string (nullable = true)
 |    |-- dob: string (nullable = true)
 |    |-- trans_num: string (nullable = true)
 |    |-- unix_time: integer (nullable = true)
 |    |-- merch_lat: double (nullable = true)
 |    |-- merch_l

24/04/02 14:35:29 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, group.id, auto.offset.reset]' were supplied but are not used yet.
