## Переменные окружения

In [4]:
import os

print("Переменные окружения для PostgreSQL:")
for key, value in os.environ.items():
    if 'POSTGRES' in key: print(f"\t{key}: {value}")

Переменные окружения для PostgreSQL:
	POSTGRES_HOST: postgres
	POSTGRES_PASSWORD: airflow
	POSTGRES_PORT: 5432
	POSTGRES_USER: airflow
	POSTGRES_DB: airflow


## Подключение к PostgreSQL

In [6]:
import sys
import psycopg2
from psycopg2 import sql
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT

os.environ.get('PATH')
def connect_to_postgres(host=os.environ.get('POSTGRES_HOST'), 
                        port=os.environ.get('POSTGRES_PORT'), 
                        database=os.environ.get('POSTGRES_DB'), 
                        user=os.environ.get('POSTGRES_USER'), 
                        password=os.environ.get('POSTGRES_PASSWORD')):
    """ Подключение к PostgreSQL """
    try:
        connection = psycopg2.connect(
            host=host,
            port=port,
            database=database,
            user=user,
            password=password
        )
        connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
        return connection
    except Exception as e:
        print(f"Ошибка: {e}")
        return None

conn = connect_to_postgres()

In [7]:
def get_all_databases(connection):
    """ Получение списка всех баз данных """
    try:
        cursor = connection.cursor()
        cursor.execute("SELECT datname FROM pg_database WHERE datistemplate = false;")
        databases = cursor.fetchall()
        cursor.close()
        return [db[0] for db in databases]
    except Exception as e:
        print(f"Ошибка при получении списка баз данных: {e}")
        return []

databases = get_all_databases(conn)
databases

['postgres', 'airflow']

In [3]:
def get_schemas(connection, database):
    """ Получение списка схем в указанной базе данных """
    try:
        cursor = connection.cursor()
        cursor.execute("""
            SELECT schema_name 
            FROM information_schema.schemata 
            WHERE schema_name NOT IN ('pg_catalog', 'information_schema')
            ORDER BY schema_name;
        """)
        schemas = cursor.fetchall()
        cursor.close()
        return [schema[0] for schema in schemas]
    except Exception as e:
        print(f"Ошибка при получении списка схем: {e}")
        return []

schemas = get_schemas(conn, databases[0])
schemas

['mart', 'pg_toast', 'public', 'raw', 'stage']

In [4]:
def get_tables_in_schema(connection, schema_name='public'):
    """ Получение списка таблиц в указанной схеме """
    try:
        cursor = connection.cursor()
        query = sql.SQL("""
            SELECT table_name
            FROM information_schema.tables 
            WHERE table_schema = %s
            ORDER BY table_name;
        """)
        cursor.execute(query, (schema_name,))
        tables = cursor.fetchall()
        cursor.close()
        return tables
    except Exception as e:
        print(f"Ошибка при получении списка таблиц: {e}")
        return []

for schema in schemas:
    print(f'{schema} - {get_tables_in_schema(conn, schema)}')

mart - []
pg_toast - []
raw - []
stage - []


In [9]:
def get_table_columns(connection, schema_name, table_name):
    """ Получение информации о колонках таблицы """
    try:
        cursor = connection.cursor()
        query = sql.SQL("""
            SELECT 
                column_name, 
                data_type, 
                is_nullable,
                character_maximum_length,
                column_default
            FROM information_schema.columns 
            WHERE table_schema = %s AND table_name = %s
            ORDER BY ordinal_position;
        """)
        cursor.execute(query, (schema_name, table_name))
        columns = cursor.fetchall()
        cursor.close()
        return columns
    except Exception as e:
        print(f"Ошибка при получении информации о колонках: {e}")
        return []

schema, table = 'public', 'ab_permission' 
# schema, table = 'raw', 'users'
print(f'{table} - {get_table_columns(conn, schema, table)}')

users - [('id', 'integer', 'YES', None, None), ('first_name', 'text', 'YES', None, None), ('last_name', 'text', 'YES', None, None), ('age', 'integer', 'YES', None, None), ('created_date', 'date', 'YES', None, None)]


## PySpark примеры

In [12]:
!ls /home/jovyan/data

raw_2020.csv  raw_2021.csv  raw_2022.csv  raw_2023.csv


In [36]:
from pyspark.sql import SparkSession

