In [None]:
# Author: Wong Xiao Yuan

In [None]:
import pandas as pd

df = pd.read_parquet("sentiment_merged.parquet")
print(df.columns.tolist())

['title', 'url', 'publishDate', 'newsCompany', 'full_text', 'sentence', 'predictedLabel', 'topic_summary', 'topic_with_lda', 'topic_with_ner', 'department', 'category', 'person_names', 'locations']


In [None]:
# task5_graph_building.ipynb
from pyspark.sql import SparkSession
from article_record_parser import ArticleRecordParser
from neo4j_graph_builder import Neo4jGraphBuilder

# Step 1
spark = SparkSession.builder.appName("Task5_Neo4j").getOrCreate()

# Step 2
df = spark.read.parquet("hdfs://localhost:9000/user/student/sentiment_merged.parquet")

# Step 3
builder = Neo4jGraphBuilder(
    "neo4j+s://97144963.databases.neo4j.io",
    "neo4j",
    "D3LgAxX8FIfix41we1XQ1iNzN6fGAftdjowFjeAJjbk"
)

# Step 4
columns = [
    'title','url','publishDate','newsCompany',
    'full_text','sentence','predictedLabel',
    'topic_with_lda','department','category'
]
records = df.select(*columns).collect()
articles = [ArticleRecordParser.parse_row(r) for r in records]

session = builder.driver.session()

# … after you’ve parsed `articles = […]` …

# ---- Phase 1: Article nodes (distinct titles) ----
unique_titles = {art['title'] for art in articles}
for title in unique_titles:
    session.write_transaction(Neo4jGraphBuilder._create_article_node,
                              next(art for art in articles if art['title']==title))
print(f"✔️  Distinct Article nodes: {len(unique_titles)}")
# (optional) print(unique_titles)

# ---- Phase 2: Company nodes (distinct names) ----
unique_companies = {art['newsCompany'] for art in articles if art['newsCompany']}
for c in unique_companies:
    session.write_transaction(Neo4jGraphBuilder._create_publisher_node, c)
print(f"✔️  Distinct Company nodes: {len(unique_companies)}")
# print(unique_companies)

# ---- Phase 3: Sentence nodes (distinct sentenceId) ----
unique_sentences = {art['sentenceId'] for art in articles if art['sentence'].strip()}
for sid in unique_sentences:
    art = next(art for art in articles if art['sentenceId']==sid)
    session.write_transaction(Neo4jGraphBuilder._create_sentence_node, art)
print(f"✔️  Distinct Sentence nodes: {len(unique_sentences)}")

# ---- Phase 4: Sentiment nodes (distinct labels) ----
unique_sentiments = {art['sentiment'] for art in articles if art['sentiment']}
for s in unique_sentiments:
    session.write_transaction(Neo4jGraphBuilder._create_sentiment_node, s)
print(f"✔️  Distinct Sentiment nodes: {len(unique_sentiments)}")

# ---- Phase 5: Topic nodes (distinct keywords) ----
unique_topics = {t for art in articles for t in art['topics']}
for t in unique_topics:
    session.write_transaction(Neo4jGraphBuilder._create_topic_node, t)
print(f"✔️  Distinct Topic nodes: {len(unique_topics)}")

# ---- Phase 6: Category nodes (distinct) ----
unique_categories = {art['category'] for art in articles if art['category']}
for c in unique_categories:
    session.write_transaction(Neo4jGraphBuilder._create_category_node, c)
print(f"✔️  Distinct Category nodes: {len(unique_categories)}")

# ---- Phase 7: Department nodes (distinct) ----
unique_departments = {art['department'] for art in articles if art['department']}
for d in unique_departments:
    session.write_transaction(Neo4jGraphBuilder._create_department_node, d)
print(f"✔️  Distinct Department nodes: {len(unique_departments)}")

# ---- Phase 8: PUBLISHED_BY rels (distinct pairs) ----
pub_pairs = {(art['title'], art['newsCompany']) for art in articles}
for title, comp in pub_pairs:
    session.write_transaction(
        Neo4jGraphBuilder._create_published_by_rel,
        title, comp
    )
print(f"✔️  Distinct PUBLISHED_BY relationships: {len(pub_pairs)}")

# ---- Phase 9: HAS_SENTENCE rels (distinct pairs) ----
hs_pairs = {(art['title'], art['sentenceId']) for art in articles if art['sentence'].strip()}
for title, sid in hs_pairs:
    session.write_transaction(
        Neo4jGraphBuilder._create_has_sentence_rel,
        title, sid
    )
