In [None]:
# 基于空间索引 给时序数据area打标
import findspark
findspark.init('/opt/spark')
from pyspark.sql import SparkSession
# https://sedona.apache.org/1.3.0-incubating/archive/tutorial/geospark-sql-python/
from geospark.register import upload_jars
from geospark.register import GeoSparkRegistrator
app_name = "dwd_to_dwm"

# GeoSpark has a suite of well-written geometry and index serializers. 
# Forgetting to enable these serializers will lead to high memory consumption. (序列化器优化内存使用)
# https://stackoverflow.com/questions/65213369/unable-to-configure-geospark-in-spark-session
# upload_jars()

spark = SparkSession.builder \
    .appName(app_name) \
    .enableHiveSupport() \
    .config("spark.executor.memory", "40g") \
    .config("spark.driver.memory", "40g") \
    .config("spark.driver.maxResultSize","4g")\
    .getOrCreate()

# GeoSparkRegistrator.registerAll(spark)




# 打印集群信息
print("Spark 集群名称: ", spark.conf.get("spark.app.name"))
print("Spark 集群版本: ", spark.version)
print("Spark ID: ", spark.sparkContext.applicationId)

print("Spark 集群节点数: ", spark.sparkContext._jsc.sc().getExecutorMemoryStatus().keySet())
print("每个 Executor 的内存容量: ", spark.conf.get("spark.executor.memory"))

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [None]:
# create database time_series
spark.sql("DROP TABLE IF EXISTS dwm_time_series;")

sql = '''
CREATE TABLE IF NOT EXISTS dwm_time_series
         (id bigint,
          time TIMESTAMP,
          lon float,
          lat float)
PARTITIONED BY(area int)
STORED AS PARQUET;
'''

# create database job
spark.sql("DROP TABLE IF EXISTS dwm_job;")
sql = """

CREATE TABLE IF NOT EXISTS dwm_job
(id bigint,start_time TIMESTAMP,end_time TIMESTAMP)
PARTITIONED BY(yearMonth string)
STORED AS PARQUET;

"""
spark.sql(sql)

# 创建输出表
#CREATE TABLE output(id SERIAL PRIMARY KEY,job_id int4);

print("初始化 dwm_time_series，dwm_job成功")

In [None]:
# 加载job 表=> 

sql = '''
INSERT INTO dwm_job partition(yearMonth)
SELECT
 id as id,
 start_time as start_time,
 end_time as end_time,
 yearMonth as yearMonth
 FROM dwd_job;
'''
result = spark.sql(sql)

# 查看一条数据
result = spark.sql("SELECT * FROM dwm_job LIMIT 1")
result.show()

In [None]:
# 查看时序数据总量
result = spark.sql("SELECT count(*) FROM dwd_time_series")
result.show()

In [None]:
sql = '''

SELECT id,time,lon,lat
FROM (
	SELECT id,time,lon, lat
	FROM dwd_time_series as time_series
	WHERE EXISTS (
		SELECT *
		FROM dwm_job as job
		WHERE job.id = 1
			AND time_series.time BETWEEN job.start_time AND job.end_time
	)
) dwd_time_series;

'''
df_input = spark.sql(sql)
df_input.show()

In [None]:
from pyspark.sql.types import StructType, StructField, LongType, TimestampType, FloatType, IntegerType

# 定义模式（schema）
schema = StructType([
    StructField("id", LongType(), nullable=False),
    StructField("time", TimestampType(), nullable=False),
    StructField("lon", FloatType(), nullable=False),
    StructField("lat", FloatType(), nullable=False)
])

# 添加分区列
schema.add(StructField("area", IntegerType(), nullable=False))

# 打印模式（schema）
print(schema)

In [None]:
from pyspark.sql import Row

import rtree
# 创建R树索引
idx = rtree.index.Index()

def build_rtree_idx(row):
    idx.insert(row.id, (row.lon, row.lat, row.lon, row.lat))
    
def mapper(rows):
    ResRow = Row()
    # 针对一个part的数据做map
    for row in rows:
        
        yield Row(id= row.id, time=row.time, lon=row.lon, lat=row.lat)
        

df_input.rdd.map(build_rtree_idx)

# df_output = df_input.rdd.repartition(3000).mapPartitions(mapper).toDF()


In [None]:
df_input.show()

In [None]:
from pyspark.sql import SparkSession,Row

# 创建 SparkSession
spark = SparkSession.builder \
    .appName("MapOperation") \
    .getOrCreate()

# 创建一个包含数据的 PySpark DataFrame
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
df = spark.createDataFrame(data, ["Name", "Age"])

# 将 DataFrame 转换为 RDD
rdd = df.rdd

# 定义一个函数来应用于 RDD 的每个元素
def map_func(rows):
    
    ret = []
    ResRow = Row('Name','Age')
    for row in rows:
        name = row["Name"]
        age = row["Age"]
        
        ret.append(ResRow(name,age))
        
#     return ret
        

# 应用 map 操作

df_output = rdd.repartition(30).mapPartitions(map_func).toDF()

# 将 RDD 转换回 DataFrame
new_df = spark.createDataFrame(new_rdd, ["Name", "Age"])

# 显示转换后的 DataFrame
new_df.show()