In [None]:
from pyspark.sql import SparkSession

# 创建 SparkSession
spark = SparkSession.builder \
    .appName("SQL Join on Multiple Tables") \
    .getOrCreate()

# 1. 分别读取 4 张表
table_aircraft_details = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("database/aircraft_details.csv")

table_airport_fbo_details = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("database/airport_fbo_details.csv")

table_supplier_details = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("database/supplier_details.csv")

table_zts_plane_flight = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("database/zts_plane_flight.csv")

table_zts_plane_flight_1 = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("database/zts_plane_flight_1.csv")

table_zts_plane_flight_2 = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("database/zts_plane_flight_2.csv")

table_ai_target_jet = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("database/ai_target_jet.csv")


# 2. 注册为临时视图
table_aircraft_details.createOrReplaceTempView("aircraft_details")
table_airport_fbo_details.createOrReplaceTempView("airport_fbo_details")
table_supplier_details.createOrReplaceTempView("supplier_details")
table_zts_plane_flight.createOrReplaceTempView("zts_plane_flight")
table_zts_plane_flight_1.createOrReplaceTempView("zts_plane_flight_1")
table_zts_plane_flight_2.createOrReplaceTempView("zts_plane_flight_2")
table_ai_target_jet.createOrReplaceTempView("ai_target_jet")


In [None]:
# 3. 执行连表查询
# 假设 table1 和 table2 有一个公共字段 "id"，table3 和 table4 也有相同的公共字段
join_query = """
WITH biao_1 AS (
        SELECT
                a.date_month,
                a.dep_icao,
                count(*) AS dep_num 
        FROM
                zts_plane_flight_1 a
                JOIN ai_target_jet b ON a.pure_reg = b.pure_reg 
        WHERE
                date_month BETWEEN '2023-12' 
                AND '2024-12' 
                AND a.dep_icao = 'ZSSS'
        GROUP BY
                a.date_month,
                a.dep_icao 
        ),
        biao_2 AS (
        SELECT
                a.date_month,
                a.arr_icao,
                count(*) AS arr_num 
        FROM
                zts_plane_flight_1 a
                JOIN ai_target_jet b ON a.pure_reg = b.pure_reg 
        WHERE
                date_month BETWEEN '2023-12' 
                AND '2024-12' 
                AND a.arr_icao = 'ZSSS' 
        GROUP BY
                a.date_month,
                a.arr_icao 
        ),
        biao_3 AS (
        SELECT
                a.date_month,
                a.dep_icao AS icao_code,
                dep_num + arr_num AS total_num 
        FROM
                biao_1 a
                LEFT JOIN biao_2 b ON a.date_month = b.date_month 
        ),
        biao_4 AS ( SELECT *, lag( total_num ) over ( ORDER BY date_month ) AS last_num FROM biao_3 ) SELECT
        *,
        concat( round(( total_num - last_num )/ last_num * 100, 2 ), '%' ) AS rate 
FROM
        biao_4 
WHERE
        last_num IS NOT NULL;
"""

join_query = '''
SELECT max_range FROM aircraft_details WHERE aircraft_model_name = "Embraer Phenom 300" LIMIT 1;
'''
icao_code = "'ZSSS'"
date_month1 = "'2023-12'"
date_month2 = "'2024-12'"
join_query = f'''
WITH biao_1 AS (
        SELECT
                a.date_month AS date_month,
                COUNT(*) AS dep_count 
        FROM
                zts_plane_flight_1 a
                JOIN ai_target_jet b ON a.pure_reg = b.pure_reg 
        WHERE
                a.dep_icao = {icao_code}
                AND a.date_month BETWEEN {date_month1} 
                AND {date_month2} 
        GROUP BY
                a.date_month 
        ),
        biao_2 AS (
        SELECT
                a.date_month AS date_month,
                COUNT(*) AS arr_count 
        FROM
                zts_plane_flight_1 a
                JOIN ai_target_jet b ON a.pure_reg = b.pure_reg 
        WHERE
                a.arr_icao = {icao_code} 
                AND a.date_month BETWEEN {date_month1} 
                AND {date_month2} 
        GROUP BY
                a.date_month 
        ),
        biao_3 AS (
        SELECT
                a.date_month,
                ( a.dep_count + b.arr_count ) AS num 
        FROM
                biao_1 a
                JOIN biao_2 b ON a.date_month = b.date_month 
        ),
        biao_4 AS ( SELECT date_month, num, lag( num ) over ( ORDER BY date_month ) AS last_num FROM biao_3 ) SELECT
        date_month,
        num,
        last_num,
        concat( round(( num - last_num )/ last_num * 100, 2 ), '%' ) AS rate 
FROM
        biao_4 
WHERE
        last_num IS NOT NULL;
'''

pure_reg = "'c6cay'"
date_month1 = "'2023-12'"
date_month2 = "'2024-12'"
join_query = f'''
WITH biao_1 AS (
        SELECT
                date_month,
                count(*) AS num 
        FROM
                zts_plane_flight_1 
        WHERE
                pure_reg = {pure_reg}  
                AND date_month BETWEEN {date_month1} 
                AND {date_month2} 
        GROUP BY
                date_month 
        ),
        biao_2 AS ( SELECT *, lag( num ) over ( ORDER BY date_month ) AS last_month_num FROM biao_1 ) SELECT
        date_month,
        num AS month_num,
        last_month_num,
        concat( round(( num - last_month_num )/ last_month_num * 100, 2 ), '%' ) AS rate 
FROM
        biao_2 
WHERE
        last_month_num IS NOT NULL 
ORDER BY
        date_month 
'''

result_df = spark.sql(join_query)

# 4. 显示查询结果
print("Join Query Result:")
result_df.show()

# 5. 保存结果到文件（可选）
# result_df.write.format("csv").option("header", "true").save("output/joined_result.csv")

In [None]:
from pyspark.sql import SparkSession

# 创建 SparkSession
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()

# 创建一个示例 DataFrame
data = [("Alice", 34), ("Bob", 45), ("Cathy", 29)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)

# 显示数据
df.show()
