In [7]:
import os
import sys

# Set Java 17 as the Java version for PySpark
# PySpark 3.5.7 requires Java 17, can't do with Java 25
os.environ['JAVA HOME'] = '/Program Files/Eclipse Adoptium/jdk-17.0.17.10-hotspot'
os.environ['PATH'] = os.environ['JAVA_HOME'] + '/bin;' + os.environ['PATH']

# Verify Java version
import subprocess
result = subprocess.run(['java','-version'], capture_output = True,text=True)
print("Java version:", result.stderr.split('\n')[0])

# Set Python executable for PySpark
os.environ['PYSPARK_PYTHON'] = sys.executable

Java version: openjdk version "17.0.17" 2025-10-21


In [8]:
import os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [20]:
import os
print("HADOOP_HOME =", os.environ.get("HADOOP_HOME"))

if os.environ.get("HADOOP_HOME"):
    path = os.path.join(os.environ["HADOOP_HOME"], "bin", "winutils.exe")
    print("winutils exists:", os.path.exists(path), "->", path)
else:
    print("❌ Hadoop not configured in this session")

HADOOP_HOME = D:\Code\hadoop-3.4.0\hadoop-3.4.0
winutils exists: True -> D:\Code\hadoop-3.4.0\hadoop-3.4.0\bin\winutils.exe


In [10]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("5400-elt-pipeline-test")
    .master("local[*]")
    # 可选的防踩坑配置
    .config("spark.sql.warehouse.dir", "file:/D:/tmp/spark-warehouse")
    .config("spark.driver.extraJavaOptions", "-Duser.timezone=UTC")
    .getOrCreate()
)

print("Spark version:", spark.version)
print("SparkContext stopped?", spark.sparkContext._jsc.sc().isStopped())


Spark version: 3.5.7
SparkContext stopped? False


In [11]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("PySpark in venv") \
    .config("spark.cores.max", "4") \
    .config('spark.executor.memory', '8G') \
    .config('spark.driver.maxResultSize', '8g') \
    .config('spark.kryoserializer.buffer.max','512m') \
    .config("spark.driver.cores", "4") \
    .config("spark.pyspark.python", sys.executable)\
    .config("spark.pyspark.driver.python", sys.executable)\
    .config("spark.python.use.daemon", "false") \
    .config("spark.python.worker.reuse", "false") \
    .getOrCreate()

sc = spark.sparkContext  # SparkContext对象

print("Using Apache Spark Version", spark.version)
print(sys.executable)

Using Apache Spark Version 3.5.7
D:\Columbia\Fall2025\5400\SQL.venv\Scripts\python.exe


In [12]:
df = spark.read.format("csv").options(
    header = 'true',
    inferschema = 'true',
    treatEmptyValuesAsNulls = 'true'
).load('All_external.csv')
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Article_title: string (nullable = true)
 |-- Stock_symbol: string (nullable = true)
 |-- Url: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- Author: string (nullable = true)
 |-- Article: string (nullable = true)
 |-- Lsa_summary: string (nullable = true)
 |-- Luhn_summary: string (nullable = true)
 |-- Textrank_summary: string (nullable = true)
 |-- Lexrank_summary: string (nullable = true)



In [13]:
print("Count of all records:", df.count())

Count of all records: 29984720


In [14]:
df.columns

['Date',
 'Article_title',
 'Stock_symbol',
 'Url',
 'Publisher',
 'Author',
 'Article',
 'Lsa_summary',
 'Luhn_summary',
 'Textrank_summary',
 'Lexrank_summary']

### Bronze Layer

In [16]:
from pyspark.sql import functions as F
from pymongo import MongoClient

# -------------------------
# 1. Spark 这边：Bronze 层
# -------------------------
# 假设 df 就是你 Bronze 层要保存的 DataFrame
news_bronze = df

# 可选：减少分区数，避免小文件太多（不是必须）
news_bronze = news_bronze.coalesce(4)

print("Spark rows:", news_bronze.count())
news_bronze.printSchema()

# -------------------------
# 2. 连接 Docker 里的 Mongo
# -------------------------
client = MongoClient("mongodb://localhost:27017")
db = client["news_elt"]          # 数据库名称你可以自定义
bronze_col = db["fnspid_bronze"] # collection 名称

# 覆盖写：先清空
bronze_col.delete_many({})

# -------------------------
# 3. 流式写入 Mongo（不走 toPandas）
# -------------------------
# 注意：这里用 toLocalIterator()，会一批批拉数据，不会一次性塞满内存
count = 0
for row in news_bronze.toLocalIterator():
    doc = row.asDict(recursive=True)  # Row -> dict
    bronze_col.insert_one(doc)
    count += 1
    if count % 10000 == 0:
        print("已写入", count, "条...")

print("✅ Mongo Bronze 写入完成，总行数：", count)

# -------------------------
# 4. 验证 Mongo 里数据
# -------------------------
print("Mongo rows:", bronze_col.count_documents({}))
print("示例文档：", bronze_col.find_one())


ConnectionRefusedError: [WinError 10061] 由于目标计算机积极拒绝，无法连接。