print(f"✔️  Distinct HAS_SENTENCE relationships: {len(hs_pairs)}")

# ---- Phase 10: HAS_SENTIMENT rels (distinct pairs) ----
hse_pairs = {(art['sentenceId'], art['sentiment']) for art in articles if art['sentiment']}
for sid, sent in hse_pairs:
    session.write_transaction(
        Neo4jGraphBuilder._create_has_sentiment_rel,
        sid, sent
    )
print(f"✔️  Distinct HAS_SENTIMENT relationships: {len(hse_pairs)}")

# ---- Phase 11: HAS_TOPIC rels (distinct pairs) ----
ht_pairs = {(art['title'], t) for art in articles for t in art['topics']}
for title, t in ht_pairs:
    session.write_transaction(
        Neo4jGraphBuilder._create_has_topic_rel,
        title, t
    )
print(f"✔️  Distinct HAS_TOPIC relationships: {len(ht_pairs)}")

# ---- Phase 12: BELONGS_TO_CATEGORY rels (distinct pairs) ----
bc_pairs = {(art['title'], art['category']) for art in articles if art['category']}
for title, cat in bc_pairs:
    session.write_transaction(
        Neo4jGraphBuilder._create_belongs_to_category_rel,
        title, cat
    )
print(f"✔️  Distinct BELONGS_TO_CATEGORY relationships: {len(bc_pairs)}")

# ---- Phase 13: BELONGS_TO_DEPARTMENT rels (distinct pairs) ----
bd_pairs = {(art['title'], art['department']) for art in articles if art['department']}
for title, dept in bd_pairs:
    session.write_transaction(
        Neo4jGraphBuilder._create_belongs_to_department_rel,
        title, dept
    )
print(f"✔️  Distinct BELONGS_TO_DEPARTMENT relationships: {len(bd_pairs)}")

# Step 6
session.close()
builder.close()
spark.stop()


