In [1]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, explode, from_json , StructType, StringType, regexp_extract, collect_list, avg, log1p, exp, expr
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, CountVectorizer
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

import pandas as pd

In [2]:
spark = SparkSession.builder\
        .master("local[6]") \
        .appName('Data Engineer Data Science Component')\
        .config('spark.ui.port', '4040')\
        .getOrCreate()

In [3]:
# spark.stop()

Read all json

In [None]:
base_dir = "./Raw_data"  # Replace with your directory path
# base_dir = './test'

df = spark.read.option("multiline", True).option("recursiveFileLookup", True).json(base_dir)

Topic Distribution Over Time

In [None]:
#Topic Distribution Over Time

Topic_Distribution_Columns = [
    col("abstracts-retrieval-response.subject-areas.subject-area").alias('subject_area'), #ดูจำนวน subject แต่ละอัน ในแต่ละปี
    col("abstracts-retrieval-response.coredata.prism:coverDate").alias('date'), # ดูปี
    col("abstracts-retrieval-response.coredata.prism:aggregationType").alias('aggregationType') #  เผื่อดูแยกแต่ละประเภทการตีพิมพ์
]

Topic_Distribution_df = df.select(*Topic_Distribution_Columns)

Topic_Distribution_df_exploded = Topic_Distribution_df.withColumn('subject_area',explode(col('subject_area').getItem('$')))

Topic_Distribution_df_year = Topic_Distribution_df_exploded.withColumn('date',regexp_extract(col('date'),r'(\d{4})',1))

# Topic_Distribution_df_year.show()

# Topic_Distribution_df_exploded.groupBy('subject_area').count().orderBy('count', ascending=False).show(10, False)

# Topic_Distribution_df_year.filter(col('subject_area').isNull()).show(10, False)


+------------+----+---------------+
|subject_area|date|aggregationType|
+------------+----+---------------+
+------------+----+---------------+



In [None]:
output_file = "./output/Topic_Distribution_Data"  # Replace with your desired output path
# Save the extracted data to CSV
try:
    Topic_Distribution_df_year.coalesce(1).write.option("header", True).mode('overwrite').csv(output_file)
    print("Data extraction complete! Check the output folder.")
except Exception as e:
    print(f"Error: {e}")

Data extraction complete! Check the output folder.


Citation network

In [None]:
#abstracts-retrieval-response.item.bibrecord.item-info.itemidlist.itemid //เอา SGR เป็นหลัก (ใช้ดู reference)
#abstracts-retrieval-response.item.bibrecord.tail.bibliography.reference[0].ref-info.refd-itemidlist.itemid //เอา SGR เป็นหลัก (ใช้ดู reference)
Citation_Network_Columns = [
    col("abstracts-retrieval-response.coredata.dc:title").alias('title'),
    col("abstracts-retrieval-response.item.bibrecord.item-info.itemidlist.itemid").alias('SGR_id'), #ดูจำนวน subject แต่ละอัน ในแต่ละปี
]
Citation_Network_df = df.select(*Citation_Network_Columns)

Citation_Network_df_exploded = Citation_Network_df.withColumn('SGR_id',explode(col('SGR_id')))

filtered_df = Citation_Network_df_exploded.filter(col("SGR_id").getItem('@idtype') == ("SGR"))

Title_SGR_ID_df = filtered_df.withColumn('SGR_id', col('SGR_id').getItem('$'))

# Title_SGR_ID_df.show(10, False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+
|title                                                                                                                                                                                            |SGR_id     |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+
|Nuclear imaging for localization and surgical outcome prediction in epilepsy: A review of latest discoveries and future perspectives                                                             |85145388556|
|Syntheses and anti-HIV and human cluster of differentiation 4 (CD4) down-modulating potencies of pyridine-fused cyclotriazadisulfonamide (CADA) compounds              

In [None]:
#abstracts-retrieval-response.item.bibrecord.tail.bibliography.reference[0].ref-info.refd-itemidlist.itemid //เอา SGR เป็นหลัก (ใช้ดู reference)
Reference_Column = [
    col("abstracts-retrieval-response.coredata.dc:title").alias('title'),
    col("abstracts-retrieval-response.item.bibrecord.tail.bibliography.reference").alias('Ref'),
]

Reference_df = df.select(*Reference_Column)
Reference_df_exploded = Reference_df.withColumn('Ref',explode(col('Ref')))

Reference_df_id = Reference_df_exploded.withColumn('Ref', col('Ref').getItem('ref-info').getItem('refd-itemidlist').getItem('itemid'))


schema = ArrayType(StructType([
    StructField("$", StringType(), True),
    StructField("@idtype", StringType(), True)
]))
Reference_df_id_json = Reference_df_id.withColumn('Ref', from_json(col('Ref'), schema))

Reference_df_id_exploded = Reference_df_id_json.withColumn('Ref',explode(col('Ref')))

filtered_df_SGR = Reference_df_id_exploded.filter(col("Ref").getItem('@idtype') == ("SGR"))

