In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from twilio.rest import Client

# Initializing Spark Session
spark = SparkSession.builder \
    .appName("stream1") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.8") \
    .config("spark.master", "local[*]").getOrCreate()

# Reading data from CSV into a DataFrame
people_df = spark.read.options(header=True).options(inferschema=True).csv('/accounts.csv')

# Printing the schema of the DataFrame
people_df.printSchema()

# Function to send messages using Twilio
def send_message(to, body_in):
    account_sid = ''
    auth_token = ''
    client = Client(account_sid, auth_token)
    message = client.messages.create(
        from_='whatsapp:+14155238886',
        body=body_in,
        to='whatsapp:+' + str(to),   
    )
    return message.sid

# Defining a user-defined function (UDF) for sending messages
send_messageUDF = udf(send_message)

# Kafka parameters for streaming
kafka_params = {
    "kafka.bootstrap.servers": "localhost:9092",  # Kafka broker(s)
    "subscribe": "your2"  # Kafka topic to subscribe to
}

# Reading streaming data from Kafka
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:19092") \
    .option("subscribe", "transaction_Card_2") \
    .option("group.id", "al1") \
    .load()

# Selecting columns and manipulating the DataFrame
df1 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(topic AS STRING)", "CAST(timestamp AS STRING)")
df2 = df1.select("key", "value", "timestamp").withColumn("value", split(regexp_replace(col("value"), "[\\[\\]'\"]", ""), ","))
df3 = df2.withColumn("moment", col("value")[0]).withColumn("account_number_trans", col("value")[1]).withColumn("description", col("value")[3].cast('double'))
df4 = df3.join(people_df, df3.account_number_trans == people_df.account_number).select('account_number_trans', 'moment', 'description', 'PHONE_NUMBER', 'subscriber_name').where(col('account_number_trans') == 87634)

# Selecting data for sending messages
df_to_send = df4.dropDuplicates(['account_number_trans', 'moment']).selectExpr("'Your order transaction is done on '||moment||' .Get well soon' as body", "PHONE_NUMBER as to ")

# Sending messages using the Twilio UDF
df_to_send = df_to_send.withColumn('message_sid', send_messageUDF(df_to_send['to'], df_to_send['body']))

# Writing streaming data to an in-memory table
df5 = df_to_send \
    .writeStream \
    .queryName("transaction") \
    .outputMode("append") \
    .format("memory") \
    .trigger(processingTime='5 seconds') \
    .start()

# Running a query against the in-memory table and displaying the result
result = spark.sql("SELECT * FROM transaction")
result.show(truncate=False)

# Checking the status of the streaming query
df5.status

# Checking if the streaming query is active
df5.isActive

# Stopping the streaming query
df5.stop()
