In [2]:
import pyspark.sql.dataframe
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import psycopg2

In [3]:
def init_spark():
    return (
        SparkSession.builder.master("local[*]")
        .appName("PySpark_Working")
        .config(
            "spark.jars",
            "/Users/dmitrijzigunov/Library/Application Support/JetBrains/DataGrip2023.3/jdbc-drivers/PostgreSQL/42.6.0/org/postgresql/postgresql/42.6.0/postgresql-42.6.0.jar",
        )
        .getOrCreate()
    )

spark = init_spark()

24/01/29 11:33:19 WARN Utils: Your hostname, MacBook-Pro-M.local resolves to a loopback address: 127.0.0.1; using 192.168.1.117 instead (on interface en0)
24/01/29 11:33:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/01/29 11:33:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
def read_dtp_spark(sesspark, path_name, is_print = False) -> pyspark.sql.dataframe.DataFrame:
    uchInfo = StructType([
        StructField("ALCO", StringType(), True),
        StructField("k_UCH", StringType(), True),
        StructField("NPDD", StringType(), True),
        StructField("n_UCH", StringType(), True),
        StructField("POL", StringType(), True),
        StructField("SOP_NPDD", StringType(), True),
        StructField("s_SM", StringType(), True),
        StructField("s_T", StringType(), True),
        StructField("v_ST", StringType(), True),
    ])
    driverSchema = StructType([
        StructField("ALCO", StringType(), True),
        StructField("INJURED_CARD_ID", StringType(), True),
        StructField("k_UCH", StringType(), True),
        StructField("NPDD", StringType(), True),
        StructField("n_UCH", StringType(), True),
        StructField("POL", StringType(), True),
        StructField("SAFETY_BELT", StringType(), True),
        StructField("SOP_NPDD", StringType(), True),
        StructField("s_SEAT_GROUP", StringType(), True),
        StructField("s_SM", StringType(), True),
        StructField("s_T", StringType(), True),
        StructField("v_ST", IntegerType(), True),
    ])
    tsInfoSchema = StructType([
        StructField("color", StringType(), True),
        StructField("f_sob", StringType(), True),
        StructField("g_v", StringType(), True),
        StructField("m_pov", StringType(), True),
        StructField("m_ts", StringType(), True),
        StructField("marka_ts", StringType(), True),
        StructField("n_ts", StringType(), True),
        StructField("o_pf", StringType(), True),
        StructField("r_rul", StringType(), True),
        StructField("t_n", StringType(), True),
        StructField("t_ts", StringType(), True),
        StructField("ts_s", StringType(), True),
        StructField("ts_uch", ArrayType(driverSchema), True),
    ])

    infoDtpSchema = StructType([
        StructField("CHOM", StringType(), True),
        StructField("COORD_L", StringType(), True),
        StructField("COORD_W", StringType(), True),
        StructField("dor", StringType(), True),
        StructField("dor_k", StringType(), True),
        StructField("dor_z", StringType(), True),
        StructField("factor", StringType(), True),
        StructField("house", StringType(), True),
        StructField("k_ul", StringType(), True),
        StructField("km", StringType(), True),
        StructField("m", StringType(), True),
        StructField("NP", StringType(), True),
        StructField("ndu", StringType(), True),
        StructField("OBJ_DTP", StringType(), True),
        StructField("osv", StringType(), True),
        StructField("s_dtp", StringType(), True),
        StructField("s_pch", StringType(), True),
        StructField("sdor", StringType(), True),
        StructField("spog", StringType(), True),
        StructField("street", StringType(), True),
        StructField("ts_info", ArrayType(tsInfoSchema), True),
        StructField("uchInfo", ArrayType(uchInfo), True),
    ])

    tabSchema = StructType([
        StructField("DTPV", StringType(), True),
        StructField("date", StringType(), True),
        StructField("district", StringType(), True),
        StructField("EMTP_NUMBER", LongType(), True),
        StructField("infoDtp", infoDtpSchema, True),
        StructField("KTS", StringType(), True),
        StructField("KUCH", StringType(), True),
        StructField("kartId", StringType(), True),
        StructField("POG", StringType(), True),
        StructField("RAN", StringType(), True),
        StructField("rowNum", StringType(), True),
        StructField("time", StringType(), True),
    ])
    

    df = sesspark.read.option("multiline", "true").json(
        path_name,
        schema=tabSchema
    )
    if is_print:
        df.printSchema()

    return df