Reference_df_SGR = filtered_df_SGR.withColumn('Ref', col('Ref').getItem('$'))

Connection_df = Reference_df_SGR.join(Title_SGR_ID_df, Reference_df_SGR.title == Title_SGR_ID_df.title, 'left').drop(Title_SGR_ID_df.title).select('title','SGR_id','Ref')

+-----+---+
|title|Ref|
+-----+---+
+-----+---+



In [None]:
output_file_1 = "./output/Citation_network_title_id"  
output_file_2 = "./output/Citation_network_ref"  
# Save the extracted data to CSV
try:
    Title_SGR_ID_df.coalesce(1).write.option("header", True).mode('overwrite').csv(output_file_1)
    Connection_df.coalesce(1).write.option("header", True).mode('overwrite').csv(output_file_2)
    print("Data extraction complete! Check the output folder.")
except Exception as e:
    print(f"Error: {e}")

Data extraction complete! Check the output folder.


Citedby count Prediction

In [None]:
Citedby_count_Column = [
    # col("abstracts-retrieval-response.coredata.dc:identifier").alias('identifier'),
    col("abstracts-retrieval-response.coredata.prism:publicationName").alias('publication_name'),
    # col("abstracts-retrieval-response.coredata.prism:aggregationType").alias('aggregationType'),
    col("abstracts-retrieval-response.coredata.citedby-count").alias('citedby_count'),
    #เพิ่มเติม เป็นตัวแปรต้นสำหรับ predict
    col("abstracts-retrieval-response.subject-areas.subject-area").alias('subject_area'),
    col("abstracts-retrieval-response.affiliation").alias('affiliation'),
    

]
Citedby_count_df = df.select(*Citedby_count_Column)

Citedby_count_df = Citedby_count_df.withColumn(
    'citedby_count', col('citedby_count').cast('int')
)

# เก็บไว้ก่อน
# Journey_Count_By_Publication_Name = Journal_Impact_df.groupBy('publication_name').agg(
#     sum('citedby_count').alias('total_citedby_count'),                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 
#     count('citedby_count').alias('number_of_articles')
#     ).orderBy('total_citedby_count', ascending=False)
# Journey_Count_By_Publication_Name.show(10, False)

In [None]:
#start encoding subject_area
Citedby_count_df_exploded = Citedby_count_df.withColumn('subject_area',explode(col('subject_area').getItem('$')))


Citedby_count_df_grouped = Citedby_count_df_exploded.groupBy('publication_name').agg(
    collect_list("subject_area").alias("subject_area_list"),
)

cv_subject = CountVectorizer(inputCol="subject_area_list", outputCol="subject_area_vector", binary=True)


encode_subject_df = cv_subject.fit(Citedby_count_df_grouped).transform(Citedby_count_df_grouped).drop('subject_area_list').join(Citedby_count_df, 'publication_name', 'left').drop('subject_area')

#start encoding affiliation
schema = ArrayType(StructType([
    StructField("affiliation-city", StringType(), True),
    StructField("@id", StringType(), True),
    StructField("affilname", StringType(), True),
    StructField("affiliation-country", StringType(), True),
    StructField("@href", StringType(), True),
]))
encode_subject_df_json = encode_subject_df.withColumn('affiliation', from_json(col('affiliation'), schema))

encode_subject_df_json_exploded = encode_subject_df_json.withColumn('affiliation',explode(col('affiliation').getItem('affilname')))

encode_subject_df_json_grouped = encode_subject_df_json_exploded.groupBy('publication_name').agg(
    collect_list("affiliation").alias("affiliation_list"),
)

cv_affiliation = CountVectorizer(inputCol="affiliation_list", outputCol="affiliation_vector", binary=True)

encode_affiliation_df = cv_affiliation.fit(encode_subject_df_json_grouped).transform(encode_subject_df_json_grouped).drop('affiliation_list').join(encode_subject_df_json, 'publication_name', 'left').drop('affiliation')


#start encoding publication_name

indexer_publication = StringIndexer(inputCol="publication_name", outputCol="publication_name_index")
encoder_publication = OneHotEncoder(inputCol="publication_name_index", outputCol="publication_name_vector")

pipeline = Pipeline(stages=[indexer_publication , encoder_publication])

encode_df = pipeline.fit(encode_affiliation_df).transform(encode_affiliation_df).drop('aggregationType' , 'aggregationType_index', 'publication_name' , 'publication_name_index', 'identifier', 'identifier_index')

# encode_df.show(10, False)

Data Prepare

In [None]:
#fillna
filled_df = encode_df.fillna({'citedby_count':encode_df.agg(avg('citedby_count').alias('avg')).first()['avg']})

#drop outlier
percentile = filled_df.approxQuantile('citedby_count', [0.25, 0.75], 0)
Q1 = percentile[0]
Q3 = percentile[1]
IQR = Q3 - Q1

lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR

