In [1]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
spark = SparkSession.builder \
    .appName("Read CSV Example") \
    .config("spark.driver.host", "localhost") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .getOrCreate()


file_path_train = "./csv_file/combined_data_2018_2023.csv"  
file_path_test = "./csv_file/final_data_prep_2024.csv"  
df_train = spark.read.csv(file_path_train, header=True, inferSchema=True)
df_test = spark.read.csv(file_path_test, header=True, inferSchema=True)

print("Train")
df_train.show(5)

print("Test")
df_test.show(5) 

Train
+----+-------+--------------+-------------------+
|year|country|article_amount|subject_area_abbrev|
+----+-------+--------------+-------------------+
|2023|  Spain|          68.0|               ECON|
|2023|  Spain|          68.0|               ECON|
|2023|  India|         190.0|               VETE|
|2023|  India|         190.0|               VETE|
|2023|  India|         190.0|               VETE|
+----+-------+--------------+-------------------+
only showing top 5 rows

Test
+----+-------+--------------+-------------------+
|year|country|article_amount|subject_area_abbrev|
+----+-------+--------------+-------------------+
|2024|   Iran|          50.0|               AGRI|
|2024|   Iran|          50.0|               AGRI|
|2024|   Iran|          50.0|               AGRI|
|2024|   Iran|          50.0|               AGRI|
|2024|   Iran|          50.0|               AGRI|
+----+-------+--------------+-------------------+
only showing top 5 rows



In [3]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pandas as pd
from pyspark.sql import functions as F


spark = SparkSession.builder.appName("DataSciencePipeline").getOrCreate()


try:

    df_train = spark.read.csv(file_path_train, header=True, inferSchema=True)
    df_test = spark.read.csv(file_path_test, header=True, inferSchema=True)
except Exception as e:
    print(f"Error loading files: {e}")
    spark.stop()
    raise


print("Training Data Schema:")
df_train.printSchema()
print("Testing Data Schema:")
df_test.printSchema()


required_columns = ["year", "article_amount", "subject_area_abbrev", "country"]
missing_columns_train = [col for col in required_columns if col not in df_train.columns]
missing_columns_test = [col for col in required_columns if col not in df_test.columns]


if missing_columns_train or missing_columns_test:
    raise ValueError(f"Missing required columns. Train: {missing_columns_train}, Test: {missing_columns_test}")


df_train = df_train.dropna(subset=required_columns)
df_test = df_test.dropna(subset=required_columns)


df_train = df_train.withColumn("country", F.trim(F.lower(F.col("country"))))
df_test = df_test.withColumn("country", F.trim(F.lower(F.col("country"))))


common_countries = df_train.select("country").distinct().intersect(df_test.select("country").distinct())
df_train = df_train.join(common_countries, on="country", how="inner")
df_test = df_test.join(common_countries, on="country", how="inner")


subject_area_indexer = StringIndexer(inputCol="subject_area_abbrev", outputCol="subject_area_indexed", handleInvalid="skip")
country_indexer = StringIndexer(inputCol="country", outputCol="country_indexed", handleInvalid="skip")


feature_columns = ["year", "article_amount", "subject_area_indexed"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")


rf = RandomForestClassifier(labelCol="country_indexed", featuresCol="features", probabilityCol="probability")


pipeline = Pipeline(stages=[subject_area_indexer, country_indexer, assembler, rf])


model = pipeline.fit(df_train)


df_test_predictions = model.transform(df_test)


df_test_predictions = df_test_predictions.select(
    "country", "article_amount", "subject_area_abbrev", "features",
    "country_indexed", "prediction", "probability"
)


df_test_predictions_pd = df_test_predictions.toPandas()


predictions_output_path = "predictions_output.csv"
df_test_predictions_pd.to_csv(predictions_output_path, index=False)

print(f"Predictions have been exported to {predictions_output_path}.")


evaluator = MulticlassClassificationEvaluator(labelCol="country_indexed", predictionCol="prediction")


accuracy = evaluator.evaluate(df_test_predictions, {evaluator.metricName: "accuracy"})
f1_score = evaluator.evaluate(df_test_predictions, {evaluator.metricName: "f1"})


metrics = [
    {"metric": "accuracy", "value": accuracy},
    {"metric": "f1_score", "value": f1_score},
]


for metric in ["weightedPrecision", "weightedRecall"]:
    score = evaluator.evaluate(df_test_predictions, {evaluator.metricName: metric})
    metrics.append({"metric": metric, "value": score})


metrics_df = pd.DataFrame(metrics)


metrics_output_path = "metrics_output.csv"
metrics_df.to_csv(metrics_output_path, index=False)

print(f"Metrics have been exported to {metrics_output_path}.")


spark.stop()


Training Data Schema:
root
 |-- year: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- article_amount: double (nullable = true)
 |-- subject_area_abbrev: string (nullable = true)

Testing Data Schema:
root
 |-- year: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- article_amount: double (nullable = true)
 |-- subject_area_abbrev: string (nullable = true)

Predictions have been exported to predictions_output.csv.
Metrics have been exported to metrics_output.csv.
