In [82]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace ,col, when
from pyspark.ml.feature import Tokenizer, StopWordsRemover, StopWordsRemover, Tokenizer
from pyspark.sql.types import StringType, StructField, StructType
import random
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StringIndexer
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.conf import SparkConf
from kafka import KafkaProducer, KafkaConsumer
from time import sleep
import json
from datetime import datetime
from json import loads
import pandas as pd

In [11]:
# consumer = KafkaConsumer('Ehsan',bootstrap_servers=['localhost:9092'],consumer_timeout_ms = 20000)

In [83]:
# Create a Kafka consumer
consumer = KafkaConsumer(
    'Ehsan',
    bootstrap_servers='localhost:9092',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),consumer_timeout_ms = 20000
)

In [84]:
# Create a Spark session
conf = SparkConf().setAppName("Title_suggestion").set('spark.executor.memory', '4g').set('spark.shuffle.service.enabled', 'false').set('spark.dynamicAllocation.enabled','false')

spark = SparkSession.builder.config(conf=conf).getOrCreate()

In [85]:
# # Get the movie abstract from the user
# abstract = input("Please enter the movie abstract: ")

In [86]:
# Lists to store the summaries, titles, and ratings
summaries = []
titl = []
ratin = []

# Consume messages from Kafka
for message in consumer:
    # Parse the message value as JSON
    movie = message.value

    # Append the summary, title, and rating to the respective lists
    summaries.append(movie['summary'])
    titl.append(movie['title'])
    ratin.append(movie['rating'])

# Save the summaries in a variable
abstract = ' '.join(summaries)

# Save the titles and ratings in a DataFrame
df = pd.DataFrame({
    'Title': titl,
    'Rating': ratin
})

# Consume messages from Kafka
for message in consumer:
    # Parse the message value as JSON
    movie = message.value

    # Print the summary, title, and rating
    print(f"Title: {movie['title']}")
    print(f"Rating: {movie['rating']}")
    print(f"Summary: {movie['summary']}")
    print("\n")

# Close the consumer connection
consumer.close()

In [87]:
summaries

['ajcjHBIBKUHKJADBNCK;JBKLJBKUBHUJBSCKHKBKJB']

In [88]:
ratin[0]

'5'

In [89]:
titl[0]

'Toy Story (1995)'

In [90]:
# Sample data (replace this with your text data)
data = [(abstract,)]

# Define schema and create a DataFrame
schema = ["text"]
df = spark.createDataFrame(data, schema)


# Remove punctuation and symbols
cleaned_data = df.withColumn("cleaned_text", regexp_replace("text", "[^a-zA-Z0-9\\s]", ""))

# Tokenization - Convert cleaned text into tokens (words)
tokenizer = Tokenizer(inputCol="cleaned_text", outputCol="words")
words_data = tokenizer.transform(cleaned_data)

# Remove stop words
stop_words = StopWordsRemover.loadDefaultStopWords("english")
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words", stopWords=stop_words)
filtered_data = stopwords_remover.transform(words_data)

# Get unique words after removing stop words
words = filtered_data.select("filtered_words").rdd.flatMap(lambda row: row.filtered_words).distinct().collect()

# Display the words after removing punctuation, symbols, and stop words
print(words)





['ajcjhbibkuhkjadbnckjbkljbkubhujbsckhkbkjb']


In [91]:
# Sample list of words (replace this with your word list)
# words = ["adventure", "mystery", "fantasy", "thriller", "comedy", "drama", "action"]

# Generate titles
titles = [' '.join(random.sample(words, min(2, len(words)))) for _ in range(4)]

# Create a DataFrame from the generated titles
title_df = spark.createDataFrame([(title,) for title in titles], ["title"])
title_df.show(truncate=False)


+-----------------------------------------+
|title                                    |
+-----------------------------------------+
|ajcjhbibkuhkjadbnckjbkljbkubhujbsckhkbkjb|
|ajcjhbibkuhkjadbnckjbkljbkubhujbsckhkbkjb|
|ajcjhbibkuhkjadbnckjbkljbkubhujbsckhkbkjb|
|ajcjhbibkuhkjadbnckjbkljbkubhujbsckhkbkjb|
+-----------------------------------------+



