In [142]:
import os
import json
import pandas as pd

In [143]:
# Create a list of file paths for text files
path = "/Users/lejeuje/Desktop/Spark/spark/notebooks/test data"
text_file_paths = [
    os.path.join(root, name)
    for root, dirs, files in os.walk(path)
    for name in files
    if name.endswith('.json')
    and not name.endswith('_SUCCESS.json')
]

In [144]:
data = []
def process_json(filename):
    with open(filename, 'r') as f:
        try:
            # Load the JSON data and handle potential errors
            json_data = json.load(f)
            value_dict = json.loads(json_data['value'])

            # Create a dictionary with the extracted values
            data_dict = {
                "aid": value_dict["aid"],
                "title": value_dict["title"],
                "url": value_dict["url"],
                "domain": value_dict["domain"],
                "votes": value_dict["votes"],
                "user": value_dict["user"],
                "posted_at": value_dict["posted_at"],
                "comments": value_dict["comments"],
                "source_title": value_dict["source_title"],
                "source_text": value_dict["source_text"],
                "frontpage": value_dict["frontpage"]
            }

            # Append the dictionary to the data list
            data.append(data_dict)
        except json.JSONDecodeError as e:
            print(f"Error parsing JSON file '{filename}': {e}")

# Process each JSON file
for filename in text_file_paths:
    process_json(filename)

# Create the DataFrame from the list of dictionaries
df = pd.DataFrame(data)

# Print the DataFrame
print(df)
df.to_csv('reviews150.csv', sep='\t', index=False)

          aid                                              title  \
0    39958495  Show HN: A Postgres extension to save you from...   
1    39958152  ChatGPT might get its own dedicated personal A...   
2    39959354  AI meets next-gen info stealers in social medi...   
3    39959732                                The Loneliness Cure   
4    39958129  Do people generally agree with Shaoshan Liu an...   
..        ...                                                ...   
351  39959583   Newly launched unified AI assistants open source   
352  39957731                                  Nix – A One Pager   
353  39958350           America's Next Soldiers Will Be Machines   
354  39959997  Freak winds kill three people by sucking them ...   
355  39958129  Do people generally agree with Shaoshan Liu an...   

                                                   url              domain  \
0                 https://github.com/viggy28/pg_savior  github.com/viggy28   
1    https://www.techradar.

In [145]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

In [146]:
# create a SparkSession 
spark = SparkSession.builder \
    .appName("Deep_learning_txt") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.cores", "2") \
    .getOrCreate()
# Remove duplicate row
sdf = spark.createDataFrame(df)
sdf=sdf.dropna()
sdf = sdf.withColumn("frontpage", col("frontpage").cast("integer"))

In [147]:
train_data, test_data = sdf.randomSplit([0.8, 0.2], seed=123)
print("Frontpage: True", train_data.filter(col("frontpage") == True).count()+test_data.filter(col("frontpage") == True).count())
print("Frontpage: False ", train_data.filter(col("frontpage") == False).count()+test_data.filter(col("frontpage") == False).count())

Frontpage: True 44
Frontpage: False  300


In [148]:
sdf = sdf.withColumn("frontpage", col("frontpage").cast("integer"))
# preprocess the data
tokenizer = Tokenizer(inputCol="title", outputCol="words")
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words", locale="en_US")
count_vectorizer = CountVectorizer(inputCol="filtered_words", outputCol="raw_features")
idf = IDF(inputCol="raw_features", outputCol="features")
#string_indexer = StringIndexer(inputCol="label", outputCol="label_index")

# create model
lr = LogisticRegression(featuresCol="features", labelCol="frontpage")

In [149]:
# Grid to optimise hper parameters
param_grid = ParamGridBuilder() \
   .addGrid(count_vectorizer.vocabSize, [1000, 5000]) \
   .addGrid(lr.regParam, [0.01, 0.1]) \
   .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
   .build()

# define the evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="frontpage", predictionCol="prediction")

In [150]:
# create pipeline
pipeline = Pipeline(stages=[tokenizer, stopwords_remover, count_vectorizer, idf, lr])

# define the cross-validator
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3)

# fit pipeline to the training data
cv_model = cv.fit(train_data)

# make predictions on the test data
predictions = cv_model.transform(test_data)

In [151]:
eval = MulticlassClassificationEvaluator(labelCol="frontpage", predictionCol="prediction")
accuracy = eval.evaluate(predictions, {evaluator.metricName: "accuracy"})
print(accuracy)

0.9852210781344639


In [153]:
# find the best model
model = cv_model.bestModel
model.save("model2")