25/04/18 16:05:42 WARN Utils: Your hostname, LAPTOP-EFVFE10M. resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/04/18 16:05:42 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/18 16:05:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  session.write_transaction(Neo4jGraphBuilder._create_article_node,


✔️  Distinct Article nodes: 46
✔️  Distinct Company nodes: 2


  session.write_transaction(Neo4jGraphBuilder._create_publisher_node, c)
  session.write_transaction(Neo4jGraphBuilder._create_sentence_node, art)


✔️  Distinct Sentence nodes: 743
✔️  Distinct Sentiment nodes: 3


  session.write_transaction(Neo4jGraphBuilder._create_sentiment_node, s)
  session.write_transaction(Neo4jGraphBuilder._create_topic_node, t)


✔️  Distinct Topic nodes: 53


  session.write_transaction(Neo4jGraphBuilder._create_category_node, c)


✔️  Distinct Category nodes: 8
✔️  Distinct Department nodes: 3


  session.write_transaction(Neo4jGraphBuilder._create_department_node, d)
  session.write_transaction(


✔️  Distinct PUBLISHED_BY relationships: 46


  session.write_transaction(


✔️  Distinct HAS_SENTENCE relationships: 743


  session.write_transaction(


✔️  Distinct HAS_SENTIMENT relationships: 743


  session.write_transaction(


✔️  Distinct HAS_TOPIC relationships: 230


  session.write_transaction(


✔️  Distinct BELONGS_TO_CATEGORY relationships: 46


  session.write_transaction(


✔️  Distinct BELONGS_TO_DEPARTMENT relationships: 46


In [None]:
from pyspark.sql.functions import explode, col

# assume parsed_df is your DataFrame with schema:
# ['title','url','publishDate','full_text','sentence','sentenceId',
#  'newsCompany','sentiment','topics','people','category','department']

for c in parsed_df.columns:
    print(f"\n--- Distinct values for column `{c}` ({parsed_df.select(c).distinct().count()} total) ---")
    # if it’s an array column, explode it first
    if dict(parsed_df.dtypes)[c].startswith("array"):
        parsed_df \
          .select(explode(col(c)).alias(c)) \
          .distinct() \
          .orderBy(c) \
          .show(truncate=False)
    else:
        parsed_df \
          .select(c) \
          .distinct() \
          .orderBy(c) \
          .show(truncate=False)



--- Distinct values for column `title` (46 total) ---
+----------------------------------------------------------------------------------------------------------------+
|title                                                                                                           |
+----------------------------------------------------------------------------------------------------------------+
|139 more staff eyed for scam response centre                                                                    |
|16-year-old boy who made AI porn of schoolmates nabbed                                                          |
|All parties must respect Islam’s position as the Federal Religion, says Mohd Na’im                              |
|Allow parties involved to resolve temple relocation harmoniously, says Fahmi                                    |
|Banks foiled nearly RM800mil in fraudulent transactions                                                         |
|CMA amendment enhances o

In [None]:
from neo4j import GraphDatabase

URI      = "neo4j+s://97144963.databases.neo4j.io"
USER     = "neo4j"
PASSWORD = "D3LgAxX8FIfix41we1XQ1iNzN6fGAftdjowFjeAJjbk"

TYPE_MAP = {
    "str":   "String",
    "int":   "Long",
    "float": "Double",
    "bool":  "Boolean"
}

def map_type(value):
    if isinstance(value, list):
        elem_types = {
            TYPE_MAP.get(type(v).__name__, type(v).__name__)
            for v in value if v is not None
        }
        if not elem_types:
            return "List<Unknown>"
        if len(elem_types) == 1:
            t = elem_types.pop()
            return f"List<{t}>"
        joined = ", ".join(sorted(elem_types))
        return f"List<({joined})>"
    else:
        pyname = type(value).__name__
        return TYPE_MAP.get(pyname, pyname)

driver = GraphDatabase.driver(URI, auth=(USER, PASSWORD))

with driver.session() as session:
    labels = [record["label"] for record in session.run("CALL db.labels() YIELD label")]

    print("=== Graph Schema: Node labels with their properties & types ===\n")
    for lbl in labels:
        rec = session.run(f"MATCH (n:`{lbl}`) RETURN n LIMIT 1").single()
        if not rec:
            continue
        node = rec["n"]
        print(f"({lbl})")
        for key, val in dict(node).items():
            print(f"  +{key}: {map_type(val)}")
        print()

driver.close()


=== Graph Schema: Node labels with their properties & types ===

(Article)
  +full_text: String
  +publishDate: String
  +title: String
  +url: String

(Company)
  +name: String

(Sentence)
  +id: String
  +text: String

(Sentiment)
  +label: String

(Topic)
  +keyword: String

(Category)
  +name: String

(Department)
  +name: String



In [None]:

spark = SparkSession.builder \
    .appName("CountParquetRows") \
    .getOrCreate()

df = spark.read.parquet("hdfs://localhost:9000/user/student/sentiment_merged.parquet")

row_count = df.count()

print(f"Total rows in DataFrame: {row_count}")

df.show()

print(f"Number of partitions: {df.rdd.getNumPartitions()}")


Total rows in DataFrame: 5095
+--------------------+--------------------+--------------------+---------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------+----------+--------------------+------------+---------+
|               title|                 url|         publishDate|    newsCompany|           full_text|            sentence|predictedLabel|       topic_summary|      topic_with_lda|topic_with_ner|department|            category|person_names|locations|
+--------------------+--------------------+--------------------+---------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------+----------+--------------------+------------+---------+
|16-year-old boy w...|https://www.thest...|2025-04-09T23:00:00Z|The Star Online|johor baru: a 16-...|johor baru: a 16-...|       neutral|Topic 2: said, vi...|Topic 2: said, vi...|Limited Places|  Politics|International P

In [None]:
from pyspark.sql.functions import col, explode, regexp_replace, split

# 1) Strip off the "Topic 7:" prefix
cleaned = df.withColumn(
    "topics_array",
    split(
        regexp_replace(col("topic_with_lda"), r"^Topic\s*\d+:\s*", ""),
        r",\s*"
    )
)

# 2) Explode, dedupe, sort, and show
distinct_topics_df = (
    cleaned
      .select(explode(col("topics_array")).alias("topic"))
      .filter(col("topic") != "")    # drop any empty strings
      .distinct()
      .orderBy("topic")
)

distinct_topics_df.show(truncate=False)
print(f"Total distinct topics: {distinct_topics_df.count()}")


+--------------+
|topic         |
+--------------+
|action        |
|activities    |
|also          |
|anwar         |
|april         |
|ayer          |
|campaign      |
|children      |
|commission    |
|communications|
|companies     |
|content       |
|datuk         |
|digital       |
|facebook      |
|fahmi         |
|financial     |
|g             |
|internet      |
|investigation |
+--------------+
only showing top 20 rows

Total distinct topics: 53
