In [1]:
#model building. Build the model that will be used for the live prediction.

In [1]:
import threading

# Helper thread to avoid the Spark StreamingContext from blocking Jupyter
        
class StreamingThread(threading.Thread):
    def __init__(self, ssc):
        super().__init__()
        self.ssc = ssc
    def run(self):
        self.ssc.start()
        self.ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [2]:
sc

In [3]:
#first, we will need to organize and clean the data
#step 1: convert to .json (of ander formaat? -> nee, het is echt als een .json dictionary gegeven, dus zo is logisch)
#step 2: put it in a table.

In [None]:
#script to convert all saved files to .json files, so they can actually be accessed
import os

def rename_to_json(base_dir):
    # Function to recursively find files to rename
    def find_files_to_rename(directory):
        files_to_rename = []
        for root, dirs, files in os.walk(directory):
            for file in files:
                if file.startswith('part-') and not file.endswith('.json'):
                    files_to_rename.append(os.path.join(root, file))
        return files_to_rename

    # Find all files to rename
    files_to_rename = find_files_to_rename(base_dir)

    # Rename files
    for old_path in files_to_rename:
        # Extract the filename and directory
        directory, filename = os.path.split(old_path)
        # Append ".json" to the filename
        new_filename = filename + '.json'
        # Construct the new path
        new_path = os.path.join(directory, new_filename)
        # Rename the file
        os.rename(old_path, new_path)
        print(f"Renamed {filename} to {new_filename}")

# Example usage:
base_directory = "C:/Users/eloua/Desktop/spark/notebooks/data/data/"
rename_to_json(base_directory)

In [5]:
#merging all the seperate .json files into one single dataframe
from pyspark.sql import SparkSession

def merge_json_files(base_dir):
    # Create a SparkSession
    spark = SparkSession.builder \
        .appName("model building") \
        .getOrCreate()

    # Function to recursively find JSON files in subfolders
    def find_json_files(directory):
        json_files = []
        for root, dirs, files in os.walk(directory):
            for file in files:
                if file.endswith('.json'):
                    json_files.append(os.path.join(root, file))
        return json_files

    # Find all JSON files in subfolders
    json_files = find_json_files(base_dir)

    # Read each JSON file into a DataFrame and merge them
    merged_df = None
    for json_file in json_files:
        if merged_df is None:
            merged_df = spark.read.json(json_file)
        else:
            new_df = spark.read.json(json_file)
            merged_df = merged_df.union(new_df)

    # Show the schema of the merged DataFrame
    merged_df.printSchema()

    return merged_df

#apply it to the actual data
base_directory = "C:/Users/eloua/Desktop/spark/notebooks/data/data_klein/"
merged_dataframe = merge_json_files(base_directory)



root
 |-- aid: string (nullable = true)
 |-- comments: long (nullable = true)
 |-- domain: string (nullable = true)
 |-- frontpage: boolean (nullable = true)
 |-- posted_at: string (nullable = true)
 |-- source_text: string (nullable = true)
 |-- source_title: string (nullable = true)
 |-- title: string (nullable = true)
 |-- url: string (nullable = true)
 |-- user: string (nullable = true)
 |-- votes: long (nullable = true)



In [6]:
merged_dataframe.show()

+--------+--------+--------------------+---------+-------------------+--------------------+--------------------+--------------------+--------------------+-------------+-----+
|     aid|comments|              domain|frontpage|          posted_at|         source_text|        source_title|               title|                 url|         user|votes|
+--------+--------+--------------------+---------+-------------------+--------------------+--------------------+--------------------+--------------------+-------------+-----+
|39977630|       0|    github.com/gvcgo|    false|2024-04-09 09:18:20|GitHub - gvcgo/ve...|GitHub - gvcgo/ve...|Show HN: A genera...|https://github.co...|      moqsien|    1|
|39977645|       0| github.com/rrweb-io|    false|2024-04-09 09:20:17|GitHub - rrweb-io...|GitHub - rrweb-io...|Rrweb, web sessio...|https://github.co...|     brennerm|    1|
|39977648|       0|      flexboxcss.com|    false|2024-04-09 09:20:59|Flexbox CSS - Eff...|Effortlessly crea...|Web tool to s

In [39]:
#write it to one big .parquet file
merged_dataframe.repartition(1).write.mode('overwrite').parquet("C:/Users/eloua/Desktop/spark/notebooks/data/merged_data/")

feature engineering

In [7]:
#create a variable that contains how many upvotes the user has collected in total. note that this variable should be updated regularly in a real
#application.
#we will make use of the HackerNews API to collect the data on the users first.
import requests
import json

#iterate over the dataframe to collect the user_ids of the users that actually posted a news story that we're tracking
user_ids = merged_dataframe.select("user").distinct().rdd.flatMap(lambda x: x).collect()