drop_outlier_df = filled_df.filter((col('citedby_count') >= lower_bound) & (col('citedby_count') <= upper_bound))

#catergorize 0 citation
with_is_zero_df = drop_outlier_df.withColumn('is_zero_citedby_count', expr("CASE WHEN citedby_count = 0 THEN 1 ELSE 0 END"))

is_zero_indexer = StringIndexer(inputCol="is_zero_citedby_count", outputCol="is_zero_citedby_count_index")
is_zero_encoder = OneHotEncoder(inputCol="is_zero_citedby_count_index", outputCol="is_zero_citedby_count_vector")

pipeline = Pipeline(stages=[is_zero_indexer , is_zero_encoder])

final_df = pipeline.fit(with_is_zero_df).transform(with_is_zero_df).drop('is_zero_citedby_count' , 'is_zero_citedby_count_index')

In [None]:
## For tuning hyperparameter
# Columns = ["numTrees", "maxDepth", "subsamplingRate", "minInstancesPerNode", "featureSubsetStrategy", "rmse", "mae", "r2"]

# rf_param_df = pd.DataFrame(columns=Columns)

# rf_param_df = pd.read_csv('./output/RF_param.csv')

Data Science Component

In [None]:
feature_columns = ['affiliation_vector', 'subject_area_vector', 'publication_name_vector', 'is_zero_citedby_count_vector']

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(final_df)
data = data.select("features", "citedby_count")

train_data, test_data = data.randomSplit([0.7, 0.3], seed=42069)

In [None]:

# gbt = GBTRegressor(
#     featuresCol="features", 
#     labelCol="citedby_count", 
#     maxIter=10,
#     maxDepth=15,
#     stepSize=0.2,
#     seed=42069
#     )

rf = RandomForestRegressor(
    featuresCol="features", 
    labelCol="citedby_count", 
    numTrees=50, 
    maxDepth=10, 
    subsamplingRate=0.8,
    minInstancesPerNode=5,
    featureSubsetStrategy="auto",
    seed=42069
)

In [None]:
# gbt_model = gbt.fit(train_data)
rf_model = rf.fit(train_data)


# gbt_predictions = gbt_model.transform(test_data)
rf_predictions = rf_model.transform(test_data)

model = rf_model
predictions = rf_predictions

In [None]:
evaluator_rmse = RegressionEvaluator(labelCol="citedby_count", predictionCol="prediction", metricName="rmse")
evaluator_mae = RegressionEvaluator(labelCol="citedby_count", predictionCol="prediction", metricName="mae")
evaluator_r2 = RegressionEvaluator(labelCol="citedby_count", predictionCol="prediction", metricName="r2")

# gbt_rmse = evaluator_rmse.evaluate(gbt_predictions)
# rf_rmse = evaluator_rmse.evaluate(rf_predictions)

# gbt_mae = evaluator_mae.evaluate(gbt_predictions)
# rf_mae = evaluator_mae.evaluate(rf_predictions)

# gbt_r2 = evaluator_r2.evaluate(gbt_predictions)
# rf_r2 = evaluator_r2.evaluate(rf_predictions)


# print(f"GBT RMSE: {gbt_rmse}  RF RMSE: {rf_rmse}")
# print(f"GBT MAE: {gbt_mae}  RF MAE: {rf_mae}")
# print(f"GBT R²: {gbt_r2}  RF R²: {rf_r2}")

rmse = evaluator_rmse.evaluate(predictions)
mae = evaluator_mae.evaluate(predictions)
r2 = evaluator_r2.evaluate(predictions)

print(f"Root Mean Squared Error (RMSE): {rmse}")
print(f"Mean Absolute Error (MAE): {mae}")
print(f"R-Squared (R²): {r2}")

Root Mean Squared Error (RMSE): 3.967437275228513
Mean Absolute Error (MAE): 2.5120940200712703
R-Squared (R²): 0.31602971584446005


In [None]:
# new_row = [ rf.getNumTrees(), rf.getMaxDepth(), rf.getSubsamplingRate(), rf.getMinInstancesPerNode(), rf.getFeatureSubsetStrategy(), rmse, mae, r2]

# rf_param_df.loc[len(rf_param_df)] = new_row

In [None]:
# rf_param_df.tail()

# rf_param_df.drop(len(rf_param_df)-1, inplace=True)

# rf_param_df.to_csv('./output/RF_param.csv', index=False)

In [None]:
# predictions.orderBy("prediction", ascending=False).show(10, False)

# train_data.filter(col('citedby_count').isNull() | col('features').isNull()).show(10, False)

# predictions.describe().show()

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
output_file = "./output/Citedby_Prediction_Data.csv"  # Replace with your desired output path
# Save the extracted data to CSV
try:
    predictions.select('citedby_count', 'prediction').coalesce(1).write.option("header", True).mode('overwrite').csv(output_file)
    print("Data extraction complete! Check the output folder.")
except Exception as e:
    print(f"Error: {e}")