In [1]:
import findspark
import os
findspark.init()
findspark.find()


'/usr/local/spark'

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("KafkaSparkIntegration") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1,com.microsoft.sqlserver:mssql-jdbc:9.4.0.jre8") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://namenode:9000") \
    .getOrCreate()


In [5]:
spark


In [6]:
kafka_broker = "44.213.123.145:9092"
kafka_topic = "CustomerChurnProject"

df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_broker) \
    .option("subscribe", kafka_topic) \
    .load()
# Convert the binary values to string
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")


In [7]:
writer = df.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("mystock2") \
    .start()


In [8]:
# writer.stop()


In [9]:
is_streaming = df.isStreaming
print(is_streaming)


True


In [10]:
df_stock1 = spark.sql("SELECT * FROM mystock2")
df_stock1.printSchema()


root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)



In [30]:
df_stock1.show(truncate=False)  # Display the data


+---+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|key|value                                                                                                                                                                                                                                                                      |
+---+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1  |{"CustomerID": "12603", "Age": "44", "Gender": "Male", "Tenure": "2", "Usage Frequency": "6", "Support Calls": "10", "Payment Delay": "27", "Subscription Type": "Basic", "Co

In [12]:
df_stock1.isStreaming


False

In [13]:
from pyspark.sql.types import *

schema = StructType([
    StructField("CustomerID", StringType(), True),
    StructField("Age", StringType(), True),
    StructField("Gender", StringType(), True),
    StructField("Tenure", StringType(), True),
    StructField("Usage Frequency", StringType(), True),
    StructField("Support Calls", StringType(), True),
    StructField("Payment Delay", StringType(), True),
    StructField("Subscription Type", StringType(), True),
    StructField("Contract Length", StringType(), True),
    StructField("Total Spend", StringType(), True),
    StructField("Last Interaction", StringType(), True),
    StructField("Churn", StringType(), True)
])


In [14]:
from pyspark.sql.functions import from_json, col

df_parsed = df.selectExpr("CAST(value AS STRING) as value") \
    .withColumn("parsed_json", from_json(col("value"), schema))


In [15]:
df_final = df_parsed.select(
    col("parsed_json.CustomerID").alias("CustomerID"),
    col("parsed_json.Age").alias("Age"),
    col("parsed_json.Gender").alias("Gender"),
    col("parsed_json.Tenure").alias("Tenure"),
    col("parsed_json.`Usage Frequency`").alias("Usage Frequency"),
    col("parsed_json.`Support Calls`").alias("Support Calls"),
    col("parsed_json.`Payment Delay`").alias("Payment Delay"),
    col("parsed_json.`Subscription Type`").alias("Subscription Type"),
    col("parsed_json.`Contract Length`").alias("Contract Length"),
    col("parsed_json.`Total Spend`").alias("Total Spend"),
    col("parsed_json.`Last Interaction`").alias("Last Interaction")
)


In [16]:
df_final.isStreaming


True

In [17]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, StringType, DoubleType

typed_df = df_final.select(
    col("CustomerID").cast(IntegerType()).alias("CustomerID"),
    col("Age").cast(IntegerType()).alias("Age"),
    col("Gender").cast(StringType()).alias("Gender"),
    col("Tenure").cast(IntegerType()).alias("Tenure"),
    col("Usage Frequency").cast(IntegerType()).alias("Usage Frequency"),
    col("Support Calls").cast(IntegerType()).alias("Support Calls"),
    col("Payment Delay").cast(IntegerType()).alias("Payment Delay"),
    col("Subscription Type").cast(StringType()).alias("Subscription Type"),
    col("Contract Length").cast(StringType()).alias("Contract Length"),
    col("Total Spend").cast(DoubleType()).alias("Total Spend"),
    col("Last Interaction").cast(IntegerType()).alias("Last Interaction")
)


In [18]:
typed_df.isStreaming


True

In [19]:
# spark.catalog.clearCache()


In [20]:
from pyspark.ml import PipelineModel
model = PipelineModel.load("class_model")


In [21]:
predictions = model.transform(typed_df)


In [22]:
predictions.isStreaming


True

In [23]:
predictions.createOrReplaceTempView("t1")


In [24]:
result_df =spark.sql("""
    SELECT 
        CustomerID,
        Age,
        Gender,
        Tenure,
        `Usage Frequency`,
        `Support Calls`,
        `Payment Delay`,
        `Subscription Type`,
        `Contract Length`,
        `Total Spend`,
        `Last Interaction`,
        prediction 
    FROM t1
""")


In [25]:
result_df.printSchema()


root
 |-- CustomerID: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Usage Frequency: integer (nullable = true)
 |-- Support Calls: integer (nullable = true)
 |-- Payment Delay: integer (nullable = true)
 |-- Subscription Type: string (nullable = true)
 |-- Contract Length: string (nullable = true)
 |-- Total Spend: double (nullable = true)
 |-- Last Interaction: integer (nullable = true)
 |-- prediction: double (nullable = false)



In [26]:
result_df.isStreaming


True

In [27]:
def write_to_sql_server(batch_df, batch_id):
    try:
        batch_df.write \
            .format("jdbc") \
            .option("url", "jdbc:sqlserver://192.168.1.2:1433;databaseName=Customer_Churn;user=mahmoud;password=;") \
            .option("dbtable", "CustomerData") \
            .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
            .mode("append")\
            .save()
    except Exception as e:
        print(str(e))
        
        
        
def write_to_sql_azure(batch_df, batch_id):
    try:
        batch_df.write \
            .format("jdbc") \
            .option("url", "jdbc:sqlserver://server-sql-hassanen.database.windows.net:1433;databaseName=Customer_Churn;user=;password=;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;") \
            .option("dbtable", "CustomerData") \
            .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
            .mode("append")\
            .save()
    except Exception as e:
        print(str(e))

# # Start the stream and write to SQL Server
result_df.writeStream \
    .foreachBatch(write_to_sql_server) \
    .start()


# write raw backup to Azure SQL DB
typed_df.writeStream \
    .foreachBatch(write_to_sql_azure) \
    .start()



In [28]:
result_df.isStreaming


True

In [None]:
#write to hadoop
df_final_result = result_df.coalesce(1)
hadoopquery = df_final_result.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "hdfs://namenode:9000/Data-Customer/") \
    .option("checkpointLocation", "hdfs://namenode:9000/checkpoints-customer/") \
    .trigger(processingTime='30 seconds') \
    .start()

hadoopquery.awaitTermination()