# Создаем SparkSession
spark = (
    SparkSession.builder
        .appName("CSV Read Example")
        .getOrCreate()
)

df = spark.read.csv("/home/jovyan/data/raw_2020.csv", header=True, inferSchema=True, sep=';')

# df.printSchema()

first_columns = df.columns[:10]
df.select(first_columns).orderBy('1/29/20', reverse=False).show()

print(f"Количество строк: {df.count()}")
spark.stop()

+-------------------+--------------------+-------+-------+-------+-------+-------+-------+-------+-------+
|     Country/Region|      Province/State|1/22/20|1/23/20|1/24/20|1/25/20|1/26/20|1/27/20|1/28/20|1/29/20|
+-------------------+--------------------+-------+-------+-------+-------+-------+-------+-------+-------+
|        Afghanistan|                NULL|      0|      0|      0|      0|      0|      0|      0|      0|
|            Albania|                NULL|      0|      0|      0|      0|      0|      0|      0|      0|
|            Algeria|                NULL|      0|      0|      0|      0|      0|      0|      0|      0|
|            Andorra|                NULL|      0|      0|      0|      0|      0|      0|      0|      0|
|             Angola|                NULL|      0|      0|      0|      0|      0|      0|      0|      0|
|         Antarctica|                NULL|      0|      0|      0|      0|      0|      0|      0|      0|
|Antigua and Barbuda|                

In [11]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder 
        .appName("PostgreSQL Read Example") 
        .config("spark.jars", "/path/to/postgresql-42.5.0.jar") 
        .getOrCreate()
)

# table = 'raw.users'
table = "ab_permission"

postgres_url = "jdbc:postgresql://postgres:5432/airflow"
postgres_properties = {
    "user": "airflow",
    "password": "airflow",
    "driver": "org.postgresql.Driver"
}

df = (
    spark.read
        .jdbc(url=postgres_url, 
              table=table, 
              properties=postgres_properties)
)

df.show()
df.printSchema()
print(f"Количество строк: {df.count()}")

spark.stop()

+---+----------+---------+---+------------+
| id|first_name|last_name|age|created_date|
+---+----------+---------+---+------------+
|  2|     Мария|  Петрова| 25|  2023-02-20|
|  1|      Иван|   Иванов| 30|  2023-01-15|
|  3|   Алексей|  Сидоров| 35|  2023-03-10|
+---+----------+---------+---+------------+

root
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- created_date: date (nullable = true)

Количество строк: 3


In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from pyspark.sql.functions import to_date

spark = (
    SparkSession.builder 
        .appName("PostgreSQL Read Example") 
        .config("spark.jars", "/path/to/postgresql-42.5.0.jar") 
        .getOrCreate()
)

data = [
    (1, "Иван", "Иванов", 30, "2023-01-15"),
    (2, "Мария", "Петрова", 25, "2023-02-20"),
    (3, "Алексей", "Сидоров", 35, "2023-03-10")
]

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("created_date", StringType(), True)
])

df = spark.createDataFrame(data, schema=schema)
df = df.withColumn("created_date", to_date("created_date", "yyyy-MM-dd"))

print("Данные для вставки:")
df.show()

postgres_url = "jdbc:postgresql://postgres:5432/airflow"
postgres_properties = {
    "user": "airflow",
    "password": "airflow",
    "driver": "org.postgresql.Driver"
}

(df.write 
    .mode("append")
    .jdbc(url=postgres_url, 
          table="raw.users", 
          properties=postgres_properties)
)

print("Данные в базе:")
(
    spark.read
        .jdbc(url=postgres_url, 
              table='raw.users', 
              properties=postgres_properties)
).show()

spark.stop()

Данные для вставки:
+---+----------+---------+---+------------+
| id|first_name|last_name|age|created_date|
+---+----------+---------+---+------------+
|  1|      Иван|   Иванов| 30|  2023-01-15|
|  2|     Мария|  Петрова| 25|  2023-02-20|
|  3|   Алексей|  Сидоров| 35|  2023-03-10|
+---+----------+---------+---+------------+

Данные в базе:
+---+----------+---------+---+------------+
| id|first_name|last_name|age|created_date|
+---+----------+---------+---+------------+
|  2|     Мария|  Петрова| 25|  2023-02-20|
|  1|      Иван|   Иванов| 30|  2023-01-15|
|  3|   Алексей|  Сидоров| 35|  2023-03-10|
+---+----------+---------+---+------------+

