In [None]:
import warnings
warnings.filterwarnings("ignore")

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType, FloatType

In [None]:
spark= SparkSession.builder\
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1")\
    .config("spark.jars", "sqljdbc42.jar")\
    .getOrCreate()

In [None]:
spark

In [None]:
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "movies") \
    .option("enable.auto.commit", True) \
    .option("auto.offset.reset", "earliest") \
    .option("startingOffsets", "earliest") \
    .option("auto.commit.interval.ms", 5000) \
    .load()

In [None]:
df.printSchema()

In [None]:
schema = StructType([
    StructField("show_id", StringType(), True),
    StructField("type", StringType(), True),
    StructField("title", StringType(), True),
    StructField("director", StringType(), True),
    StructField("cast", StringType(), True),
    StructField("country", StringType(), True),
    StructField("date_added", StringType(), True),
    StructField("release_year", StringType(), True),
    StructField("rating", StringType(), True),
    StructField("duration", StringType(), True),
    StructField("listed_in", StringType(), True),
    StructField("description", StringType(), True),
    StructField("source", StringType(), True),
    StructField("total_views" , StringType(), True),
    StructField("user_reviews", StringType(), True),
    StructField("user_rating", StringType(), True),
])

In [None]:
json_df = df.select(from_json(col("value").cast("string"), schema).alias("data")).select("data.*")

In [None]:
json_df.printSchema()

In [None]:
Updateemptystring = json_df.replace("", None)


In [None]:
removewhitespace = Updateemptystring.select([ltrim(c).alias(c) for c in Updateemptystring.columns])

In [None]:
drop_date_added = removewhitespace.drop("date_added")

In [None]:
explode_listed_in= drop_date_added.withColumn("Category", explode(split("listed_in", ","))).drop("listed_in")

In [None]:
explode_listed_in.printSchema()

In [None]:
director_nulls= explode_listed_in.replace("null", "unknown")

In [None]:
director_nulls.printSchema()

In [None]:
cast_nulls= director_nulls.replace("null", "unknown")

In [None]:
cast_nulls.printSchema()

In [None]:
# Filter out rows where 'director' and 'cast' columns that are fully numeric (integers)
numeric_pattern = "^[0-9]+$"

filtered_df = cast_nulls.filter(
    ~(
        col("director").rlike(numeric_pattern) &
        col("cast").rlike(numeric_pattern)
    )
)

In [None]:
filtered_df.printSchema()

In [None]:
description_nulls= filtered_df.replace("null", "unknown")

In [None]:
description_nulls.printSchema()

In [None]:
casting_year = description_nulls.withColumn("release_year", col("release_year").cast(IntegerType()))

In [None]:
casting_year.printSchema()

In [None]:
# Define thresholds for classification
short_threshold = 90   # minutes
medium_threshold = 150 # minutes

In [None]:
# Classify movies based on duration directly
bins_movies = casting_year.withColumn(
    "movie_duration_category",
    when(regexp_extract(col("duration"), r"(\d+)", 1).cast("integer") <= short_threshold, "short")
    .when(
        (regexp_extract(col("duration"), r"(\d+)", 1).cast("integer") > short_threshold) &
        (regexp_extract(col("duration"), r"(\d+)", 1).cast("integer") <= medium_threshold),
        "medium"
    )
    .otherwise("long")
)

In [None]:
bins_movies.printSchema()

In [None]:
country_nulls= bins_movies.replace("null", "unknown")

In [None]:
duration_nulls= country_nulls.replace("null", "unknown")

In [None]:
duration_nulls.printSchema()

In [None]:
# Casting the 'total_views' and 'user_reviews column to IntegerType and 'user_rating' to FloatType
casting_views = duration_nulls.withColumn("total_views", col("total_views").cast(IntegerType()))
Casting_reviews = casting_views.withColumn("user_reviews", col("user_reviews").cast(IntegerType()))
Casting_rating = Casting_reviews.withColumn("user_rating", col("user_rating").cast(FloatType()))

In [None]:
Casting_rating.printSchema()

In [None]:
def write_to_sql_server(batch_df, batch_id):
    try:
        batch_df.write \
            .format("jdbc") \
            .option("url", "jdbc:sqlserver://192.168.1.11:1433;databaseName=movies_DB;user=mostafa;password=mostafa7amdy;") \
            .option("dbtable", "movies") \
            .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
            .mode("append")\
            .save()
    except Exception as e:
        print(str(e))

# Start the stream and write to SQL Server
Casting_rating.writeStream \
    .foreachBatch(write_to_sql_server) \
    .outputMode("append") \
    .start() 

    
#query = Casting_rating.writeStream.outputMode("append").format("console").start()

In [34]:
Casting_rating.isStreaming

24/09/19 20:22:43 INFO Executor: Finished task 1.0 in stage 907.0 (TID 1653). 1843 bytes result sent to driver
24/09/19 20:22:43 INFO TaskSetManager: Finished task 1.0 in stage 907.0 (TID 1653) in 786 ms on 172.18.123.199 (executor driver) (1/2)
24/09/19 20:22:43 INFO Executor: Finished task 0.0 in stage 907.0 (TID 1652). 1843 bytes result sent to driver
24/09/19 20:22:43 INFO TaskSetManager: Finished task 0.0 in stage 907.0 (TID 1652) in 839 ms on 172.18.123.199 (executor driver) (2/2)
24/09/19 20:22:43 INFO TaskSchedulerImpl: Removed TaskSet 907.0, whose tasks have all completed, from pool 
24/09/19 20:22:43 INFO DAGScheduler: ResultStage 907 (start at DirectMethodHandleAccessor.java:103) finished in 0.847 s
24/09/19 20:22:43 INFO DAGScheduler: Job 907 is finished. Cancelling potential speculative or zombie tasks for this job
24/09/19 20:22:43 INFO TaskSchedulerImpl: Killing all running tasks in stage 907: Stage finished
24/09/19 20:22:43 INFO DAGScheduler: Job 907 finished: start at