user_data = []

# Iterate over user IDs and collect their info using the API
for user_id in user_ids:
    # API endpoint for user information
    api_url = f"https://hacker-news.firebaseio.com/v0/user/{user_id}.json"
    
    # Send GET request to the API
    response = requests.get(api_url)
    
    # Check if request was successful
    if response.status_code == 200:
        # Parse JSON response
        user_info = response.json()
        user_data.append(user_info)
    else:
        print(f"Failed to fetch data for user {user_id}")

# Save collected user data to a file
with open("user_data.json", "w") as f:
    json.dump(user_data, f)

In [9]:
#save it into a dataframe
user_info_df = spark.createDataFrame(user_data)
user_info_df.show()

+----------+-------------+------+--------------------+--------------------+
|   created|           id| karma|           submitted|               about|
+----------+-------------+------+--------------------+--------------------+
|1712120068|      moqsien|     3|[40072447, 400097...|                NULL|
|1426762276|     brennerm|    37|[39977645, 399627...|Freelance DevOps ...|
|1687312715| lovegrenoble|   317|[40099296, 400992...|                NULL|
|1333111965|  GeoAtreides|  1107|[40092521, 400870...|                NULL|
|1273014226|         tosh|148859|[40100561, 401002...|https:&#x2F;&#x2F...|
|1546925297|    alephnerd|  6301|[40098382, 400931...|Recovering Policy...|
|1561310375|  hubraumhugo|  6695|[40087150, 400871...|Unstructured data...|
|1694750092|   palmfacehn|   512|[40078251, 400781...|HN face palm<p>ec...|
|1208631846|       marban| 25033|[40099299, 400981...|Mostly known for ...|
|1276254589|        flojo|   230|[39977780, 399288...|                NULL|
|1709037918|

In [None]:
#create the features from the user data
#some features will need to get binned, like karma and number of submissions.

In [10]:
#merging the collected user data with the original data

# Importing required libraries
from pyspark.sql.functions import col, size

# Selecting relevant columns from user_info_df
user_info_selected = user_info_df.select("id", "created", "karma", size("submitted").alias("submitted_count")) #using the count of submissions as a
#variable

# Renaming columns to avoid conflicts during join and increase clarity
user_info_selected = user_info_selected \
    .withColumnRenamed("id", "user") \
    .withColumnRenamed("created", "user_created") \
    .withColumnRenamed("karma", "user_karma") \
    .withColumnRenamed("submitted_count", "user_submitted_count")

# Merging dataframes
enriched_dataframe = merged_dataframe.join(user_info_selected, "user", "left")

# Displaying the resulting DataFrame
enriched_dataframe.show()


+-------------+--------+--------+--------------------+---------+-------------------+--------------------+--------------------+--------------------+--------------------+-----+------------+----------+--------------------+
|         user|     aid|comments|              domain|frontpage|          posted_at|         source_text|        source_title|               title|                 url|votes|user_created|user_karma|user_submitted_count|
+-------------+--------+--------+--------------------+---------+-------------------+--------------------+--------------------+--------------------+--------------------+-----+------------+----------+--------------------+
|      moqsien|39977630|       0|    github.com/gvcgo|    false|2024-04-09 09:18:20|GitHub - gvcgo/ve...|GitHub - gvcgo/ve...|Show HN: A genera...|https://github.co...|    1|  1712120068|         3|                   9|
|     brennerm|39977645|       0| github.com/rrweb-io|    false|2024-04-09 09:20:17|GitHub - rrweb-io...|GitHub - rrweb-

In [17]:
#readablility can be relevant, since a lot of the articles will be technical, so it might be interesting to get an idea of how readability influences
#the frontpage reaching
#source title verwijderen, is problematisch als het bv een github pagina is die gedeeld wordt
import re
from textblob import TextBlob
import textstat
from pyspark.sql.functions import udf, size, split
from pyspark.sql.types import StringType

# function to preprocess text
def preprocess_text(text):
    # Remove URLs
    text = re.sub(r'http\S+', '', text)
    # Remove special characters
    text = re.sub(r'[^a-zA-Z\s]', '', text)
    # Convert to lowercase
    text = text.lower()
    return text

# function to calculate sentiment score using TextBlob
def calculate_sentiment(text):
    blob = TextBlob(str(text))
    return blob.sentiment.polarity

# function to calculate readability score using textstat
def calculate_readability(text):
    return textstat.flesch_kincaid_grade(text)


# UDF for preprocessing text
preprocess_udf = udf(preprocess_text, StringType())

# Apply preprocessing and calculate word count, sentiment score, and readability score
test_dataframe_text_analysis = enriched_dataframe.withColumn("title_preprocessed", preprocess_udf("title")) \
    .withColumn("source_title_preprocessed", preprocess_udf("source_title")) \
    .withColumn("source_text_preprocessed", preprocess_udf("source_text")) \
#    .withColumn("word_count_title", size(split(col("title_preprocessed"), "\s+"))) \
#    .withColumn("word_count_source_title", size(split(col("source_title_preprocessed"), "\s+"))) \
#    .withColumn("word_count_source_text", size(split(col("source_text_preprocessed"), "\s+"))) \
#    .withColumn("sentiment_score_title", calculate_sentiment(col("title_preprocessed"))) \
#    .withColumn("sentiment_score_source_title", calculate_sentiment(col("source_title_preprocessed"))) \
#    .withColumn("sentiment_score_source_text", calculate_sentiment(col("source_text_preprocessed"))) \
#    .withColumn("readability_score_title", calculate_readability(col("title_preprocessed"))) \
#    .withColumn("readability_score_source_title", calculate_readability(col("source_title_preprocessed"))) \
#    .withColumn("readability_score_source_text", calculate_readability(col("source_text_preprocessed")))

# Show the updated DataFrame
test_dataframe_text_analysis.show()


+-------------+--------+--------+--------------------+---------+-------------------+--------------------+--------------------+--------------------+--------------------+-----+------------+----------+--------------------+--------------------+-------------------------+------------------------+
|         user|     aid|comments|              domain|frontpage|          posted_at|         source_text|        source_title|               title|                 url|votes|user_created|user_karma|user_submitted_count|  title_preprocessed|source_title_preprocessed|source_text_preprocessed|
+-------------+--------+--------+--------------------+---------+-------------------+--------------------+--------------------+--------------------+--------------------+-----+------------+----------+--------------------+--------------------+-------------------------+------------------------+
|      moqsien|39977630|       0|    github.com/gvcgo|    false|2024-04-09 09:18:20|GitHub - gvcgo/ve...|GitHub - gvcgo/ve..

In [18]:
# Check the data types of columns in the DataFrame
print(test_dataframe_text_analysis.dtypes)



[('user', 'string'), ('aid', 'string'), ('comments', 'bigint'), ('domain', 'string'), ('frontpage', 'boolean'), ('posted_at', 'string'), ('source_text', 'string'), ('source_title', 'string'), ('title', 'string'), ('url', 'string'), ('votes', 'bigint'), ('user_created', 'bigint'), ('user_karma', 'bigint'), ('user_submitted_count', 'int'), ('title_preprocessed', 'string'), ('source_title_preprocessed', 'string'), ('source_text_preprocessed', 'string')]


In [24]:
# Check for null values in title_preprocessed column
#test_dataframe_text_analysis.filter(col("title_preprocessed").isNull()).show()
#test_dataframe_text_analysis.filter(col("title_preprocessed").isNull()).count()

# Check for null values in source_title_preprocessed column
#test_dataframe_text_analysis.filter(col("source_title_preprocessed").isNull()).count()

# Check for null values in source_text_preprocessed column
test_dataframe_text_analysis.filter(col("source_text_preprocessed").isNull()).show()


PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "C:\Users\eloua\AppData\Local\Temp\ipykernel_20592\2961351709.py", line 12, in preprocess_text
  File "C:\Users\eloua\anaconda3\envs\sparkenvconda\lib\re.py", line 210, in sub
    return _compile(pattern, flags).sub(repl, string, count)
TypeError: expected string or bytes-like object


In [25]:
test_dataframe_text_analysis.printSchema()


root
 |-- user: string (nullable = true)
 |-- aid: string (nullable = true)
 |-- comments: long (nullable = true)
 |-- domain: string (nullable = true)
 |-- frontpage: boolean (nullable = true)
 |-- posted_at: string (nullable = true)
 |-- source_text: string (nullable = true)
 |-- source_title: string (nullable = true)
 |-- title: string (nullable = true)
 |-- url: string (nullable = true)
 |-- votes: long (nullable = true)
 |-- user_created: long (nullable = true)
 |-- user_karma: long (nullable = true)
 |-- user_submitted_count: integer (nullable = true)
 |-- title_preprocessed: string (nullable = true)
 |-- source_title_preprocessed: string (nullable = true)
 |-- source_text_preprocessed: string (nullable = true)



In [29]:
test_dataframe.repartition(1).write.mode('overwrite').parquet("C:/Users/eloua/Desktop/spark/notebooks/")

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "C:\Users\eloua\Desktop\spark\spark-3.5.1-bin-hadoop3\python\lib\py4j-0.10.9.7-src.zip\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "C:\Users\eloua\Desktop\spark\spark-3.5.1-bin-hadoop3\python\lib\py4j-0.10.9.7-src.zip\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "C:\Users\eloua\anaconda3\envs\sparkenvconda\lib\socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 