# Работа с подключением к Postgres и загрузка таблиц с вычислением их идентификатором

In [5]:
def load_table(db_table, user_name="dmitrijzigunov", url_ct="jdbc:postgresql://localhost:5432/car_accidents"):
    return (
        spark.read.format("jdbc")
        .options(
            url=url_ct,
            dbtable=db_table,
            user=user_name,
            driver="org.postgresql.Driver",
        )
        .load()
    )

# Добавление и согласование идентификаторов

In [6]:
#* Можно вызвать selectExpr, и тогда названия колонок просто перечислять через запятую 

# Добавление столбца с идентификаторами
def load_dtp_data(main_frame, id_function, is_print=False):
    dtp_data_loading = main_frame.select(
        col("DTPV").cast(StringType()).alias("description"), 
        to_timestamp(concat(col("date"), lit(" "), col("time")), "dd.MM.yyyy HH:mm").alias("datetime"),
        col("time").cast(StringType()), 
        col("infoDtp.COORD_L").cast(FloatType()).alias("coord_l"),
        col("infoDtp.COORD_W").cast(FloatType()).alias("coord_w"),
        col("infoDtp.dor").cast(StringType()).alias("dor"), 
        col("infoDtp.osv").cast(StringType()).alias("osv"), 
        col("KTS").cast(IntegerType()).alias("count_ts"), 
        col("KUCH").cast(IntegerType()).alias("count_parts")
    ).withColumn("ID", id_function(monotonically_increasing_id(), lit("dtp")))
    if is_print:
        print(dtp_data_loading.show(n=5))
    return dtp_data_loading
# Добавление столбца с идентификаторами
def load_vehicle_data(main_frame, id_function, is_print=False):
    vehicle_data_loading = main_frame.withColumn("ID", id_function(monotonically_increasing_id(), lit("dtp")))\
    .select("id", explode(col("infoDtp.ts_info")).alias("ts_info"))\
    .select(
    col("ts_info.marka_ts"),
    col("ts_info.m_ts"),
    col("ts_info.r_rul"),
    col("ts_info.t_ts").alias("type_ts"),
    col("ts_info.g_v").cast(IntegerType()).alias("car_year"),
    col("ts_info.color"),
    col("ID").alias("dtp_id")
).withColumn("ID", id_function(monotonically_increasing_id(), lit("vehicle")))
    if is_print:
        print(vehicle_data_loading.show(n=8))
    return vehicle_data_loading
# Добавление столбца с идентификаторами
def load_participant_data(main_frame, id_function, is_print=False):
    partic_data_loading = main_frame.select(explode(col("infoDtp.ts_info")).alias("ts_info"))\
    .withColumn("vehicle_id", id_function(monotonically_increasing_id(), lit("vehicle")))\
    .select("vehicle_id", explode(col("ts_info.ts_uch")).alias("participant")).select(
    col('participant.k_UCH').alias("category"),
    col('participant.NPDD').alias("warnings"),
    col('participant.SAFETY_BELT').alias("safety_belt"),
    col('participant.POL').alias("pol"),
    col('participant.s_T').alias("health"),
    col('vehicle_id')).withColumn("safety_belt", when(col("safety_belt") == "Да", True).otherwise(False))\
    .withColumn("ID", id_function(monotonically_increasing_id(), lit("participant")))
    if is_print:
        print(partic_data_loading.show(n=50))
    return partic_data_loading

