In [1]:
import numpy

In [1]:
from pyspark.sql import SparkSession

# 初始化 Spark 会话（本地模式）
spark = SparkSession.builder\
    .appName("MyApp")\
    .master("local[*]")\
    .config("spark.driver.memory", "4g")\
    .enableHiveSupport()\
    .getOrCreate()

PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.

In [None]:
# 从 CSV 加载
df = spark.read.csv("C:\Users\admin\Documents\risk_data\data\loan_info.csv", header=True, inferSchema=True)

# 从 Hive 表加载
hive_df = spark.sql("SELECT * FROM my_hive_table")

# 从 Parquet 加载
parquet_df = spark.read.parquet("data.parquet")

In [None]:
# 保存为 Parquet
df.write.parquet("output.parquet")

# 保存到 Hive 表
df.write.saveAsTable("hive_table")

# 保存为 CSV（需谨慎处理分区）
df.write.csv("output.csv", mode="overwrite")

In [None]:
# 显示数据
df.show(5)  # 前 5 行
df.printSchema()  # 查看表结构

# 列选择与过滤
df.select("name", "age").filter(df.age > 30).show()

# 聚合操作
df.groupBy("department").agg({"salary": "avg", "age": "max"}).show()

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

# 定义 UDF
def square(x):
    return x * x

square_udf = udf(square, IntegerType())

# 应用 UDF
df.withColumn("squared_age", square_udf(df.age)).show()

In [None]:
# 创建临时视图
df.createOrReplaceTempView("people")

# 执行 SQL 查询
result = spark.sql("""
    SELECT department, AVG(salary) AS avg_salary 
    FROM people 
    WHERE age > 25 
    GROUP BY department
""")
result.show()

In [None]:
df.cache()  # 缓存到内存（适用于频繁访问的数据）
df.persist(StorageLevel.MEMORY_AND_DISK)  # 更灵活的持久化策略

In [None]:
# 重分区（减少数据倾斜）
df = df.repartition(100, "department")

# 控制并行度
spark.conf.set("spark.sql.shuffle.partitions", 200)  # 调整 Shuffle 分区数

In [None]:
small_df = ...  # 小数据集
broadcast_df = spark.sparkContext.broadcast(small_df.collect())

# 在 UDF 中使用广播变量
def lookup_data(id):
    return broadcast_df.value.get(id)

In [None]:
# 从 Kafka 读取流数据
stream_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "host:9092") \
    .option("subscribe", "topic") \
    .load()

# 处理数据并输出
query = stream_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()  # 持续运行

In [None]:
spark.stop()  # 显式释放资源