In [92]:
ratings = (
    spark.read.csv(
        path="E:/fall2023/5012- Big Data/project/data-sets/ratings.csv",
        sep=",",
        header=True,
        quote='"',
        schema="userId INT, movieId INT, rating DOUBLE, timestamp INT",
    )
    # .withColumn("timestamp", f.to_timestamp(f.from_unixtime("timestamp")))
    .drop("timestamp")
    .cache()
)

In [93]:
movies = spark.read.csv(path="E:/fall2023/5012- Big Data/project/data-sets/movies.csv",
        sep=",",
        header=True,
        quote='"',
        schema="movieId INT, title STRING, genres STRING",
    )

In [94]:
# Join the data on the movieId column
data = movies.join(ratings, on="movieId")

In [95]:
# Let's say you want to update the 'summary' column to 'Excellent' when the 'rating' is '5'
data = data.withColumn('rating', when(data['title'] == titl[0], ratin[0]).otherwise(data['rating']))


In [97]:
data.show(5)

+-------+--------------------+--------------------+------+------+
|movieId|               title|              genres|userId|rating|
+-------+--------------------+--------------------+------+------+
|      1|    Toy Story (1995)|Adventure|Animati...|     1|     5|
|      3|Grumpier Old Men ...|      Comedy|Romance|     1|   4.0|
|      6|         Heat (1995)|Action|Crime|Thri...|     1|   4.0|
|     47|Seven (a.k.a. Se7...|    Mystery|Thriller|     1|   5.0|
|     50|Usual Suspects, T...|Crime|Mystery|Thr...|     1|   5.0|
+-------+--------------------+--------------------+------+------+
only showing top 5 rows



In [50]:
# Now you can split the data into training and test sets
train_data, test_data = data.randomSplit([0.7, 0.3])

In [27]:
# Preprocess data
tokenizer = Tokenizer(inputCol="title", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="rawFeatures")
idf = IDF(inputCol=hashingTF.getOutputCol(), outputCol="features")
indexer = StringIndexer(inputCol="genres", outputCol="genresIndex")

In [28]:
# Set up regression model
lr = LinearRegression(featuresCol="features", labelCol="rating")


In [30]:
# Set up pipeline
pipeline = Pipeline(stages=[tokenizer, hashingTF, idf, indexer, lr])

In [31]:
# Train model
model = pipeline.fit(train_data)

In [32]:
# Make predictions on the test data
predictions = model.transform(test_data)


In [33]:
# Train model on the training data
model = pipeline.fit(train_data)

# Make predictions on the test data
predictions = model.transform(test_data)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="rating",
    predictionCol="prediction",
    metricName="rmse"
)
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 0.997643


In [34]:
# Make predictions on the test data
prediction = model.transform(title_df)

In [38]:
# Sort by the 'prediction' column in ascending order
prediction_sorted = prediction.sort(col("prediction").desc())

In [39]:
prediction_sorted.select("title", "prediction").show(5)

+--------------------+----------------+
|               title|      prediction|
+--------------------+----------------+
|   stringtype import|3.51051048612971|
|   import structtype|3.51051048612971|
|stringtype pyspar...|3.51051048612971|
|structfield pyspa...|3.51051048612971|
+--------------------+----------------+



In [None]:
url="https://docs.google.com/spreadsheets/d/random_link"
import pandas as pd
df=spark.createDataFrame(pd.read_csv(url))

In [1]:
1 + 1

2

In [2]:
# Subscribe to 1 topic
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of Structured Streaming + Kafka Integration Guide.

In [4]:
#    Spark
from pyspark import SparkContext
#    Spark Streaming
from pyspark.streaming import StreamingContext
#    Kafka
from pyspark.streaming.kafka import KafkaUtils
#    json parsing
import json


ModuleNotFoundError: No module named 'pyspark.streaming.kafka'

In [5]:
spark.stop()

In [1]:
from kafka import KafkaProducer
from time import sleep
import json
from datetime import datetime
from kafka import KafkaConsumer
from json import loads

In [2]:
consumer = KafkaConsumer('Ehsan',bootstrap_servers=['localhost:9092'])

In [None]:
for message in consumer:
    print(message.value)