# dtp_data = load_dtp_data(dtp, generate_id_udf, True)
# vehicle_data = load_vehicle_data(dtp, generate_id_udf, True)
# partic_data = load_participant_data(dtp, generate_id_udf, True)


# Загрузка в Postgres

In [7]:
def get_dtp_columns():
    return ["id", "description", "datetime", "coord_w", "coord_l", "dor", "osv", "count_ts", "count_parts"]

def get_vehicle_columns():
    return ["id", "dtp_id", "marka_ts", "m_ts", "r_rul", "type_ts", "car_year", "color"]

def get_participant_columns():
    return ["id", "vehicle_id", "category", "warnings", "safety_belt", "pol", "health"]


# Настройка подключения к Postgres и чтение таблицы
def save_table(df, db_table, mode="append", user_name="dmitrijzigunov", url_ct="jdbc:postgresql://localhost:5432/car_accidents"):
    if db_table == "dtp":
        df = df.select(get_dtp_columns())
    elif db_table == "vehicle":
        df = df.select(get_vehicle_columns())
    elif db_table == "participant":
        df = df.select(get_participant_columns())
    df.write.format("jdbc")\
    .options(
        url=url_ct,
        dbtable=db_table,
        user=user_name,
        driver="org.postgresql.Driver",
    )\
    .mode(mode)\
    .save()


# Полный цикл ETL

Чтобы узнать адрес, по которому нужно обращаться к hdfs, открыть файл 
/opt/homebrew/Cellar/hadoop/3.3.6/libexec/etc/hadoop/core-site.xml

In [8]:
def load_and_save(file_name, is_print=False):
    if is_print:
        print(f"Loading {file_name}...")
    # path_file = "hdfs://localhost:9000/data/November2022.json"
    path_file = file_name
    textFile = spark.read.option("multiline","true").json(path_file)
    
    dtp = read_dtp_spark(spark, path_file)
    
    next_id_dtp = load_table("dtp").selectExpr("max(id) as max_id").first()["max_id"]
    next_id_veh = load_table("vehicle").selectExpr("max(id) as max_id").first()["max_id"]
    next_id_par = load_table("participant").selectExpr("max(id) as max_id").first()["max_id"]
    if is_print:
        print(f"Next id key – {next_id_dtp} dtp, {next_id_veh} vehicle, {next_id_par} participant")
    next_ids = {
        "dtp": next_id_dtp,
        "vehicle": next_id_veh,
        "participant": next_id_par
    }
    # Определение пользовательской функции
    def generate_id(row_num, table_name):
        next_id = next_ids.get(table_name, 0)
        if next_id is None:
            next_id = 0
        else:
            next_id += 1
        return row_num + next_id
    # Регистрация пользовательской функции
    generate_id_udf = udf(generate_id, IntegerType())
    
    dtp_data = load_dtp_data(dtp, generate_id_udf)
    vehicle_data = load_vehicle_data(dtp, generate_id_udf)
    partic_data = load_participant_data(dtp, generate_id_udf)
    
    if is_print:
        print(f"Loading dtp...\n{dtp_data.show(n=5)}")
        print(f"Loading veh...\n{vehicle_data.show(n=5)}")
        print(f"Loading par...\n{partic_data.show(n=5)}")
    
    save_table(dtp_data, "dtp")
    save_table(vehicle_data, "vehicle")
    save_table(partic_data, "participant")

# Рабочая область
#### При необходимости комментировать уже загруженные строки
#### Чтобы узнать адрес, по которому нужно обращаться к hdfs, открыть файл  /opt/homebrew/Cellar/hadoop/3.3.6/libexec/etc/hadoop/core-site.xml

In [12]:
load_and_save("hdfs://localhost:9000/data/November2022.json", False)
load_and_save("hdfs://localhost:9000/data/December2022.json", False)
load_and_save("hdfs://localhost:9000/data/January2023.json", False)
load_and_save("hdfs://localhost:9000/data/February2023.json", False)